s4-partition: fixed selection of partitions on exact match
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         int num_requests;
45         int finished_requests;
46 };
47
48 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
49 {
50         struct partition_context *ac;
51
52         ac = talloc_zero(req, struct partition_context);
53         if (ac == NULL) {
54                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
55                 return NULL;
56         }
57
58         ac->module = module;
59         ac->req = req;
60
61         return ac;
62 }
63
64 /*
65  *    helper functions to call the next module in chain
66  *    */
67
68 int partition_request(struct ldb_module *module, struct ldb_request *request)
69 {
70         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
71                 const struct dsdb_control_current_partition *partition = NULL;
72                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
73                 if (partition_ctrl) {
74                         partition = talloc_get_type(partition_ctrl->data,
75                                                     struct dsdb_control_current_partition);
76                 }
77
78                 if (partition != NULL) {
79                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
80                                   ldb_dn_get_linearized(partition->dn));                        
81                 } else {
82                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
83                 }
84         }
85
86         return ldb_next_request(module, request);
87 }
88
89 static struct dsdb_partition *find_partition(struct partition_private_data *data,
90                                              struct ldb_dn *dn,
91                                              struct ldb_request *req)
92 {
93         int i;
94         struct ldb_control *partition_ctrl;
95
96         /* see if the request has the partition DN specified in a
97          * control. The repl_meta_data module can specify this to
98          * ensure that replication happens to the right partition
99          */
100         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
101         if (partition_ctrl) {
102                 const struct dsdb_control_current_partition *partition;
103                 partition = talloc_get_type(partition_ctrl->data,
104                                             struct dsdb_control_current_partition);
105                 if (partition != NULL) {
106                         dn = partition->dn;
107                 }
108         }
109
110         if (dn == NULL) {
111                 return NULL;
112         }
113
114         /* Look at base DN */
115         /* Figure out which partition it is under */
116         /* Skip the lot if 'data' isn't here yet (initialisation) */
117         for (i=0; data && data->partitions && data->partitions[i]; i++) {
118                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
119                         return data->partitions[i];
120                 }
121         }
122
123         return NULL;
124 }
125
126 /**
127  * fire the caller's callback for every entry, but only send 'done' once.
128  */
129 static int partition_req_callback(struct ldb_request *req,
130                                   struct ldb_reply *ares)
131 {
132         struct partition_context *ac;
133         struct ldb_module *module;
134         struct ldb_request *nreq;
135         int ret;
136         struct partition_private_data *data;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
141
142         if (!ares) {
143                 return ldb_module_done(ac->req, NULL, NULL,
144                                         LDB_ERR_OPERATIONS_ERROR);
145         }
146
147         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
148         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
149                 /* If we didn't fan this request out to mulitple partitions,
150                  * or this is an individual search result, we can
151                  * deterministily tell the caller what partition this was
152                  * written to (repl_meta_data likes to know) */
153                         ret = ldb_reply_add_control(ares,
154                                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
155                                                     false, partition_ctrl->data);
156                         if (ret != LDB_SUCCESS) {
157                                 return ldb_module_done(ac->req, NULL, NULL,
158                                                        ret);
159                         }
160         }
161
162         if (ares->error != LDB_SUCCESS) {
163                 return ldb_module_done(ac->req, ares->controls,
164                                         ares->response, ares->error);
165         }
166
167         switch (ares->type) {
168         case LDB_REPLY_REFERRAL:
169                 /* ignore referrals for now */
170                 break;
171
172         case LDB_REPLY_ENTRY:
173                 if (ac->req->operation != LDB_SEARCH) {
174                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
175                                 "partition_req_callback:"
176                                 " Unsupported reply type for this request");
177                         return ldb_module_done(ac->req, NULL, NULL,
178                                                 LDB_ERR_OPERATIONS_ERROR);
179                 }
180                 
181                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182
183         case LDB_REPLY_DONE:
184                 if (ac->req->operation == LDB_EXTENDED) {
185                         /* FIXME: check for ares->response, replmd does not fill it ! */
186                         if (ares->response) {
187                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
189                                                           "partition_req_callback:"
190                                                           " Unknown extended reply, "
191                                                           "only supports START_TLS");
192                                         talloc_free(ares);
193                                         return ldb_module_done(ac->req, NULL, NULL,
194                                                                 LDB_ERR_OPERATIONS_ERROR);
195                                 }
196                         }
197                 }
198
199                 ac->finished_requests++;
200                 if (ac->finished_requests == ac->num_requests) {
201                         /* this was the last one, call callback */
202                         return ldb_module_done(ac->req, ares->controls,
203                                                ares->response, 
204                                                ares->error);
205                 }
206
207                 /* not the last, now call the next one */
208                 module = ac->part_req[ac->finished_requests].module;
209                 nreq = ac->part_req[ac->finished_requests].req;
210
211                 ret = partition_request(module, nreq);
212                 if (ret != LDB_SUCCESS) {
213                         talloc_free(ares);
214                         return ldb_module_done(ac->req, NULL, NULL, ret);
215                 }
216
217                 break;
218         }
219
220         talloc_free(ares);
221         return LDB_SUCCESS;
222 }
223
224 static int partition_prep_request(struct partition_context *ac,
225                                   struct dsdb_partition *partition)
226 {
227         int ret;
228         struct ldb_request *req;
229
230         ac->part_req = talloc_realloc(ac, ac->part_req,
231                                         struct part_request,
232                                         ac->num_requests + 1);
233         if (ac->part_req == NULL) {
234                 ldb_oom(ldb_module_get_ctx(ac->module));
235                 return LDB_ERR_OPERATIONS_ERROR;
236         }
237
238         switch (ac->req->operation) {
239         case LDB_SEARCH:
240                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
241                                         ac->part_req,
242                                         ac->req->op.search.base,
243                                         ac->req->op.search.scope,
244                                         ac->req->op.search.tree,
245                                         ac->req->op.search.attrs,
246                                         ac->req->controls,
247                                         ac, partition_req_callback,
248                                         ac->req);
249                 break;
250         case LDB_ADD:
251                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
252                                         ac->req->op.add.message,
253                                         ac->req->controls,
254                                         ac, partition_req_callback,
255                                         ac->req);
256                 break;
257         case LDB_MODIFY:
258                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
259                                         ac->req->op.mod.message,
260                                         ac->req->controls,
261                                         ac, partition_req_callback,
262                                         ac->req);
263                 break;
264         case LDB_DELETE:
265                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
266                                         ac->req->op.del.dn,
267                                         ac->req->controls,
268                                         ac, partition_req_callback,
269                                         ac->req);
270                 break;
271         case LDB_RENAME:
272                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273                                         ac->req->op.rename.olddn,
274                                         ac->req->op.rename.newdn,
275                                         ac->req->controls,
276                                         ac, partition_req_callback,
277                                         ac->req);
278                 break;
279         case LDB_EXTENDED:
280                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
281                                         ac->part_req,
282                                         ac->req->op.extended.oid,
283                                         ac->req->op.extended.data,
284                                         ac->req->controls,
285                                         ac, partition_req_callback,
286                                         ac->req);
287                 break;
288         default:
289                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
290                                   "Unsupported request type!");
291                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
292         }
293
294         if (ret != LDB_SUCCESS) {
295                 return ret;
296         }
297
298         ac->part_req[ac->num_requests].req = req;
299
300         if (ac->req->controls) {
301                 req->controls = talloc_memdup(req, ac->req->controls,
302                                         talloc_get_size(ac->req->controls));
303                 if (req->controls == NULL) {
304                         ldb_oom(ldb_module_get_ctx(ac->module));
305                         return LDB_ERR_OPERATIONS_ERROR;
306                 }
307         }
308
309         if (partition) {
310                 ac->part_req[ac->num_requests].module = partition->module;
311
312                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
313                         ret = ldb_request_add_control(req,
314                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
315                                                       false, partition->ctrl);
316                         if (ret != LDB_SUCCESS) {
317                                 return ret;
318                         }
319                 }
320
321                 if (req->operation == LDB_SEARCH) {
322                         /* If the search is for 'more' than this partition,
323                          * then change the basedn, so a remote LDAP server
324                          * doesn't object */
325                         if (ldb_dn_compare_base(partition->ctrl->dn,
326                                                 req->op.search.base) != 0) {
327                                 req->op.search.base = partition->ctrl->dn;
328                         }
329                 }
330
331         } else {
332                 /* make sure you put the module here, or
333                  * or ldb_next_request() will skip a module */
334                 ac->part_req[ac->num_requests].module = ac->module;
335         }
336
337         ac->num_requests++;
338
339         return LDB_SUCCESS;
340 }
341
342 static int partition_call_first(struct partition_context *ac)
343 {
344         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
345 }
346
347 /**
348  * Send a request down to all the partitions
349  */
350 static int partition_send_all(struct ldb_module *module, 
351                               struct partition_context *ac, 
352                               struct ldb_request *req) 
353 {
354         int i;
355         struct partition_private_data *data = talloc_get_type(module->private_data, 
356                                                               struct partition_private_data);
357         int ret = partition_prep_request(ac, NULL);
358         if (ret != LDB_SUCCESS) {
359                 return ret;
360         }
361         for (i=0; data && data->partitions && data->partitions[i]; i++) {
362                 ret = partition_prep_request(ac, data->partitions[i]);
363                 if (ret != LDB_SUCCESS) {
364                         return ret;
365                 }
366         }
367
368         /* fire the first one */
369         return partition_call_first(ac);
370 }
371
372 /**
373  * Figure out which backend a request needs to be aimed at.  Some
374  * requests must be replicated to all backends
375  */
376 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
377 {
378         struct partition_context *ac;
379         unsigned i;
380         int ret;
381         struct dsdb_partition *partition;
382         struct partition_private_data *data = talloc_get_type(module->private_data, 
383                                                               struct partition_private_data);
384         if (!data || !data->partitions) {
385                 return ldb_next_request(module, req);
386         }
387
388         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
389                 /* Is this a special DN, we need to replicate to every backend? */
390                 for (i=0; data->replicate && data->replicate[i]; i++) {
391                         if (ldb_dn_compare(data->replicate[i], 
392                                            dn) == 0) {
393                                 
394                                 ac = partition_init_ctx(module, req);
395                                 if (!ac) {
396                                         return LDB_ERR_OPERATIONS_ERROR;
397                                 }
398                                 
399                                 return partition_send_all(module, ac, req);
400                         }
401                 }
402         }
403
404         /* Otherwise, we need to find the partition to fire it to */
405
406         /* Find partition */
407         partition = find_partition(data, dn, req);
408         if (!partition) {
409                 /*
410                  * if we haven't found a matching partition
411                  * pass the request to the main ldb
412                  *
413                  * TODO: we should maybe return an error here
414                  *       if it's not a special dn
415                  */
416
417                 return ldb_next_request(module, req);
418         }
419
420         ac = partition_init_ctx(module, req);
421         if (!ac) {
422                 return LDB_ERR_OPERATIONS_ERROR;
423         }
424
425         /* we need to add a control but we never touch the original request */
426         ret = partition_prep_request(ac, partition);
427         if (ret != LDB_SUCCESS) {
428                 return ret;
429         }
430
431         /* fire the first one */
432         return partition_call_first(ac);
433 }
434
435 /* search */
436 static int partition_search(struct ldb_module *module, struct ldb_request *req)
437 {
438         int ret;
439         struct ldb_control **saved_controls;
440         /* Find backend */
441         struct partition_private_data *data = talloc_get_type(module->private_data, 
442                                                               struct partition_private_data);
443
444         /* issue request */
445
446         /* (later) consider if we should be searching multiple
447          * partitions (for 'invisible' partition behaviour */
448
449         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
450         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
451         
452         struct ldb_search_options_control *search_options = NULL;
453         struct dsdb_partition *p;
454         
455         ret = partition_reload_if_required(module, data);
456         if (ret != LDB_SUCCESS) {
457                 return ret;
458         }
459
460         p = find_partition(data, NULL, req);
461         if (p != NULL) {
462                 /* the caller specified what partition they want the
463                  * search - just pass it on
464                  */
465                 return ldb_next_request(p->module, req);                
466         }
467
468
469         if (search_control) {
470                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
471         }
472
473         /* Remove the domain_scope control, so we don't confuse a backend server */
474         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
475                 ldb_oom(ldb_module_get_ctx(module));
476                 return LDB_ERR_OPERATIONS_ERROR;
477         }
478
479         /*
480          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
481          * down as uncritical to make windows 2008 dcpromo happy.
482          */
483         if (search_control) {
484                 search_control->critical = 0;
485         }
486
487         /* TODO:
488            Generate referrals (look for a partition under this DN) if we don't have the above control specified
489         */
490         
491         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
492                 int i;
493                 struct partition_context *ac;
494                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
495                         /* We have processed this flag, so we are done with this control now */
496
497                         /* Remove search control, so we don't confuse a backend server */
498                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
499                                 ldb_oom(ldb_module_get_ctx(module));
500                                 return LDB_ERR_OPERATIONS_ERROR;
501                         }
502                 }
503                 ac = partition_init_ctx(module, req);
504                 if (!ac) {
505                         return LDB_ERR_OPERATIONS_ERROR;
506                 }
507
508                 /* Search from the base DN */
509                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
510                         return partition_send_all(module, ac, req);
511                 }
512                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
513                         bool match = false, stop = false;
514                         /* Find all partitions under the search base 
515                            
516                            we match if:
517
518                               1) the DN we are looking for exactly matches the partition
519                              or
520                               2) the DN we are looking for is a parent of the partition and it isn't
521                                  a scope base search
522                              or
523                               3) the DN we are looking for is a child of the partition
524                          */
525                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
526                                 match = true;
527                                 stop = true;
528                         }
529                         if (!match && 
530                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
531                              req->op.search.scope != LDB_SCOPE_BASE)) {
532                                 match = true;
533                         }
534                         if (!match &&
535                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
536                                 match = true;
537                                 stop = true; /* note that this relies on partition ordering */
538                         }
539                         if (match) {
540                                 ret = partition_prep_request(ac, data->partitions[i]);
541                                 if (ret != LDB_SUCCESS) {
542                                         return ret;
543                                 }
544                         }
545                         if (stop) break;
546                 }
547
548                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
549                 if (ac->num_requests == 0) {
550                         talloc_free(ac);
551                         return ldb_next_request(module, req);
552                 }
553
554                 /* fire the first one */
555                 return partition_call_first(ac);
556
557         } else {
558                 /* Handle this like all other requests */
559                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
560                         /* We have processed this flag, so we are done with this control now */
561
562                         /* Remove search control, so we don't confuse a backend server */
563                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
564                                 ldb_oom(ldb_module_get_ctx(module));
565                                 return LDB_ERR_OPERATIONS_ERROR;
566                         }
567                 }
568
569                 return partition_replicate(module, req, req->op.search.base);
570         }
571 }
572
573 /* add */
574 static int partition_add(struct ldb_module *module, struct ldb_request *req)
575 {
576         return partition_replicate(module, req, req->op.add.message->dn);
577 }
578
579 /* modify */
580 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
581 {
582         return partition_replicate(module, req, req->op.mod.message->dn);
583 }
584
585 /* delete */
586 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
587 {
588         return partition_replicate(module, req, req->op.del.dn);
589 }
590
591 /* rename */
592 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
593 {
594         /* Find backend */
595         struct dsdb_partition *backend, *backend2;
596         
597         struct partition_private_data *data = talloc_get_type(module->private_data, 
598                                                               struct partition_private_data);
599
600         /* Skip the lot if 'data' isn't here yet (initialisation) */
601         if (!data) {
602                 return LDB_ERR_OPERATIONS_ERROR;
603         }
604
605         backend = find_partition(data, req->op.rename.olddn, req);
606         backend2 = find_partition(data, req->op.rename.newdn, req);
607
608         if ((backend && !backend2) || (!backend && backend2)) {
609                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
610         }
611
612         if (backend != backend2) {
613                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
614                                        "Cannot rename from %s in %s to %s in %s: %s",
615                                        ldb_dn_get_linearized(req->op.rename.olddn),
616                                        ldb_dn_get_linearized(backend->ctrl->dn),
617                                        ldb_dn_get_linearized(req->op.rename.newdn),
618                                        ldb_dn_get_linearized(backend2->ctrl->dn),
619                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
620                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
621         }
622
623         return partition_replicate(module, req, req->op.rename.olddn);
624 }
625
626 /* start a transaction */
627 static int partition_start_trans(struct ldb_module *module)
628 {
629         int i, ret;
630         struct partition_private_data *data = talloc_get_type(module->private_data, 
631                                                               struct partition_private_data);
632         /* Look at base DN */
633         /* Figure out which partition it is under */
634         /* Skip the lot if 'data' isn't here yet (initialization) */
635         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
636                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
637         }
638         ret = ldb_next_start_trans(module);
639         if (ret != LDB_SUCCESS) {
640                 return ret;
641         }
642
643         ret = partition_reload_if_required(module, data);
644         if (ret != LDB_SUCCESS) {
645                 return ret;
646         }
647
648         for (i=0; data && data->partitions && data->partitions[i]; i++) {
649                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
650                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
651                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
652                 }
653                 ret = ldb_next_start_trans(data->partitions[i]->module);
654                 if (ret != LDB_SUCCESS) {
655                         /* Back it out, if it fails on one */
656                         for (i--; i >= 0; i--) {
657                                 ldb_next_del_trans(data->partitions[i]->module);
658                         }
659                         ldb_next_del_trans(module);
660                         return ret;
661                 }
662         }
663
664         data->in_transaction++;
665
666         return LDB_SUCCESS;
667 }
668
669 /* prepare for a commit */
670 static int partition_prepare_commit(struct ldb_module *module)
671 {
672         int i;
673         struct partition_private_data *data = talloc_get_type(module->private_data, 
674                                                               struct partition_private_data);
675
676         for (i=0; data && data->partitions && data->partitions[i]; i++) {
677                 int ret;
678
679                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
680                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
681                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
682                 }
683                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
684                 if (ret != LDB_SUCCESS) {
685                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
686                         return ret;
687                 }
688         }
689
690         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
691                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
692         }
693         return ldb_next_prepare_commit(module);
694 }
695
696
697 /* end a transaction */
698 static int partition_end_trans(struct ldb_module *module)
699 {
700         int i, ret, ret2;
701         struct partition_private_data *data = talloc_get_type(module->private_data, 
702                                                               struct partition_private_data);
703
704         ret = LDB_SUCCESS;
705
706         if (data->in_transaction == 0) {
707                 DEBUG(0,("partition end transaction mismatch\n"));
708                 ret = LDB_ERR_OPERATIONS_ERROR;
709         } else {
710                 data->in_transaction--;
711         }
712
713         for (i=0; data && data->partitions && data->partitions[i]; i++) {
714                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
715                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
716                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
717                 }
718                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
719                 if (ret2 != LDB_SUCCESS) {
720                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
721                         ret = ret2;
722                 }
723         }
724
725         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
726                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
727         }
728         ret2 = ldb_next_end_trans(module);
729         if (ret2 != LDB_SUCCESS) {
730                 ret = ret2;
731         }
732         return ret;
733 }
734
735 /* delete a transaction */
736 static int partition_del_trans(struct ldb_module *module)
737 {
738         int i, ret, final_ret = LDB_SUCCESS;
739         struct partition_private_data *data = talloc_get_type(module->private_data, 
740                                                               struct partition_private_data);
741         for (i=0; data && data->partitions && data->partitions[i]; i++) {
742                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
743                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
744                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
745                 }
746                 ret = ldb_next_del_trans(data->partitions[i]->module);
747                 if (ret != LDB_SUCCESS) {
748                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
749                         final_ret = ret;
750                 }
751         }       
752
753         if (data->in_transaction == 0) {
754                 DEBUG(0,("partition del transaction mismatch\n"));
755                 return LDB_ERR_OPERATIONS_ERROR;
756         }
757         data->in_transaction--;
758
759         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
760                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
761         }
762         ret = ldb_next_del_trans(module);
763         if (ret != LDB_SUCCESS) {
764                 final_ret = ret;
765         }
766         return final_ret;
767 }
768
769 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
770                                      enum ldb_sequence_type type, uint64_t *seq_number) 
771 {
772         int ret;
773         struct ldb_result *res;
774         struct ldb_seqnum_request *tseq;
775         struct ldb_request *treq;
776         struct ldb_seqnum_result *seqr;
777         res = talloc_zero(mem_ctx, struct ldb_result);
778         if (res == NULL) {
779                 return LDB_ERR_OPERATIONS_ERROR;
780         }
781         tseq = talloc_zero(res, struct ldb_seqnum_request);
782         if (tseq == NULL) {
783                 talloc_free(res);
784                 return LDB_ERR_OPERATIONS_ERROR;
785         }
786         tseq->type = type;
787         
788         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
789                                      LDB_EXTENDED_SEQUENCE_NUMBER,
790                                      tseq,
791                                      NULL,
792                                      res,
793                                      ldb_extended_default_callback,
794                                      NULL);
795         if (ret != LDB_SUCCESS) {
796                 talloc_free(res);
797                 return ret;
798         }
799         
800         ret = ldb_next_request(module, treq);
801         if (ret != LDB_SUCCESS) {
802                 talloc_free(res);
803                 return ret;
804         }
805         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
806         if (ret != LDB_SUCCESS) {
807                 talloc_free(res);
808                 return ret;
809         }
810         
811         seqr = talloc_get_type(res->extended->data,
812                                struct ldb_seqnum_result);
813         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
814                 ret = LDB_ERR_OPERATIONS_ERROR;
815                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
816                 talloc_free(res);
817                 return ret;
818         } else {
819                 *seq_number = seqr->seq_num;
820         }
821         talloc_free(res);
822         return LDB_SUCCESS;
823 }
824
825 /* FIXME: This function is still semi-async */
826 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
827 {
828         int i, ret;
829         uint64_t seq_number = 0;
830         uint64_t timestamp_sequence = 0;
831         uint64_t timestamp = 0;
832         struct partition_private_data *data = talloc_get_type(module->private_data, 
833                                                               struct partition_private_data);
834         struct ldb_seqnum_request *seq;
835         struct ldb_seqnum_result *seqr;
836         struct ldb_request *treq;
837         struct ldb_seqnum_request *tseq;
838         struct ldb_seqnum_result *tseqr;
839         struct ldb_extended *ext;
840         struct ldb_result *res;
841         struct dsdb_partition *p;
842
843         p = find_partition(data, NULL, req);
844         if (p != NULL) {
845                 /* the caller specified what partition they want the
846                  * sequence number operation on - just pass it on
847                  */
848                 return ldb_next_request(p->module, req);                
849         }
850
851         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
852
853         switch (seq->type) {
854         case LDB_SEQ_NEXT:
855         case LDB_SEQ_HIGHEST_SEQ:
856
857                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
858                 if (ret != LDB_SUCCESS) {
859                         return ret;
860                 }
861
862                 /* Skip the lot if 'data' isn't here yet (initialisation) */
863                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
864
865                         res = talloc_zero(req, struct ldb_result);
866                         if (res == NULL) {
867                                 return LDB_ERR_OPERATIONS_ERROR;
868                         }
869                         tseq = talloc_zero(res, struct ldb_seqnum_request);
870                         if (tseq == NULL) {
871                                 talloc_free(res);
872                                 return LDB_ERR_OPERATIONS_ERROR;
873                         }
874                         tseq->type = seq->type;
875
876                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
877                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
878                                                      tseq,
879                                                      NULL,
880                                                      res,
881                                                      ldb_extended_default_callback,
882                                                      NULL);
883                         if (ret != LDB_SUCCESS) {
884                                 talloc_free(res);
885                                 return ret;
886                         }
887
888                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
889                                 ret = ldb_request_add_control(treq,
890                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
891                                                               false, data->partitions[i]->ctrl);
892                                 if (ret != LDB_SUCCESS) {
893                                         talloc_free(res);
894                                         return ret;
895                                 }
896                         }
897
898                         ret = partition_request(data->partitions[i]->module, treq);
899                         if (ret != LDB_SUCCESS) {
900                                 talloc_free(res);
901                                 return ret;
902                         }
903                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
904                         if (ret != LDB_SUCCESS) {
905                                 talloc_free(res);
906                                 return ret;
907                         }
908                         tseqr = talloc_get_type(res->extended->data,
909                                                 struct ldb_seqnum_result);
910                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
911                                 timestamp_sequence = MAX(timestamp_sequence,
912                                                          tseqr->seq_num);
913                         } else {
914                                 seq_number += tseqr->seq_num;
915                         }
916                         talloc_free(res);
917                 }
918                 /* fall through */
919         case LDB_SEQ_HIGHEST_TIMESTAMP:
920
921                 res = talloc_zero(req, struct ldb_result);
922                 if (res == NULL) {
923                         return LDB_ERR_OPERATIONS_ERROR;
924                 }
925
926                 tseq = talloc_zero(res, struct ldb_seqnum_request);
927                 if (tseq == NULL) {
928                         talloc_free(res);
929                         return LDB_ERR_OPERATIONS_ERROR;
930                 }
931                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
932
933                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
934                                              LDB_EXTENDED_SEQUENCE_NUMBER,
935                                              tseq,
936                                              NULL,
937                                              res,
938                                              ldb_extended_default_callback,
939                                              NULL);
940                 if (ret != LDB_SUCCESS) {
941                         talloc_free(res);
942                         return ret;
943                 }
944
945                 ret = ldb_next_request(module, treq);
946                 if (ret != LDB_SUCCESS) {
947                         talloc_free(res);
948                         return ret;
949                 }
950                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
951                 if (ret != LDB_SUCCESS) {
952                         talloc_free(res);
953                         return ret;
954                 }
955
956                 tseqr = talloc_get_type(res->extended->data,
957                                            struct ldb_seqnum_result);
958                 timestamp = tseqr->seq_num;
959
960                 talloc_free(res);
961
962                 /* Skip the lot if 'data' isn't here yet (initialisation) */
963                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
964
965                         res = talloc_zero(req, struct ldb_result);
966                         if (res == NULL) {
967                                 return LDB_ERR_OPERATIONS_ERROR;
968                         }
969
970                         tseq = talloc_zero(res, struct ldb_seqnum_request);
971                         if (tseq == NULL) {
972                                 talloc_free(res);
973                                 return LDB_ERR_OPERATIONS_ERROR;
974                         }
975                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
976
977                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
978                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
979                                                      tseq,
980                                                      NULL,
981                                                      res,
982                                                      ldb_extended_default_callback,
983                                                      NULL);
984                         if (ret != LDB_SUCCESS) {
985                                 talloc_free(res);
986                                 return ret;
987                         }
988
989                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
990                                 ret = ldb_request_add_control(treq,
991                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
992                                                               false, data->partitions[i]->ctrl);
993                                 if (ret != LDB_SUCCESS) {
994                                         talloc_free(res);
995                                         return ret;
996                                 }
997                         }
998
999                         ret = partition_request(data->partitions[i]->module, treq);
1000                         if (ret != LDB_SUCCESS) {
1001                                 talloc_free(res);
1002                                 return ret;
1003                         }
1004                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1005                         if (ret != LDB_SUCCESS) {
1006                                 talloc_free(res);
1007                                 return ret;
1008                         }
1009
1010                         tseqr = talloc_get_type(res->extended->data,
1011                                                   struct ldb_seqnum_result);
1012                         timestamp = MAX(timestamp, tseqr->seq_num);
1013
1014                         talloc_free(res);
1015                 }
1016
1017                 break;
1018         }
1019
1020         ext = talloc_zero(req, struct ldb_extended);
1021         if (!ext) {
1022                 return LDB_ERR_OPERATIONS_ERROR;
1023         }
1024         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1025         if (seqr == NULL) {
1026                 talloc_free(ext);
1027                 return LDB_ERR_OPERATIONS_ERROR;
1028         }
1029         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1030         ext->data = seqr;
1031
1032         switch (seq->type) {
1033         case LDB_SEQ_NEXT:
1034         case LDB_SEQ_HIGHEST_SEQ:
1035
1036                 /* Has someone above set a timebase sequence? */
1037                 if (timestamp_sequence) {
1038                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1039                 } else {
1040                         seqr->seq_num = seq_number;
1041                 }
1042
1043                 if (timestamp_sequence > seqr->seq_num) {
1044                         seqr->seq_num = timestamp_sequence;
1045                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1046                 }
1047
1048                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1049                 break;
1050         case LDB_SEQ_HIGHEST_TIMESTAMP:
1051                 seqr->seq_num = timestamp;
1052                 break;
1053         }
1054
1055         if (seq->type == LDB_SEQ_NEXT) {
1056                 seqr->seq_num++;
1057         }
1058
1059         /* send request done */
1060         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1061 }
1062
1063 /* extended */
1064 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1065 {
1066         struct partition_private_data *data;
1067         struct partition_context *ac;
1068         int ret;
1069
1070         data = talloc_get_type(module->private_data, struct partition_private_data);
1071         if (!data) {
1072                 return ldb_next_request(module, req);
1073         }
1074
1075         ret = partition_reload_if_required(module, data);
1076         if (ret != LDB_SUCCESS) {
1077                 return ret;
1078         }
1079         
1080         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1081                 return partition_sequence_number(module, req);
1082         }
1083
1084         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1085                 return partition_create(module, req);
1086         }
1087
1088         /* 
1089          * as the extended operation has no dn
1090          * we need to send it to all partitions
1091          */
1092
1093         ac = partition_init_ctx(module, req);
1094         if (!ac) {
1095                 return LDB_ERR_OPERATIONS_ERROR;
1096         }
1097
1098         return partition_send_all(module, ac, req);
1099 }
1100
1101 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1102         .name              = "partition",
1103         .init_context      = partition_init,
1104         .search            = partition_search,
1105         .add               = partition_add,
1106         .modify            = partition_modify,
1107         .del               = partition_delete,
1108         .rename            = partition_rename,
1109         .extended          = partition_extended,
1110         .start_transaction = partition_start_trans,
1111         .prepare_commit    = partition_prepare_commit,
1112         .end_transaction   = partition_end_trans,
1113         .del_transaction   = partition_del_trans,
1114 };