6b0fbe728bce594ed4ce88c34045350ba5cd04a3
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140
141         if (!ares) {
142                 return ldb_module_done(ac->req, NULL, NULL,
143                                         LDB_ERR_OPERATIONS_ERROR);
144         }
145
146         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148                 /* If we didn't fan this request out to mulitple partitions,
149                  * or this is an individual search result, we can
150                  * deterministically tell the caller what partition this was
151                  * written to (repl_meta_data likes to know) */
152                 ret = ldb_reply_add_control(ares,
153                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
154                                             false, partition_ctrl->data);
155                 if (ret != LDB_SUCCESS) {
156                         return ldb_module_done(ac->req, NULL, NULL,
157                                                ret);
158                 }
159         }
160
161         if (ares->error != LDB_SUCCESS) {
162                 return ldb_module_done(ac->req, ares->controls,
163                                         ares->response, ares->error);
164         }
165
166         switch (ares->type) {
167         case LDB_REPLY_REFERRAL:
168                 return ldb_module_send_referral(ac->req, ares->referral);
169
170         case LDB_REPLY_ENTRY:
171                 if (ac->req->operation != LDB_SEARCH) {
172                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
173                                 "partition_req_callback:"
174                                 " Unsupported reply type for this request");
175                         return ldb_module_done(ac->req, NULL, NULL,
176                                                 LDB_ERR_OPERATIONS_ERROR);
177                 }
178                 
179                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
180
181         case LDB_REPLY_DONE:
182                 if (ac->req->operation == LDB_EXTENDED) {
183                         /* FIXME: check for ares->response, replmd does not fill it ! */
184                         if (ares->response) {
185                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
187                                                           "partition_req_callback:"
188                                                           " Unknown extended reply, "
189                                                           "only supports START_TLS");
190                                         talloc_free(ares);
191                                         return ldb_module_done(ac->req, NULL, NULL,
192                                                                 LDB_ERR_OPERATIONS_ERROR);
193                                 }
194                         }
195                 }
196
197                 ac->finished_requests++;
198                 if (ac->finished_requests == ac->num_requests) {
199                         /* Send back referrals if they do exist (search ops) */
200                         if (ac->referrals != NULL) {
201                                 const char **ref;
202                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
203                                         ret = ldb_module_send_referral(ac->req,
204                                                                        talloc_strdup(ac->req, *ref));
205                                         if (ret != LDB_SUCCESS) {
206                                                 return ldb_module_done(ac->req, NULL, NULL,
207                                                                        ret);
208                                         }
209                                 }
210                         }
211
212                         /* this was the last one, call callback */
213                         return ldb_module_done(ac->req, ares->controls,
214                                                ares->response, 
215                                                ares->error);
216                 }
217
218                 /* not the last, now call the next one */
219                 module = ac->part_req[ac->finished_requests].module;
220                 nreq = ac->part_req[ac->finished_requests].req;
221
222                 ret = partition_request(module, nreq);
223                 if (ret != LDB_SUCCESS) {
224                         talloc_free(ares);
225                         return ldb_module_done(ac->req, NULL, NULL, ret);
226                 }
227
228                 break;
229         }
230
231         talloc_free(ares);
232         return LDB_SUCCESS;
233 }
234
235 static int partition_prep_request(struct partition_context *ac,
236                                   struct dsdb_partition *partition)
237 {
238         int ret;
239         struct ldb_request *req;
240         struct ldb_control *partition_ctrl = NULL;
241
242         ac->part_req = talloc_realloc(ac, ac->part_req,
243                                         struct part_request,
244                                         ac->num_requests + 1);
245         if (ac->part_req == NULL) {
246                 return ldb_oom(ldb_module_get_ctx(ac->module));
247         }
248
249         switch (ac->req->operation) {
250         case LDB_SEARCH:
251                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
252                                         ac->part_req,
253                                         ac->req->op.search.base,
254                                         ac->req->op.search.scope,
255                                         ac->req->op.search.tree,
256                                         ac->req->op.search.attrs,
257                                         ac->req->controls,
258                                         ac, partition_req_callback,
259                                         ac->req);
260                 LDB_REQ_SET_LOCATION(req);
261                 break;
262         case LDB_ADD:
263                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264                                         ac->req->op.add.message,
265                                         ac->req->controls,
266                                         ac, partition_req_callback,
267                                         ac->req);
268                 LDB_REQ_SET_LOCATION(req);
269                 break;
270         case LDB_MODIFY:
271                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272                                         ac->req->op.mod.message,
273                                         ac->req->controls,
274                                         ac, partition_req_callback,
275                                         ac->req);
276                 LDB_REQ_SET_LOCATION(req);
277                 break;
278         case LDB_DELETE:
279                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280                                         ac->req->op.del.dn,
281                                         ac->req->controls,
282                                         ac, partition_req_callback,
283                                         ac->req);
284                 LDB_REQ_SET_LOCATION(req);
285                 break;
286         case LDB_RENAME:
287                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288                                         ac->req->op.rename.olddn,
289                                         ac->req->op.rename.newdn,
290                                         ac->req->controls,
291                                         ac, partition_req_callback,
292                                         ac->req);
293                 LDB_REQ_SET_LOCATION(req);
294                 break;
295         case LDB_EXTENDED:
296                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
297                                         ac->part_req,
298                                         ac->req->op.extended.oid,
299                                         ac->req->op.extended.data,
300                                         ac->req->controls,
301                                         ac, partition_req_callback,
302                                         ac->req);
303                 LDB_REQ_SET_LOCATION(req);
304                 break;
305         default:
306                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
307                                   "Unsupported request type!");
308                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
309         }
310
311         if (ret != LDB_SUCCESS) {
312                 return ret;
313         }
314
315         ac->part_req[ac->num_requests].req = req;
316
317         if (ac->req->controls) {
318                 /* Duplicate everything beside the current partition control */
319                 partition_ctrl = ldb_request_get_control(ac->req,
320                                                          DSDB_CONTROL_CURRENT_PARTITION_OID);
321                 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
322                         return ldb_module_oom(ac->module);
323                 }
324         }
325
326         if (partition) {
327                 void *part_data = partition->ctrl;
328
329                 ac->part_req[ac->num_requests].module = partition->module;
330
331                 if (partition_ctrl != NULL) {
332                         if (partition_ctrl->data != NULL) {
333                                 part_data = partition_ctrl->data;
334                         }
335
336                         /*
337                          * If the provided current partition control is without
338                          * data then use the calculated one.
339                          */
340                         ret = ldb_request_add_control(req,
341                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
342                                                       false, part_data);
343                         if (ret != LDB_SUCCESS) {
344                                 return ret;
345                         }
346                 }
347
348                 if (req->operation == LDB_SEARCH) {
349                         /* If the search is for 'more' than this partition,
350                          * then change the basedn, so a remote LDAP server
351                          * doesn't object */
352                         if (ldb_dn_compare_base(partition->ctrl->dn,
353                                                 req->op.search.base) != 0) {
354                                 req->op.search.base = partition->ctrl->dn;
355                         }
356                 }
357
358         } else {
359                 /* make sure you put the module here, or
360                  * or ldb_next_request() will skip a module */
361                 ac->part_req[ac->num_requests].module = ac->module;
362         }
363
364         ac->num_requests++;
365
366         return LDB_SUCCESS;
367 }
368
369 static int partition_call_first(struct partition_context *ac)
370 {
371         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
372 }
373
374 /**
375  * Send a request down to all the partitions (but not the sam.ldb file)
376  */
377 static int partition_send_all(struct ldb_module *module, 
378                               struct partition_context *ac, 
379                               struct ldb_request *req) 
380 {
381         unsigned int i;
382         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
383                                                               struct partition_private_data);
384         int ret;
385
386         for (i=0; data && data->partitions && data->partitions[i]; i++) {
387                 ret = partition_prep_request(ac, data->partitions[i]);
388                 if (ret != LDB_SUCCESS) {
389                         return ret;
390                 }
391         }
392
393         /* fire the first one */
394         return partition_call_first(ac);
395 }
396
397 struct partition_copy_context {
398         struct ldb_module *module;
399         struct partition_context *partition_context;
400         struct ldb_request *request;
401         struct ldb_dn *dn;
402 };
403
404 /*
405  * A special DN has been updated in the primary partition. Now propagate those
406  * changes to the remaining partitions.
407  *
408  * Note: that the operations are asynchronous and this function is called
409  *       from partition_copy_all_callback_handler in response to an async
410  *       callback.
411  */
412 static int partition_copy_all_callback_action(
413         struct ldb_module *module,
414         struct partition_context *ac,
415         struct ldb_request *req,
416         struct ldb_dn *dn)
417
418 {
419
420         unsigned int i;
421         struct partition_private_data *data =
422                 talloc_get_type(
423                         ldb_module_get_private(module),
424                         struct partition_private_data);
425         int search_ret;
426         struct ldb_result *res;
427         /* now fetch the resulting object, and then copy it to all the
428          * other partitions. We need this approach to cope with the
429          * partitions getting out of sync. If for example the
430          * @ATTRIBUTES object exists on one partition but not the
431          * others then just doing each of the partitions in turn will
432          * lead to an error
433          */
434         search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
435         if (search_ret != LDB_SUCCESS) {
436                 return search_ret;
437         }
438
439         /* now delete the object in the other partitions, if requried
440         */
441         if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
442                 for (i=0; data->partitions && data->partitions[i]; i++) {
443                         int pret;
444                         pret = dsdb_module_del(data->partitions[i]->module,
445                                                dn,
446                                                DSDB_FLAG_NEXT_MODULE,
447                                                req);
448                         if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
449                                 /* we should only get success or no
450                                    such object from the other partitions */
451                                 return pret;
452                         }
453                 }
454
455                 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
456         }
457
458         /* now add/modify in the other partitions */
459         for (i=0; data->partitions && data->partitions[i]; i++) {
460                 struct ldb_message *modify_msg = NULL;
461                 int pret;
462                 unsigned int el_idx;
463
464                 pret = dsdb_module_add(data->partitions[i]->module,
465                                        res->msgs[0],
466                                        DSDB_FLAG_NEXT_MODULE,
467                                        req);
468                 if (pret == LDB_SUCCESS) {
469                         continue;
470                 }
471
472                 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
473                         return pret;
474                 }
475
476                 modify_msg = ldb_msg_copy(req, res->msgs[0]);
477                 if (modify_msg == NULL) {
478                         return ldb_module_oom(module);
479                 }
480
481                 /*
482                  * mark all the message elements as
483                  * LDB_FLAG_MOD_REPLACE
484                  */
485                 for (el_idx=0;
486                      el_idx < modify_msg->num_elements;
487                      el_idx++) {
488                         modify_msg->elements[el_idx].flags
489                                 = LDB_FLAG_MOD_REPLACE;
490                 }
491
492                 if (req->operation == LDB_MODIFY) {
493                         const struct ldb_message *req_msg = req->op.mod.message;
494                         /*
495                          * mark elements to be removed, if there were
496                          * deleted entirely above we need to delete
497                          * them here too
498                          */
499                         for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
500                                 if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
501                                     || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
502                                         req_msg->elements[el_idx].num_values == 0)) {
503                                         if (ldb_msg_find_element(modify_msg,
504                                                                  req_msg->elements[el_idx].name) != NULL) {
505                                                 continue;
506                                         }
507                                         pret = ldb_msg_add_empty(
508                                                 modify_msg,
509                                                 req_msg->elements[el_idx].name,
510                                                 LDB_FLAG_MOD_REPLACE,
511                                                 NULL);
512                                         if (pret != LDB_SUCCESS) {
513                                                 return pret;
514                                         }
515                                 }
516                         }
517                 }
518
519                 pret = dsdb_module_modify(data->partitions[i]->module,
520                                           modify_msg,
521                                           DSDB_FLAG_NEXT_MODULE,
522                                           req);
523
524                 if (pret != LDB_SUCCESS) {
525                         return pret;
526                 }
527         }
528
529         return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
530 }
531
532
533 /*
534  * @brief call back function for the ldb operations on special DN's.
535  *
536  * As the LDB operations are async, and we wish to use the result
537  * the operations, a callback needs to be registered to process the results
538  * of the LDB operations.
539  *
540  * @param req the ldb request
541  * @param res the result of the operation
542  *
543  * @return the LDB_STATUS
544  */
545 static int partition_copy_all_callback_handler(
546         struct ldb_request *req,
547         struct ldb_reply *ares)
548 {
549         struct partition_copy_context *ac = NULL;
550
551         ac = talloc_get_type(
552                 req->context,
553                 struct partition_copy_context);
554
555         if (!ares) {
556                 return ldb_module_done(
557                         ac->request,
558                         NULL,
559                         NULL,
560                         LDB_ERR_OPERATIONS_ERROR);
561         }
562
563         /* pass on to the callback */
564         switch (ares->type) {
565         case LDB_REPLY_ENTRY:
566                 return ldb_module_send_entry(
567                         ac->request,
568                         ares->message,
569                         ares->controls);
570
571         case LDB_REPLY_REFERRAL:
572                 return ldb_module_send_referral(
573                         ac->request,
574                         ares->referral);
575
576         case LDB_REPLY_DONE: {
577                 int error = ares->error;
578                 if (error == LDB_SUCCESS) {
579                         error = partition_copy_all_callback_action(
580                                 ac->module,
581                                 ac->partition_context,
582                                 ac->request,
583                                 ac->dn);
584                 }
585                 return ldb_module_done(
586                         ac->request,
587                         ares->controls,
588                         ares->response,
589                         error);
590         }
591
592         default:
593                 /* Can't happen */
594                 return LDB_ERR_OPERATIONS_ERROR;
595         }
596 }
597
598 /**
599  * send an operation to the top partition, then copy the resulting
600  * object to all other partitions.
601  */
602 static int partition_copy_all(
603         struct ldb_module *module,
604         struct partition_context *partition_context,
605         struct ldb_request *req,
606         struct ldb_dn *dn)
607 {
608         struct ldb_request *new_req = NULL;
609         struct ldb_context *ldb = NULL;
610         struct partition_copy_context *context = NULL;
611
612         int ret;
613
614         ldb = ldb_module_get_ctx(module);
615
616         context = talloc_zero(req, struct partition_copy_context);
617         if (context == NULL) {
618                 return ldb_oom(ldb);
619         }
620         context->module = module;
621         context->request = req;
622         context->dn = dn;
623         context->partition_context = partition_context;
624
625         switch (req->operation) {
626         case LDB_ADD:
627                 ret = ldb_build_add_req(
628                         &new_req,
629                         ldb,
630                         req,
631                         req->op.add.message,
632                         req->controls,
633                         context,
634                         partition_copy_all_callback_handler,
635                         req);
636                 break;
637         case LDB_MODIFY:
638                 ret = ldb_build_mod_req(
639                         &new_req,
640                         ldb,
641                         req,
642                         req->op.mod.message,
643                         req->controls,
644                         context,
645                         partition_copy_all_callback_handler,
646                         req);
647                 break;
648         case LDB_DELETE:
649                 ret = ldb_build_del_req(
650                         &new_req,
651                         ldb,
652                         req,
653                         req->op.del.dn,
654                         req->controls,
655                         context,
656                         partition_copy_all_callback_handler,
657                         req);
658                 break;
659         case LDB_RENAME:
660                 ret = ldb_build_rename_req(
661                         &new_req,
662                         ldb,
663                         req,
664                         req->op.rename.olddn,
665                         req->op.rename.newdn,
666                         req->controls,
667                         context,
668                         partition_copy_all_callback_handler,
669                         req);
670                 break;
671         default:
672                 /*
673                  * Shouldn't happen.
674                  */
675                 ldb_debug(
676                         ldb,
677                         LDB_DEBUG_ERROR,
678                         "Unexpected operation type (%d)\n", req->operation);
679                 ret = LDB_ERR_OPERATIONS_ERROR;
680                 break;
681         }
682         if (ret != LDB_SUCCESS) {
683                 return ret;
684         }
685         return ldb_next_request(module, new_req);
686 }
687 /**
688  * Figure out which backend a request needs to be aimed at.  Some
689  * requests must be replicated to all backends
690  */
691 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
692 {
693         struct partition_context *ac;
694         unsigned int i;
695         int ret;
696         struct dsdb_partition *partition;
697         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
698                                                               struct partition_private_data);
699
700         /* if we aren't initialised yet go further */
701         if (!data || !data->partitions) {
702                 return ldb_next_request(module, req);
703         }
704
705         if (ldb_dn_is_special(dn)) {
706                 /* Is this a special DN, we need to replicate to every backend? */
707                 for (i=0; data->replicate && data->replicate[i]; i++) {
708                         if (ldb_dn_compare(data->replicate[i], 
709                                            dn) == 0) {
710                                 
711                                 ac = partition_init_ctx(module, req);
712                                 if (!ac) {
713                                         return ldb_operr(ldb_module_get_ctx(module));
714                                 }
715                                 
716                                 return partition_copy_all(module, ac, req, dn);
717                         }
718                 }
719         }
720
721         /* Otherwise, we need to find the partition to fire it to */
722
723         /* Find partition */
724         partition = find_partition(data, dn, req);
725         if (!partition) {
726                 /*
727                  * if we haven't found a matching partition
728                  * pass the request to the main ldb
729                  *
730                  * TODO: we should maybe return an error here
731                  *       if it's not a special dn
732                  */
733
734                 return ldb_next_request(module, req);
735         }
736
737         ac = partition_init_ctx(module, req);
738         if (!ac) {
739                 return ldb_operr(ldb_module_get_ctx(module));
740         }
741
742         /* we need to add a control but we never touch the original request */
743         ret = partition_prep_request(ac, partition);
744         if (ret != LDB_SUCCESS) {
745                 return ret;
746         }
747
748         /* fire the first one */
749         return partition_call_first(ac);
750 }
751
752 /* search */
753 static int partition_search(struct ldb_module *module, struct ldb_request *req)
754 {
755         struct ldb_control **saved_controls;
756         /* Find backend */
757         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
758                                                               struct partition_private_data);
759         struct partition_context *ac;
760         struct ldb_context *ldb;
761         struct loadparm_context *lp_ctx;
762
763         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
764         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
765         struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
766         
767         struct ldb_search_options_control *search_options = NULL;
768         struct dsdb_partition *p;
769         unsigned int i, j;
770         int ret;
771         bool domain_scope = false, phantom_root = false;
772
773         p = find_partition(data, NULL, req);
774         if (p != NULL) {
775                 /* the caller specified what partition they want the
776                  * search - just pass it on
777                  */
778                 return ldb_next_request(p->module, req);
779         }
780
781         /* Get back the search options from the search control, and mark it as
782          * non-critical (to make backends and also dcpromo happy).
783          */
784         if (search_control) {
785                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
786                 search_control->critical = 0;
787
788         }
789
790         /* Remove the "domain_scope" control, so we don't confuse a backend
791          * server */
792         if (domain_scope_control && !ldb_save_controls(domain_scope_control, req, &saved_controls)) {
793                 return ldb_oom(ldb_module_get_ctx(module));
794         }
795
796         /* if we aren't initialised yet go further */
797         if (!data || !data->partitions) {
798                 return ldb_next_request(module, req);
799         }
800
801         /* Special DNs without specified partition should go further */
802         if (ldb_dn_is_special(req->op.search.base)) {
803                 return ldb_next_request(module, req);
804         }
805
806         /* Locate the options */
807         domain_scope = (search_options
808                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
809                 || domain_scope_control;
810         phantom_root = search_options
811                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
812
813         /* Remove handled options from the search control flag */
814         if (search_options) {
815                 search_options->search_options = search_options->search_options
816                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
817                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
818         }
819
820         ac = partition_init_ctx(module, req);
821         if (!ac) {
822                 return ldb_operr(ldb_module_get_ctx(module));
823         }
824
825         ldb = ldb_module_get_ctx(ac->module);
826         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
827                                                 struct loadparm_context);
828
829         /* Search from the base DN */
830         if (ldb_dn_is_null(req->op.search.base)) {
831                 if (!phantom_root) {
832                         return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
833                 }
834                 return partition_send_all(module, ac, req);
835         }
836
837         for (i=0; data->partitions[i]; i++) {
838                 bool match = false, stop = false;
839
840                 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
841                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
842                                                 req->op.search.base) == 0) {
843                                 /* base DN is in a partial replica
844                                    with the NO_GLOBAL_CATALOG
845                                    control. This partition is invisible */
846                                 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
847                                 continue;
848                         }
849                 }
850
851                 if (phantom_root) {
852                         /* Phantom root: Find all partitions under the
853                          * search base. We match if:
854                          *
855                          * 1) the DN we are looking for exactly matches a
856                          *    certain partition and always stop
857                          * 2) the DN we are looking for is a parent of certain
858                          *    partitions and it isn't a scope base search
859                          * 3) the DN we are looking for is a child of a certain
860                          *    partition and always stop
861                          *    - we don't need to go any further up in the
862                          *    hierarchy!
863                          */
864                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
865                                            req->op.search.base) == 0) {
866                                 match = true;
867                                 stop = true;
868                         }
869                         if (!match &&
870                             (ldb_dn_compare_base(req->op.search.base,
871                                                  data->partitions[i]->ctrl->dn) == 0 &&
872                              req->op.search.scope != LDB_SCOPE_BASE)) {
873                                 match = true;
874                         }
875                         if (!match &&
876                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
877                                                 req->op.search.base) == 0) {
878                                 match = true;
879                                 stop = true; /* note that this relies on partition ordering */
880                         }
881                 } else {
882                         /* Domain scope: Find all partitions under the search
883                          * base.
884                          *
885                          * We generate referral candidates if we haven't
886                          * specified the domain scope control, haven't a base
887                          * search* scope and the DN we are looking for is a real
888                          * predecessor of certain partitions. When a new
889                          * referral candidate is nearer to the DN than an
890                          * existing one delete the latter (we want to have only
891                          * the closest ones). When we checked this for all
892                          * candidates we have the final referrals.
893                          *
894                          * We match if the DN we are looking for is a child of
895                          * a certain partition or the partition
896                          * DN itself - we don't need to go any further
897                          * up in the hierarchy!
898                          */
899                         if ((!domain_scope) &&
900                             (req->op.search.scope != LDB_SCOPE_BASE) &&
901                             (ldb_dn_compare_base(req->op.search.base,
902                                                  data->partitions[i]->ctrl->dn) == 0) &&
903                             (ldb_dn_compare(req->op.search.base,
904                                             data->partitions[i]->ctrl->dn) != 0)) {
905                                 const char *scheme = ldb_get_opaque(
906                                     ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
907                                 char *ref = talloc_asprintf(
908                                         ac,
909                                         "%s://%s/%s%s",
910                                         scheme == NULL ? "ldap" : scheme,
911                                         lpcfg_dnsdomain(lp_ctx),
912                                         ldb_dn_get_linearized(
913                                             data->partitions[i]->ctrl->dn),
914                                         req->op.search.scope ==
915                                             LDB_SCOPE_ONELEVEL ? "??base" : "");
916
917                                 if (ref == NULL) {
918                                         return ldb_oom(ldb);
919                                 }
920
921                                 /* Initialise the referrals list */
922                                 if (ac->referrals == NULL) {
923                                         char **l = str_list_make_empty(ac);
924                                         ac->referrals = discard_const_p(const char *, l);
925                                         if (ac->referrals == NULL) {
926                                                 return ldb_oom(ldb);
927                                         }
928                                 }
929
930                                 /* Check if the new referral candidate is
931                                  * closer to the base DN than already
932                                  * saved ones and delete the latters */
933                                 j = 0;
934                                 while (ac->referrals[j] != NULL) {
935                                         if (strstr(ac->referrals[j],
936                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
937                                                 str_list_remove(ac->referrals,
938                                                                 ac->referrals[j]);
939                                         } else {
940                                                 ++j;
941                                         }
942                                 }
943
944                                 /* Add our new candidate */
945                                 ac->referrals = str_list_add(ac->referrals, ref);
946
947                                 talloc_free(ref);
948
949                                 if (ac->referrals == NULL) {
950                                         return ldb_oom(ldb);
951                                 }
952                         }
953                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
954                                 match = true;
955                                 stop = true; /* note that this relies on partition ordering */
956                         }
957                 }
958
959                 if (match) {
960                         ret = partition_prep_request(ac, data->partitions[i]);
961                         if (ret != LDB_SUCCESS) {
962                                 return ret;
963                         }
964                 }
965
966                 if (stop) break;
967         }
968
969         /* Perhaps we didn't match any partitions. Try the main partition */
970         if (ac->num_requests == 0) {
971                 talloc_free(ac);
972                 return ldb_next_request(module, req);
973         }
974
975         /* fire the first one */
976         return partition_call_first(ac);
977 }
978
979 /* add */
980 static int partition_add(struct ldb_module *module, struct ldb_request *req)
981 {
982         return partition_replicate(module, req, req->op.add.message->dn);
983 }
984
985 /* modify */
986 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
987 {
988         return partition_replicate(module, req, req->op.mod.message->dn);
989 }
990
991 /* delete */
992 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
993 {
994         return partition_replicate(module, req, req->op.del.dn);
995 }
996
997 /* rename */
998 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
999 {
1000         /* Find backend */
1001         struct dsdb_partition *backend, *backend2;
1002         
1003         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1004                                                               struct partition_private_data);
1005
1006         /* Skip the lot if 'data' isn't here yet (initialisation) */
1007         if (!data) {
1008                 return ldb_operr(ldb_module_get_ctx(module));
1009         }
1010
1011         backend = find_partition(data, req->op.rename.olddn, req);
1012         backend2 = find_partition(data, req->op.rename.newdn, req);
1013
1014         if ((backend && !backend2) || (!backend && backend2)) {
1015                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1016         }
1017
1018         if (backend != backend2) {
1019                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1020                                        "Cannot rename from %s in %s to %s in %s: %s",
1021                                        ldb_dn_get_linearized(req->op.rename.olddn),
1022                                        ldb_dn_get_linearized(backend->ctrl->dn),
1023                                        ldb_dn_get_linearized(req->op.rename.newdn),
1024                                        ldb_dn_get_linearized(backend2->ctrl->dn),
1025                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1026                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1027         }
1028
1029         return partition_replicate(module, req, req->op.rename.olddn);
1030 }
1031
1032 /* start a transaction */
1033 int partition_start_trans(struct ldb_module *module)
1034 {
1035         int i = 0;
1036         int ret = 0;
1037         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1038                                                               struct partition_private_data);
1039         /* Look at base DN */
1040         /* Figure out which partition it is under */
1041         /* Skip the lot if 'data' isn't here yet (initialization) */
1042         if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1043                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1044         }
1045
1046         /*
1047          * We start a transaction on metadata.tdb first and end it last in
1048          * end_trans. This makes locking semantics follow TDB rather than MDB,
1049          * and effectively locks all partitions at once.
1050          * Detail:
1051          * Samba AD is special in that the partitions module (this file)
1052          * combines multiple independently locked databases into one overall
1053          * transaction. Changes across multiple partition DBs in a single
1054          * transaction must ALL be either visible or invisible.
1055          * The way this is achieved is by taking out a write lock on
1056          * metadata.tdb at the start of prepare_commit, while unlocking it at
1057          * the end of end_trans. This is matched by read_lock, ensuring it
1058          * can't progress until that write lock is released.
1059          *
1060          * metadata.tdb needs to be a TDB file because MDB uses independent
1061          * locks, which means a read lock and a write lock can be held at the
1062          * same time, whereas in TDB, the two locks block each other. The TDB
1063          * behaviour is required to implement the functionality described
1064          * above.
1065          *
1066          * An important additional detail here is that if prepare_commit is
1067          * called on a TDB without any changes being made, no write lock is
1068          * taken. We address this by storing a sequence number in metadata.tdb
1069          * which is updated every time a replicated attribute is modified.
1070          * The possibility of a few unreplicated attributes being out of date
1071          * turns out not to be a problem.
1072          * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
1073          * the same end as locking metadata.tdb, unless we made a modification
1074          * to the @ records found there before every prepare_commit.
1075          */
1076         ret = partition_metadata_start_trans(module);
1077         if (ret != LDB_SUCCESS) {
1078                 return ret;
1079         }
1080
1081         ret = ldb_next_start_trans(module);
1082         if (ret != LDB_SUCCESS) {
1083                 partition_metadata_del_trans(module);
1084                 return ret;
1085         }
1086
1087         ret = partition_reload_if_required(module, data, NULL);
1088         if (ret != LDB_SUCCESS) {
1089                 ldb_next_del_trans(module);
1090                 partition_metadata_del_trans(module);
1091                 return ret;
1092         }
1093
1094         /*
1095          * The following per partition locks are required mostly because TDB
1096          * and MDB require locks before read and write ops are permitted.
1097          */
1098         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1099                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1100                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1101                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1102                 }
1103                 ret = ldb_next_start_trans(data->partitions[i]->module);
1104                 if (ret != LDB_SUCCESS) {
1105                         /* Back it out, if it fails on one */
1106                         for (i--; i >= 0; i--) {
1107                                 ldb_next_del_trans(data->partitions[i]->module);
1108                         }
1109                         ldb_next_del_trans(module);
1110                         partition_metadata_del_trans(module);
1111                         return ret;
1112                 }
1113         }
1114
1115         data->in_transaction++;
1116
1117         return LDB_SUCCESS;
1118 }
1119
1120 /* prepare for a commit */
1121 int partition_prepare_commit(struct ldb_module *module)
1122 {
1123         unsigned int i;
1124         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1125                                                               struct partition_private_data);
1126         int ret;
1127
1128         /*
1129          * Order of prepare_commit calls must match that in
1130          * partition_start_trans. See comment in that function for detail.
1131          */
1132         ret = partition_metadata_prepare_commit(module);
1133         if (ret != LDB_SUCCESS) {
1134                 return ret;
1135         }
1136
1137         ret = ldb_next_prepare_commit(module);
1138         if (ret != LDB_SUCCESS) {
1139                 return ret;
1140         }
1141
1142         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1143                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1144                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1145                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1146                 }
1147                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1148                 if (ret != LDB_SUCCESS) {
1149                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1150                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1151                                                ldb_errstring(ldb_module_get_ctx(module)));
1152                         return ret;
1153                 }
1154         }
1155
1156         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1157                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1158         }
1159
1160         return LDB_SUCCESS;
1161 }
1162
1163
1164 /* end a transaction */
1165 int partition_end_trans(struct ldb_module *module)
1166 {
1167         int ret, ret2;
1168         int i;
1169         struct ldb_context *ldb = ldb_module_get_ctx(module);
1170         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1171                                                               struct partition_private_data);
1172         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1173
1174         ret = LDB_SUCCESS;
1175
1176         if (data->in_transaction == 0) {
1177                 DEBUG(0,("partition end transaction mismatch\n"));
1178                 ret = LDB_ERR_OPERATIONS_ERROR;
1179         } else {
1180                 data->in_transaction--;
1181         }
1182
1183         /*
1184          * Order of end_trans calls must be the reverse of that in
1185          * partition_start_trans. See comment in that function for detail.
1186          */
1187         if (data && data->partitions) {
1188                 /* Just counting the partitions */
1189                 for (i=0; data->partitions[i]; i++) {}
1190
1191                 /* now walk them backwards */
1192                 for (i--; i>=0; i--) {
1193                         struct dsdb_partition *p = data->partitions[i];
1194                         if (trace) {
1195                                 ldb_debug(ldb,
1196                                           LDB_DEBUG_TRACE,
1197                                           "partition_end_trans() -> %s",
1198                                           ldb_dn_get_linearized(p->ctrl->dn));
1199                         }
1200                         ret2 = ldb_next_end_trans(p->module);
1201                         if (ret2 != LDB_SUCCESS) {
1202                                 ldb_asprintf_errstring(ldb,
1203                                         "end_trans error on %s: %s",
1204                                         ldb_dn_get_linearized(p->ctrl->dn),
1205                                         ldb_errstring(ldb));
1206                                 ret = ret2;
1207                         }
1208                 }
1209         }
1210
1211         if (trace) {
1212                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1213         }
1214         ret2 = ldb_next_end_trans(module);
1215         if (ret2 != LDB_SUCCESS) {
1216                 ret = ret2;
1217         }
1218
1219         ret2 = partition_metadata_end_trans(module);
1220         if (ret2 != LDB_SUCCESS) {
1221                 ret = ret2;
1222         }
1223
1224         return ret;
1225 }
1226
1227 /* delete a transaction */
1228 int partition_del_trans(struct ldb_module *module)
1229 {
1230         int ret, final_ret = LDB_SUCCESS;
1231         int i;
1232         struct ldb_context *ldb = ldb_module_get_ctx(module);
1233         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1234                                                               struct partition_private_data);
1235         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1236
1237         if (data == NULL) {
1238                 DEBUG(0,("partion delete transaction with no private data\n"));
1239                 return ldb_operr(ldb);
1240         }
1241
1242         /*
1243          * Order of del_trans calls must be the reverse of that in
1244          * partition_start_trans. See comment in that function for detail.
1245          */
1246         if (data->partitions) {
1247                 /* Just counting the partitions */
1248                 for (i=0; data->partitions[i]; i++) {}
1249
1250                 /* now walk them backwards */
1251                 for (i--; i>=0; i--) {
1252                         struct dsdb_partition *p = data->partitions[i];
1253                         if (trace) {
1254                                 ldb_debug(ldb,
1255                                           LDB_DEBUG_TRACE,
1256                                           "partition_del_trans() -> %s",
1257                                           ldb_dn_get_linearized(p->ctrl->dn));
1258                         }
1259                         ret = ldb_next_del_trans(p->module);
1260                         if (ret != LDB_SUCCESS) {
1261                                 ldb_asprintf_errstring(ldb,
1262                                         "del_trans error on %s: %s",
1263                                         ldb_dn_get_linearized(p->ctrl->dn),
1264                                         ldb_errstring(ldb));
1265                                 final_ret = ret;
1266                         }
1267                 }
1268         }
1269
1270         if (trace) {
1271                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1272         }
1273         ret = ldb_next_del_trans(module);
1274         if (ret != LDB_SUCCESS) {
1275                 final_ret = ret;
1276         }
1277
1278         ret = partition_metadata_del_trans(module);
1279         if (ret != LDB_SUCCESS) {
1280                 final_ret = ret;
1281         }
1282
1283         if (data->in_transaction == 0) {
1284                 DEBUG(0,("partition del transaction mismatch\n"));
1285                 return ldb_operr(ldb_module_get_ctx(module));
1286         }
1287         data->in_transaction--;
1288
1289         return final_ret;
1290 }
1291
1292 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
1293                                       uint64_t *seq_number,
1294                                       struct ldb_request *parent)
1295 {
1296         int ret;
1297         struct ldb_result *res;
1298         struct ldb_seqnum_request *tseq;
1299         struct ldb_seqnum_result *seqr;
1300
1301         tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1302         if (tseq == NULL) {
1303                 return ldb_oom(ldb_module_get_ctx(module));
1304         }
1305         tseq->type = LDB_SEQ_HIGHEST_SEQ;
1306         
1307         ret = dsdb_module_extended(module, tseq, &res,
1308                                    LDB_EXTENDED_SEQUENCE_NUMBER,
1309                                    tseq,
1310                                    DSDB_FLAG_NEXT_MODULE,
1311                                    parent);
1312         if (ret != LDB_SUCCESS) {
1313                 talloc_free(tseq);
1314                 return ret;
1315         }
1316         
1317         seqr = talloc_get_type_abort(res->extended->data,
1318                                      struct ldb_seqnum_result);
1319         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1320                 talloc_free(res);
1321                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1322                         "Primary backend in partition module returned a timestamp based seq");
1323         }
1324
1325         *seq_number = seqr->seq_num;
1326         talloc_free(tseq);
1327         return LDB_SUCCESS;
1328 }
1329
1330
1331 /*
1332  * Older version of sequence number as sum of sequence numbers for each partition
1333  */
1334 int partition_sequence_number_from_partitions(struct ldb_module *module,
1335                                               uint64_t *seqr)
1336 {
1337         int ret;
1338         unsigned int i;
1339         uint64_t seq_number = 0;
1340         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1341                                                               struct partition_private_data);
1342
1343         ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1344         if (ret != LDB_SUCCESS) {
1345                 return ret;
1346         }
1347         
1348         /* Skip the lot if 'data' isn't here yet (initialisation) */
1349         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1350                 struct ldb_seqnum_request *tseq;
1351                 struct ldb_seqnum_result *tseqr;
1352                 struct ldb_request *treq;
1353                 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1354                 if (res == NULL) {
1355                         return ldb_oom(ldb_module_get_ctx(module));
1356                 }
1357                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1358                 if (tseq == NULL) {
1359                         talloc_free(res);
1360                         return ldb_oom(ldb_module_get_ctx(module));
1361                 }
1362                 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1363                 
1364                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1365                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1366                                              tseq,
1367                                              NULL,
1368                                              res,
1369                                              ldb_extended_default_callback,
1370                                              NULL);
1371                 LDB_REQ_SET_LOCATION(treq);
1372                 if (ret != LDB_SUCCESS) {
1373                         talloc_free(res);
1374                         return ret;
1375                 }
1376                 
1377                 ret = partition_request(data->partitions[i]->module, treq);
1378                 if (ret != LDB_SUCCESS) {
1379                         talloc_free(res);
1380                         return ret;
1381                 }
1382                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1383                 if (ret != LDB_SUCCESS) {
1384                         talloc_free(res);
1385                         return ret;
1386                 }
1387                 tseqr = talloc_get_type(res->extended->data,
1388                                         struct ldb_seqnum_result);
1389                 seq_number += tseqr->seq_num;
1390                 talloc_free(res);
1391         }
1392
1393         *seqr = seq_number;
1394         return LDB_SUCCESS;
1395 }
1396
1397
1398 /*
1399  * Newer version of sequence number using metadata tdb
1400  */
1401 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1402 {
1403         struct ldb_extended *ext;
1404         struct ldb_seqnum_request *seq;
1405         struct ldb_seqnum_result *seqr;
1406         uint64_t seq_number;
1407         int ret;
1408
1409         seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1410         switch (seq->type) {
1411         case LDB_SEQ_NEXT:
1412                 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1413                 if (ret != LDB_SUCCESS) {
1414                         return ret;
1415                 }
1416                 break;
1417
1418         case LDB_SEQ_HIGHEST_SEQ:
1419                 ret = partition_metadata_sequence_number(module, &seq_number);
1420                 if (ret != LDB_SUCCESS) {
1421                         return ret;
1422                 }
1423                 break;
1424
1425         case LDB_SEQ_HIGHEST_TIMESTAMP:
1426                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1427                                         "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1428         }
1429
1430         ext = talloc_zero(req, struct ldb_extended);
1431         if (!ext) {
1432                 return ldb_module_oom(module);
1433         }
1434         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1435         if (seqr == NULL) {
1436                 talloc_free(ext);
1437                 return ldb_module_oom(module);
1438         }
1439         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1440         ext->data = seqr;
1441
1442         seqr->seq_num = seq_number;
1443         seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1444
1445         /* send request done */
1446         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1447 }
1448
1449 /* lock all the backends */
1450 int partition_read_lock(struct ldb_module *module)
1451 {
1452         int i = 0;
1453         int ret = 0;
1454         int ret2 = 0;
1455         struct ldb_context *ldb = ldb_module_get_ctx(module);
1456         struct partition_private_data *data = \
1457                 talloc_get_type(ldb_module_get_private(module),
1458                                 struct partition_private_data);
1459
1460         if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1461                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1462                           "partition_read_lock() -> (metadata partition)");
1463         }
1464
1465         /*
1466          * It is important to only do this for LOCK because:
1467          * - we don't want to unlock what we did not lock
1468          *
1469          * - we don't want to make a new lock on the sam.ldb
1470          *   (triggered inside this routine due to the seq num check)
1471          *   during an unlock phase as that will violate the lock
1472          *   ordering
1473          */
1474
1475         if (data == NULL) {
1476                 TALLOC_CTX *mem_ctx = talloc_new(module);
1477
1478                 data = talloc_zero(mem_ctx, struct partition_private_data);
1479                 if (data == NULL) {
1480                         talloc_free(mem_ctx);
1481                         return ldb_operr(ldb);
1482                 }
1483
1484                 /*
1485                  * When used from Samba4, this message is set by the
1486                  * samba4 module, as a fixed value not read from the
1487                  * DB.  This avoids listing modules in the DB
1488                  */
1489                 data->forced_module_msg = talloc_get_type(
1490                         ldb_get_opaque(ldb,
1491                                        DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1492                         struct ldb_message);
1493
1494                 ldb_module_set_private(module, talloc_steal(module,
1495                                                             data));
1496                 talloc_free(mem_ctx);
1497         }
1498
1499         /*
1500          * This will lock sam.ldb and will also call event loops,
1501          * so we do it before we get the whole db lock.
1502          */
1503         ret = partition_reload_if_required(module, data, NULL);
1504         if (ret != LDB_SUCCESS) {
1505                 return ret;
1506         }
1507
1508         /*
1509          * Order of read_lock calls must match that in partition_start_trans.
1510          * See comment in that function for detail.
1511          */
1512         ret = partition_metadata_read_lock(module);
1513         if (ret != LDB_SUCCESS) {
1514                 goto failed;
1515         }
1516
1517         /*
1518          * The top level DB (sam.ldb) lock is not enough to block another
1519          * process in prepare_commit(), because if nothing was changed in the
1520          * specific backend, then prepare_commit() is a no-op. Therefore the
1521          * metadata.tdb lock is taken out above, as it is the best we can do
1522          * right now.
1523          */
1524         ret = ldb_next_read_lock(module);
1525         if (ret != LDB_SUCCESS) {
1526                 ldb_debug_set(ldb,
1527                               LDB_DEBUG_FATAL,
1528                               "Failed to lock db: %s / %s for metadata partition",
1529                               ldb_errstring(ldb),
1530                               ldb_strerror(ret));
1531
1532                 return ret;
1533         }
1534
1535         /*
1536          * The following per partition locks are required mostly because TDB
1537          * and MDB require locks before reads are permitted.
1538          */
1539         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1540                 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1541                         ldb_debug(ldb, LDB_DEBUG_TRACE,
1542                                   "partition_read_lock() -> %s",
1543                                   ldb_dn_get_linearized(
1544                                           data->partitions[i]->ctrl->dn));
1545                 }
1546                 ret = ldb_next_read_lock(data->partitions[i]->module);
1547                 if (ret == LDB_SUCCESS) {
1548                         continue;
1549                 }
1550
1551                 ldb_debug_set(ldb,
1552                               LDB_DEBUG_FATAL,
1553                               "Failed to lock db: %s / %s for %s",
1554                               ldb_errstring(ldb),
1555                               ldb_strerror(ret),
1556                               ldb_dn_get_linearized(
1557                                       data->partitions[i]->ctrl->dn));
1558
1559                 goto failed;
1560         }
1561
1562         return LDB_SUCCESS;
1563
1564 failed:
1565         /* Back it out, if it fails on one */
1566         for (i--; i >= 0; i--) {
1567                 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1568                 if (ret2 != LDB_SUCCESS) {
1569                         ldb_debug(ldb,
1570                                   LDB_DEBUG_FATAL,
1571                                   "Failed to unlock db: %s / %s",
1572                                   ldb_errstring(ldb),
1573                                   ldb_strerror(ret2));
1574                 }
1575         }
1576         ret2 = ldb_next_read_unlock(module);
1577         if (ret2 != LDB_SUCCESS) {
1578                 ldb_debug(ldb,
1579                           LDB_DEBUG_FATAL,
1580                           "Failed to unlock db: %s / %s",
1581                           ldb_errstring(ldb),
1582                           ldb_strerror(ret2));
1583         }
1584         return ret;
1585 }
1586
1587 /* unlock all the backends */
1588 int partition_read_unlock(struct ldb_module *module)
1589 {
1590         int i;
1591         int ret = LDB_SUCCESS;
1592         int ret2;
1593         struct ldb_context *ldb = ldb_module_get_ctx(module);
1594         struct partition_private_data *data = \
1595                 talloc_get_type(ldb_module_get_private(module),
1596                                 struct partition_private_data);
1597         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1598
1599         /*
1600          * Order of read_unlock calls must be the reverse of that in
1601          * partition_start_trans. See comment in that function for detail.
1602          */
1603         if (data && data->partitions) {
1604                 /* Just counting the partitions */
1605                 for (i=0; data->partitions[i]; i++) {}
1606
1607                 /* now walk them backwards */
1608                 for (i--; i>=0; i--) {
1609                         struct dsdb_partition *p = data->partitions[i];
1610                         if (trace) {
1611                                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1612                                           "partition_read_unlock() -> %s",
1613                                           ldb_dn_get_linearized(p->ctrl->dn));
1614                         }
1615                         ret2 = ldb_next_read_unlock(p->module);
1616                         if (ret2 != LDB_SUCCESS) {
1617                                 ldb_debug_set(ldb,
1618                                            LDB_DEBUG_FATAL,
1619                                            "Failed to lock db: %s / %s for %s",
1620                                            ldb_errstring(ldb),
1621                                            ldb_strerror(ret),
1622                                            ldb_dn_get_linearized(p->ctrl->dn));
1623
1624                                 /*
1625                                  * Don't overwrite the original failure code
1626                                  * if there was one
1627                                  */
1628                                 if (ret == LDB_SUCCESS) {
1629                                         ret = ret2;
1630                                 }
1631                         }
1632                 }
1633         }
1634
1635         if (trace) {
1636                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1637                           "partition_read_unlock() -> (metadata partition)");
1638         }
1639
1640         ret2 = ldb_next_read_unlock(module);
1641         if (ret2 != LDB_SUCCESS) {
1642                 ldb_debug_set(ldb,
1643                               LDB_DEBUG_FATAL,
1644                               "Failed to unlock db: %s / %s for metadata partition",
1645                               ldb_errstring(ldb),
1646                               ldb_strerror(ret2));
1647
1648                 /*
1649                  * Don't overwrite the original failure code
1650                  * if there was one
1651                  */
1652                 if (ret == LDB_SUCCESS) {
1653                         ret = ret2;
1654                 }
1655         }
1656
1657         ret = partition_metadata_read_unlock(module);
1658
1659         /*
1660          * Don't overwrite the original failure code
1661          * if there was one
1662          */
1663         if (ret == LDB_SUCCESS) {
1664                 ret = ret2;
1665         }
1666
1667         return ret;
1668 }
1669
1670 /* extended */
1671 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1672 {
1673         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1674                                                               struct partition_private_data);
1675         struct partition_context *ac;
1676         int ret;
1677
1678         /* if we aren't initialised yet go further */
1679         if (!data) {
1680                 return ldb_next_request(module, req);
1681         }
1682
1683         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1684                 /* Update the metadata.tdb to increment the schema version if needed*/
1685                 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1686                 ret = partition_metadata_inc_schema_sequence(module);
1687                 return ldb_module_done(req, NULL, NULL, ret);
1688         }
1689         
1690         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1691                 return partition_sequence_number(module, req);
1692         }
1693
1694         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1695                 return partition_create(module, req);
1696         }
1697
1698         /* 
1699          * as the extended operation has no dn
1700          * we need to send it to all partitions
1701          */
1702
1703         ac = partition_init_ctx(module, req);
1704         if (!ac) {
1705                 return ldb_operr(ldb_module_get_ctx(module));
1706         }
1707
1708         return partition_send_all(module, ac, req);
1709 }
1710
1711 static const struct ldb_module_ops ldb_partition_module_ops = {
1712         .name              = "partition",
1713         .init_context      = partition_init,
1714         .search            = partition_search,
1715         .add               = partition_add,
1716         .modify            = partition_modify,
1717         .del               = partition_delete,
1718         .rename            = partition_rename,
1719         .extended          = partition_extended,
1720         .start_transaction = partition_start_trans,
1721         .prepare_commit    = partition_prepare_commit,
1722         .end_transaction   = partition_end_trans,
1723         .del_transaction   = partition_del_trans,
1724         .read_lock         = partition_read_lock,
1725         .read_unlock       = partition_read_unlock
1726 };
1727
1728 int ldb_partition_module_init(const char *version)
1729 {
1730         LDB_MODULE_CHECK_VERSION(version);
1731         return ldb_register_module(&ldb_partition_module_ops);
1732 }