dsdb: Remove dead code in partition_prep_request()
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> %s",
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140
141         if (!ares) {
142                 return ldb_module_done(ac->req, NULL, NULL,
143                                         LDB_ERR_OPERATIONS_ERROR);
144         }
145
146         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
147         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
148                 /* If we didn't fan this request out to mulitple partitions,
149                  * or this is an individual search result, we can
150                  * deterministically tell the caller what partition this was
151                  * written to (repl_meta_data likes to know) */
152                 ret = ldb_reply_add_control(ares,
153                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
154                                             false, partition_ctrl->data);
155                 if (ret != LDB_SUCCESS) {
156                         return ldb_module_done(ac->req, NULL, NULL,
157                                                ret);
158                 }
159         }
160
161         if (ares->error != LDB_SUCCESS) {
162                 return ldb_module_done(ac->req, ares->controls,
163                                         ares->response, ares->error);
164         }
165
166         switch (ares->type) {
167         case LDB_REPLY_REFERRAL:
168                 return ldb_module_send_referral(ac->req, ares->referral);
169
170         case LDB_REPLY_ENTRY:
171                 if (ac->req->operation != LDB_SEARCH) {
172                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
173                                 "partition_req_callback:"
174                                 " Unsupported reply type for this request");
175                         return ldb_module_done(ac->req, NULL, NULL,
176                                                 LDB_ERR_OPERATIONS_ERROR);
177                 }
178                 
179                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
180
181         case LDB_REPLY_DONE:
182                 if (ac->req->operation == LDB_EXTENDED) {
183                         /* FIXME: check for ares->response, replmd does not fill it ! */
184                         if (ares->response) {
185                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
186                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
187                                                           "partition_req_callback:"
188                                                           " Unknown extended reply, "
189                                                           "only supports START_TLS");
190                                         talloc_free(ares);
191                                         return ldb_module_done(ac->req, NULL, NULL,
192                                                                 LDB_ERR_OPERATIONS_ERROR);
193                                 }
194                         }
195                 }
196
197                 ac->finished_requests++;
198                 if (ac->finished_requests == ac->num_requests) {
199                         /* Send back referrals if they do exist (search ops) */
200                         if (ac->referrals != NULL) {
201                                 const char **ref;
202                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
203                                         ret = ldb_module_send_referral(ac->req,
204                                                                        talloc_strdup(ac->req, *ref));
205                                         if (ret != LDB_SUCCESS) {
206                                                 return ldb_module_done(ac->req, NULL, NULL,
207                                                                        ret);
208                                         }
209                                 }
210                         }
211
212                         /* this was the last one, call callback */
213                         return ldb_module_done(ac->req, ares->controls,
214                                                ares->response, 
215                                                ares->error);
216                 }
217
218                 /* not the last, now call the next one */
219                 module = ac->part_req[ac->finished_requests].module;
220                 nreq = ac->part_req[ac->finished_requests].req;
221
222                 ret = partition_request(module, nreq);
223                 if (ret != LDB_SUCCESS) {
224                         talloc_free(ares);
225                         return ldb_module_done(ac->req, NULL, NULL, ret);
226                 }
227
228                 break;
229         }
230
231         talloc_free(ares);
232         return LDB_SUCCESS;
233 }
234
235 static int partition_prep_request(struct partition_context *ac,
236                                   struct dsdb_partition *partition)
237 {
238         int ret;
239         struct ldb_request *req;
240         struct ldb_control *partition_ctrl = NULL;
241         void *part_data = NULL;
242
243         ac->part_req = talloc_realloc(ac, ac->part_req,
244                                         struct part_request,
245                                         ac->num_requests + 1);
246         if (ac->part_req == NULL) {
247                 return ldb_oom(ldb_module_get_ctx(ac->module));
248         }
249
250         switch (ac->req->operation) {
251         case LDB_SEARCH:
252                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
253                                         ac->part_req,
254                                         ac->req->op.search.base,
255                                         ac->req->op.search.scope,
256                                         ac->req->op.search.tree,
257                                         ac->req->op.search.attrs,
258                                         ac->req->controls,
259                                         ac, partition_req_callback,
260                                         ac->req);
261                 LDB_REQ_SET_LOCATION(req);
262                 break;
263         case LDB_ADD:
264                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265                                         ac->req->op.add.message,
266                                         ac->req->controls,
267                                         ac, partition_req_callback,
268                                         ac->req);
269                 LDB_REQ_SET_LOCATION(req);
270                 break;
271         case LDB_MODIFY:
272                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273                                         ac->req->op.mod.message,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 LDB_REQ_SET_LOCATION(req);
278                 break;
279         case LDB_DELETE:
280                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
281                                         ac->req->op.del.dn,
282                                         ac->req->controls,
283                                         ac, partition_req_callback,
284                                         ac->req);
285                 LDB_REQ_SET_LOCATION(req);
286                 break;
287         case LDB_RENAME:
288                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
289                                         ac->req->op.rename.olddn,
290                                         ac->req->op.rename.newdn,
291                                         ac->req->controls,
292                                         ac, partition_req_callback,
293                                         ac->req);
294                 LDB_REQ_SET_LOCATION(req);
295                 break;
296         case LDB_EXTENDED:
297                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298                                         ac->part_req,
299                                         ac->req->op.extended.oid,
300                                         ac->req->op.extended.data,
301                                         ac->req->controls,
302                                         ac, partition_req_callback,
303                                         ac->req);
304                 LDB_REQ_SET_LOCATION(req);
305                 break;
306         default:
307                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
308                                   "Unsupported request type!");
309                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
310         }
311
312         if (ret != LDB_SUCCESS) {
313                 return ret;
314         }
315
316         ac->part_req[ac->num_requests].req = req;
317
318         if (ac->req->controls) {
319                 /* Duplicate everything beside the current partition control */
320                 partition_ctrl = ldb_request_get_control(ac->req,
321                                                          DSDB_CONTROL_CURRENT_PARTITION_OID);
322                 if (!ldb_save_controls(partition_ctrl, req, NULL)) {
323                         return ldb_module_oom(ac->module);
324                 }
325         }
326
327         part_data = partition->ctrl;
328
329         ac->part_req[ac->num_requests].module = partition->module;
330
331         if (partition_ctrl != NULL) {
332                 if (partition_ctrl->data != NULL) {
333                         part_data = partition_ctrl->data;
334                 }
335
336                 /*
337                  * If the provided current partition control is without
338                  * data then use the calculated one.
339                  */
340                 ret = ldb_request_add_control(req,
341                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
342                                               false, part_data);
343                 if (ret != LDB_SUCCESS) {
344                         return ret;
345                 }
346         }
347
348         if (req->operation == LDB_SEARCH) {
349                 /* If the search is for 'more' than this partition,
350                  * then change the basedn, so a remote LDAP server
351                  * doesn't object */
352                 if (ldb_dn_compare_base(partition->ctrl->dn,
353                                         req->op.search.base) != 0) {
354                         req->op.search.base = partition->ctrl->dn;
355                 }
356         }
357
358         ac->num_requests++;
359
360         return LDB_SUCCESS;
361 }
362
363 static int partition_call_first(struct partition_context *ac)
364 {
365         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
366 }
367
368 /**
369  * Send a request down to all the partitions (but not the sam.ldb file)
370  */
371 static int partition_send_all(struct ldb_module *module, 
372                               struct partition_context *ac, 
373                               struct ldb_request *req) 
374 {
375         unsigned int i;
376         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
377                                                               struct partition_private_data);
378         int ret;
379
380         for (i=0; data && data->partitions && data->partitions[i]; i++) {
381                 ret = partition_prep_request(ac, data->partitions[i]);
382                 if (ret != LDB_SUCCESS) {
383                         return ret;
384                 }
385         }
386
387         /* fire the first one */
388         return partition_call_first(ac);
389 }
390
391 struct partition_copy_context {
392         struct ldb_module *module;
393         struct partition_context *partition_context;
394         struct ldb_request *request;
395         struct ldb_dn *dn;
396 };
397
398 /*
399  * A special DN has been updated in the primary partition. Now propagate those
400  * changes to the remaining partitions.
401  *
402  * Note: that the operations are asynchronous and this function is called
403  *       from partition_copy_all_callback_handler in response to an async
404  *       callback.
405  */
406 static int partition_copy_all_callback_action(
407         struct ldb_module *module,
408         struct partition_context *ac,
409         struct ldb_request *req,
410         struct ldb_dn *dn)
411
412 {
413
414         unsigned int i;
415         struct partition_private_data *data =
416                 talloc_get_type(
417                         ldb_module_get_private(module),
418                         struct partition_private_data);
419         int search_ret;
420         struct ldb_result *res;
421         /* now fetch the resulting object, and then copy it to all the
422          * other partitions. We need this approach to cope with the
423          * partitions getting out of sync. If for example the
424          * @ATTRIBUTES object exists on one partition but not the
425          * others then just doing each of the partitions in turn will
426          * lead to an error
427          */
428         search_ret = dsdb_module_search_dn(module, ac, &res, dn, NULL, DSDB_FLAG_NEXT_MODULE, req);
429         if (search_ret != LDB_SUCCESS) {
430                 return search_ret;
431         }
432
433         /* now delete the object in the other partitions, if requried
434         */
435         if (search_ret == LDB_ERR_NO_SUCH_OBJECT) {
436                 for (i=0; data->partitions && data->partitions[i]; i++) {
437                         int pret;
438                         pret = dsdb_module_del(data->partitions[i]->module,
439                                                dn,
440                                                DSDB_FLAG_NEXT_MODULE,
441                                                req);
442                         if (pret != LDB_SUCCESS && pret != LDB_ERR_NO_SUCH_OBJECT) {
443                                 /* we should only get success or no
444                                    such object from the other partitions */
445                                 return pret;
446                         }
447                 }
448
449                 return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
450         }
451
452         /* now add/modify in the other partitions */
453         for (i=0; data->partitions && data->partitions[i]; i++) {
454                 struct ldb_message *modify_msg = NULL;
455                 int pret;
456                 unsigned int el_idx;
457
458                 pret = dsdb_module_add(data->partitions[i]->module,
459                                        res->msgs[0],
460                                        DSDB_FLAG_NEXT_MODULE,
461                                        req);
462                 if (pret == LDB_SUCCESS) {
463                         continue;
464                 }
465
466                 if (pret != LDB_ERR_ENTRY_ALREADY_EXISTS) {
467                         return pret;
468                 }
469
470                 modify_msg = ldb_msg_copy(req, res->msgs[0]);
471                 if (modify_msg == NULL) {
472                         return ldb_module_oom(module);
473                 }
474
475                 /*
476                  * mark all the message elements as
477                  * LDB_FLAG_MOD_REPLACE
478                  */
479                 for (el_idx=0;
480                      el_idx < modify_msg->num_elements;
481                      el_idx++) {
482                         modify_msg->elements[el_idx].flags
483                                 = LDB_FLAG_MOD_REPLACE;
484                 }
485
486                 if (req->operation == LDB_MODIFY) {
487                         const struct ldb_message *req_msg = req->op.mod.message;
488                         /*
489                          * mark elements to be removed, if there were
490                          * deleted entirely above we need to delete
491                          * them here too
492                          */
493                         for (el_idx=0; el_idx < req_msg->num_elements; el_idx++) {
494                                 if (req_msg->elements[el_idx].flags & LDB_FLAG_MOD_DELETE
495                                     || ((req_msg->elements[el_idx].flags & LDB_FLAG_MOD_REPLACE) &&
496                                         req_msg->elements[el_idx].num_values == 0)) {
497                                         if (ldb_msg_find_element(modify_msg,
498                                                                  req_msg->elements[el_idx].name) != NULL) {
499                                                 continue;
500                                         }
501                                         pret = ldb_msg_add_empty(
502                                                 modify_msg,
503                                                 req_msg->elements[el_idx].name,
504                                                 LDB_FLAG_MOD_REPLACE,
505                                                 NULL);
506                                         if (pret != LDB_SUCCESS) {
507                                                 return pret;
508                                         }
509                                 }
510                         }
511                 }
512
513                 pret = dsdb_module_modify(data->partitions[i]->module,
514                                           modify_msg,
515                                           DSDB_FLAG_NEXT_MODULE,
516                                           req);
517
518                 if (pret != LDB_SUCCESS) {
519                         return pret;
520                 }
521         }
522
523         return ldb_module_done(req, NULL, NULL, LDB_SUCCESS);
524 }
525
526
527 /*
528  * @brief call back function for the ldb operations on special DN's.
529  *
530  * As the LDB operations are async, and we wish to use the result
531  * the operations, a callback needs to be registered to process the results
532  * of the LDB operations.
533  *
534  * @param req the ldb request
535  * @param res the result of the operation
536  *
537  * @return the LDB_STATUS
538  */
539 static int partition_copy_all_callback_handler(
540         struct ldb_request *req,
541         struct ldb_reply *ares)
542 {
543         struct partition_copy_context *ac = NULL;
544
545         ac = talloc_get_type(
546                 req->context,
547                 struct partition_copy_context);
548
549         if (!ares) {
550                 return ldb_module_done(
551                         ac->request,
552                         NULL,
553                         NULL,
554                         LDB_ERR_OPERATIONS_ERROR);
555         }
556
557         /* pass on to the callback */
558         switch (ares->type) {
559         case LDB_REPLY_ENTRY:
560                 return ldb_module_send_entry(
561                         ac->request,
562                         ares->message,
563                         ares->controls);
564
565         case LDB_REPLY_REFERRAL:
566                 return ldb_module_send_referral(
567                         ac->request,
568                         ares->referral);
569
570         case LDB_REPLY_DONE: {
571                 int error = ares->error;
572                 if (error == LDB_SUCCESS) {
573                         error = partition_copy_all_callback_action(
574                                 ac->module,
575                                 ac->partition_context,
576                                 ac->request,
577                                 ac->dn);
578                 }
579                 return ldb_module_done(
580                         ac->request,
581                         ares->controls,
582                         ares->response,
583                         error);
584         }
585
586         default:
587                 /* Can't happen */
588                 return LDB_ERR_OPERATIONS_ERROR;
589         }
590 }
591
592 /**
593  * send an operation to the top partition, then copy the resulting
594  * object to all other partitions.
595  */
596 static int partition_copy_all(
597         struct ldb_module *module,
598         struct partition_context *partition_context,
599         struct ldb_request *req,
600         struct ldb_dn *dn)
601 {
602         struct ldb_request *new_req = NULL;
603         struct ldb_context *ldb = NULL;
604         struct partition_copy_context *context = NULL;
605
606         int ret;
607
608         ldb = ldb_module_get_ctx(module);
609
610         context = talloc_zero(req, struct partition_copy_context);
611         if (context == NULL) {
612                 return ldb_oom(ldb);
613         }
614         context->module = module;
615         context->request = req;
616         context->dn = dn;
617         context->partition_context = partition_context;
618
619         switch (req->operation) {
620         case LDB_ADD:
621                 ret = ldb_build_add_req(
622                         &new_req,
623                         ldb,
624                         req,
625                         req->op.add.message,
626                         req->controls,
627                         context,
628                         partition_copy_all_callback_handler,
629                         req);
630                 break;
631         case LDB_MODIFY:
632                 ret = ldb_build_mod_req(
633                         &new_req,
634                         ldb,
635                         req,
636                         req->op.mod.message,
637                         req->controls,
638                         context,
639                         partition_copy_all_callback_handler,
640                         req);
641                 break;
642         case LDB_DELETE:
643                 ret = ldb_build_del_req(
644                         &new_req,
645                         ldb,
646                         req,
647                         req->op.del.dn,
648                         req->controls,
649                         context,
650                         partition_copy_all_callback_handler,
651                         req);
652                 break;
653         case LDB_RENAME:
654                 ret = ldb_build_rename_req(
655                         &new_req,
656                         ldb,
657                         req,
658                         req->op.rename.olddn,
659                         req->op.rename.newdn,
660                         req->controls,
661                         context,
662                         partition_copy_all_callback_handler,
663                         req);
664                 break;
665         default:
666                 /*
667                  * Shouldn't happen.
668                  */
669                 ldb_debug(
670                         ldb,
671                         LDB_DEBUG_ERROR,
672                         "Unexpected operation type (%d)\n", req->operation);
673                 ret = LDB_ERR_OPERATIONS_ERROR;
674                 break;
675         }
676         if (ret != LDB_SUCCESS) {
677                 return ret;
678         }
679         return ldb_next_request(module, new_req);
680 }
681 /**
682  * Figure out which backend a request needs to be aimed at.  Some
683  * requests must be replicated to all backends
684  */
685 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
686 {
687         struct partition_context *ac;
688         unsigned int i;
689         int ret;
690         struct dsdb_partition *partition;
691         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
692                                                               struct partition_private_data);
693
694         /* if we aren't initialised yet go further */
695         if (!data || !data->partitions) {
696                 return ldb_next_request(module, req);
697         }
698
699         if (ldb_dn_is_special(dn)) {
700                 /* Is this a special DN, we need to replicate to every backend? */
701                 for (i=0; data->replicate && data->replicate[i]; i++) {
702                         if (ldb_dn_compare(data->replicate[i], 
703                                            dn) == 0) {
704                                 
705                                 ac = partition_init_ctx(module, req);
706                                 if (!ac) {
707                                         return ldb_operr(ldb_module_get_ctx(module));
708                                 }
709                                 
710                                 return partition_copy_all(module, ac, req, dn);
711                         }
712                 }
713         }
714
715         /* Otherwise, we need to find the partition to fire it to */
716
717         /* Find partition */
718         partition = find_partition(data, dn, req);
719         if (!partition) {
720                 /*
721                  * if we haven't found a matching partition
722                  * pass the request to the main ldb
723                  *
724                  * TODO: we should maybe return an error here
725                  *       if it's not a special dn
726                  */
727
728                 return ldb_next_request(module, req);
729         }
730
731         ac = partition_init_ctx(module, req);
732         if (!ac) {
733                 return ldb_operr(ldb_module_get_ctx(module));
734         }
735
736         /* we need to add a control but we never touch the original request */
737         ret = partition_prep_request(ac, partition);
738         if (ret != LDB_SUCCESS) {
739                 return ret;
740         }
741
742         /* fire the first one */
743         return partition_call_first(ac);
744 }
745
746 /* search */
747 static int partition_search(struct ldb_module *module, struct ldb_request *req)
748 {
749         /* Find backend */
750         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
751                                                               struct partition_private_data);
752         struct partition_context *ac;
753         struct ldb_context *ldb;
754         struct loadparm_context *lp_ctx;
755
756         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
757         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
758         struct ldb_control *no_gc_control = ldb_request_get_control(req, DSDB_CONTROL_NO_GLOBAL_CATALOG);
759         
760         struct ldb_search_options_control *search_options = NULL;
761         struct dsdb_partition *p;
762         unsigned int i, j;
763         int ret;
764         bool domain_scope = false, phantom_root = false;
765
766         p = find_partition(data, NULL, req);
767         if (p != NULL) {
768                 /* the caller specified what partition they want the
769                  * search - just pass it on
770                  */
771                 return ldb_next_request(p->module, req);
772         }
773
774         /* Get back the search options from the search control, and mark it as
775          * non-critical (to make backends and also dcpromo happy).
776          */
777         if (search_control) {
778                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
779                 search_control->critical = 0;
780
781         }
782
783         /* if we aren't initialised yet go further */
784         if (!data || !data->partitions) {
785                 return ldb_next_request(module, req);
786         }
787
788         /* Special DNs without specified partition should go further */
789         if (ldb_dn_is_special(req->op.search.base)) {
790                 return ldb_next_request(module, req);
791         }
792
793         /* Locate the options */
794         domain_scope = (search_options
795                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
796                 || domain_scope_control;
797         phantom_root = search_options
798                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
799
800         /* Remove handled options from the search control flag */
801         if (search_options) {
802                 search_options->search_options = search_options->search_options
803                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
804                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
805         }
806
807         ac = partition_init_ctx(module, req);
808         if (!ac) {
809                 return ldb_operr(ldb_module_get_ctx(module));
810         }
811
812         ldb = ldb_module_get_ctx(ac->module);
813         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
814                                                 struct loadparm_context);
815
816         /* Search from the base DN */
817         if (ldb_dn_is_null(req->op.search.base)) {
818                 if (!phantom_root) {
819                         return ldb_error(ldb, LDB_ERR_NO_SUCH_OBJECT, "empty base DN");
820                 }
821                 return partition_send_all(module, ac, req);
822         }
823
824         for (i=0; data->partitions[i]; i++) {
825                 bool match = false, stop = false;
826
827                 if (data->partitions[i]->partial_replica && no_gc_control != NULL) {
828                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
829                                                 req->op.search.base) == 0) {
830                                 /* base DN is in a partial replica
831                                    with the NO_GLOBAL_CATALOG
832                                    control. This partition is invisible */
833                                 /* DEBUG(0,("DENYING NON-GC OP: %s\n", ldb_module_call_chain(req, req))); */
834                                 continue;
835                         }
836                 }
837
838                 if (phantom_root) {
839                         /* Phantom root: Find all partitions under the
840                          * search base. We match if:
841                          *
842                          * 1) the DN we are looking for exactly matches a
843                          *    certain partition and always stop
844                          * 2) the DN we are looking for is a parent of certain
845                          *    partitions and it isn't a scope base search
846                          * 3) the DN we are looking for is a child of a certain
847                          *    partition and always stop
848                          *    - we don't need to go any further up in the
849                          *    hierarchy!
850                          */
851                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
852                                            req->op.search.base) == 0) {
853                                 match = true;
854                                 stop = true;
855                         }
856                         if (!match &&
857                             (ldb_dn_compare_base(req->op.search.base,
858                                                  data->partitions[i]->ctrl->dn) == 0 &&
859                              req->op.search.scope != LDB_SCOPE_BASE)) {
860                                 match = true;
861                         }
862                         if (!match &&
863                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
864                                                 req->op.search.base) == 0) {
865                                 match = true;
866                                 stop = true; /* note that this relies on partition ordering */
867                         }
868                 } else {
869                         /* Domain scope: Find all partitions under the search
870                          * base.
871                          *
872                          * We generate referral candidates if we haven't
873                          * specified the domain scope control, haven't a base
874                          * search* scope and the DN we are looking for is a real
875                          * predecessor of certain partitions. When a new
876                          * referral candidate is nearer to the DN than an
877                          * existing one delete the latter (we want to have only
878                          * the closest ones). When we checked this for all
879                          * candidates we have the final referrals.
880                          *
881                          * We match if the DN we are looking for is a child of
882                          * a certain partition or the partition
883                          * DN itself - we don't need to go any further
884                          * up in the hierarchy!
885                          */
886                         if ((!domain_scope) &&
887                             (req->op.search.scope != LDB_SCOPE_BASE) &&
888                             (ldb_dn_compare_base(req->op.search.base,
889                                                  data->partitions[i]->ctrl->dn) == 0) &&
890                             (ldb_dn_compare(req->op.search.base,
891                                             data->partitions[i]->ctrl->dn) != 0)) {
892                                 const char *scheme = ldb_get_opaque(
893                                     ldb, LDAP_REFERRAL_SCHEME_OPAQUE);
894                                 char *ref = talloc_asprintf(
895                                         ac,
896                                         "%s://%s/%s%s",
897                                         scheme == NULL ? "ldap" : scheme,
898                                         lpcfg_dnsdomain(lp_ctx),
899                                         ldb_dn_get_linearized(
900                                             data->partitions[i]->ctrl->dn),
901                                         req->op.search.scope ==
902                                             LDB_SCOPE_ONELEVEL ? "??base" : "");
903
904                                 if (ref == NULL) {
905                                         return ldb_oom(ldb);
906                                 }
907
908                                 /* Initialise the referrals list */
909                                 if (ac->referrals == NULL) {
910                                         char **l = str_list_make_empty(ac);
911                                         ac->referrals = discard_const_p(const char *, l);
912                                         if (ac->referrals == NULL) {
913                                                 return ldb_oom(ldb);
914                                         }
915                                 }
916
917                                 /* Check if the new referral candidate is
918                                  * closer to the base DN than already
919                                  * saved ones and delete the latters */
920                                 j = 0;
921                                 while (ac->referrals[j] != NULL) {
922                                         if (strstr(ac->referrals[j],
923                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
924                                                 str_list_remove(ac->referrals,
925                                                                 ac->referrals[j]);
926                                         } else {
927                                                 ++j;
928                                         }
929                                 }
930
931                                 /* Add our new candidate */
932                                 ac->referrals = str_list_add(ac->referrals, ref);
933
934                                 talloc_free(ref);
935
936                                 if (ac->referrals == NULL) {
937                                         return ldb_oom(ldb);
938                                 }
939                         }
940                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
941                                 match = true;
942                                 stop = true; /* note that this relies on partition ordering */
943                         }
944                 }
945
946                 if (match) {
947                         ret = partition_prep_request(ac, data->partitions[i]);
948                         if (ret != LDB_SUCCESS) {
949                                 return ret;
950                         }
951                 }
952
953                 if (stop) break;
954         }
955
956         /* Perhaps we didn't match any partitions. Try the main partition */
957         if (ac->num_requests == 0) {
958                 talloc_free(ac);
959                 return ldb_next_request(module, req);
960         }
961
962         /* fire the first one */
963         return partition_call_first(ac);
964 }
965
966 /* add */
967 static int partition_add(struct ldb_module *module, struct ldb_request *req)
968 {
969         return partition_replicate(module, req, req->op.add.message->dn);
970 }
971
972 /* modify */
973 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
974 {
975         return partition_replicate(module, req, req->op.mod.message->dn);
976 }
977
978 /* delete */
979 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
980 {
981         return partition_replicate(module, req, req->op.del.dn);
982 }
983
984 /* rename */
985 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
986 {
987         /* Find backend */
988         struct dsdb_partition *backend, *backend2;
989         
990         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
991                                                               struct partition_private_data);
992
993         /* Skip the lot if 'data' isn't here yet (initialisation) */
994         if (!data) {
995                 return ldb_operr(ldb_module_get_ctx(module));
996         }
997
998         backend = find_partition(data, req->op.rename.olddn, req);
999         backend2 = find_partition(data, req->op.rename.newdn, req);
1000
1001         if ((backend && !backend2) || (!backend && backend2)) {
1002                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1003         }
1004
1005         if (backend != backend2) {
1006                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
1007                                        "Cannot rename from %s in %s to %s in %s: %s",
1008                                        ldb_dn_get_linearized(req->op.rename.olddn),
1009                                        ldb_dn_get_linearized(backend->ctrl->dn),
1010                                        ldb_dn_get_linearized(req->op.rename.newdn),
1011                                        ldb_dn_get_linearized(backend2->ctrl->dn),
1012                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
1013                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
1014         }
1015
1016         return partition_replicate(module, req, req->op.rename.olddn);
1017 }
1018
1019 /* start a transaction */
1020 int partition_start_trans(struct ldb_module *module)
1021 {
1022         int i = 0;
1023         int ret = 0;
1024         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1025                                                               struct partition_private_data);
1026         /* Look at base DN */
1027         /* Figure out which partition it is under */
1028         /* Skip the lot if 'data' isn't here yet (initialization) */
1029         if (ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING) {
1030                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
1031         }
1032
1033         /*
1034          * We start a transaction on metadata.tdb first and end it last in
1035          * end_trans. This makes locking semantics follow TDB rather than MDB,
1036          * and effectively locks all partitions at once.
1037          * Detail:
1038          * Samba AD is special in that the partitions module (this file)
1039          * combines multiple independently locked databases into one overall
1040          * transaction. Changes across multiple partition DBs in a single
1041          * transaction must ALL be either visible or invisible.
1042          * The way this is achieved is by taking out a write lock on
1043          * metadata.tdb at the start of prepare_commit, while unlocking it at
1044          * the end of end_trans. This is matched by read_lock, ensuring it
1045          * can't progress until that write lock is released.
1046          *
1047          * metadata.tdb needs to be a TDB file because MDB uses independent
1048          * locks, which means a read lock and a write lock can be held at the
1049          * same time, whereas in TDB, the two locks block each other. The TDB
1050          * behaviour is required to implement the functionality described
1051          * above.
1052          *
1053          * An important additional detail here is that if prepare_commit is
1054          * called on a TDB without any changes being made, no write lock is
1055          * taken. We address this by storing a sequence number in metadata.tdb
1056          * which is updated every time a replicated attribute is modified.
1057          * The possibility of a few unreplicated attributes being out of date
1058          * turns out not to be a problem.
1059          * For this reason, a lock on sam.ldb (which is a TDB) won't achieve
1060          * the same end as locking metadata.tdb, unless we made a modification
1061          * to the @ records found there before every prepare_commit.
1062          */
1063         ret = partition_metadata_start_trans(module);
1064         if (ret != LDB_SUCCESS) {
1065                 return ret;
1066         }
1067
1068         ret = ldb_next_start_trans(module);
1069         if (ret != LDB_SUCCESS) {
1070                 partition_metadata_del_trans(module);
1071                 return ret;
1072         }
1073
1074         ret = partition_reload_if_required(module, data, NULL);
1075         if (ret != LDB_SUCCESS) {
1076                 ldb_next_del_trans(module);
1077                 partition_metadata_del_trans(module);
1078                 return ret;
1079         }
1080
1081         /*
1082          * The following per partition locks are required mostly because TDB
1083          * and MDB require locks before read and write ops are permitted.
1084          */
1085         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1086                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1087                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_start_trans() -> %s",
1088                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1089                 }
1090                 ret = ldb_next_start_trans(data->partitions[i]->module);
1091                 if (ret != LDB_SUCCESS) {
1092                         /* Back it out, if it fails on one */
1093                         for (i--; i >= 0; i--) {
1094                                 ldb_next_del_trans(data->partitions[i]->module);
1095                         }
1096                         ldb_next_del_trans(module);
1097                         partition_metadata_del_trans(module);
1098                         return ret;
1099                 }
1100         }
1101
1102         data->in_transaction++;
1103
1104         return LDB_SUCCESS;
1105 }
1106
1107 /* prepare for a commit */
1108 int partition_prepare_commit(struct ldb_module *module)
1109 {
1110         unsigned int i;
1111         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1112                                                               struct partition_private_data);
1113         int ret;
1114
1115         /*
1116          * Order of prepare_commit calls must match that in
1117          * partition_start_trans. See comment in that function for detail.
1118          */
1119         ret = partition_metadata_prepare_commit(module);
1120         if (ret != LDB_SUCCESS) {
1121                 return ret;
1122         }
1123
1124         ret = ldb_next_prepare_commit(module);
1125         if (ret != LDB_SUCCESS) {
1126                 return ret;
1127         }
1128
1129         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1130                 if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1131                         ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s",
1132                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
1133                 }
1134                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
1135                 if (ret != LDB_SUCCESS) {
1136                         ldb_asprintf_errstring(ldb_module_get_ctx(module), "prepare_commit error on %s: %s",
1137                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
1138                                                ldb_errstring(ldb_module_get_ctx(module)));
1139                         return ret;
1140                 }
1141         }
1142
1143         if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
1144                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
1145         }
1146
1147         return LDB_SUCCESS;
1148 }
1149
1150
1151 /* end a transaction */
1152 int partition_end_trans(struct ldb_module *module)
1153 {
1154         int ret, ret2;
1155         int i;
1156         struct ldb_context *ldb = ldb_module_get_ctx(module);
1157         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1158                                                               struct partition_private_data);
1159         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1160
1161         ret = LDB_SUCCESS;
1162
1163         if (data->in_transaction == 0) {
1164                 DEBUG(0,("partition end transaction mismatch\n"));
1165                 ret = LDB_ERR_OPERATIONS_ERROR;
1166         } else {
1167                 data->in_transaction--;
1168         }
1169
1170         /*
1171          * Order of end_trans calls must be the reverse of that in
1172          * partition_start_trans. See comment in that function for detail.
1173          */
1174         if (data && data->partitions) {
1175                 /* Just counting the partitions */
1176                 for (i=0; data->partitions[i]; i++) {}
1177
1178                 /* now walk them backwards */
1179                 for (i--; i>=0; i--) {
1180                         struct dsdb_partition *p = data->partitions[i];
1181                         if (trace) {
1182                                 ldb_debug(ldb,
1183                                           LDB_DEBUG_TRACE,
1184                                           "partition_end_trans() -> %s",
1185                                           ldb_dn_get_linearized(p->ctrl->dn));
1186                         }
1187                         ret2 = ldb_next_end_trans(p->module);
1188                         if (ret2 != LDB_SUCCESS) {
1189                                 ldb_asprintf_errstring(ldb,
1190                                         "end_trans error on %s: %s",
1191                                         ldb_dn_get_linearized(p->ctrl->dn),
1192                                         ldb_errstring(ldb));
1193                                 ret = ret2;
1194                         }
1195                 }
1196         }
1197
1198         if (trace) {
1199                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
1200         }
1201         ret2 = ldb_next_end_trans(module);
1202         if (ret2 != LDB_SUCCESS) {
1203                 ret = ret2;
1204         }
1205
1206         ret2 = partition_metadata_end_trans(module);
1207         if (ret2 != LDB_SUCCESS) {
1208                 ret = ret2;
1209         }
1210
1211         return ret;
1212 }
1213
1214 /* delete a transaction */
1215 int partition_del_trans(struct ldb_module *module)
1216 {
1217         int ret, final_ret = LDB_SUCCESS;
1218         int i;
1219         struct ldb_context *ldb = ldb_module_get_ctx(module);
1220         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1221                                                               struct partition_private_data);
1222         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1223
1224         if (data == NULL) {
1225                 DEBUG(0,("partion delete transaction with no private data\n"));
1226                 return ldb_operr(ldb);
1227         }
1228
1229         /*
1230          * Order of del_trans calls must be the reverse of that in
1231          * partition_start_trans. See comment in that function for detail.
1232          */
1233         if (data->partitions) {
1234                 /* Just counting the partitions */
1235                 for (i=0; data->partitions[i]; i++) {}
1236
1237                 /* now walk them backwards */
1238                 for (i--; i>=0; i--) {
1239                         struct dsdb_partition *p = data->partitions[i];
1240                         if (trace) {
1241                                 ldb_debug(ldb,
1242                                           LDB_DEBUG_TRACE,
1243                                           "partition_del_trans() -> %s",
1244                                           ldb_dn_get_linearized(p->ctrl->dn));
1245                         }
1246                         ret = ldb_next_del_trans(p->module);
1247                         if (ret != LDB_SUCCESS) {
1248                                 ldb_asprintf_errstring(ldb,
1249                                         "del_trans error on %s: %s",
1250                                         ldb_dn_get_linearized(p->ctrl->dn),
1251                                         ldb_errstring(ldb));
1252                                 final_ret = ret;
1253                         }
1254                 }
1255         }
1256
1257         if (trace) {
1258                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
1259         }
1260         ret = ldb_next_del_trans(module);
1261         if (ret != LDB_SUCCESS) {
1262                 final_ret = ret;
1263         }
1264
1265         ret = partition_metadata_del_trans(module);
1266         if (ret != LDB_SUCCESS) {
1267                 final_ret = ret;
1268         }
1269
1270         if (data->in_transaction == 0) {
1271                 DEBUG(0,("partition del transaction mismatch\n"));
1272                 return ldb_operr(ldb_module_get_ctx(module));
1273         }
1274         data->in_transaction--;
1275
1276         return final_ret;
1277 }
1278
1279 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
1280                                       uint64_t *seq_number,
1281                                       struct ldb_request *parent)
1282 {
1283         int ret;
1284         struct ldb_result *res;
1285         struct ldb_seqnum_request *tseq;
1286         struct ldb_seqnum_result *seqr;
1287
1288         tseq = talloc_zero(mem_ctx, struct ldb_seqnum_request);
1289         if (tseq == NULL) {
1290                 return ldb_oom(ldb_module_get_ctx(module));
1291         }
1292         tseq->type = LDB_SEQ_HIGHEST_SEQ;
1293         
1294         ret = dsdb_module_extended(module, tseq, &res,
1295                                    LDB_EXTENDED_SEQUENCE_NUMBER,
1296                                    tseq,
1297                                    DSDB_FLAG_NEXT_MODULE,
1298                                    parent);
1299         if (ret != LDB_SUCCESS) {
1300                 talloc_free(tseq);
1301                 return ret;
1302         }
1303         
1304         seqr = talloc_get_type_abort(res->extended->data,
1305                                      struct ldb_seqnum_result);
1306         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1307                 talloc_free(res);
1308                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1309                         "Primary backend in partition module returned a timestamp based seq");
1310         }
1311
1312         *seq_number = seqr->seq_num;
1313         talloc_free(tseq);
1314         return LDB_SUCCESS;
1315 }
1316
1317
1318 /*
1319  * Older version of sequence number as sum of sequence numbers for each partition
1320  */
1321 int partition_sequence_number_from_partitions(struct ldb_module *module,
1322                                               uint64_t *seqr)
1323 {
1324         int ret;
1325         unsigned int i;
1326         uint64_t seq_number = 0;
1327         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1328                                                               struct partition_private_data);
1329
1330         ret = partition_primary_sequence_number(module, data, &seq_number, NULL);
1331         if (ret != LDB_SUCCESS) {
1332                 return ret;
1333         }
1334         
1335         /* Skip the lot if 'data' isn't here yet (initialisation) */
1336         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1337                 struct ldb_seqnum_request *tseq;
1338                 struct ldb_seqnum_result *tseqr;
1339                 struct ldb_request *treq;
1340                 struct ldb_result *res = talloc_zero(data, struct ldb_result);
1341                 if (res == NULL) {
1342                         return ldb_oom(ldb_module_get_ctx(module));
1343                 }
1344                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1345                 if (tseq == NULL) {
1346                         talloc_free(res);
1347                         return ldb_oom(ldb_module_get_ctx(module));
1348                 }
1349                 tseq->type = LDB_SEQ_HIGHEST_SEQ;
1350                 
1351                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1352                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1353                                              tseq,
1354                                              NULL,
1355                                              res,
1356                                              ldb_extended_default_callback,
1357                                              NULL);
1358                 LDB_REQ_SET_LOCATION(treq);
1359                 if (ret != LDB_SUCCESS) {
1360                         talloc_free(res);
1361                         return ret;
1362                 }
1363                 
1364                 ret = partition_request(data->partitions[i]->module, treq);
1365                 if (ret != LDB_SUCCESS) {
1366                         talloc_free(res);
1367                         return ret;
1368                 }
1369                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1370                 if (ret != LDB_SUCCESS) {
1371                         talloc_free(res);
1372                         return ret;
1373                 }
1374                 tseqr = talloc_get_type(res->extended->data,
1375                                         struct ldb_seqnum_result);
1376                 seq_number += tseqr->seq_num;
1377                 talloc_free(res);
1378         }
1379
1380         *seqr = seq_number;
1381         return LDB_SUCCESS;
1382 }
1383
1384
1385 /*
1386  * Newer version of sequence number using metadata tdb
1387  */
1388 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
1389 {
1390         struct ldb_extended *ext;
1391         struct ldb_seqnum_request *seq;
1392         struct ldb_seqnum_result *seqr;
1393         uint64_t seq_number;
1394         int ret;
1395
1396         seq = talloc_get_type_abort(req->op.extended.data, struct ldb_seqnum_request);
1397         switch (seq->type) {
1398         case LDB_SEQ_NEXT:
1399                 ret = partition_metadata_sequence_number_increment(module, &seq_number);
1400                 if (ret != LDB_SUCCESS) {
1401                         return ret;
1402                 }
1403                 break;
1404
1405         case LDB_SEQ_HIGHEST_SEQ:
1406                 ret = partition_metadata_sequence_number(module, &seq_number);
1407                 if (ret != LDB_SUCCESS) {
1408                         return ret;
1409                 }
1410                 break;
1411
1412         case LDB_SEQ_HIGHEST_TIMESTAMP:
1413                 return ldb_module_error(module, LDB_ERR_OPERATIONS_ERROR,
1414                                         "LDB_SEQ_HIGHEST_TIMESTAMP not supported");
1415         }
1416
1417         ext = talloc_zero(req, struct ldb_extended);
1418         if (!ext) {
1419                 return ldb_module_oom(module);
1420         }
1421         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1422         if (seqr == NULL) {
1423                 talloc_free(ext);
1424                 return ldb_module_oom(module);
1425         }
1426         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1427         ext->data = seqr;
1428
1429         seqr->seq_num = seq_number;
1430         seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1431
1432         /* send request done */
1433         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1434 }
1435
1436 /* lock all the backends */
1437 int partition_read_lock(struct ldb_module *module)
1438 {
1439         int i = 0;
1440         int ret = 0;
1441         int ret2 = 0;
1442         struct ldb_context *ldb = ldb_module_get_ctx(module);
1443         struct partition_private_data *data = \
1444                 talloc_get_type(ldb_module_get_private(module),
1445                                 struct partition_private_data);
1446
1447         if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
1448                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1449                           "partition_read_lock() -> (metadata partition)");
1450         }
1451
1452         /*
1453          * It is important to only do this for LOCK because:
1454          * - we don't want to unlock what we did not lock
1455          *
1456          * - we don't want to make a new lock on the sam.ldb
1457          *   (triggered inside this routine due to the seq num check)
1458          *   during an unlock phase as that will violate the lock
1459          *   ordering
1460          */
1461
1462         if (data == NULL) {
1463                 TALLOC_CTX *mem_ctx = talloc_new(module);
1464
1465                 data = talloc_zero(mem_ctx, struct partition_private_data);
1466                 if (data == NULL) {
1467                         talloc_free(mem_ctx);
1468                         return ldb_operr(ldb);
1469                 }
1470
1471                 /*
1472                  * When used from Samba4, this message is set by the
1473                  * samba4 module, as a fixed value not read from the
1474                  * DB.  This avoids listing modules in the DB
1475                  */
1476                 data->forced_module_msg = talloc_get_type(
1477                         ldb_get_opaque(ldb,
1478                                        DSDB_OPAQUE_PARTITION_MODULE_MSG_OPAQUE_NAME),
1479                         struct ldb_message);
1480
1481                 ldb_module_set_private(module, talloc_steal(module,
1482                                                             data));
1483                 talloc_free(mem_ctx);
1484         }
1485
1486         /*
1487          * This will lock sam.ldb and will also call event loops,
1488          * so we do it before we get the whole db lock.
1489          */
1490         ret = partition_reload_if_required(module, data, NULL);
1491         if (ret != LDB_SUCCESS) {
1492                 return ret;
1493         }
1494
1495         /*
1496          * Order of read_lock calls must match that in partition_start_trans.
1497          * See comment in that function for detail.
1498          */
1499         ret = partition_metadata_read_lock(module);
1500         if (ret != LDB_SUCCESS) {
1501                 goto failed;
1502         }
1503
1504         /*
1505          * The top level DB (sam.ldb) lock is not enough to block another
1506          * process in prepare_commit(), because if nothing was changed in the
1507          * specific backend, then prepare_commit() is a no-op. Therefore the
1508          * metadata.tdb lock is taken out above, as it is the best we can do
1509          * right now.
1510          */
1511         ret = ldb_next_read_lock(module);
1512         if (ret != LDB_SUCCESS) {
1513                 ldb_debug_set(ldb,
1514                               LDB_DEBUG_FATAL,
1515                               "Failed to lock db: %s / %s for metadata partition",
1516                               ldb_errstring(ldb),
1517                               ldb_strerror(ret));
1518
1519                 return ret;
1520         }
1521
1522         /*
1523          * The following per partition locks are required mostly because TDB
1524          * and MDB require locks before reads are permitted.
1525          */
1526         for (i=0; data && data->partitions && data->partitions[i]; i++) {
1527                 if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
1528                         ldb_debug(ldb, LDB_DEBUG_TRACE,
1529                                   "partition_read_lock() -> %s",
1530                                   ldb_dn_get_linearized(
1531                                           data->partitions[i]->ctrl->dn));
1532                 }
1533                 ret = ldb_next_read_lock(data->partitions[i]->module);
1534                 if (ret == LDB_SUCCESS) {
1535                         continue;
1536                 }
1537
1538                 ldb_debug_set(ldb,
1539                               LDB_DEBUG_FATAL,
1540                               "Failed to lock db: %s / %s for %s",
1541                               ldb_errstring(ldb),
1542                               ldb_strerror(ret),
1543                               ldb_dn_get_linearized(
1544                                       data->partitions[i]->ctrl->dn));
1545
1546                 goto failed;
1547         }
1548
1549         return LDB_SUCCESS;
1550
1551 failed:
1552         /* Back it out, if it fails on one */
1553         for (i--; i >= 0; i--) {
1554                 ret2 = ldb_next_read_unlock(data->partitions[i]->module);
1555                 if (ret2 != LDB_SUCCESS) {
1556                         ldb_debug(ldb,
1557                                   LDB_DEBUG_FATAL,
1558                                   "Failed to unlock db: %s / %s",
1559                                   ldb_errstring(ldb),
1560                                   ldb_strerror(ret2));
1561                 }
1562         }
1563         ret2 = ldb_next_read_unlock(module);
1564         if (ret2 != LDB_SUCCESS) {
1565                 ldb_debug(ldb,
1566                           LDB_DEBUG_FATAL,
1567                           "Failed to unlock db: %s / %s",
1568                           ldb_errstring(ldb),
1569                           ldb_strerror(ret2));
1570         }
1571         return ret;
1572 }
1573
1574 /* unlock all the backends */
1575 int partition_read_unlock(struct ldb_module *module)
1576 {
1577         int i;
1578         int ret = LDB_SUCCESS;
1579         int ret2;
1580         struct ldb_context *ldb = ldb_module_get_ctx(module);
1581         struct partition_private_data *data = \
1582                 talloc_get_type(ldb_module_get_private(module),
1583                                 struct partition_private_data);
1584         bool trace = module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING;
1585
1586         /*
1587          * Order of read_unlock calls must be the reverse of that in
1588          * partition_start_trans. See comment in that function for detail.
1589          */
1590         if (data && data->partitions) {
1591                 /* Just counting the partitions */
1592                 for (i=0; data->partitions[i]; i++) {}
1593
1594                 /* now walk them backwards */
1595                 for (i--; i>=0; i--) {
1596                         struct dsdb_partition *p = data->partitions[i];
1597                         if (trace) {
1598                                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1599                                           "partition_read_unlock() -> %s",
1600                                           ldb_dn_get_linearized(p->ctrl->dn));
1601                         }
1602                         ret2 = ldb_next_read_unlock(p->module);
1603                         if (ret2 != LDB_SUCCESS) {
1604                                 ldb_debug_set(ldb,
1605                                            LDB_DEBUG_FATAL,
1606                                            "Failed to lock db: %s / %s for %s",
1607                                            ldb_errstring(ldb),
1608                                            ldb_strerror(ret),
1609                                            ldb_dn_get_linearized(p->ctrl->dn));
1610
1611                                 /*
1612                                  * Don't overwrite the original failure code
1613                                  * if there was one
1614                                  */
1615                                 if (ret == LDB_SUCCESS) {
1616                                         ret = ret2;
1617                                 }
1618                         }
1619                 }
1620         }
1621
1622         if (trace) {
1623                 ldb_debug(ldb, LDB_DEBUG_TRACE,
1624                           "partition_read_unlock() -> (metadata partition)");
1625         }
1626
1627         ret2 = ldb_next_read_unlock(module);
1628         if (ret2 != LDB_SUCCESS) {
1629                 ldb_debug_set(ldb,
1630                               LDB_DEBUG_FATAL,
1631                               "Failed to unlock db: %s / %s for metadata partition",
1632                               ldb_errstring(ldb),
1633                               ldb_strerror(ret2));
1634
1635                 /*
1636                  * Don't overwrite the original failure code
1637                  * if there was one
1638                  */
1639                 if (ret == LDB_SUCCESS) {
1640                         ret = ret2;
1641                 }
1642         }
1643
1644         ret = partition_metadata_read_unlock(module);
1645
1646         /*
1647          * Don't overwrite the original failure code
1648          * if there was one
1649          */
1650         if (ret == LDB_SUCCESS) {
1651                 ret = ret2;
1652         }
1653
1654         return ret;
1655 }
1656
1657 /* extended */
1658 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1659 {
1660         struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
1661                                                               struct partition_private_data);
1662         struct partition_context *ac;
1663         int ret;
1664
1665         /* if we aren't initialised yet go further */
1666         if (!data) {
1667                 return ldb_next_request(module, req);
1668         }
1669
1670         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1671                 /* Update the metadata.tdb to increment the schema version if needed*/
1672                 DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
1673                 ret = partition_metadata_inc_schema_sequence(module);
1674                 return ldb_module_done(req, NULL, NULL, ret);
1675         }
1676         
1677         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1678                 return partition_sequence_number(module, req);
1679         }
1680
1681         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1682                 return partition_create(module, req);
1683         }
1684
1685         /* 
1686          * as the extended operation has no dn
1687          * we need to send it to all partitions
1688          */
1689
1690         ac = partition_init_ctx(module, req);
1691         if (!ac) {
1692                 return ldb_operr(ldb_module_get_ctx(module));
1693         }
1694
1695         return partition_send_all(module, ac, req);
1696 }
1697
1698 static const struct ldb_module_ops ldb_partition_module_ops = {
1699         .name              = "partition",
1700         .init_context      = partition_init,
1701         .search            = partition_search,
1702         .add               = partition_add,
1703         .modify            = partition_modify,
1704         .del               = partition_delete,
1705         .rename            = partition_rename,
1706         .extended          = partition_extended,
1707         .start_transaction = partition_start_trans,
1708         .prepare_commit    = partition_prepare_commit,
1709         .end_transaction   = partition_end_trans,
1710         .del_transaction   = partition_del_trans,
1711         .read_lock         = partition_read_lock,
1712         .read_unlock       = partition_read_unlock
1713 };
1714
1715 int ldb_partition_module_init(const char *version)
1716 {
1717         LDB_MODULE_CHECK_VERSION(version);
1718         return ldb_register_module(&ldb_partition_module_ops);
1719 }