s4:partition LDB module - change counter variables to "unsigned" where appropriate
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         unsigned int num_requests;
45         unsigned int finished_requests;
46
47         const char **referrals;
48 };
49
50 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
51 {
52         struct partition_context *ac;
53
54         ac = talloc_zero(req, struct partition_context);
55         if (ac == NULL) {
56                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
57                 return NULL;
58         }
59
60         ac->module = module;
61         ac->req = req;
62
63         return ac;
64 }
65
66 /*
67  * helper functions to call the next module in chain
68  */
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         unsigned int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct partition_private_data *data;
138         struct ldb_control *partition_ctrl;
139
140         ac = talloc_get_type(req->context, struct partition_context);
141         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
142
143         if (!ares) {
144                 return ldb_module_done(ac->req, NULL, NULL,
145                                         LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150                 /* If we didn't fan this request out to mulitple partitions,
151                  * or this is an individual search result, we can
152                  * deterministily tell the caller what partition this was
153                  * written to (repl_meta_data likes to know) */
154                 ret = ldb_reply_add_control(ares,
155                                             DSDB_CONTROL_CURRENT_PARTITION_OID,
156                                             false, partition_ctrl->data);
157                 if (ret != LDB_SUCCESS) {
158                         return ldb_module_done(ac->req, NULL, NULL,
159                                                ret);
160                 }
161         }
162
163         if (ares->error != LDB_SUCCESS) {
164                 return ldb_module_done(ac->req, ares->controls,
165                                         ares->response, ares->error);
166         }
167
168         switch (ares->type) {
169         case LDB_REPLY_REFERRAL:
170                 /* ignore referrals for now */
171                 break;
172
173         case LDB_REPLY_ENTRY:
174                 if (ac->req->operation != LDB_SEARCH) {
175                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
176                                 "partition_req_callback:"
177                                 " Unsupported reply type for this request");
178                         return ldb_module_done(ac->req, NULL, NULL,
179                                                 LDB_ERR_OPERATIONS_ERROR);
180                 }
181                 
182                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
183
184         case LDB_REPLY_DONE:
185                 if (ac->req->operation == LDB_EXTENDED) {
186                         /* FIXME: check for ares->response, replmd does not fill it ! */
187                         if (ares->response) {
188                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
189                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
190                                                           "partition_req_callback:"
191                                                           " Unknown extended reply, "
192                                                           "only supports START_TLS");
193                                         talloc_free(ares);
194                                         return ldb_module_done(ac->req, NULL, NULL,
195                                                                 LDB_ERR_OPERATIONS_ERROR);
196                                 }
197                         }
198                 }
199
200                 ac->finished_requests++;
201                 if (ac->finished_requests == ac->num_requests) {
202                         /* Send back referrals if they do exist (search ops) */
203                         if (ac->referrals != NULL) {
204                                 const char **ref;
205                                 for (ref = ac->referrals; *ref != NULL; ++ref) {
206                                         ret = ldb_module_send_referral(ac->req,
207                                                                        talloc_strdup(ac->req, *ref));
208                                         if (ret != LDB_SUCCESS) {
209                                                 return ldb_module_done(ac->req, NULL, NULL,
210                                                                        ret);
211                                         }
212                                 }
213                         }
214
215                         /* this was the last one, call callback */
216                         return ldb_module_done(ac->req, ares->controls,
217                                                ares->response, 
218                                                ares->error);
219                 }
220
221                 /* not the last, now call the next one */
222                 module = ac->part_req[ac->finished_requests].module;
223                 nreq = ac->part_req[ac->finished_requests].req;
224
225                 ret = partition_request(module, nreq);
226                 if (ret != LDB_SUCCESS) {
227                         talloc_free(ares);
228                         return ldb_module_done(ac->req, NULL, NULL, ret);
229                 }
230
231                 break;
232         }
233
234         talloc_free(ares);
235         return LDB_SUCCESS;
236 }
237
238 static int partition_prep_request(struct partition_context *ac,
239                                   struct dsdb_partition *partition)
240 {
241         int ret;
242         struct ldb_request *req;
243
244         ac->part_req = talloc_realloc(ac, ac->part_req,
245                                         struct part_request,
246                                         ac->num_requests + 1);
247         if (ac->part_req == NULL) {
248                 ldb_oom(ldb_module_get_ctx(ac->module));
249                 return LDB_ERR_OPERATIONS_ERROR;
250         }
251
252         switch (ac->req->operation) {
253         case LDB_SEARCH:
254                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
255                                         ac->part_req,
256                                         ac->req->op.search.base,
257                                         ac->req->op.search.scope,
258                                         ac->req->op.search.tree,
259                                         ac->req->op.search.attrs,
260                                         ac->req->controls,
261                                         ac, partition_req_callback,
262                                         ac->req);
263                 break;
264         case LDB_ADD:
265                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
266                                         ac->req->op.add.message,
267                                         ac->req->controls,
268                                         ac, partition_req_callback,
269                                         ac->req);
270                 break;
271         case LDB_MODIFY:
272                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273                                         ac->req->op.mod.message,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 break;
278         case LDB_DELETE:
279                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280                                         ac->req->op.del.dn,
281                                         ac->req->controls,
282                                         ac, partition_req_callback,
283                                         ac->req);
284                 break;
285         case LDB_RENAME:
286                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
287                                         ac->req->op.rename.olddn,
288                                         ac->req->op.rename.newdn,
289                                         ac->req->controls,
290                                         ac, partition_req_callback,
291                                         ac->req);
292                 break;
293         case LDB_EXTENDED:
294                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
295                                         ac->part_req,
296                                         ac->req->op.extended.oid,
297                                         ac->req->op.extended.data,
298                                         ac->req->controls,
299                                         ac, partition_req_callback,
300                                         ac->req);
301                 break;
302         default:
303                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
304                                   "Unsupported request type!");
305                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
306         }
307
308         if (ret != LDB_SUCCESS) {
309                 return ret;
310         }
311
312         ac->part_req[ac->num_requests].req = req;
313
314         if (ac->req->controls) {
315                 req->controls = talloc_memdup(req, ac->req->controls,
316                                         talloc_get_size(ac->req->controls));
317                 if (req->controls == NULL) {
318                         ldb_oom(ldb_module_get_ctx(ac->module));
319                         return LDB_ERR_OPERATIONS_ERROR;
320                 }
321         }
322
323         if (partition) {
324                 ac->part_req[ac->num_requests].module = partition->module;
325
326                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
327                         ret = ldb_request_add_control(req,
328                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
329                                                       false, partition->ctrl);
330                         if (ret != LDB_SUCCESS) {
331                                 return ret;
332                         }
333                 }
334
335                 if (req->operation == LDB_SEARCH) {
336                         /* If the search is for 'more' than this partition,
337                          * then change the basedn, so a remote LDAP server
338                          * doesn't object */
339                         if (ldb_dn_compare_base(partition->ctrl->dn,
340                                                 req->op.search.base) != 0) {
341                                 req->op.search.base = partition->ctrl->dn;
342                         }
343                 }
344
345         } else {
346                 /* make sure you put the module here, or
347                  * or ldb_next_request() will skip a module */
348                 ac->part_req[ac->num_requests].module = ac->module;
349         }
350
351         ac->num_requests++;
352
353         return LDB_SUCCESS;
354 }
355
356 static int partition_call_first(struct partition_context *ac)
357 {
358         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
359 }
360
361 /**
362  * Send a request down to all the partitions
363  */
364 static int partition_send_all(struct ldb_module *module, 
365                               struct partition_context *ac, 
366                               struct ldb_request *req) 
367 {
368         unsigned int i;
369         struct partition_private_data *data = talloc_get_type(module->private_data, 
370                                                               struct partition_private_data);
371         int ret = partition_prep_request(ac, NULL);
372         if (ret != LDB_SUCCESS) {
373                 return ret;
374         }
375         for (i=0; data && data->partitions && data->partitions[i]; i++) {
376                 ret = partition_prep_request(ac, data->partitions[i]);
377                 if (ret != LDB_SUCCESS) {
378                         return ret;
379                 }
380         }
381
382         /* fire the first one */
383         return partition_call_first(ac);
384 }
385
386 /**
387  * Figure out which backend a request needs to be aimed at.  Some
388  * requests must be replicated to all backends
389  */
390 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
391 {
392         struct partition_context *ac;
393         unsigned int i;
394         int ret;
395         struct dsdb_partition *partition;
396         struct partition_private_data *data = talloc_get_type(module->private_data, 
397                                                               struct partition_private_data);
398         if (!data || !data->partitions) {
399                 return ldb_next_request(module, req);
400         }
401
402         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
403                 /* Is this a special DN, we need to replicate to every backend? */
404                 for (i=0; data->replicate && data->replicate[i]; i++) {
405                         if (ldb_dn_compare(data->replicate[i], 
406                                            dn) == 0) {
407                                 
408                                 ac = partition_init_ctx(module, req);
409                                 if (!ac) {
410                                         return LDB_ERR_OPERATIONS_ERROR;
411                                 }
412                                 
413                                 return partition_send_all(module, ac, req);
414                         }
415                 }
416         }
417
418         /* Otherwise, we need to find the partition to fire it to */
419
420         /* Find partition */
421         partition = find_partition(data, dn, req);
422         if (!partition) {
423                 /*
424                  * if we haven't found a matching partition
425                  * pass the request to the main ldb
426                  *
427                  * TODO: we should maybe return an error here
428                  *       if it's not a special dn
429                  */
430
431                 return ldb_next_request(module, req);
432         }
433
434         ac = partition_init_ctx(module, req);
435         if (!ac) {
436                 return LDB_ERR_OPERATIONS_ERROR;
437         }
438
439         /* we need to add a control but we never touch the original request */
440         ret = partition_prep_request(ac, partition);
441         if (ret != LDB_SUCCESS) {
442                 return ret;
443         }
444
445         /* fire the first one */
446         return partition_call_first(ac);
447 }
448
449 /* search */
450 static int partition_search(struct ldb_module *module, struct ldb_request *req)
451 {
452         struct ldb_control **saved_controls;
453         /* Find backend */
454         struct partition_private_data *data = talloc_get_type(module->private_data, 
455                                                               struct partition_private_data);
456         struct partition_context *ac;
457         struct ldb_context *ldb;
458         struct loadparm_context *lp_ctx;
459
460         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
461         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
462         
463         struct ldb_search_options_control *search_options = NULL;
464         struct dsdb_partition *p;
465         unsigned int i, j;
466         int ret;
467         bool domain_scope = false, phantom_root = false;
468         
469         ret = partition_reload_if_required(module, data);
470         if (ret != LDB_SUCCESS) {
471                 return ret;
472         }
473
474         p = find_partition(data, NULL, req);
475         if (p != NULL) {
476                 /* the caller specified what partition they want the
477                  * search - just pass it on
478                  */
479                 return ldb_next_request(p->module, req);                
480         }
481
482         /* Get back the search options from the search control, and mark it as
483          * non-critical (to make backends and also dcpromo happy).
484          */
485         if (search_control) {
486                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
487                 search_control->critical = 0;
488
489         }
490
491         /* Remove the "domain_scope" control, so we don't confuse a backend
492          * server */
493         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
494                 ldb_oom(ldb_module_get_ctx(module));
495                 return LDB_ERR_OPERATIONS_ERROR;
496         }
497
498         /* Locate the options */
499         domain_scope = (search_options
500                 && (search_options->search_options & LDB_SEARCH_OPTION_DOMAIN_SCOPE))
501                 || domain_scope_control;
502         phantom_root = search_options
503                 && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT);
504
505         /* Remove handled options from the search control flag */
506         if (search_options) {
507                 search_options->search_options = search_options->search_options
508                         & ~LDB_SEARCH_OPTION_DOMAIN_SCOPE
509                         & ~LDB_SEARCH_OPTION_PHANTOM_ROOT;
510         }
511
512         if (!data || !data->partitions) {
513                 return ldb_next_request(module, req);
514         }
515
516         ac = partition_init_ctx(module, req);
517         if (!ac) {
518                 return LDB_ERR_OPERATIONS_ERROR;
519         }
520
521         ldb = ldb_module_get_ctx(ac->module);
522         lp_ctx = talloc_get_type(ldb_get_opaque(ldb, "loadparm"),
523                                                 struct loadparm_context);
524
525         /* Search from the base DN */
526         if (ldb_dn_is_null(req->op.search.base)) {
527                 return partition_send_all(module, ac, req);
528         }
529
530         for (i=0; data->partitions[i]; i++) {
531                 bool match = false, stop = false;
532
533                 if (phantom_root) {
534                         /* Phantom root: Find all partitions under the
535                          * search base. We match if:
536                          *
537                          * 1) the DN we are looking for exactly matches a
538                          *    certain partition and always stop
539                          * 2) the DN we are looking for is a parent of certain
540                          *    partitions and it isn't a scope base search
541                          * 3) the DN we are looking for is a child of a certain
542                          *    partition and always stop
543                          *    - we don't need to go any further up in the
544                          *    hierarchy!
545                          */
546                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn,
547                                            req->op.search.base) == 0) {
548                                 match = true;
549                                 stop = true;
550                         }
551                         if (!match &&
552                             (ldb_dn_compare_base(req->op.search.base,
553                                                  data->partitions[i]->ctrl->dn) == 0 &&
554                              req->op.search.scope != LDB_SCOPE_BASE)) {
555                                 match = true;
556                         }
557                         if (!match &&
558                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn,
559                                                 req->op.search.base) == 0) {
560                                 match = true;
561                                 stop = true; /* note that this relies on partition ordering */
562                         }
563                 } else {
564                         /* Domain scope: Find all partitions under the search
565                          * base.
566                          *
567                          * We generate referral candidates if we haven't
568                          * specified the domain scope control, haven't a base
569                          * search* scope and the DN we are looking for is a real
570                          * predecessor of certain partitions. When a new
571                          * referral candidate is nearer to the DN than an
572                          * existing one delete the latter (we want to have only
573                          * the closest ones). When we checked this for all
574                          * candidates we have the final referrals.
575                          *
576                          * We match if the DN we are looking for is a child of
577                          * a certain partition or the partition
578                          * DN itself - we don't need to go any further
579                          * up in the hierarchy!
580                          */
581                         if ((!domain_scope) &&
582                             (req->op.search.scope != LDB_SCOPE_BASE) &&
583                             (ldb_dn_compare_base(req->op.search.base,
584                                                  data->partitions[i]->ctrl->dn) == 0) &&
585                             (ldb_dn_compare(req->op.search.base,
586                                             data->partitions[i]->ctrl->dn) != 0)) {
587                                 char *ref = talloc_asprintf(ac,
588                                                             "ldap://%s/%s%s",
589                                                             lp_dnsdomain(lp_ctx),
590                                                             ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
591                                                             req->op.search.scope == LDB_SCOPE_ONELEVEL ? "??base" : "");
592
593                                 if (ref == NULL) {
594                                         ldb_oom(ldb);
595                                         return LDB_ERR_OPERATIONS_ERROR;
596                                 }
597
598                                 /* Initialise the referrals list */
599                                 if (ac->referrals == NULL) {
600                                         ac->referrals = (const char **) str_list_make_empty(ac);
601                                         if (ac->referrals == NULL) {
602                                                 ldb_oom(ldb);
603                                                 return LDB_ERR_OPERATIONS_ERROR;
604                                         }
605                                 }
606
607                                 /* Check if the new referral candidate is
608                                  * closer to the base DN than already
609                                  * saved ones and delete the latters */
610                                 j = 0;
611                                 while (ac->referrals[j] != NULL) {
612                                         if (strstr(ac->referrals[j],
613                                                    ldb_dn_get_linearized(data->partitions[i]->ctrl->dn)) != NULL) {
614                                                 str_list_remove(ac->referrals,
615                                                                 ac->referrals[j]);
616                                         } else {
617                                                 ++j;
618                                         }
619                                 }
620
621                                 /* Add our new candidate */
622                                 ac->referrals = str_list_add(ac->referrals, ref);
623
624                                 talloc_free(ref);
625
626                                 if (ac->referrals == NULL) {
627                                         ldb_oom(ldb);
628                                         return LDB_ERR_OPERATIONS_ERROR;
629                                 }
630                         }
631                         if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
632                                 match = true;
633                                 stop = true; /* note that this relies on partition ordering */
634                         }
635                 }
636
637                 if (match) {
638                         ret = partition_prep_request(ac, data->partitions[i]);
639                         if (ret != LDB_SUCCESS) {
640                                 return ret;
641                         }
642                 }
643
644                 if (stop) break;
645         }
646
647         /* Perhaps we didn't match any partitions. Try the main partition */
648         if (ac->num_requests == 0) {
649                 talloc_free(ac);
650                 return ldb_next_request(module, req);
651         }
652
653         /* fire the first one */
654         return partition_call_first(ac);
655 }
656
657 /* add */
658 static int partition_add(struct ldb_module *module, struct ldb_request *req)
659 {
660         return partition_replicate(module, req, req->op.add.message->dn);
661 }
662
663 /* modify */
664 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
665 {
666         return partition_replicate(module, req, req->op.mod.message->dn);
667 }
668
669 /* delete */
670 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
671 {
672         return partition_replicate(module, req, req->op.del.dn);
673 }
674
675 /* rename */
676 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
677 {
678         /* Find backend */
679         struct dsdb_partition *backend, *backend2;
680         
681         struct partition_private_data *data = talloc_get_type(module->private_data, 
682                                                               struct partition_private_data);
683
684         /* Skip the lot if 'data' isn't here yet (initialisation) */
685         if (!data) {
686                 return LDB_ERR_OPERATIONS_ERROR;
687         }
688
689         backend = find_partition(data, req->op.rename.olddn, req);
690         backend2 = find_partition(data, req->op.rename.newdn, req);
691
692         if ((backend && !backend2) || (!backend && backend2)) {
693                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
694         }
695
696         if (backend != backend2) {
697                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
698                                        "Cannot rename from %s in %s to %s in %s: %s",
699                                        ldb_dn_get_linearized(req->op.rename.olddn),
700                                        ldb_dn_get_linearized(backend->ctrl->dn),
701                                        ldb_dn_get_linearized(req->op.rename.newdn),
702                                        ldb_dn_get_linearized(backend2->ctrl->dn),
703                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
704                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
705         }
706
707         return partition_replicate(module, req, req->op.rename.olddn);
708 }
709
710 /* start a transaction */
711 static int partition_start_trans(struct ldb_module *module)
712 {
713         unsigned int i;
714         int ret;
715         struct partition_private_data *data = talloc_get_type(module->private_data, 
716                                                               struct partition_private_data);
717         /* Look at base DN */
718         /* Figure out which partition it is under */
719         /* Skip the lot if 'data' isn't here yet (initialization) */
720         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
721                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
722         }
723         ret = ldb_next_start_trans(module);
724         if (ret != LDB_SUCCESS) {
725                 return ret;
726         }
727
728         ret = partition_reload_if_required(module, data);
729         if (ret != LDB_SUCCESS) {
730                 return ret;
731         }
732
733         for (i=0; data && data->partitions && data->partitions[i]; i++) {
734                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
735                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
736                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
737                 }
738                 ret = ldb_next_start_trans(data->partitions[i]->module);
739                 if (ret != LDB_SUCCESS) {
740                         /* Back it out, if it fails on one */
741                         for (i--; i >= 0; i--) {
742                                 ldb_next_del_trans(data->partitions[i]->module);
743                         }
744                         ldb_next_del_trans(module);
745                         return ret;
746                 }
747         }
748
749         data->in_transaction++;
750
751         return LDB_SUCCESS;
752 }
753
754 /* prepare for a commit */
755 static int partition_prepare_commit(struct ldb_module *module)
756 {
757         unsigned int i;
758         struct partition_private_data *data = talloc_get_type(module->private_data, 
759                                                               struct partition_private_data);
760
761         for (i=0; data && data->partitions && data->partitions[i]; i++) {
762                 int ret;
763
764                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
765                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
766                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
767                 }
768                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
769                 if (ret != LDB_SUCCESS) {
770                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s",
771                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
772                                                ldb_errstring(module->ldb));
773                         return ret;
774                 }
775         }
776
777         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
778                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
779         }
780         return ldb_next_prepare_commit(module);
781 }
782
783
784 /* end a transaction */
785 static int partition_end_trans(struct ldb_module *module)
786 {
787         int ret, ret2;
788         unsigned int i;
789         struct partition_private_data *data = talloc_get_type(module->private_data, 
790                                                               struct partition_private_data);
791
792         ret = LDB_SUCCESS;
793
794         if (data->in_transaction == 0) {
795                 DEBUG(0,("partition end transaction mismatch\n"));
796                 ret = LDB_ERR_OPERATIONS_ERROR;
797         } else {
798                 data->in_transaction--;
799         }
800
801         for (i=0; data && data->partitions && data->partitions[i]; i++) {
802                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
803                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
804                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
805                 }
806                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
807                 if (ret2 != LDB_SUCCESS) {
808                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s",
809                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
810                                                ldb_errstring(module->ldb));
811                         ret = ret2;
812                 }
813         }
814
815         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
816                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
817         }
818         ret2 = ldb_next_end_trans(module);
819         if (ret2 != LDB_SUCCESS) {
820                 ret = ret2;
821         }
822         return ret;
823 }
824
825 /* delete a transaction */
826 static int partition_del_trans(struct ldb_module *module)
827 {
828         int ret, final_ret = LDB_SUCCESS;
829         unsigned int i;
830         struct partition_private_data *data = talloc_get_type(module->private_data, 
831                                                               struct partition_private_data);
832         for (i=0; data && data->partitions && data->partitions[i]; i++) {
833                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
834                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
835                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
836                 }
837                 ret = ldb_next_del_trans(data->partitions[i]->module);
838                 if (ret != LDB_SUCCESS) {
839                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s",
840                                                ldb_dn_get_linearized(data->partitions[i]->ctrl->dn),
841                                                ldb_errstring(module->ldb));
842                         final_ret = ret;
843                 }
844         }       
845
846         if (data->in_transaction == 0) {
847                 DEBUG(0,("partition del transaction mismatch\n"));
848                 return LDB_ERR_OPERATIONS_ERROR;
849         }
850         data->in_transaction--;
851
852         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
853                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
854         }
855         ret = ldb_next_del_trans(module);
856         if (ret != LDB_SUCCESS) {
857                 final_ret = ret;
858         }
859         return final_ret;
860 }
861
862 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
863                                      enum ldb_sequence_type type, uint64_t *seq_number) 
864 {
865         int ret;
866         struct ldb_result *res;
867         struct ldb_seqnum_request *tseq;
868         struct ldb_request *treq;
869         struct ldb_seqnum_result *seqr;
870         res = talloc_zero(mem_ctx, struct ldb_result);
871         if (res == NULL) {
872                 return LDB_ERR_OPERATIONS_ERROR;
873         }
874         tseq = talloc_zero(res, struct ldb_seqnum_request);
875         if (tseq == NULL) {
876                 talloc_free(res);
877                 return LDB_ERR_OPERATIONS_ERROR;
878         }
879         tseq->type = type;
880         
881         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
882                                      LDB_EXTENDED_SEQUENCE_NUMBER,
883                                      tseq,
884                                      NULL,
885                                      res,
886                                      ldb_extended_default_callback,
887                                      NULL);
888         if (ret != LDB_SUCCESS) {
889                 talloc_free(res);
890                 return ret;
891         }
892         
893         ret = ldb_next_request(module, treq);
894         if (ret != LDB_SUCCESS) {
895                 talloc_free(res);
896                 return ret;
897         }
898         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
899         if (ret != LDB_SUCCESS) {
900                 talloc_free(res);
901                 return ret;
902         }
903         
904         seqr = talloc_get_type(res->extended->data,
905                                struct ldb_seqnum_result);
906         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
907                 ret = LDB_ERR_OPERATIONS_ERROR;
908                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
909                 talloc_free(res);
910                 return ret;
911         } else {
912                 *seq_number = seqr->seq_num;
913         }
914         talloc_free(res);
915         return LDB_SUCCESS;
916 }
917
918 /* FIXME: This function is still semi-async */
919 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
920 {
921         int ret;
922         unsigned int i;
923         uint64_t seq_number = 0;
924         uint64_t timestamp_sequence = 0;
925         uint64_t timestamp = 0;
926         struct partition_private_data *data = talloc_get_type(module->private_data, 
927                                                               struct partition_private_data);
928         struct ldb_seqnum_request *seq;
929         struct ldb_seqnum_result *seqr;
930         struct ldb_request *treq;
931         struct ldb_seqnum_request *tseq;
932         struct ldb_seqnum_result *tseqr;
933         struct ldb_extended *ext;
934         struct ldb_result *res;
935         struct dsdb_partition *p;
936
937         p = find_partition(data, NULL, req);
938         if (p != NULL) {
939                 /* the caller specified what partition they want the
940                  * sequence number operation on - just pass it on
941                  */
942                 return ldb_next_request(p->module, req);                
943         }
944
945         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
946
947         switch (seq->type) {
948         case LDB_SEQ_NEXT:
949         case LDB_SEQ_HIGHEST_SEQ:
950
951                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
952                 if (ret != LDB_SUCCESS) {
953                         return ret;
954                 }
955
956                 /* Skip the lot if 'data' isn't here yet (initialisation) */
957                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
958
959                         res = talloc_zero(req, struct ldb_result);
960                         if (res == NULL) {
961                                 return LDB_ERR_OPERATIONS_ERROR;
962                         }
963                         tseq = talloc_zero(res, struct ldb_seqnum_request);
964                         if (tseq == NULL) {
965                                 talloc_free(res);
966                                 return LDB_ERR_OPERATIONS_ERROR;
967                         }
968                         tseq->type = seq->type;
969
970                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
971                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
972                                                      tseq,
973                                                      NULL,
974                                                      res,
975                                                      ldb_extended_default_callback,
976                                                      NULL);
977                         if (ret != LDB_SUCCESS) {
978                                 talloc_free(res);
979                                 return ret;
980                         }
981
982                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
983                                 ret = ldb_request_add_control(treq,
984                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
985                                                               false, data->partitions[i]->ctrl);
986                                 if (ret != LDB_SUCCESS) {
987                                         talloc_free(res);
988                                         return ret;
989                                 }
990                         }
991
992                         ret = partition_request(data->partitions[i]->module, treq);
993                         if (ret != LDB_SUCCESS) {
994                                 talloc_free(res);
995                                 return ret;
996                         }
997                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
998                         if (ret != LDB_SUCCESS) {
999                                 talloc_free(res);
1000                                 return ret;
1001                         }
1002                         tseqr = talloc_get_type(res->extended->data,
1003                                                 struct ldb_seqnum_result);
1004                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
1005                                 timestamp_sequence = MAX(timestamp_sequence,
1006                                                          tseqr->seq_num);
1007                         } else {
1008                                 seq_number += tseqr->seq_num;
1009                         }
1010                         talloc_free(res);
1011                 }
1012                 /* fall through */
1013         case LDB_SEQ_HIGHEST_TIMESTAMP:
1014
1015                 res = talloc_zero(req, struct ldb_result);
1016                 if (res == NULL) {
1017                         return LDB_ERR_OPERATIONS_ERROR;
1018                 }
1019
1020                 tseq = talloc_zero(res, struct ldb_seqnum_request);
1021                 if (tseq == NULL) {
1022                         talloc_free(res);
1023                         return LDB_ERR_OPERATIONS_ERROR;
1024                 }
1025                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1026
1027                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1028                                              LDB_EXTENDED_SEQUENCE_NUMBER,
1029                                              tseq,
1030                                              NULL,
1031                                              res,
1032                                              ldb_extended_default_callback,
1033                                              NULL);
1034                 if (ret != LDB_SUCCESS) {
1035                         talloc_free(res);
1036                         return ret;
1037                 }
1038
1039                 ret = ldb_next_request(module, treq);
1040                 if (ret != LDB_SUCCESS) {
1041                         talloc_free(res);
1042                         return ret;
1043                 }
1044                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1045                 if (ret != LDB_SUCCESS) {
1046                         talloc_free(res);
1047                         return ret;
1048                 }
1049
1050                 tseqr = talloc_get_type(res->extended->data,
1051                                            struct ldb_seqnum_result);
1052                 timestamp = tseqr->seq_num;
1053
1054                 talloc_free(res);
1055
1056                 /* Skip the lot if 'data' isn't here yet (initialisation) */
1057                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
1058
1059                         res = talloc_zero(req, struct ldb_result);
1060                         if (res == NULL) {
1061                                 return LDB_ERR_OPERATIONS_ERROR;
1062                         }
1063
1064                         tseq = talloc_zero(res, struct ldb_seqnum_request);
1065                         if (tseq == NULL) {
1066                                 talloc_free(res);
1067                                 return LDB_ERR_OPERATIONS_ERROR;
1068                         }
1069                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
1070
1071                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
1072                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
1073                                                      tseq,
1074                                                      NULL,
1075                                                      res,
1076                                                      ldb_extended_default_callback,
1077                                                      NULL);
1078                         if (ret != LDB_SUCCESS) {
1079                                 talloc_free(res);
1080                                 return ret;
1081                         }
1082
1083                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
1084                                 ret = ldb_request_add_control(treq,
1085                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
1086                                                               false, data->partitions[i]->ctrl);
1087                                 if (ret != LDB_SUCCESS) {
1088                                         talloc_free(res);
1089                                         return ret;
1090                                 }
1091                         }
1092
1093                         ret = partition_request(data->partitions[i]->module, treq);
1094                         if (ret != LDB_SUCCESS) {
1095                                 talloc_free(res);
1096                                 return ret;
1097                         }
1098                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1099                         if (ret != LDB_SUCCESS) {
1100                                 talloc_free(res);
1101                                 return ret;
1102                         }
1103
1104                         tseqr = talloc_get_type(res->extended->data,
1105                                                   struct ldb_seqnum_result);
1106                         timestamp = MAX(timestamp, tseqr->seq_num);
1107
1108                         talloc_free(res);
1109                 }
1110
1111                 break;
1112         }
1113
1114         ext = talloc_zero(req, struct ldb_extended);
1115         if (!ext) {
1116                 return LDB_ERR_OPERATIONS_ERROR;
1117         }
1118         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1119         if (seqr == NULL) {
1120                 talloc_free(ext);
1121                 return LDB_ERR_OPERATIONS_ERROR;
1122         }
1123         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1124         ext->data = seqr;
1125
1126         switch (seq->type) {
1127         case LDB_SEQ_NEXT:
1128         case LDB_SEQ_HIGHEST_SEQ:
1129
1130                 /* Has someone above set a timebase sequence? */
1131                 if (timestamp_sequence) {
1132                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1133                 } else {
1134                         seqr->seq_num = seq_number;
1135                 }
1136
1137                 if (timestamp_sequence > seqr->seq_num) {
1138                         seqr->seq_num = timestamp_sequence;
1139                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1140                 }
1141
1142                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1143                 break;
1144         case LDB_SEQ_HIGHEST_TIMESTAMP:
1145                 seqr->seq_num = timestamp;
1146                 break;
1147         }
1148
1149         if (seq->type == LDB_SEQ_NEXT) {
1150                 seqr->seq_num++;
1151         }
1152
1153         /* send request done */
1154         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1155 }
1156
1157 /* extended */
1158 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1159 {
1160         struct partition_private_data *data;
1161         struct partition_context *ac;
1162         int ret;
1163
1164         data = talloc_get_type(module->private_data, struct partition_private_data);
1165         if (!data) {
1166                 return ldb_next_request(module, req);
1167         }
1168
1169         ret = partition_reload_if_required(module, data);
1170         if (ret != LDB_SUCCESS) {
1171                 return ret;
1172         }
1173         
1174         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1175                 return partition_sequence_number(module, req);
1176         }
1177
1178         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1179                 return partition_create(module, req);
1180         }
1181
1182         /* 
1183          * as the extended operation has no dn
1184          * we need to send it to all partitions
1185          */
1186
1187         ac = partition_init_ctx(module, req);
1188         if (!ac) {
1189                 return LDB_ERR_OPERATIONS_ERROR;
1190         }
1191
1192         return partition_send_all(module, ac, req);
1193 }
1194
1195 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1196         .name              = "partition",
1197         .init_context      = partition_init,
1198         .search            = partition_search,
1199         .add               = partition_add,
1200         .modify            = partition_modify,
1201         .del               = partition_delete,
1202         .rename            = partition_rename,
1203         .extended          = partition_extended,
1204         .start_transaction = partition_start_trans,
1205         .prepare_commit    = partition_prepare_commit,
1206         .end_transaction   = partition_end_trans,
1207         .del_transaction   = partition_del_trans,
1208 };