s4/dsdb: Fixed partition_search() not to pass special DN's to LDAP backend.
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct partition_private_data *data;
138         struct ldb_control *partition_ctrl;
139
140         ac = talloc_get_type(req->context, struct partition_context);
141         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
142
143         if (!ares) {
144                 return ldb_module_done(ac->req, NULL, NULL,
145                                         LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150                 /* If we didn't fan this request out to mulitple partitions,
151                  * or this is an individual search result, we can
152                  * deterministily tell the caller what partition this was
153                  * written to (repl_meta_data likes to know) */
154                 ret = ldb_reply_add_control(ares,
155                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
156                                             false, partition_ctrl->data);
157                 if (ret != LDB_SUCCESS) {
158                         return ldb_module_done(ac->req, NULL, NULL,
159                                                ret);
160                 }
161         }
162
163         if (ares->error != LDB_SUCCESS) {
164                 return ldb_module_done(ac->req, ares->controls,
165                                         ares->response, ares->error);
166         }
167
168         switch (ares->type) {
169         case LDB_REPLY_REFERRAL:
170                 return ldb_module_send_referral(ac->req, ares->referral);
171
172         case LDB_REPLY_ENTRY:
173                 if (ac->req->operation != LDB_SEARCH) {
174                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
175                                 "partition_req_callback:"
176                                 " Unsupported reply type for this request");
177                         return ldb_module_done(ac->req, NULL, NULL,
178                                                 LDB_ERR_OPERATIONS_ERROR);
179                 }
180                 
181                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182
183         case LDB_REPLY_DONE:
184                 if (ac->req->operation == LDB_EXTENDED) {
185                         /* FIXME: check for ares->response, replmd does not fill it ! */
186                         if (ares->response) {
187                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
189                                                           "partition_req_callback:"
190                                                           " Unknown extended reply, "
191                                                           "only supports START_TLS");
192                                         talloc_free(ares);
193                                         return ldb_module_done(ac->req, NULL, NULL,
194                                                                 LDB_ERR_OPERATIONS_ERROR);
195                                 }
196                         }
197                 }
198
199                 ac->finished_requests++;
200                 if (ac->finished_requests == ac->num_requests) {
201                         /* Send back referrals if they do exist (search ops) */
202                         if (ac->referrals != NULL) {
203                                 const char **ref;
204                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
205                                         ret = ldb_module_send_referral(ac->req,
206                                                                        talloc_strdup(ac->req, *ref));
207                                         if (ret != LDB_SUCCESS) {
208                                                 return ldb_module_done(ac->req, NULL, NULL,
209                                                                        ret);
210                                         }
211                                 }
212                         }
213
214                         /* this was the last one, call callback */
215                         return ldb_module_done(ac->req, ares->controls,
216                                                ares->response, 
217                                                ares->error);
218                 }
219
220                 /* not the last, now call the next one */
221                 module = ac->part_req[ac->finished_requests].module;
222                 nreq = ac->part_req[ac->finished_requests].req;
223
224                 ret = partition_request(module, nreq);
225                 if (ret != LDB_SUCCESS) {
226                         talloc_free(ares);
227                         return ldb_module_done(ac->req, NULL, NULL, ret);
228                 }
229
230                 break;
231         }
232
233         talloc_free(ares);
234         return LDB_SUCCESS;
235 }
236
237 static int partition_prep_request(struct partition_context *ac,
238                                   struct dsdb_partition *partition)
239 {
240         int ret;
241         struct ldb_request *req;
242
243         ac->part_req = talloc_realloc(ac, ac->part_req,
244                                         struct part_request,
245                                         ac->num_requests + 1);
246         if (ac->part_req == NULL) {
247                 ldb_oom(ldb_module_get_ctx(ac->module));
248                 return LDB_ERR_OPERATIONS_ERROR;
249         }
250
251         switch (ac->req->operation) {
252         case LDB_SEARCH:
253                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
254                                         ac->part_req,
255                                         ac->req->op.search.base,
256                                         ac->req->op.search.scope,
257                                         ac->req->op.search.tree,
258                                         ac->req->op.search.attrs,
259                                         ac->req->controls,
260                                         ac, partition_req_callback,
261                                         ac->req);
262                 break;
263         case LDB_ADD:
264                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
265                                         ac->req->op.add.message,
266                                         ac->req->controls,
267                                         ac, partition_req_callback,
268                                         ac->req);
269                 break;
270         case LDB_MODIFY:
271                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
272                                         ac->req->op.mod.message,
273                                         ac->req->controls,
274                                         ac, partition_req_callback,
275                                         ac->req);
276                 break;
277         case LDB_DELETE:
278                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
279                                         ac->req->op.del.dn,
280                                         ac->req->controls,
281                                         ac, partition_req_callback,
282                                         ac->req);
283                 break;
284         case LDB_RENAME:
285                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
286                                         ac->req->op.rename.olddn,
287                                         ac->req->op.rename.newdn,
288                                         ac->req->controls,
289                                         ac, partition_req_callback,
290                                         ac->req);
291                 break;
292         case LDB_EXTENDED:
293                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
294                                         ac->part_req,
295                                         ac->req->op.extended.oid,
296                                         ac->req->op.extended.data,
297                                         ac->req->controls,
298                                         ac, partition_req_callback,
299                                         ac->req);
300                 break;
301         default:
302                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
303                                   "Unsupported request type!");
304                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
305         }
306
307         if (ret != LDB_SUCCESS) {
308                 return ret;
309         }
310
311         ac->part_req[ac->num_requests].req = req;
312
313         if (ac->req->controls) {
314                 req->controls = talloc_memdup(req, ac->req->controls,
315                                         talloc_get_size(ac->req->controls));
316                 if (req->controls == NULL) {
317                         ldb_oom(ldb_module_get_ctx(ac->module));
318                         return LDB_ERR_OPERATIONS_ERROR;
319                 }
320         }
321
322         if (partition) {
323                 ac->part_req[ac->num_requests].module = partition->module;
324
325                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
326                         ret = ldb_request_add_control(req,
327                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
328                                                       false, partition->ctrl);
329                         if (ret != LDB_SUCCESS) {
330                                 return ret;
331                         }
332                 }
333
334                 if (req->operation == LDB_SEARCH) {
335                         /* If the search is for 'more' than this partition,
336                          * then change the basedn, so a remote LDAP server
337                          * doesn't object */
338                         if (ldb_dn_compare_base(partition->ctrl->dn,
339                                                 req->op.search.base) != 0) {
340                                 req->op.search.base = partition->ctrl->dn;
341                         }
342                 }
343
344         } else {
345                 /* make sure you put the module here, or
346                  * or ldb_next_request() will skip a module */
347                 ac->part_req[ac->num_requests].module = ac->module;
348         }
349
350         ac->num_requests++;
351
352         return LDB_SUCCESS;
353 }
354
355 static int partition_call_first(struct partition_context *ac)
356 {
357         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
358 }
359
360 /**
361  * Send a request down to all the partitions
362  */
363 static int partition_send_all(struct ldb_module *module, 
364                               struct partition_context *ac, 
365                               struct ldb_request *req) 
366 {
367         unsigned int i;
368         struct partition_private_data *data = talloc_get_type(module->private_data, 
369                                                               struct partition_private_data);
370         int ret = partition_prep_request(ac, NULL);
371         if (ret != LDB_SUCCESS) {
372                 return ret;
373         }
374         for (i=0; data && data->partitions && data->partitions[i]; i++) {
375                 ret = partition_prep_request(ac, data->partitions[i]);
376                 if (ret != LDB_SUCCESS) {
377                         return ret;
378                 }
379         }
380
381         /* fire the first one */
382         return partition_call_first(ac);
383 }
384
385 /**
386  * Figure out which backend a request needs to be aimed at.  Some
387  * requests must be replicated to all backends
388  */
389 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
390 {
391         struct partition_context *ac;
392         unsigned int i;
393         int ret;
394         struct dsdb_partition *partition;
395         struct partition_private_data *data = talloc_get_type(module->private_data, 
396                                                               struct partition_private_data);
397         if (!data || !data->partitions) {
398                 return ldb_next_request(module, req);
399         }
400
401         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
402                 /* Is this a special DN, we need to replicate to every backend? */
403                 for (i=0; data->replicate && data->replicate[i]; i++) {
404                         if (ldb_dn_compare(data->replicate[i], 
405                                            dn) == 0) {
406                                 
407                                 ac = partition_init_ctx(module, req);
408                                 if (!ac) {
409                                         return LDB_ERR_OPERATIONS_ERROR;
410                                 }
411                                 
412                                 return partition_send_all(module, ac, req);
413                         }
414                 }
415         }
416
417         /* Otherwise, we need to find the partition to fire it to */
418
419         /* Find partition */
420         partition = find_partition(data, dn, req);
421         if (!partition) {
422                 /*
423                  * if we haven't found a matching partition
424                  * pass the request to the main ldb
425                  *
426                  * TODO: we should maybe return an error here
427                  *       if it's not a special dn
428                  */
429
430                 return ldb_next_request(module, req);
431         }
432
433         ac = partition_init_ctx(module, req);
434         if (!ac) {
435                 return LDB_ERR_OPERATIONS_ERROR;
436         }
437
438         /* we need to add a control but we never touch the original request */
439         ret = partition_prep_request(ac, partition);
440         if (ret != LDB_SUCCESS) {
441                 return ret;
442         }
443
444         /* fire the first one */
445         return partition_call_first(ac);
446 }
447
448 /* search */
449 static int partition_search(struct ldb_module *module, struct ldb_request *req)
450 {
451         struct ldb_control **saved_controls;
452         /* Find backend */
453         struct partition_private_data *data = talloc_get_type(module->private_data, 
454                                                               struct partition_private_data);
455         struct partition_context *ac;
456         struct ldb_context *ldb;
457         struct loadparm_context *lp_ctx;
458
459         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
460         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
461         
462         struct ldb_search_options_control *search_options = NULL;
463         struct dsdb_partition *p;
464         unsigned int i, j;
465         int ret;
466         bool domain_scope = false, phantom_root = false;
467         
468         ret = partition_reload_if_required(module, data);
469         if (ret != LDB_SUCCESS) {
470                 return ret;
471         }
472
473         if (!ldb_dn_is_special(req->op.search.base)) {
474                 p = find_partition(data, NULL, req);
475                 if (p != NULL) {
476                         /* the caller specified what partition they want the
477                         * search - just pass it on
478                         */
479                         return ldb_next_request(p->module, req);
480                 }
481         }
482
483         /* Get back the search options from the search control, and mark it as
484          * non-critical (to make backends and also dcpromo happy).
485          */
486         if (search_control) {
487                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
488                 search_control->critical = 0;
489
490         }
491
492         /* Remove the "domain_scope" control, so we don't confuse a backend
493          * server */
494         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
495                 ldb_oom(ldb_module_get_ctx(module));
496                 return LDB_ERR_OPERATIONS_ERROR;
497         }
498
499         /* Locate the options */
500         domain_scope = (search_options
501                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
502                 || domain_scope_control;
503         phantom_root = search_options
504                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
505
506         /* Remove handled options from the search control flag */
507         if (search_options) {
508                 search_options->search_options = search_options->search_options
509                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
510                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
511         }
512
513         if (!data || !data->partitions) {
514                 return ldb_next_request(module, req);
515         }
516
517         ac = partition_init_ctx(module, req);
518         if (!ac) {
519                 return LDB_ERR_OPERATIONS_ERROR;
520         }
521
522         ldb = ldb_module_get_ctx(ac->module);
523         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
524                                                 struct loadparm_context);
525
526         /* Search from the base DN */
527         if (ldb_dn_is_null(req->op.search.base)) {
528                 return partition_send_all(module, ac, req);
529         }
530
531         for (i=0; data->partitions[i]; i++) {
532                 bool match = false, stop = false;
533
534                 if (phantom_root) {
535                         /* Phantom root: Find all partitions under the
536                          * search base. We match if:
537                          *
538                          * 1) the DN we are looking for exactly matches a
539                          *    certain partition and always stop
540                          * 2) the DN we are looking for is a parent of certain
541                          *    partitions and it isn't a scope base search
542                          * 3) the DN we are looking for is a child of a certain
543                          *    partition and always stop
544                          *    - we don't need to go any further up in the
545                          *    hierarchy!
546                          */
547                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
548                                            req->op.search.base) == 0) {
549                                 match = true;
550                                 stop = true;
551                         }
552                         if (!match &&
553                             (ldb_dn_compare_base(req->op.search.base,
554                                                  data->partitions[i]->ctrl->dn) == 0 &&
555                              req->op.search.scope != LDB_SCOPE_BASE)) {
556                                 match = true;
557                         }
558                         if (!match &&
559                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
560                                                 req->op.search.base) == 0) {
561                                 match = true;
562                                 stop = true; /* note that this relies on partition ordering */
563                         }
564                 } else {
565                         /* Domain scope: Find all partitions under the search
566                          * base.
567                          *
568                          * We generate referral candidates if we haven't
569                          * specified the domain scope control, haven't a base
570                          * search* scope and the DN we are looking for is a real
571                          * predecessor of certain partitions. When a new
572                          * referral candidate is nearer to the DN than an
573                          * existing one delete the latter (we want to have only
574                          * the closest ones). When we checked this for all
575                          * candidates we have the final referrals.
576                          *
577                          * We match if the DN we are looking for is a child of
578                          * a certain partition or the partition
579                          * DN itself - we don't need to go any further
580                          * up in the hierarchy!
581                          */
582                         if ((!domain_scope) &&
583                             (req->op.search.scope != LDB_SCOPE_BASE) &&
584                             (ldb_dn_compare_base(req->op.search.base,
585                                                  data->partitions[i]->ctrl->dn) == 0) &&
586                             (ldb_dn_compare(req->op.search.base,
587                                             data->partitions[i]->ctrl->dn) != 0)) {
588                                 char *ref = talloc_asprintf(ac,
589                                                             "ldap://%s/%s%s",
590                                                             lp_dnsdomain(lp_ctx),
591                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
592                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
593
594                                 if (ref == NULL) {
595                                         ldb_oom(ldb);
596                                         return LDB_ERR_OPERATIONS_ERROR;
597                                 }
598
599                                 /* Initialise the referrals list */
600                                 if (ac->referrals == NULL) {
601                                         ac->referrals = (const char **) str_list_make_empty(ac);
602                                         if (ac->referrals == NULL) {
603                                                 ldb_oom(ldb);
604                                                 return LDB_ERR_OPERATIONS_ERROR;
605                                         }
606                                 }
607
608                                 /* Check if the new referral candidate is
609                                  * closer to the base DN than already
610                                  * saved ones and delete the latters */
611                                 j = 0;
612                                 while (ac->referrals[j] != NULL) {
613                                         if (strstr(ac->referrals[j],
614                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
615                                                 str_list_remove(ac->referrals,
616                                                                 ac->referrals[j]);
617                                         } else {
618                                                 ++j;
619                                         }
620                                 }
621
622                                 /* Add our new candidate */
623                                 ac->referrals = str_list_add(ac->referrals, ref);
624
625                                 talloc_free(ref);
626
627                                 if (ac->referrals == NULL) {
628                                         ldb_oom(ldb);
629                                         return LDB_ERR_OPERATIONS_ERROR;
630                                 }
631                         }
632                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
633                                 match = true;
634                                 stop = true; /* note that this relies on partition ordering */
635                         }
636                 }
637
638                 if (match) {
639                         ret = partition_prep_request(ac, data->partitions[i]);
640                         if (ret != LDB_SUCCESS) {
641                                 return ret;
642                         }
643                 }
644
645                 if (stop) break;
646         }
647
648         /* Perhaps we didn't match any partitions. Try the main partition */
649         if (ac->num_requests == 0) {
650                 talloc_free(ac);
651                 return ldb_next_request(module, req);
652         }
653
654         /* fire the first one */
655         return partition_call_first(ac);
656 }
657
658 /* add */
659 static int partition_add(struct ldb_module *module, struct ldb_request *req)
660 {
661         return partition_replicate(module, req, req->op.add.message->dn);
662 }
663
664 /* modify */
665 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
666 {
667         return partition_replicate(module, req, req->op.mod.message->dn);
668 }
669
670 /* delete */
671 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
672 {
673         return partition_replicate(module, req, req->op.del.dn);
674 }
675
676 /* rename */
677 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
678 {
679         /* Find backend */
680         struct dsdb_partition *backend, *backend2;
681         
682         struct partition_private_data *data = talloc_get_type(module->private_data, 
683                                                               struct partition_private_data);
684
685         /* Skip the lot if 'data' isn't here yet (initialisation) */
686         if (!data) {
687                 return LDB_ERR_OPERATIONS_ERROR;
688         }
689
690         backend = find_partition(data, req->op.rename.olddn, req);
691         backend2 = find_partition(data, req->op.rename.newdn, req);
692
693         if ((backend && !backend2) || (!backend && backend2)) {
694                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
695         }
696
697         if (backend != backend2) {
698                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
699                                        "Cannot rename from %s in %s to %s in %s: %s",
700                                        ldb_dn_get_linearized(req->op.rename.olddn),
701                                        ldb_dn_get_linearized(backend->ctrl->dn),
702                                        ldb_dn_get_linearized(req->op.rename.newdn),
703                                        ldb_dn_get_linearized(backend2->ctrl->dn),
704                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
705                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
706         }
707
708         return partition_replicate(module, req, req->op.rename.olddn);
709 }
710
711 /* start a transaction */
712 static int partition_start_trans(struct ldb_module *module)
713 {
714         unsigned int i;
715         int ret;
716         struct partition_private_data *data = talloc_get_type(module->private_data, 
717                                                               struct partition_private_data);
718         /* Look at base DN */
719         /* Figure out which partition it is under */
720         /* Skip the lot if 'data' isn't here yet (initialization) */
721         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
722                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
723         }
724         ret = ldb_next_start_trans(module);
725         if (ret != LDB_SUCCESS) {
726                 return ret;
727         }
728
729         ret = partition_reload_if_required(module, data);
730         if (ret != LDB_SUCCESS) {
731                 return ret;
732         }
733
734         for (i=0; data && data->partitions && data->partitions[i]; i++) {
735                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
736                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
737                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
738                 }
739                 ret = ldb_next_start_trans(data->partitions[i]->module);
740                 if (ret != LDB_SUCCESS) {
741                         /* Back it out, if it fails on one */
742                         for (i--; i >= 0; i--) {
743                                 ldb_next_del_trans(data->partitions[i]->module);
744                         }
745                         ldb_next_del_trans(module);
746                         return ret;
747                 }
748         }
749
750         data->in_transaction++;
751
752         return LDB_SUCCESS;
753 }
754
755 /* prepare for a commit */
756 static int partition_prepare_commit(struct ldb_module *module)
757 {
758         unsigned int i;
759         struct partition_private_data *data = talloc_get_type(module->private_data, 
760                                                               struct partition_private_data);
761
762         for (i=0; data && data->partitions && data->partitions[i]; i++) {
763                 int ret;
764
765                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
766                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
767                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
768                 }
769                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
770                 if (ret != LDB_SUCCESS) {
771                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s",
772                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
773                                                ldb_errstring(module->ldb));
774                         return ret;
775                 }
776         }
777
778         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
779                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
780         }
781         return ldb_next_prepare_commit(module);
782 }
783
784
785 /* end a transaction */
786 static int partition_end_trans(struct ldb_module *module)
787 {
788         int ret, ret2;
789         unsigned int i;
790         struct partition_private_data *data = talloc_get_type(module->private_data, 
791                                                               struct partition_private_data);
792
793         ret = LDB_SUCCESS;
794
795         if (data->in_transaction == 0) {
796                 DEBUG(0,("partition end transaction mismatch\n"));
797                 ret = LDB_ERR_OPERATIONS_ERROR;
798         } else {
799                 data->in_transaction--;
800         }
801
802         for (i=0; data && data->partitions && data->partitions[i]; i++) {
803                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
804                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
805                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
806                 }
807                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
808                 if (ret2 != LDB_SUCCESS) {
809                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s",
810                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
811                                                ldb_errstring(module->ldb));
812                         ret = ret2;
813                 }
814         }
815
816         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
817                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
818         }
819         ret2 = ldb_next_end_trans(module);
820         if (ret2 != LDB_SUCCESS) {
821                 ret = ret2;
822         }
823         return ret;
824 }
825
826 /* delete a transaction */
827 static int partition_del_trans(struct ldb_module *module)
828 {
829         int ret, final_ret = LDB_SUCCESS;
830         unsigned int i;
831         struct partition_private_data *data = talloc_get_type(module->private_data, 
832                                                               struct partition_private_data);
833         for (i=0; data && data->partitions && data->partitions[i]; i++) {
834                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
835                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
836                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
837                 }
838                 ret = ldb_next_del_trans(data->partitions[i]->module);
839                 if (ret != LDB_SUCCESS) {
840                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s",
841                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
842                                                ldb_errstring(module->ldb));
843                         final_ret = ret;
844                 }
845         }       
846
847         if (data->in_transaction == 0) {
848                 DEBUG(0,("partition del transaction mismatch\n"));
849                 return LDB_ERR_OPERATIONS_ERROR;
850         }
851         data->in_transaction--;
852
853         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
854                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
855         }
856         ret = ldb_next_del_trans(module);
857         if (ret != LDB_SUCCESS) {
858                 final_ret = ret;
859         }
860         return final_ret;
861 }
862
863 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
864                                      enum ldb_sequence_type type, uint64_t *seq_number) 
865 {
866         int ret;
867         struct ldb_result *res;
868         struct ldb_seqnum_request *tseq;
869         struct ldb_request *treq;
870         struct ldb_seqnum_result *seqr;
871         res = talloc_zero(mem_ctx, struct ldb_result);
872         if (res == NULL) {
873                 return LDB_ERR_OPERATIONS_ERROR;
874         }
875         tseq = talloc_zero(res, struct ldb_seqnum_request);
876         if (tseq == NULL) {
877                 talloc_free(res);
878                 return LDB_ERR_OPERATIONS_ERROR;
879         }
880         tseq->type = type;
881         
882         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
883                                      LDB_EXTENDED_SEQUENCE_NUMBER,
884                                      tseq,
885                                      NULL,
886                                      res,
887                                      ldb_extended_default_callback,
888                                      NULL);
889         if (ret != LDB_SUCCESS) {
890                 talloc_free(res);
891                 return ret;
892         }
893         
894         ret = ldb_next_request(module, treq);
895         if (ret != LDB_SUCCESS) {
896                 talloc_free(res);
897                 return ret;
898         }
899         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
900         if (ret != LDB_SUCCESS) {
901                 talloc_free(res);
902                 return ret;
903         }
904         
905         seqr = talloc_get_type(res->extended->data,
906                                struct ldb_seqnum_result);
907         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
908                 ret = LDB_ERR_OPERATIONS_ERROR;
909                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
910                 talloc_free(res);
911                 return ret;
912         } else {
913                 *seq_number = seqr->seq_num;
914         }
915         talloc_free(res);
916         return LDB_SUCCESS;
917 }
918
919 /* FIXME: This function is still semi-async */
920 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
921 {
922         int ret;
923         unsigned int i;
924         uint64_t seq_number = 0;
925         uint64_t timestamp_sequence = 0;
926         uint64_t timestamp = 0;
927         struct partition_private_data *data = talloc_get_type(module->private_data, 
928                                                               struct partition_private_data);
929         struct ldb_seqnum_request *seq;
930         struct ldb_seqnum_result *seqr;
931         struct ldb_request *treq;
932         struct ldb_seqnum_request *tseq;
933         struct ldb_seqnum_result *tseqr;
934         struct ldb_extended *ext;
935         struct ldb_result *res;
936         struct dsdb_partition *p;
937
938         p = find_partition(data, NULL, req);
939         if (p != NULL) {
940                 /* the caller specified what partition they want the
941                  * sequence number operation on - just pass it on
942                  */
943                 return ldb_next_request(p->module, req);                
944         }
945
946         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
947
948         switch (seq->type) {
949         case LDB_SEQ_NEXT:
950         case LDB_SEQ_HIGHEST_SEQ:
951
952                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
953                 if (ret != LDB_SUCCESS) {
954                         return ret;
955                 }
956
957                 /* Skip the lot if 'data' isn't here yet (initialisation) */
958                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
959
960                         res = talloc_zero(req, struct ldb_result);
961                         if (res == NULL) {
962                                 return LDB_ERR_OPERATIONS_ERROR;
963                         }
964                         tseq = talloc_zero(res, struct ldb_seqnum_request);
965                         if (tseq == NULL) {
966                                 talloc_free(res);
967                                 return LDB_ERR_OPERATIONS_ERROR;
968                         }
969                         tseq->type = seq->type;
970
971                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
972                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
973                                                      tseq,
974                                                      NULL,
975                                                      res,
976                                                      ldb_extended_default_callback,
977                                                      NULL);
978                         if (ret != LDB_SUCCESS) {
979                                 talloc_free(res);
980                                 return ret;
981                         }
982
983                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
984                                 ret = ldb_request_add_control(treq,
985                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
986                                                               false, data->partitions[i]->ctrl);
987                                 if (ret != LDB_SUCCESS) {
988                                         talloc_free(res);
989                                         return ret;
990                                 }
991                         }
992
993                         ret = partition_request(data->partitions[i]->module, treq);
994                         if (ret != LDB_SUCCESS) {
995                                 talloc_free(res);
996                                 return ret;
997                         }
998                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
999                         if (ret != LDB_SUCCESS) {
1000                                 talloc_free(res);
1001                                 return ret;
1002                         }
1003                         tseqr = talloc_get_type(res->extended->data,
1004                                                 struct ldb_seqnum_result);
1005                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1006                                 timestamp_sequence = MAX(timestamp_sequence,
1007                                                          tseqr->seq_num);
1008                         } else {
1009                                 seq_number += tseqr->seq_num;
1010                         }
1011                         talloc_free(res);
1012                 }
1013                 /* fall through */
1014         case LDB_SEQ_HIGHEST_TIMESTAMP:
1015
1016                 res = talloc_zero(req, struct ldb_result);
1017                 if (res == NULL) {
1018                         return LDB_ERR_OPERATIONS_ERROR;
1019                 }
1020
1021                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1022                 if (tseq == NULL) {
1023                         talloc_free(res);
1024                         return LDB_ERR_OPERATIONS_ERROR;
1025                 }
1026                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1027
1028                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1029                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1030                                              tseq,
1031                                              NULL,
1032                                              res,
1033                                              ldb_extended_default_callback,
1034                                              NULL);
1035                 if (ret != LDB_SUCCESS) {
1036                         talloc_free(res);
1037                         return ret;
1038                 }
1039
1040                 ret = ldb_next_request(module, treq);
1041                 if (ret != LDB_SUCCESS) {
1042                         talloc_free(res);
1043                         return ret;
1044                 }
1045                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1046                 if (ret != LDB_SUCCESS) {
1047                         talloc_free(res);
1048                         return ret;
1049                 }
1050
1051                 tseqr = talloc_get_type(res->extended->data,
1052                                            struct ldb_seqnum_result);
1053                 timestamp = tseqr->seq_num;
1054
1055                 talloc_free(res);
1056
1057                 /* Skip the lot if 'data' isn't here yet (initialisation) */
1058                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1059
1060                         res = talloc_zero(req, struct ldb_result);
1061                         if (res == NULL) {
1062                                 return LDB_ERR_OPERATIONS_ERROR;
1063                         }
1064
1065                         tseq = talloc_zero(res, struct ldb_seqnum_request);
1066                         if (tseq == NULL) {
1067                                 talloc_free(res);
1068                                 return LDB_ERR_OPERATIONS_ERROR;
1069                         }
1070                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1071
1072                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1073                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
1074                                                      tseq,
1075                                                      NULL,
1076                                                      res,
1077                                                      ldb_extended_default_callback,
1078                                                      NULL);
1079                         if (ret != LDB_SUCCESS) {
1080                                 talloc_free(res);
1081                                 return ret;
1082                         }
1083
1084                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
1085                                 ret = ldb_request_add_control(treq,
1086                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
1087                                                               false, data->partitions[i]->ctrl);
1088                                 if (ret != LDB_SUCCESS) {
1089                                         talloc_free(res);
1090                                         return ret;
1091                                 }
1092                         }
1093
1094                         ret = partition_request(data->partitions[i]->module, treq);
1095                         if (ret != LDB_SUCCESS) {
1096                                 talloc_free(res);
1097                                 return ret;
1098                         }
1099                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1100                         if (ret != LDB_SUCCESS) {
1101                                 talloc_free(res);
1102                                 return ret;
1103                         }
1104
1105                         tseqr = talloc_get_type(res->extended->data,
1106                                                   struct ldb_seqnum_result);
1107                         timestamp = MAX(timestamp, tseqr->seq_num);
1108
1109                         talloc_free(res);
1110                 }
1111
1112                 break;
1113         }
1114
1115         ext = talloc_zero(req, struct ldb_extended);
1116         if (!ext) {
1117                 return LDB_ERR_OPERATIONS_ERROR;
1118         }
1119         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1120         if (seqr == NULL) {
1121                 talloc_free(ext);
1122                 return LDB_ERR_OPERATIONS_ERROR;
1123         }
1124         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1125         ext->data = seqr;
1126
1127         switch (seq->type) {
1128         case LDB_SEQ_NEXT:
1129         case LDB_SEQ_HIGHEST_SEQ:
1130
1131                 /* Has someone above set a timebase sequence? */
1132                 if (timestamp_sequence) {
1133                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1134                 } else {
1135                         seqr->seq_num = seq_number;
1136                 }
1137
1138                 if (timestamp_sequence > seqr->seq_num) {
1139                         seqr->seq_num = timestamp_sequence;
1140                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1141                 }
1142
1143                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1144                 break;
1145         case LDB_SEQ_HIGHEST_TIMESTAMP:
1146                 seqr->seq_num = timestamp;
1147                 break;
1148         }
1149
1150         if (seq->type == LDB_SEQ_NEXT) {
1151                 seqr->seq_num++;
1152         }
1153
1154         /* send request done */
1155         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1156 }
1157
1158 /* extended */
1159 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1160 {
1161         struct partition_private_data *data;
1162         struct partition_context *ac;
1163         int ret;
1164
1165         data = talloc_get_type(module->private_data, struct partition_private_data);
1166         if (!data) {
1167                 return ldb_next_request(module, req);
1168         }
1169
1170         ret = partition_reload_if_required(module, data);
1171         if (ret != LDB_SUCCESS) {
1172                 return ret;
1173         }
1174         
1175         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1176                 return partition_sequence_number(module, req);
1177         }
1178
1179         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1180                 return partition_create(module, req);
1181         }
1182
1183         /* 
1184          * as the extended operation has no dn
1185          * we need to send it to all partitions
1186          */
1187
1188         ac = partition_init_ctx(module, req);
1189         if (!ac) {
1190                 return LDB_ERR_OPERATIONS_ERROR;
1191         }
1192
1193         return partition_send_all(module, ac, req);
1194 }
1195
1196 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1197         .name              = "partition",
1198         .init_context      = partition_init,
1199         .search            = partition_search,
1200         .add               = partition_add,
1201         .modify            = partition_modify,
1202         .del               = partition_delete,
1203         .rename            = partition_rename,
1204         .extended          = partition_extended,
1205         .start_transaction = partition_start_trans,
1206         .prepare_commit    = partition_prepare_commit,
1207         .end_transaction   = partition_end_trans,
1208         .del_transaction   = partition_del_trans,
1209 };