ldb: mark the location of a lot more ldb requests
[nivanova/samba-autobuild/.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct partition_private_data *data;
138         struct ldb_control *partition_ctrl;
139
140         ac = talloc_get_type(req->context, struct partition_context);
141         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
142
143         if (!ares) {
144                 return ldb_module_done(ac->req, NULL, NULL,
145                                         LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150                 /* If we didn't fan this request out to mulitple partitions,
151                  * or this is an individual search result, we can
152                  * deterministily tell the caller what partition this was
153                  * written to (repl_meta_data likes to know) */
154                 ret = ldb_reply_add_control(ares,
155                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
156                                             false, partition_ctrl->data);
157                 if (ret != LDB_SUCCESS) {
158                         return ldb_module_done(ac->req, NULL, NULL,
159                                                ret);
160                 }
161         }
162
163         if (ares->error != LDB_SUCCESS) {
164                 return ldb_module_done(ac->req, ares->controls,
165                                         ares->response, ares->error);
166         }
167
168         switch (ares->type) {
169         case LDB_REPLY_REFERRAL:
170                 return ldb_module_send_referral(ac->req, ares->referral);
171
172         case LDB_REPLY_ENTRY:
173                 if (ac->req->operation != LDB_SEARCH) {
174                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
175                                 "partition_req_callback:"
176                                 " Unsupported reply type for this request");
177                         return ldb_module_done(ac->req, NULL, NULL,
178                                                 LDB_ERR_OPERATIONS_ERROR);
179                 }
180                 
181                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182
183         case LDB_REPLY_DONE:
184                 if (ac->req->operation == LDB_EXTENDED) {
185                         /* FIXME: check for ares->response, replmd does not fill it ! */
186                         if (ares->response) {
187                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
189                                                           "partition_req_callback:"
190                                                           " Unknown extended reply, "
191                                                           "only supports START_TLS");
192                                         talloc_free(ares);
193                                         return ldb_module_done(ac->req, NULL, NULL,
194                                                                 LDB_ERR_OPERATIONS_ERROR);
195                                 }
196                         }
197                 }
198
199                 ac->finished_requests++;
200                 if (ac->finished_requests == ac->num_requests) {
201                         /* Send back referrals if they do exist (search ops) */
202                         if (ac->referrals != NULL) {
203                                 const char **ref;
204                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
205                                         ret = ldb_module_send_referral(ac->req,
206                                                                        talloc_strdup(ac->req, *ref));
207                                         if (ret != LDB_SUCCESS) {
208                                                 return ldb_module_done(ac->req, NULL, NULL,
209                                                                        ret);
210                                         }
211                                 }
212                         }
213
214                         /* this was the last one, call callback */
215                         return ldb_module_done(ac->req, ares->controls,
216                                                ares->response, 
217                                                ares->error);
218                 }
219
220                 /* not the last, now call the next one */
221                 module = ac->part_req[ac->finished_requests].module;
222                 nreq = ac->part_req[ac->finished_requests].req;
223
224                 ret = partition_request(module, nreq);
225                 if (ret != LDB_SUCCESS) {
226                         talloc_free(ares);
227                         return ldb_module_done(ac->req, NULL, NULL, ret);
228                 }
229
230                 break;
231         }
232
233         talloc_free(ares);
234         return LDB_SUCCESS;
235 }
236
237 static int partition_prep_request(struct partition_context *ac,
238                                   struct dsdb_partition *partition)
239 {
240         int ret;
241         struct ldb_request *req;
242
243         ac->part_req = talloc_realloc(ac, ac->part_req,
244                                         struct part_request,
245                                         ac->num_requests + 1);
246         if (ac->part_req == NULL) {
247                 return ldb_oom(ldb_module_get_ctx(ac->module));
248         }
249
250         switch (ac->req->operation) {
251         case LDB_SEARCH:
252                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
253                                         ac->part_req,
254                                         ac->req->op.search.base,
255                                         ac->req->op.search.scope,
256                                         ac->req->op.search.tree,
257                                         ac->req->op.search.attrs,
258                                         ac->req->controls,
259                                         ac, partition_req_callback,
260                                         ac->req);
261                 LDB_REQ_SET_LOCATION(req);
262                 break;
263         case LDB_ADD:
264                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265                                         ac->req->op.add.message,
266                                         ac->req->controls,
267                                         ac, partition_req_callback,
268                                         ac->req);
269                 LDB_REQ_SET_LOCATION(req);
270                 break;
271         case LDB_MODIFY:
272                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273                                         ac->req->op.mod.message,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 LDB_REQ_SET_LOCATION(req);
278                 break;
279         case LDB_DELETE:
280                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
281                                         ac->req->op.del.dn,
282                                         ac->req->controls,
283                                         ac, partition_req_callback,
284                                         ac->req);
285                 LDB_REQ_SET_LOCATION(req);
286                 break;
287         case LDB_RENAME:
288                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
289                                         ac->req->op.rename.olddn,
290                                         ac->req->op.rename.newdn,
291                                         ac->req->controls,
292                                         ac, partition_req_callback,
293                                         ac->req);
294                 LDB_REQ_SET_LOCATION(req);
295                 break;
296         case LDB_EXTENDED:
297                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
298                                         ac->part_req,
299                                         ac->req->op.extended.oid,
300                                         ac->req->op.extended.data,
301                                         ac->req->controls,
302                                         ac, partition_req_callback,
303                                         ac->req);
304                 LDB_REQ_SET_LOCATION(req);
305                 break;
306         default:
307                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
308                                   "Unsupported request type!");
309                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
310         }
311
312         if (ret != LDB_SUCCESS) {
313                 return ret;
314         }
315
316         ac->part_req[ac->num_requests].req = req;
317
318         if (ac->req->controls) {
319                 req->controls = talloc_memdup(req, ac->req->controls,
320                                         talloc_get_size(ac->req->controls));
321                 if (req->controls == NULL) {
322                         return ldb_oom(ldb_module_get_ctx(ac->module));
323                 }
324         }
325
326         if (partition) {
327                 ac->part_req[ac->num_requests].module = partition->module;
328
329                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
330                         ret = ldb_request_add_control(req,
331                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
332                                                       false, partition->ctrl);
333                         if (ret != LDB_SUCCESS) {
334                                 return ret;
335                         }
336                 }
337
338                 if (req->operation == LDB_SEARCH) {
339                         /* If the search is for 'more' than this partition,
340                          * then change the basedn, so a remote LDAP server
341                          * doesn't object */
342                         if (ldb_dn_compare_base(partition->ctrl->dn,
343                                                 req->op.search.base) != 0) {
344                                 req->op.search.base = partition->ctrl->dn;
345                         }
346                 }
347
348         } else {
349                 /* make sure you put the module here, or
350                  * or ldb_next_request() will skip a module */
351                 ac->part_req[ac->num_requests].module = ac->module;
352         }
353
354         ac->num_requests++;
355
356         return LDB_SUCCESS;
357 }
358
359 static int partition_call_first(struct partition_context *ac)
360 {
361         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
362 }
363
364 /**
365  * Send a request down to all the partitions
366  */
367 static int partition_send_all(struct ldb_module *module, 
368                               struct partition_context *ac, 
369                               struct ldb_request *req) 
370 {
371         unsigned int i;
372         struct partition_private_data *data = talloc_get_type(module->private_data, 
373                                                               struct partition_private_data);
374         int ret = partition_prep_request(ac, NULL);
375         if (ret != LDB_SUCCESS) {
376                 return ret;
377         }
378         for (i=0; data && data->partitions && data->partitions[i]; i++) {
379                 ret = partition_prep_request(ac, data->partitions[i]);
380                 if (ret != LDB_SUCCESS) {
381                         return ret;
382                 }
383         }
384
385         /* fire the first one */
386         return partition_call_first(ac);
387 }
388
389 /**
390  * Figure out which backend a request needs to be aimed at.  Some
391  * requests must be replicated to all backends
392  */
393 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
394 {
395         struct partition_context *ac;
396         unsigned int i;
397         int ret;
398         struct dsdb_partition *partition;
399         struct partition_private_data *data = talloc_get_type(module->private_data, 
400                                                               struct partition_private_data);
401         if (!data || !data->partitions) {
402                 return ldb_next_request(module, req);
403         }
404
405         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
406                 /* Is this a special DN, we need to replicate to every backend? */
407                 for (i=0; data->replicate && data->replicate[i]; i++) {
408                         if (ldb_dn_compare(data->replicate[i], 
409                                            dn) == 0) {
410                                 
411                                 ac = partition_init_ctx(module, req);
412                                 if (!ac) {
413                                         return ldb_operr(ldb_module_get_ctx(module));
414                                 }
415                                 
416                                 return partition_send_all(module, ac, req);
417                         }
418                 }
419         }
420
421         /* Otherwise, we need to find the partition to fire it to */
422
423         /* Find partition */
424         partition = find_partition(data, dn, req);
425         if (!partition) {
426                 /*
427                  * if we haven't found a matching partition
428                  * pass the request to the main ldb
429                  *
430                  * TODO: we should maybe return an error here
431                  *       if it's not a special dn
432                  */
433
434                 return ldb_next_request(module, req);
435         }
436
437         ac = partition_init_ctx(module, req);
438         if (!ac) {
439                 return ldb_operr(ldb_module_get_ctx(module));
440         }
441
442         /* we need to add a control but we never touch the original request */
443         ret = partition_prep_request(ac, partition);
444         if (ret != LDB_SUCCESS) {
445                 return ret;
446         }
447
448         /* fire the first one */
449         return partition_call_first(ac);
450 }
451
452 /* search */
453 static int partition_search(struct ldb_module *module, struct ldb_request *req)
454 {
455         struct ldb_control **saved_controls;
456         /* Find backend */
457         struct partition_private_data *data = talloc_get_type(module->private_data, 
458                                                               struct partition_private_data);
459         struct partition_context *ac;
460         struct ldb_context *ldb;
461         struct loadparm_context *lp_ctx;
462
463         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
464         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
465         
466         struct ldb_search_options_control *search_options = NULL;
467         struct dsdb_partition *p;
468         unsigned int i, j;
469         int ret;
470         bool domain_scope = false, phantom_root = false;
471         
472         ret = partition_reload_if_required(module, data);
473         if (ret != LDB_SUCCESS) {
474                 return ret;
475         }
476
477         p = find_partition(data, NULL, req);
478         if (p != NULL) {
479                 /* the caller specified what partition they want the
480                  * search - just pass it on
481                  */
482                 return ldb_next_request(p->module, req);
483         }
484
485         /* Get back the search options from the search control, and mark it as
486          * non-critical (to make backends and also dcpromo happy).
487          */
488         if (search_control) {
489                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
490                 search_control->critical = 0;
491
492         }
493
494         /* Remove the "domain_scope" control, so we don't confuse a backend
495          * server */
496         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
497                 return ldb_oom(ldb_module_get_ctx(module));
498         }
499
500         /* Locate the options */
501         domain_scope = (search_options
502                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
503                 || domain_scope_control;
504         phantom_root = search_options
505                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
506
507         /* Remove handled options from the search control flag */
508         if (search_options) {
509                 search_options->search_options = search_options->search_options
510                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
511                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
512         }
513
514         if (!data || !data->partitions) {
515                 return ldb_next_request(module, req);
516         }
517
518         ac = partition_init_ctx(module, req);
519         if (!ac) {
520                 return ldb_operr(ldb_module_get_ctx(module));
521         }
522
523         ldb = ldb_module_get_ctx(ac->module);
524         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
525                                                 struct loadparm_context);
526
527         /* Search from the base DN */
528         if (ldb_dn_is_null(req->op.search.base)) {
529                 return partition_send_all(module, ac, req);
530         }
531
532         for (i=0; data->partitions[i]; i++) {
533                 bool match = false, stop = false;
534
535                 if (phantom_root) {
536                         /* Phantom root: Find all partitions under the
537                          * search base. We match if:
538                          *
539                          * 1) the DN we are looking for exactly matches a
540                          *    certain partition and always stop
541                          * 2) the DN we are looking for is a parent of certain
542                          *    partitions and it isn't a scope base search
543                          * 3) the DN we are looking for is a child of a certain
544                          *    partition and always stop
545                          *    - we don't need to go any further up in the
546                          *    hierarchy!
547                          */
548                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
549                                            req->op.search.base) == 0) {
550                                 match = true;
551                                 stop = true;
552                         }
553                         if (!match &&
554                             (ldb_dn_compare_base(req->op.search.base,
555                                                  data->partitions[i]->ctrl->dn) == 0 &&
556                              req->op.search.scope != LDB_SCOPE_BASE)) {
557                                 match = true;
558                         }
559                         if (!match &&
560                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
561                                                 req->op.search.base) == 0) {
562                                 match = true;
563                                 stop = true; /* note that this relies on partition ordering */
564                         }
565                 } else {
566                         /* Domain scope: Find all partitions under the search
567                          * base.
568                          *
569                          * We generate referral candidates if we haven't
570                          * specified the domain scope control, haven't a base
571                          * search* scope and the DN we are looking for is a real
572                          * predecessor of certain partitions. When a new
573                          * referral candidate is nearer to the DN than an
574                          * existing one delete the latter (we want to have only
575                          * the closest ones). When we checked this for all
576                          * candidates we have the final referrals.
577                          *
578                          * We match if the DN we are looking for is a child of
579                          * a certain partition or the partition
580                          * DN itself - we don't need to go any further
581                          * up in the hierarchy!
582                          */
583                         if ((!domain_scope) &&
584                             (req->op.search.scope != LDB_SCOPE_BASE) &&
585                             (ldb_dn_compare_base(req->op.search.base,
586                                                  data->partitions[i]->ctrl->dn) == 0) &&
587                             (ldb_dn_compare(req->op.search.base,
588                                             data->partitions[i]->ctrl->dn) != 0)) {
589                                 char *ref = talloc_asprintf(ac,
590                                                             "ldap://%s/%s%s",
591                                                             lpcfg_dnsdomain(lp_ctx),
592                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
593                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
594
595                                 if (ref == NULL) {
596                                         return ldb_oom(ldb);
597                                 }
598
599                                 /* Initialise the referrals list */
600                                 if (ac->referrals == NULL) {
601                                         ac->referrals = (const char **) str_list_make_empty(ac);
602                                         if (ac->referrals == NULL) {
603                                                 return ldb_oom(ldb);
604                                         }
605                                 }
606
607                                 /* Check if the new referral candidate is
608                                  * closer to the base DN than already
609                                  * saved ones and delete the latters */
610                                 j = 0;
611                                 while (ac->referrals[j] != NULL) {
612                                         if (strstr(ac->referrals[j],
613                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
614                                                 str_list_remove(ac->referrals,
615                                                                 ac->referrals[j]);
616                                         } else {
617                                                 ++j;
618                                         }
619                                 }
620
621                                 /* Add our new candidate */
622                                 ac->referrals = str_list_add(ac->referrals, ref);
623
624                                 talloc_free(ref);
625
626                                 if (ac->referrals == NULL) {
627                                         return ldb_oom(ldb);
628                                 }
629                         }
630                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
631                                 match = true;
632                                 stop = true; /* note that this relies on partition ordering */
633                         }
634                 }
635
636                 if (match) {
637                         ret = partition_prep_request(ac, data->partitions[i]);
638                         if (ret != LDB_SUCCESS) {
639                                 return ret;
640                         }
641                 }
642
643                 if (stop) break;
644         }
645
646         /* Perhaps we didn't match any partitions. Try the main partition */
647         if (ac->num_requests == 0) {
648                 talloc_free(ac);
649                 return ldb_next_request(module, req);
650         }
651
652         /* fire the first one */
653         return partition_call_first(ac);
654 }
655
656 /* add */
657 static int partition_add(struct ldb_module *module, struct ldb_request *req)
658 {
659         return partition_replicate(module, req, req->op.add.message->dn);
660 }
661
662 /* modify */
663 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
664 {
665         return partition_replicate(module, req, req->op.mod.message->dn);
666 }
667
668 /* delete */
669 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
670 {
671         return partition_replicate(module, req, req->op.del.dn);
672 }
673
674 /* rename */
675 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
676 {
677         /* Find backend */
678         struct dsdb_partition *backend, *backend2;
679         
680         struct partition_private_data *data = talloc_get_type(module->private_data, 
681                                                               struct partition_private_data);
682
683         /* Skip the lot if 'data' isn't here yet (initialisation) */
684         if (!data) {
685                 return ldb_operr(ldb_module_get_ctx(module));
686         }
687
688         backend = find_partition(data, req->op.rename.olddn, req);
689         backend2 = find_partition(data, req->op.rename.newdn, req);
690
691         if ((backend && !backend2) || (!backend && backend2)) {
692                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
693         }
694
695         if (backend != backend2) {
696                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
697                                        "Cannot rename from %s in %s to %s in %s: %s",
698                                        ldb_dn_get_linearized(req->op.rename.olddn),
699                                        ldb_dn_get_linearized(backend->ctrl->dn),
700                                        ldb_dn_get_linearized(req->op.rename.newdn),
701                                        ldb_dn_get_linearized(backend2->ctrl->dn),
702                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
703                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
704         }
705
706         return partition_replicate(module, req, req->op.rename.olddn);
707 }
708
709 /* start a transaction */
710 static int partition_start_trans(struct ldb_module *module)
711 {
712         unsigned int i;
713         int ret;
714         struct partition_private_data *data = talloc_get_type(module->private_data, 
715                                                               struct partition_private_data);
716         /* Look at base DN */
717         /* Figure out which partition it is under */
718         /* Skip the lot if 'data' isn't here yet (initialization) */
719         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
720                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
721         }
722         ret = ldb_next_start_trans(module);
723         if (ret != LDB_SUCCESS) {
724                 return ret;
725         }
726
727         ret = partition_reload_if_required(module, data);
728         if (ret != LDB_SUCCESS) {
729                 return ret;
730         }
731
732         for (i=0; data && data->partitions && data->partitions[i]; i++) {
733                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
734                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
735                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
736                 }
737                 ret = ldb_next_start_trans(data->partitions[i]->module);
738                 if (ret != LDB_SUCCESS) {
739                         /* Back it out, if it fails on one */
740                         for (i--; i >= 0; i--) {
741                                 ldb_next_del_trans(data->partitions[i]->module);
742                         }
743                         ldb_next_del_trans(module);
744                         return ret;
745                 }
746         }
747
748         data->in_transaction++;
749
750         return LDB_SUCCESS;
751 }
752
753 /* prepare for a commit */
754 static int partition_prepare_commit(struct ldb_module *module)
755 {
756         unsigned int i;
757         struct partition_private_data *data = talloc_get_type(module->private_data, 
758                                                               struct partition_private_data);
759
760         for (i=0; data && data->partitions && data->partitions[i]; i++) {
761                 int ret;
762
763                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
764                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
765                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
766                 }
767                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
768                 if (ret != LDB_SUCCESS) {
769                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s",
770                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
771                                                ldb_errstring(module->ldb));
772                         return ret;
773                 }
774         }
775
776         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
777                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
778         }
779         return ldb_next_prepare_commit(module);
780 }
781
782
783 /* end a transaction */
784 static int partition_end_trans(struct ldb_module *module)
785 {
786         int ret, ret2;
787         unsigned int i;
788         struct partition_private_data *data = talloc_get_type(module->private_data, 
789                                                               struct partition_private_data);
790
791         ret = LDB_SUCCESS;
792
793         if (data->in_transaction == 0) {
794                 DEBUG(0,("partition end transaction mismatch\n"));
795                 ret = LDB_ERR_OPERATIONS_ERROR;
796         } else {
797                 data->in_transaction--;
798         }
799
800         for (i=0; data && data->partitions && data->partitions[i]; i++) {
801                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
802                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
803                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
804                 }
805                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
806                 if (ret2 != LDB_SUCCESS) {
807                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s",
808                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
809                                                ldb_errstring(module->ldb));
810                         ret = ret2;
811                 }
812         }
813
814         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
815                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
816         }
817         ret2 = ldb_next_end_trans(module);
818         if (ret2 != LDB_SUCCESS) {
819                 ret = ret2;
820         }
821         return ret;
822 }
823
824 /* delete a transaction */
825 static int partition_del_trans(struct ldb_module *module)
826 {
827         int ret, final_ret = LDB_SUCCESS;
828         unsigned int i;
829         struct partition_private_data *data = talloc_get_type(module->private_data, 
830                                                               struct partition_private_data);
831         for (i=0; data && data->partitions && data->partitions[i]; i++) {
832                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
833                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
834                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
835                 }
836                 ret = ldb_next_del_trans(data->partitions[i]->module);
837                 if (ret != LDB_SUCCESS) {
838                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s",
839                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
840                                                ldb_errstring(module->ldb));
841                         final_ret = ret;
842                 }
843         }       
844
845         if (data->in_transaction == 0) {
846                 DEBUG(0,("partition del transaction mismatch\n"));
847                 return ldb_operr(ldb_module_get_ctx(module));
848         }
849         data->in_transaction--;
850
851         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
852                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
853         }
854         ret = ldb_next_del_trans(module);
855         if (ret != LDB_SUCCESS) {
856                 final_ret = ret;
857         }
858         return final_ret;
859 }
860
861 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
862                                      enum ldb_sequence_type type, uint64_t *seq_number) 
863 {
864         int ret;
865         struct ldb_result *res;
866         struct ldb_seqnum_request *tseq;
867         struct ldb_request *treq;
868         struct ldb_seqnum_result *seqr;
869         res = talloc_zero(mem_ctx, struct ldb_result);
870         if (res == NULL) {
871                 return ldb_oom(ldb_module_get_ctx(module));
872         }
873         tseq = talloc_zero(res, struct ldb_seqnum_request);
874         if (tseq == NULL) {
875                 talloc_free(res);
876                 return ldb_oom(ldb_module_get_ctx(module));
877         }
878         tseq->type = type;
879         
880         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
881                                      LDB_EXTENDED_SEQUENCE_NUMBER,
882                                      tseq,
883                                      NULL,
884                                      res,
885                                      ldb_extended_default_callback,
886                                      NULL);
887         LDB_REQ_SET_LOCATION(treq);
888         if (ret != LDB_SUCCESS) {
889                 talloc_free(res);
890                 return ret;
891         }
892         
893         ret = ldb_next_request(module, treq);
894         if (ret != LDB_SUCCESS) {
895                 talloc_free(res);
896                 return ret;
897         }
898         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
899         if (ret != LDB_SUCCESS) {
900                 talloc_free(res);
901                 return ret;
902         }
903         
904         seqr = talloc_get_type(res->extended->data,
905                                struct ldb_seqnum_result);
906         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
907                 ret = LDB_ERR_OPERATIONS_ERROR;
908                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
909                 talloc_free(res);
910                 return ret;
911         } else {
912                 *seq_number = seqr->seq_num;
913         }
914         talloc_free(res);
915         return LDB_SUCCESS;
916 }
917
918 /* FIXME: This function is still semi-async */
919 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
920 {
921         int ret;
922         unsigned int i;
923         uint64_t seq_number = 0;
924         uint64_t timestamp_sequence = 0;
925         uint64_t timestamp = 0;
926         struct partition_private_data *data = talloc_get_type(module->private_data, 
927                                                               struct partition_private_data);
928         struct ldb_seqnum_request *seq;
929         struct ldb_seqnum_result *seqr;
930         struct ldb_request *treq;
931         struct ldb_seqnum_request *tseq;
932         struct ldb_seqnum_result *tseqr;
933         struct ldb_extended *ext;
934         struct ldb_result *res;
935         struct dsdb_partition *p;
936
937         p = find_partition(data, NULL, req);
938         if (p != NULL) {
939                 /* the caller specified what partition they want the
940                  * sequence number operation on - just pass it on
941                  */
942                 return ldb_next_request(p->module, req);                
943         }
944
945         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
946
947         switch (seq->type) {
948         case LDB_SEQ_NEXT:
949         case LDB_SEQ_HIGHEST_SEQ:
950
951                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
952                 if (ret != LDB_SUCCESS) {
953                         return ret;
954                 }
955
956                 /* Skip the lot if 'data' isn't here yet (initialisation) */
957                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
958
959                         res = talloc_zero(req, struct ldb_result);
960                         if (res == NULL) {
961                                 return ldb_oom(ldb_module_get_ctx(module));
962                         }
963                         tseq = talloc_zero(res, struct ldb_seqnum_request);
964                         if (tseq == NULL) {
965                                 talloc_free(res);
966                                 return ldb_oom(ldb_module_get_ctx(module));
967                         }
968                         tseq->type = seq->type;
969
970                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
971                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
972                                                      tseq,
973                                                      NULL,
974                                                      res,
975                                                      ldb_extended_default_callback,
976                                                      NULL);
977                         LDB_REQ_SET_LOCATION(treq);
978                         if (ret != LDB_SUCCESS) {
979                                 talloc_free(res);
980                                 return ret;
981                         }
982
983                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
984                                 ret = ldb_request_add_control(treq,
985                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
986                                                               false, data->partitions[i]->ctrl);
987                                 if (ret != LDB_SUCCESS) {
988                                         talloc_free(res);
989                                         return ret;
990                                 }
991                         }
992
993                         ret = partition_request(data->partitions[i]->module, treq);
994                         if (ret != LDB_SUCCESS) {
995                                 talloc_free(res);
996                                 return ret;
997                         }
998                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
999                         if (ret != LDB_SUCCESS) {
1000                                 talloc_free(res);
1001                                 return ret;
1002                         }
1003                         tseqr = talloc_get_type(res->extended->data,
1004                                                 struct ldb_seqnum_result);
1005                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1006                                 timestamp_sequence = MAX(timestamp_sequence,
1007                                                          tseqr->seq_num);
1008                         } else {
1009                                 seq_number += tseqr->seq_num;
1010                         }
1011                         talloc_free(res);
1012                 }
1013                 /* fall through */
1014         case LDB_SEQ_HIGHEST_TIMESTAMP:
1015
1016                 res = talloc_zero(req, struct ldb_result);
1017                 if (res == NULL) {
1018                         return ldb_oom(ldb_module_get_ctx(module));
1019                 }
1020
1021                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1022                 if (tseq == NULL) {
1023                         talloc_free(res);
1024                         return ldb_oom(ldb_module_get_ctx(module));
1025                 }
1026                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1027
1028                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1029                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1030                                              tseq,
1031                                              NULL,
1032                                              res,
1033                                              ldb_extended_default_callback,
1034                                              NULL);
1035                 LDB_REQ_SET_LOCATION(treq);
1036                 if (ret != LDB_SUCCESS) {
1037                         talloc_free(res);
1038                         return ret;
1039                 }
1040
1041                 ret = ldb_next_request(module, treq);
1042                 if (ret != LDB_SUCCESS) {
1043                         talloc_free(res);
1044                         return ret;
1045                 }
1046                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1047                 if (ret != LDB_SUCCESS) {
1048                         talloc_free(res);
1049                         return ret;
1050                 }
1051
1052                 tseqr = talloc_get_type(res->extended->data,
1053                                            struct ldb_seqnum_result);
1054                 timestamp = tseqr->seq_num;
1055
1056                 talloc_free(res);
1057
1058                 /* Skip the lot if 'data' isn't here yet (initialisation) */
1059                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1060
1061                         res = talloc_zero(req, struct ldb_result);
1062                         if (res == NULL) {
1063                                 return ldb_oom(ldb_module_get_ctx(module));
1064                         }
1065
1066                         tseq = talloc_zero(res, struct ldb_seqnum_request);
1067                         if (tseq == NULL) {
1068                                 talloc_free(res);
1069                                 return ldb_oom(ldb_module_get_ctx(module));
1070                         }
1071                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1072
1073                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1074                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
1075                                                      tseq,
1076                                                      NULL,
1077                                                      res,
1078                                                      ldb_extended_default_callback,
1079                                                      NULL);
1080                         LDB_REQ_SET_LOCATION(treq);
1081                         if (ret != LDB_SUCCESS) {
1082                                 talloc_free(res);
1083                                 return ret;
1084                         }
1085
1086                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
1087                                 ret = ldb_request_add_control(treq,
1088                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
1089                                                               false, data->partitions[i]->ctrl);
1090                                 if (ret != LDB_SUCCESS) {
1091                                         talloc_free(res);
1092                                         return ret;
1093                                 }
1094                         }
1095
1096                         ret = partition_request(data->partitions[i]->module, treq);
1097                         if (ret != LDB_SUCCESS) {
1098                                 talloc_free(res);
1099                                 return ret;
1100                         }
1101                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1102                         if (ret != LDB_SUCCESS) {
1103                                 talloc_free(res);
1104                                 return ret;
1105                         }
1106
1107                         tseqr = talloc_get_type(res->extended->data,
1108                                                   struct ldb_seqnum_result);
1109                         timestamp = MAX(timestamp, tseqr->seq_num);
1110
1111                         talloc_free(res);
1112                 }
1113
1114                 break;
1115         }
1116
1117         ext = talloc_zero(req, struct ldb_extended);
1118         if (!ext) {
1119                 return ldb_oom(ldb_module_get_ctx(module));
1120         }
1121         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1122         if (seqr == NULL) {
1123                 talloc_free(ext);
1124                 return ldb_oom(ldb_module_get_ctx(module));
1125         }
1126         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1127         ext->data = seqr;
1128
1129         switch (seq->type) {
1130         case LDB_SEQ_NEXT:
1131         case LDB_SEQ_HIGHEST_SEQ:
1132
1133                 /* Has someone above set a timebase sequence? */
1134                 if (timestamp_sequence) {
1135                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1136                 } else {
1137                         seqr->seq_num = seq_number;
1138                 }
1139
1140                 if (timestamp_sequence > seqr->seq_num) {
1141                         seqr->seq_num = timestamp_sequence;
1142                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1143                 }
1144
1145                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1146                 break;
1147         case LDB_SEQ_HIGHEST_TIMESTAMP:
1148                 seqr->seq_num = timestamp;
1149                 break;
1150         }
1151
1152         if (seq->type == LDB_SEQ_NEXT) {
1153                 seqr->seq_num++;
1154         }
1155
1156         /* send request done */
1157         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1158 }
1159
1160 /* extended */
1161 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1162 {
1163         struct partition_private_data *data;
1164         struct partition_context *ac;
1165         int ret;
1166
1167         data = talloc_get_type(module->private_data, struct partition_private_data);
1168         if (!data) {
1169                 return ldb_next_request(module, req);
1170         }
1171
1172         ret = partition_reload_if_required(module, data);
1173         if (ret != LDB_SUCCESS) {
1174                 return ret;
1175         }
1176         
1177         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1178                 return partition_sequence_number(module, req);
1179         }
1180
1181         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1182                 return partition_create(module, req);
1183         }
1184
1185         /* 
1186          * as the extended operation has no dn
1187          * we need to send it to all partitions
1188          */
1189
1190         ac = partition_init_ctx(module, req);
1191         if (!ac) {
1192                 return ldb_operr(ldb_module_get_ctx(module));
1193         }
1194
1195         return partition_send_all(module, ac, req);
1196 }
1197
1198 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1199         .name              = "partition",
1200         .init_context      = partition_init,
1201         .search            = partition_search,
1202         .add               = partition_add,
1203         .modify            = partition_modify,
1204         .del               = partition_delete,
1205         .rename            = partition_rename,
1206         .extended          = partition_extended,
1207         .start_transaction = partition_start_trans,
1208         .prepare_commit    = partition_prepare_commit,
1209         .end_transaction   = partition_end_trans,
1210         .del_transaction   = partition_del_trans,
1211 };