s4-dsdb: use ldb_operr() in the dsdb code
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct partition_private_data *data;
138         struct ldb_control *partition_ctrl;
139
140         ac = talloc_get_type(req->context, struct partition_context);
141         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
142
143         if (!ares) {
144                 return ldb_module_done(ac->req, NULL, NULL,
145                                         LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150                 /* If we didn't fan this request out to mulitple partitions,
151                  * or this is an individual search result, we can
152                  * deterministily tell the caller what partition this was
153                  * written to (repl_meta_data likes to know) */
154                 ret = ldb_reply_add_control(ares,
155                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
156                                             false, partition_ctrl->data);
157                 if (ret != LDB_SUCCESS) {
158                         return ldb_module_done(ac->req, NULL, NULL,
159                                                ret);
160                 }
161         }
162
163         if (ares->error != LDB_SUCCESS) {
164                 return ldb_module_done(ac->req, ares->controls,
165                                         ares->response, ares->error);
166         }
167
168         switch (ares->type) {
169         case LDB_REPLY_REFERRAL:
170                 return ldb_module_send_referral(ac->req, ares->referral);
171
172         case LDB_REPLY_ENTRY:
173                 if (ac->req->operation != LDB_SEARCH) {
174                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
175                                 "partition_req_callback:"
176                                 " Unsupported reply type for this request");
177                         return ldb_module_done(ac->req, NULL, NULL,
178                                                 LDB_ERR_OPERATIONS_ERROR);
179                 }
180                 
181                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182
183         case LDB_REPLY_DONE:
184                 if (ac->req->operation == LDB_EXTENDED) {
185                         /* FIXME: check for ares->response, replmd does not fill it ! */
186                         if (ares->response) {
187                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
189                                                           "partition_req_callback:"
190                                                           " Unknown extended reply, "
191                                                           "only supports START_TLS");
192                                         talloc_free(ares);
193                                         return ldb_module_done(ac->req, NULL, NULL,
194                                                                 LDB_ERR_OPERATIONS_ERROR);
195                                 }
196                         }
197                 }
198
199                 ac->finished_requests++;
200                 if (ac->finished_requests == ac->num_requests) {
201                         /* Send back referrals if they do exist (search ops) */
202                         if (ac->referrals != NULL) {
203                                 const char **ref;
204                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
205                                         ret = ldb_module_send_referral(ac->req,
206                                                                        talloc_strdup(ac->req, *ref));
207                                         if (ret != LDB_SUCCESS) {
208                                                 return ldb_module_done(ac->req, NULL, NULL,
209                                                                        ret);
210                                         }
211                                 }
212                         }
213
214                         /* this was the last one, call callback */
215                         return ldb_module_done(ac->req, ares->controls,
216                                                ares->response, 
217                                                ares->error);
218                 }
219
220                 /* not the last, now call the next one */
221                 module = ac->part_req[ac->finished_requests].module;
222                 nreq = ac->part_req[ac->finished_requests].req;
223
224                 ret = partition_request(module, nreq);
225                 if (ret != LDB_SUCCESS) {
226                         talloc_free(ares);
227                         return ldb_module_done(ac->req, NULL, NULL, ret);
228                 }
229
230                 break;
231         }
232
233         talloc_free(ares);
234         return LDB_SUCCESS;
235 }
236
237 static int partition_prep_request(struct partition_context *ac,
238                                   struct dsdb_partition *partition)
239 {
240         int ret;
241         struct ldb_request *req;
242
243         ac->part_req = talloc_realloc(ac, ac->part_req,
244                                         struct part_request,
245                                         ac->num_requests + 1);
246         if (ac->part_req == NULL) {
247                 return ldb_oom(ldb_module_get_ctx(ac->module));
248         }
249
250         switch (ac->req->operation) {
251         case LDB_SEARCH:
252                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
253                                         ac->part_req,
254                                         ac->req->op.search.base,
255                                         ac->req->op.search.scope,
256                                         ac->req->op.search.tree,
257                                         ac->req->op.search.attrs,
258                                         ac->req->controls,
259                                         ac, partition_req_callback,
260                                         ac->req);
261                 break;
262         case LDB_ADD:
263                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
264                                         ac->req->op.add.message,
265                                         ac->req->controls,
266                                         ac, partition_req_callback,
267                                         ac->req);
268                 break;
269         case LDB_MODIFY:
270                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
271                                         ac->req->op.mod.message,
272                                         ac->req->controls,
273                                         ac, partition_req_callback,
274                                         ac->req);
275                 break;
276         case LDB_DELETE:
277                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
278                                         ac->req->op.del.dn,
279                                         ac->req->controls,
280                                         ac, partition_req_callback,
281                                         ac->req);
282                 break;
283         case LDB_RENAME:
284                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
285                                         ac->req->op.rename.olddn,
286                                         ac->req->op.rename.newdn,
287                                         ac->req->controls,
288                                         ac, partition_req_callback,
289                                         ac->req);
290                 break;
291         case LDB_EXTENDED:
292                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
293                                         ac->part_req,
294                                         ac->req->op.extended.oid,
295                                         ac->req->op.extended.data,
296                                         ac->req->controls,
297                                         ac, partition_req_callback,
298                                         ac->req);
299                 break;
300         default:
301                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
302                                   "Unsupported request type!");
303                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
304         }
305
306         if (ret != LDB_SUCCESS) {
307                 return ret;
308         }
309
310         ac->part_req[ac->num_requests].req = req;
311
312         if (ac->req->controls) {
313                 req->controls = talloc_memdup(req, ac->req->controls,
314                                         talloc_get_size(ac->req->controls));
315                 if (req->controls == NULL) {
316                         return ldb_oom(ldb_module_get_ctx(ac->module));
317                 }
318         }
319
320         if (partition) {
321                 ac->part_req[ac->num_requests].module = partition->module;
322
323                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
324                         ret = ldb_request_add_control(req,
325                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
326                                                       false, partition->ctrl);
327                         if (ret != LDB_SUCCESS) {
328                                 return ret;
329                         }
330                 }
331
332                 if (req->operation == LDB_SEARCH) {
333                         /* If the search is for 'more' than this partition,
334                          * then change the basedn, so a remote LDAP server
335                          * doesn't object */
336                         if (ldb_dn_compare_base(partition->ctrl->dn,
337                                                 req->op.search.base) != 0) {
338                                 req->op.search.base = partition->ctrl->dn;
339                         }
340                 }
341
342         } else {
343                 /* make sure you put the module here, or
344                  * or ldb_next_request() will skip a module */
345                 ac->part_req[ac->num_requests].module = ac->module;
346         }
347
348         ac->num_requests++;
349
350         return LDB_SUCCESS;
351 }
352
353 static int partition_call_first(struct partition_context *ac)
354 {
355         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
356 }
357
358 /**
359  * Send a request down to all the partitions
360  */
361 static int partition_send_all(struct ldb_module *module, 
362                               struct partition_context *ac, 
363                               struct ldb_request *req) 
364 {
365         unsigned int i;
366         struct partition_private_data *data = talloc_get_type(module->private_data, 
367                                                               struct partition_private_data);
368         int ret = partition_prep_request(ac, NULL);
369         if (ret != LDB_SUCCESS) {
370                 return ret;
371         }
372         for (i=0; data && data->partitions && data->partitions[i]; i++) {
373                 ret = partition_prep_request(ac, data->partitions[i]);
374                 if (ret != LDB_SUCCESS) {
375                         return ret;
376                 }
377         }
378
379         /* fire the first one */
380         return partition_call_first(ac);
381 }
382
383 /**
384  * Figure out which backend a request needs to be aimed at.  Some
385  * requests must be replicated to all backends
386  */
387 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
388 {
389         struct partition_context *ac;
390         unsigned int i;
391         int ret;
392         struct dsdb_partition *partition;
393         struct partition_private_data *data = talloc_get_type(module->private_data, 
394                                                               struct partition_private_data);
395         if (!data || !data->partitions) {
396                 return ldb_next_request(module, req);
397         }
398
399         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
400                 /* Is this a special DN, we need to replicate to every backend? */
401                 for (i=0; data->replicate && data->replicate[i]; i++) {
402                         if (ldb_dn_compare(data->replicate[i], 
403                                            dn) == 0) {
404                                 
405                                 ac = partition_init_ctx(module, req);
406                                 if (!ac) {
407                                         return ldb_operr(ldb_module_get_ctx(module));
408                                 }
409                                 
410                                 return partition_send_all(module, ac, req);
411                         }
412                 }
413         }
414
415         /* Otherwise, we need to find the partition to fire it to */
416
417         /* Find partition */
418         partition = find_partition(data, dn, req);
419         if (!partition) {
420                 /*
421                  * if we haven't found a matching partition
422                  * pass the request to the main ldb
423                  *
424                  * TODO: we should maybe return an error here
425                  *       if it's not a special dn
426                  */
427
428                 return ldb_next_request(module, req);
429         }
430
431         ac = partition_init_ctx(module, req);
432         if (!ac) {
433                 return ldb_operr(ldb_module_get_ctx(module));
434         }
435
436         /* we need to add a control but we never touch the original request */
437         ret = partition_prep_request(ac, partition);
438         if (ret != LDB_SUCCESS) {
439                 return ret;
440         }
441
442         /* fire the first one */
443         return partition_call_first(ac);
444 }
445
446 /* search */
447 static int partition_search(struct ldb_module *module, struct ldb_request *req)
448 {
449         struct ldb_control **saved_controls;
450         /* Find backend */
451         struct partition_private_data *data = talloc_get_type(module->private_data, 
452                                                               struct partition_private_data);
453         struct partition_context *ac;
454         struct ldb_context *ldb;
455         struct loadparm_context *lp_ctx;
456
457         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
458         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
459         
460         struct ldb_search_options_control *search_options = NULL;
461         struct dsdb_partition *p;
462         unsigned int i, j;
463         int ret;
464         bool domain_scope = false, phantom_root = false;
465         
466         ret = partition_reload_if_required(module, data);
467         if (ret != LDB_SUCCESS) {
468                 return ret;
469         }
470
471         p = find_partition(data, NULL, req);
472         if (p != NULL) {
473                 /* the caller specified what partition they want the
474                  * search - just pass it on
475                  */
476                 return ldb_next_request(p->module, req);
477         }
478
479         /* Get back the search options from the search control, and mark it as
480          * non-critical (to make backends and also dcpromo happy).
481          */
482         if (search_control) {
483                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
484                 search_control->critical = 0;
485
486         }
487
488         /* Remove the "domain_scope" control, so we don't confuse a backend
489          * server */
490         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
491                 return ldb_oom(ldb_module_get_ctx(module));
492         }
493
494         /* Locate the options */
495         domain_scope = (search_options
496                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
497                 || domain_scope_control;
498         phantom_root = search_options
499                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
500
501         /* Remove handled options from the search control flag */
502         if (search_options) {
503                 search_options->search_options = search_options->search_options
504                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
505                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
506         }
507
508         if (!data || !data->partitions) {
509                 return ldb_next_request(module, req);
510         }
511
512         ac = partition_init_ctx(module, req);
513         if (!ac) {
514                 return ldb_operr(ldb_module_get_ctx(module));
515         }
516
517         ldb = ldb_module_get_ctx(ac->module);
518         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
519                                                 struct loadparm_context);
520
521         /* Search from the base DN */
522         if (ldb_dn_is_null(req->op.search.base)) {
523                 return partition_send_all(module, ac, req);
524         }
525
526         for (i=0; data->partitions[i]; i++) {
527                 bool match = false, stop = false;
528
529                 if (phantom_root) {
530                         /* Phantom root: Find all partitions under the
531                          * search base. We match if:
532                          *
533                          * 1) the DN we are looking for exactly matches a
534                          *    certain partition and always stop
535                          * 2) the DN we are looking for is a parent of certain
536                          *    partitions and it isn't a scope base search
537                          * 3) the DN we are looking for is a child of a certain
538                          *    partition and always stop
539                          *    - we don't need to go any further up in the
540                          *    hierarchy!
541                          */
542                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
543                                            req->op.search.base) == 0) {
544                                 match = true;
545                                 stop = true;
546                         }
547                         if (!match &&
548                             (ldb_dn_compare_base(req->op.search.base,
549                                                  data->partitions[i]->ctrl->dn) == 0 &&
550                              req->op.search.scope != LDB_SCOPE_BASE)) {
551                                 match = true;
552                         }
553                         if (!match &&
554                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
555                                                 req->op.search.base) == 0) {
556                                 match = true;
557                                 stop = true; /* note that this relies on partition ordering */
558                         }
559                 } else {
560                         /* Domain scope: Find all partitions under the search
561                          * base.
562                          *
563                          * We generate referral candidates if we haven't
564                          * specified the domain scope control, haven't a base
565                          * search* scope and the DN we are looking for is a real
566                          * predecessor of certain partitions. When a new
567                          * referral candidate is nearer to the DN than an
568                          * existing one delete the latter (we want to have only
569                          * the closest ones). When we checked this for all
570                          * candidates we have the final referrals.
571                          *
572                          * We match if the DN we are looking for is a child of
573                          * a certain partition or the partition
574                          * DN itself - we don't need to go any further
575                          * up in the hierarchy!
576                          */
577                         if ((!domain_scope) &&
578                             (req->op.search.scope != LDB_SCOPE_BASE) &&
579                             (ldb_dn_compare_base(req->op.search.base,
580                                                  data->partitions[i]->ctrl->dn) == 0) &&
581                             (ldb_dn_compare(req->op.search.base,
582                                             data->partitions[i]->ctrl->dn) != 0)) {
583                                 char *ref = talloc_asprintf(ac,
584                                                             "ldap://%s/%s%s",
585                                                             lp_dnsdomain(lp_ctx),
586                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
587                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
588
589                                 if (ref == NULL) {
590                                         return ldb_oom(ldb);
591                                 }
592
593                                 /* Initialise the referrals list */
594                                 if (ac->referrals == NULL) {
595                                         ac->referrals = (const char **) str_list_make_empty(ac);
596                                         if (ac->referrals == NULL) {
597                                                 return ldb_oom(ldb);
598                                         }
599                                 }
600
601                                 /* Check if the new referral candidate is
602                                  * closer to the base DN than already
603                                  * saved ones and delete the latters */
604                                 j = 0;
605                                 while (ac->referrals[j] != NULL) {
606                                         if (strstr(ac->referrals[j],
607                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
608                                                 str_list_remove(ac->referrals,
609                                                                 ac->referrals[j]);
610                                         } else {
611                                                 ++j;
612                                         }
613                                 }
614
615                                 /* Add our new candidate */
616                                 ac->referrals = str_list_add(ac->referrals, ref);
617
618                                 talloc_free(ref);
619
620                                 if (ac->referrals == NULL) {
621                                         return ldb_oom(ldb);
622                                 }
623                         }
624                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
625                                 match = true;
626                                 stop = true; /* note that this relies on partition ordering */
627                         }
628                 }
629
630                 if (match) {
631                         ret = partition_prep_request(ac, data->partitions[i]);
632                         if (ret != LDB_SUCCESS) {
633                                 return ret;
634                         }
635                 }
636
637                 if (stop) break;
638         }
639
640         /* Perhaps we didn't match any partitions. Try the main partition */
641         if (ac->num_requests == 0) {
642                 talloc_free(ac);
643                 return ldb_next_request(module, req);
644         }
645
646         /* fire the first one */
647         return partition_call_first(ac);
648 }
649
650 /* add */
651 static int partition_add(struct ldb_module *module, struct ldb_request *req)
652 {
653         return partition_replicate(module, req, req->op.add.message->dn);
654 }
655
656 /* modify */
657 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
658 {
659         return partition_replicate(module, req, req->op.mod.message->dn);
660 }
661
662 /* delete */
663 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
664 {
665         return partition_replicate(module, req, req->op.del.dn);
666 }
667
668 /* rename */
669 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
670 {
671         /* Find backend */
672         struct dsdb_partition *backend, *backend2;
673         
674         struct partition_private_data *data = talloc_get_type(module->private_data, 
675                                                               struct partition_private_data);
676
677         /* Skip the lot if 'data' isn't here yet (initialisation) */
678         if (!data) {
679                 return ldb_operr(ldb_module_get_ctx(module));
680         }
681
682         backend = find_partition(data, req->op.rename.olddn, req);
683         backend2 = find_partition(data, req->op.rename.newdn, req);
684
685         if ((backend && !backend2) || (!backend && backend2)) {
686                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
687         }
688
689         if (backend != backend2) {
690                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
691                                        "Cannot rename from %s in %s to %s in %s: %s",
692                                        ldb_dn_get_linearized(req->op.rename.olddn),
693                                        ldb_dn_get_linearized(backend->ctrl->dn),
694                                        ldb_dn_get_linearized(req->op.rename.newdn),
695                                        ldb_dn_get_linearized(backend2->ctrl->dn),
696                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
697                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
698         }
699
700         return partition_replicate(module, req, req->op.rename.olddn);
701 }
702
703 /* start a transaction */
704 static int partition_start_trans(struct ldb_module *module)
705 {
706         unsigned int i;
707         int ret;
708         struct partition_private_data *data = talloc_get_type(module->private_data, 
709                                                               struct partition_private_data);
710         /* Look at base DN */
711         /* Figure out which partition it is under */
712         /* Skip the lot if 'data' isn't here yet (initialization) */
713         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
714                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
715         }
716         ret = ldb_next_start_trans(module);
717         if (ret != LDB_SUCCESS) {
718                 return ret;
719         }
720
721         ret = partition_reload_if_required(module, data);
722         if (ret != LDB_SUCCESS) {
723                 return ret;
724         }
725
726         for (i=0; data && data->partitions && data->partitions[i]; i++) {
727                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
728                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
729                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
730                 }
731                 ret = ldb_next_start_trans(data->partitions[i]->module);
732                 if (ret != LDB_SUCCESS) {
733                         /* Back it out, if it fails on one */
734                         for (i--; i >= 0; i--) {
735                                 ldb_next_del_trans(data->partitions[i]->module);
736                         }
737                         ldb_next_del_trans(module);
738                         return ret;
739                 }
740         }
741
742         data->in_transaction++;
743
744         return LDB_SUCCESS;
745 }
746
747 /* prepare for a commit */
748 static int partition_prepare_commit(struct ldb_module *module)
749 {
750         unsigned int i;
751         struct partition_private_data *data = talloc_get_type(module->private_data, 
752                                                               struct partition_private_data);
753
754         for (i=0; data && data->partitions && data->partitions[i]; i++) {
755                 int ret;
756
757                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
758                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
759                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
760                 }
761                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
762                 if (ret != LDB_SUCCESS) {
763                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s",
764                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
765                                                ldb_errstring(module->ldb));
766                         return ret;
767                 }
768         }
769
770         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
771                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
772         }
773         return ldb_next_prepare_commit(module);
774 }
775
776
777 /* end a transaction */
778 static int partition_end_trans(struct ldb_module *module)
779 {
780         int ret, ret2;
781         unsigned int i;
782         struct partition_private_data *data = talloc_get_type(module->private_data, 
783                                                               struct partition_private_data);
784
785         ret = LDB_SUCCESS;
786
787         if (data->in_transaction == 0) {
788                 DEBUG(0,("partition end transaction mismatch\n"));
789                 ret = LDB_ERR_OPERATIONS_ERROR;
790         } else {
791                 data->in_transaction--;
792         }
793
794         for (i=0; data && data->partitions && data->partitions[i]; i++) {
795                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
796                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
797                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
798                 }
799                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
800                 if (ret2 != LDB_SUCCESS) {
801                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s",
802                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
803                                                ldb_errstring(module->ldb));
804                         ret = ret2;
805                 }
806         }
807
808         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
809                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
810         }
811         ret2 = ldb_next_end_trans(module);
812         if (ret2 != LDB_SUCCESS) {
813                 ret = ret2;
814         }
815         return ret;
816 }
817
818 /* delete a transaction */
819 static int partition_del_trans(struct ldb_module *module)
820 {
821         int ret, final_ret = LDB_SUCCESS;
822         unsigned int i;
823         struct partition_private_data *data = talloc_get_type(module->private_data, 
824                                                               struct partition_private_data);
825         for (i=0; data && data->partitions && data->partitions[i]; i++) {
826                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
827                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
828                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
829                 }
830                 ret = ldb_next_del_trans(data->partitions[i]->module);
831                 if (ret != LDB_SUCCESS) {
832                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s",
833                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
834                                                ldb_errstring(module->ldb));
835                         final_ret = ret;
836                 }
837         }       
838
839         if (data->in_transaction == 0) {
840                 DEBUG(0,("partition del transaction mismatch\n"));
841                 return ldb_operr(ldb_module_get_ctx(module));
842         }
843         data->in_transaction--;
844
845         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
846                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
847         }
848         ret = ldb_next_del_trans(module);
849         if (ret != LDB_SUCCESS) {
850                 final_ret = ret;
851         }
852         return final_ret;
853 }
854
855 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
856                                      enum ldb_sequence_type type, uint64_t *seq_number) 
857 {
858         int ret;
859         struct ldb_result *res;
860         struct ldb_seqnum_request *tseq;
861         struct ldb_request *treq;
862         struct ldb_seqnum_result *seqr;
863         res = talloc_zero(mem_ctx, struct ldb_result);
864         if (res == NULL) {
865                 return ldb_oom(ldb_module_get_ctx(module));
866         }
867         tseq = talloc_zero(res, struct ldb_seqnum_request);
868         if (tseq == NULL) {
869                 talloc_free(res);
870                 return ldb_oom(ldb_module_get_ctx(module));
871         }
872         tseq->type = type;
873         
874         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
875                                      LDB_EXTENDED_SEQUENCE_NUMBER,
876                                      tseq,
877                                      NULL,
878                                      res,
879                                      ldb_extended_default_callback,
880                                      NULL);
881         if (ret != LDB_SUCCESS) {
882                 talloc_free(res);
883                 return ret;
884         }
885         
886         ret = ldb_next_request(module, treq);
887         if (ret != LDB_SUCCESS) {
888                 talloc_free(res);
889                 return ret;
890         }
891         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
892         if (ret != LDB_SUCCESS) {
893                 talloc_free(res);
894                 return ret;
895         }
896         
897         seqr = talloc_get_type(res->extended->data,
898                                struct ldb_seqnum_result);
899         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
900                 ret = LDB_ERR_OPERATIONS_ERROR;
901                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
902                 talloc_free(res);
903                 return ret;
904         } else {
905                 *seq_number = seqr->seq_num;
906         }
907         talloc_free(res);
908         return LDB_SUCCESS;
909 }
910
911 /* FIXME: This function is still semi-async */
912 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
913 {
914         int ret;
915         unsigned int i;
916         uint64_t seq_number = 0;
917         uint64_t timestamp_sequence = 0;
918         uint64_t timestamp = 0;
919         struct partition_private_data *data = talloc_get_type(module->private_data, 
920                                                               struct partition_private_data);
921         struct ldb_seqnum_request *seq;
922         struct ldb_seqnum_result *seqr;
923         struct ldb_request *treq;
924         struct ldb_seqnum_request *tseq;
925         struct ldb_seqnum_result *tseqr;
926         struct ldb_extended *ext;
927         struct ldb_result *res;
928         struct dsdb_partition *p;
929
930         p = find_partition(data, NULL, req);
931         if (p != NULL) {
932                 /* the caller specified what partition they want the
933                  * sequence number operation on - just pass it on
934                  */
935                 return ldb_next_request(p->module, req);                
936         }
937
938         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
939
940         switch (seq->type) {
941         case LDB_SEQ_NEXT:
942         case LDB_SEQ_HIGHEST_SEQ:
943
944                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
945                 if (ret != LDB_SUCCESS) {
946                         return ret;
947                 }
948
949                 /* Skip the lot if 'data' isn't here yet (initialisation) */
950                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
951
952                         res = talloc_zero(req, struct ldb_result);
953                         if (res == NULL) {
954                                 return ldb_oom(ldb_module_get_ctx(module));
955                         }
956                         tseq = talloc_zero(res, struct ldb_seqnum_request);
957                         if (tseq == NULL) {
958                                 talloc_free(res);
959                                 return ldb_oom(ldb_module_get_ctx(module));
960                         }
961                         tseq->type = seq->type;
962
963                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
964                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
965                                                      tseq,
966                                                      NULL,
967                                                      res,
968                                                      ldb_extended_default_callback,
969                                                      NULL);
970                         if (ret != LDB_SUCCESS) {
971                                 talloc_free(res);
972                                 return ret;
973                         }
974
975                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
976                                 ret = ldb_request_add_control(treq,
977                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
978                                                               false, data->partitions[i]->ctrl);
979                                 if (ret != LDB_SUCCESS) {
980                                         talloc_free(res);
981                                         return ret;
982                                 }
983                         }
984
985                         ret = partition_request(data->partitions[i]->module, treq);
986                         if (ret != LDB_SUCCESS) {
987                                 talloc_free(res);
988                                 return ret;
989                         }
990                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
991                         if (ret != LDB_SUCCESS) {
992                                 talloc_free(res);
993                                 return ret;
994                         }
995                         tseqr = talloc_get_type(res->extended->data,
996                                                 struct ldb_seqnum_result);
997                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
998                                 timestamp_sequence = MAX(timestamp_sequence,
999                                                          tseqr->seq_num);
1000                         } else {
1001                                 seq_number += tseqr->seq_num;
1002                         }
1003                         talloc_free(res);
1004                 }
1005                 /* fall through */
1006         case LDB_SEQ_HIGHEST_TIMESTAMP:
1007
1008                 res = talloc_zero(req, struct ldb_result);
1009                 if (res == NULL) {
1010                         return ldb_oom(ldb_module_get_ctx(module));
1011                 }
1012
1013                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1014                 if (tseq == NULL) {
1015                         talloc_free(res);
1016                         return ldb_oom(ldb_module_get_ctx(module));
1017                 }
1018                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1019
1020                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1021                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1022                                              tseq,
1023                                              NULL,
1024                                              res,
1025                                              ldb_extended_default_callback,
1026                                              NULL);
1027                 if (ret != LDB_SUCCESS) {
1028                         talloc_free(res);
1029                         return ret;
1030                 }
1031
1032                 ret = ldb_next_request(module, treq);
1033                 if (ret != LDB_SUCCESS) {
1034                         talloc_free(res);
1035                         return ret;
1036                 }
1037                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1038                 if (ret != LDB_SUCCESS) {
1039                         talloc_free(res);
1040                         return ret;
1041                 }
1042
1043                 tseqr = talloc_get_type(res->extended->data,
1044                                            struct ldb_seqnum_result);
1045                 timestamp = tseqr->seq_num;
1046
1047                 talloc_free(res);
1048
1049                 /* Skip the lot if 'data' isn't here yet (initialisation) */
1050                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1051
1052                         res = talloc_zero(req, struct ldb_result);
1053                         if (res == NULL) {
1054                                 return ldb_oom(ldb_module_get_ctx(module));
1055                         }
1056
1057                         tseq = talloc_zero(res, struct ldb_seqnum_request);
1058                         if (tseq == NULL) {
1059                                 talloc_free(res);
1060                                 return ldb_oom(ldb_module_get_ctx(module));
1061                         }
1062                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1063
1064                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1065                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
1066                                                      tseq,
1067                                                      NULL,
1068                                                      res,
1069                                                      ldb_extended_default_callback,
1070                                                      NULL);
1071                         if (ret != LDB_SUCCESS) {
1072                                 talloc_free(res);
1073                                 return ret;
1074                         }
1075
1076                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
1077                                 ret = ldb_request_add_control(treq,
1078                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
1079                                                               false, data->partitions[i]->ctrl);
1080                                 if (ret != LDB_SUCCESS) {
1081                                         talloc_free(res);
1082                                         return ret;
1083                                 }
1084                         }
1085
1086                         ret = partition_request(data->partitions[i]->module, treq);
1087                         if (ret != LDB_SUCCESS) {
1088                                 talloc_free(res);
1089                                 return ret;
1090                         }
1091                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1092                         if (ret != LDB_SUCCESS) {
1093                                 talloc_free(res);
1094                                 return ret;
1095                         }
1096
1097                         tseqr = talloc_get_type(res->extended->data,
1098                                                   struct ldb_seqnum_result);
1099                         timestamp = MAX(timestamp, tseqr->seq_num);
1100
1101                         talloc_free(res);
1102                 }
1103
1104                 break;
1105         }
1106
1107         ext = talloc_zero(req, struct ldb_extended);
1108         if (!ext) {
1109                 return ldb_oom(ldb_module_get_ctx(module));
1110         }
1111         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1112         if (seqr == NULL) {
1113                 talloc_free(ext);
1114                 return ldb_oom(ldb_module_get_ctx(module));
1115         }
1116         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1117         ext->data = seqr;
1118
1119         switch (seq->type) {
1120         case LDB_SEQ_NEXT:
1121         case LDB_SEQ_HIGHEST_SEQ:
1122
1123                 /* Has someone above set a timebase sequence? */
1124                 if (timestamp_sequence) {
1125                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1126                 } else {
1127                         seqr->seq_num = seq_number;
1128                 }
1129
1130                 if (timestamp_sequence > seqr->seq_num) {
1131                         seqr->seq_num = timestamp_sequence;
1132                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1133                 }
1134
1135                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1136                 break;
1137         case LDB_SEQ_HIGHEST_TIMESTAMP:
1138                 seqr->seq_num = timestamp;
1139                 break;
1140         }
1141
1142         if (seq->type == LDB_SEQ_NEXT) {
1143                 seqr->seq_num++;
1144         }
1145
1146         /* send request done */
1147         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1148 }
1149
1150 /* extended */
1151 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1152 {
1153         struct partition_private_data *data;
1154         struct partition_context *ac;
1155         int ret;
1156
1157         data = talloc_get_type(module->private_data, struct partition_private_data);
1158         if (!data) {
1159                 return ldb_next_request(module, req);
1160         }
1161
1162         ret = partition_reload_if_required(module, data);
1163         if (ret != LDB_SUCCESS) {
1164                 return ret;
1165         }
1166         
1167         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1168                 return partition_sequence_number(module, req);
1169         }
1170
1171         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1172                 return partition_create(module, req);
1173         }
1174
1175         /* 
1176          * as the extended operation has no dn
1177          * we need to send it to all partitions
1178          */
1179
1180         ac = partition_init_ctx(module, req);
1181         if (!ac) {
1182                 return ldb_operr(ldb_module_get_ctx(module));
1183         }
1184
1185         return partition_send_all(module, ac, req);
1186 }
1187
1188 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1189         .name              = "partition",
1190         .init_context      = partition_init,
1191         .search            = partition_search,
1192         .add               = partition_add,
1193         .modify            = partition_modify,
1194         .del               = partition_delete,
1195         .rename            = partition_rename,
1196         .extended          = partition_extended,
1197         .start_transaction = partition_start_trans,
1198         .prepare_commit    = partition_prepare_commit,
1199         .end_transaction   = partition_end_trans,
1200         .del_transaction   = partition_del_trans,
1201 };