s4-dsdb: always cancel transactions on all partitions
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42         bool got_success;
43
44         struct part_request *part_req;
45         int num_requests;
46         int finished_requests;
47 };
48
49 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
50 {
51         struct partition_context *ac;
52
53         ac = talloc_zero(req, struct partition_context);
54         if (ac == NULL) {
55                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
56                 return NULL;
57         }
58
59         ac->module = module;
60         ac->req = req;
61
62         return ac;
63 }
64
65 /*
66  *    helper functions to call the next module in chain
67  *    */
68
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
72                 const struct dsdb_control_current_partition *partition = NULL;
73                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
74                 if (partition_ctrl) {
75                         partition = talloc_get_type(partition_ctrl->data,
76                                                     struct dsdb_control_current_partition);
77                 }
78
79                 if (partition != NULL) {
80                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
81                                   ldb_dn_get_linearized(partition->dn));                        
82                 } else {
83                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
84                 }
85         }
86
87         return ldb_next_request(module, request);
88 }
89
90 static struct dsdb_partition *find_partition(struct partition_private_data *data,
91                                              struct ldb_dn *dn,
92                                              struct ldb_request *req)
93 {
94         int i;
95         struct ldb_control *partition_ctrl;
96
97         /* see if the request has the partition DN specified in a
98          * control. The repl_meta_data module can specify this to
99          * ensure that replication happens to the right partition
100          */
101         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
102         if (partition_ctrl) {
103                 const struct dsdb_control_current_partition *partition;
104                 partition = talloc_get_type(partition_ctrl->data,
105                                             struct dsdb_control_current_partition);
106                 if (partition != NULL) {
107                         dn = partition->dn;
108                 }
109         }
110
111         if (dn == NULL) {
112                 return NULL;
113         }
114
115         /* Look at base DN */
116         /* Figure out which partition it is under */
117         /* Skip the lot if 'data' isn't here yet (initialisation) */
118         for (i=0; data && data->partitions && data->partitions[i]; i++) {
119                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
120                         return data->partitions[i];
121                 }
122         }
123
124         return NULL;
125 }
126
127 /**
128  * fire the caller's callback for every entry, but only send 'done' once.
129  */
130 static int partition_req_callback(struct ldb_request *req,
131                                   struct ldb_reply *ares)
132 {
133         struct partition_context *ac;
134         struct ldb_module *module;
135         struct ldb_request *nreq;
136         int ret;
137         struct partition_private_data *data;
138         struct ldb_control *partition_ctrl;
139
140         ac = talloc_get_type(req->context, struct partition_context);
141         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
142
143         if (!ares) {
144                 return ldb_module_done(ac->req, NULL, NULL,
145                                         LDB_ERR_OPERATIONS_ERROR);
146         }
147
148         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
149         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
150                 /* If we didn't fan this request out to mulitple partitions,
151                  * or this is an individual search result, we can
152                  * deterministily tell the caller what partition this was
153                  * written to (repl_meta_data likes to know) */
154                         ret = ldb_reply_add_control(ares,
155                                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
156                                                     false, partition_ctrl->data);
157                         if (ret != LDB_SUCCESS) {
158                                 return ldb_module_done(ac->req, NULL, NULL,
159                                                        ret);
160                         }
161         }
162
163         if (ares->error != LDB_SUCCESS && !ac->got_success) {
164                 return ldb_module_done(ac->req, ares->controls,
165                                         ares->response, ares->error);
166         }
167
168         switch (ares->type) {
169         case LDB_REPLY_REFERRAL:
170                 /* ignore referrals for now */
171                 break;
172
173         case LDB_REPLY_ENTRY:
174                 if (ac->req->operation != LDB_SEARCH) {
175                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
176                                 "partition_req_callback:"
177                                 " Unsupported reply type for this request");
178                         return ldb_module_done(ac->req, NULL, NULL,
179                                                 LDB_ERR_OPERATIONS_ERROR);
180                 }
181                 
182                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
183
184         case LDB_REPLY_DONE:
185                 if (ares->error == LDB_SUCCESS) {
186                         ac->got_success = true;
187                 }
188                 if (ac->req->operation == LDB_EXTENDED) {
189                         /* FIXME: check for ares->response, replmd does not fill it ! */
190                         if (ares->response) {
191                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
192                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
193                                                           "partition_req_callback:"
194                                                           " Unknown extended reply, "
195                                                           "only supports START_TLS");
196                                         talloc_free(ares);
197                                         return ldb_module_done(ac->req, NULL, NULL,
198                                                                 LDB_ERR_OPERATIONS_ERROR);
199                                 }
200                         }
201                 }
202
203                 ac->finished_requests++;
204                 if (ac->finished_requests == ac->num_requests) {
205                         /* this was the last one, call callback */
206                         return ldb_module_done(ac->req, ares->controls,
207                                                ares->response, 
208                                                ac->got_success?LDB_SUCCESS:ares->error);
209                 }
210
211                 /* not the last, now call the next one */
212                 module = ac->part_req[ac->finished_requests].module;
213                 nreq = ac->part_req[ac->finished_requests].req;
214
215                 ret = partition_request(module, nreq);
216                 if (ret != LDB_SUCCESS) {
217                         talloc_free(ares);
218                         return ldb_module_done(ac->req, NULL, NULL, ret);
219                 }
220
221                 break;
222         }
223
224         talloc_free(ares);
225         return LDB_SUCCESS;
226 }
227
228 static int partition_prep_request(struct partition_context *ac,
229                                   struct dsdb_partition *partition)
230 {
231         int ret;
232         struct ldb_request *req;
233
234         ac->part_req = talloc_realloc(ac, ac->part_req,
235                                         struct part_request,
236                                         ac->num_requests + 1);
237         if (ac->part_req == NULL) {
238                 ldb_oom(ldb_module_get_ctx(ac->module));
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         switch (ac->req->operation) {
243         case LDB_SEARCH:
244                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
245                                         ac->part_req,
246                                         ac->req->op.search.base,
247                                         ac->req->op.search.scope,
248                                         ac->req->op.search.tree,
249                                         ac->req->op.search.attrs,
250                                         ac->req->controls,
251                                         ac, partition_req_callback,
252                                         ac->req);
253                 break;
254         case LDB_ADD:
255                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
256                                         ac->req->op.add.message,
257                                         ac->req->controls,
258                                         ac, partition_req_callback,
259                                         ac->req);
260                 break;
261         case LDB_MODIFY:
262                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
263                                         ac->req->op.mod.message,
264                                         ac->req->controls,
265                                         ac, partition_req_callback,
266                                         ac->req);
267                 break;
268         case LDB_DELETE:
269                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
270                                         ac->req->op.del.dn,
271                                         ac->req->controls,
272                                         ac, partition_req_callback,
273                                         ac->req);
274                 break;
275         case LDB_RENAME:
276                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
277                                         ac->req->op.rename.olddn,
278                                         ac->req->op.rename.newdn,
279                                         ac->req->controls,
280                                         ac, partition_req_callback,
281                                         ac->req);
282                 break;
283         case LDB_EXTENDED:
284                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
285                                         ac->part_req,
286                                         ac->req->op.extended.oid,
287                                         ac->req->op.extended.data,
288                                         ac->req->controls,
289                                         ac, partition_req_callback,
290                                         ac->req);
291                 break;
292         default:
293                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
294                                   "Unsupported request type!");
295                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
296         }
297
298         if (ret != LDB_SUCCESS) {
299                 return ret;
300         }
301
302         ac->part_req[ac->num_requests].req = req;
303
304         if (ac->req->controls) {
305                 req->controls = talloc_memdup(req, ac->req->controls,
306                                         talloc_get_size(ac->req->controls));
307                 if (req->controls == NULL) {
308                         ldb_oom(ldb_module_get_ctx(ac->module));
309                         return LDB_ERR_OPERATIONS_ERROR;
310                 }
311         }
312
313         if (partition) {
314                 ac->part_req[ac->num_requests].module = partition->module;
315
316                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
317                         ret = ldb_request_add_control(req,
318                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
319                                                       false, partition->ctrl);
320                         if (ret != LDB_SUCCESS) {
321                                 return ret;
322                         }
323                 }
324
325                 if (req->operation == LDB_SEARCH) {
326                         /* If the search is for 'more' than this partition,
327                          * then change the basedn, so a remote LDAP server
328                          * doesn't object */
329                         if (ldb_dn_compare_base(partition->ctrl->dn,
330                                                 req->op.search.base) != 0) {
331                                 req->op.search.base = partition->ctrl->dn;
332                         }
333                 }
334
335         } else {
336                 /* make sure you put the module here, or
337                  * or ldb_next_request() will skip a module */
338                 ac->part_req[ac->num_requests].module = ac->module;
339         }
340
341         ac->num_requests++;
342
343         return LDB_SUCCESS;
344 }
345
346 static int partition_call_first(struct partition_context *ac)
347 {
348         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
349 }
350
351 /**
352  * Send a request down to all the partitions
353  */
354 static int partition_send_all(struct ldb_module *module, 
355                               struct partition_context *ac, 
356                               struct ldb_request *req) 
357 {
358         int i;
359         struct partition_private_data *data = talloc_get_type(module->private_data, 
360                                                               struct partition_private_data);
361         int ret = partition_prep_request(ac, NULL);
362         if (ret != LDB_SUCCESS) {
363                 return ret;
364         }
365         for (i=0; data && data->partitions && data->partitions[i]; i++) {
366                 ret = partition_prep_request(ac, data->partitions[i]);
367                 if (ret != LDB_SUCCESS) {
368                         return ret;
369                 }
370         }
371
372         /* fire the first one */
373         return partition_call_first(ac);
374 }
375
376 /**
377  * Figure out which backend a request needs to be aimed at.  Some
378  * requests must be replicated to all backends
379  */
380 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
381 {
382         struct partition_context *ac;
383         unsigned i;
384         int ret;
385         struct dsdb_partition *partition;
386         struct partition_private_data *data = talloc_get_type(module->private_data, 
387                                                               struct partition_private_data);
388         if (!data || !data->partitions) {
389                 return ldb_next_request(module, req);
390         }
391
392         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
393                 /* Is this a special DN, we need to replicate to every backend? */
394                 for (i=0; data->replicate && data->replicate[i]; i++) {
395                         if (ldb_dn_compare(data->replicate[i], 
396                                            dn) == 0) {
397                                 
398                                 ac = partition_init_ctx(module, req);
399                                 if (!ac) {
400                                         return LDB_ERR_OPERATIONS_ERROR;
401                                 }
402                                 
403                                 return partition_send_all(module, ac, req);
404                         }
405                 }
406         }
407
408         /* Otherwise, we need to find the partition to fire it to */
409
410         /* Find partition */
411         partition = find_partition(data, dn, req);
412         if (!partition) {
413                 /*
414                  * if we haven't found a matching partition
415                  * pass the request to the main ldb
416                  *
417                  * TODO: we should maybe return an error here
418                  *       if it's not a special dn
419                  */
420
421                 return ldb_next_request(module, req);
422         }
423
424         ac = partition_init_ctx(module, req);
425         if (!ac) {
426                 return LDB_ERR_OPERATIONS_ERROR;
427         }
428
429         /* we need to add a control but we never touch the original request */
430         ret = partition_prep_request(ac, partition);
431         if (ret != LDB_SUCCESS) {
432                 return ret;
433         }
434
435         /* fire the first one */
436         return partition_call_first(ac);
437 }
438
439 /* search */
440 static int partition_search(struct ldb_module *module, struct ldb_request *req)
441 {
442         int ret;
443         struct ldb_control **saved_controls;
444         /* Find backend */
445         struct partition_private_data *data = talloc_get_type(module->private_data, 
446                                                               struct partition_private_data);
447
448         /* issue request */
449
450         /* (later) consider if we should be searching multiple
451          * partitions (for 'invisible' partition behaviour */
452
453         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
454         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
455         
456         struct ldb_search_options_control *search_options = NULL;
457         struct dsdb_partition *p;
458         
459         ret = partition_reload_if_required(module, data);
460         if (ret != LDB_SUCCESS) {
461                 return ret;
462         }
463
464         p = find_partition(data, NULL, req);
465         if (p != NULL) {
466                 /* the caller specified what partition they want the
467                  * search - just pass it on
468                  */
469                 return ldb_next_request(p->module, req);                
470         }
471
472
473         if (search_control) {
474                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
475         }
476
477         /* Remove the domain_scope control, so we don't confuse a backend server */
478         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
479                 ldb_oom(ldb_module_get_ctx(module));
480                 return LDB_ERR_OPERATIONS_ERROR;
481         }
482
483         /*
484          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
485          * down as uncritical to make windows 2008 dcpromo happy.
486          */
487         if (search_control) {
488                 search_control->critical = 0;
489         }
490
491         /* TODO:
492            Generate referrals (look for a partition under this DN) if we don't have the above control specified
493         */
494         
495         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
496                 int i;
497                 struct partition_context *ac;
498                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
499                         /* We have processed this flag, so we are done with this control now */
500
501                         /* Remove search control, so we don't confuse a backend server */
502                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
503                                 ldb_oom(ldb_module_get_ctx(module));
504                                 return LDB_ERR_OPERATIONS_ERROR;
505                         }
506                 }
507                 ac = partition_init_ctx(module, req);
508                 if (!ac) {
509                         return LDB_ERR_OPERATIONS_ERROR;
510                 }
511
512                 /* Search from the base DN */
513                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
514                         return partition_send_all(module, ac, req);
515                 }
516                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
517                         bool match = false, stop = false;
518                         /* Find all partitions under the search base 
519                            
520                            we match if:
521
522                               1) the DN we are looking for exactly matches the partition
523                              or
524                               2) the DN we are looking for is a parent of the partition and it isn't
525                                  a scope base search
526                              or
527                               3) the DN we are looking for is a child of the partition
528                          */
529                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
530                                 match = true;
531                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
532                                         stop = true;
533                                 }
534                         }
535                         if (!match && 
536                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
537                              req->op.search.scope != LDB_SCOPE_BASE)) {
538                                 match = true;
539                         }
540                         if (!match &&
541                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
542                                 match = true;
543                                 stop = true; /* note that this relies on partition ordering */
544                         }
545                         if (match) {
546                                 ret = partition_prep_request(ac, data->partitions[i]);
547                                 if (ret != LDB_SUCCESS) {
548                                         return ret;
549                                 }
550                         }
551                         if (stop) break;
552                 }
553
554                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
555                 if (ac->num_requests == 0) {
556                         talloc_free(ac);
557                         return ldb_next_request(module, req);
558                 }
559
560                 /* fire the first one */
561                 return partition_call_first(ac);
562
563         } else {
564                 /* Handle this like all other requests */
565                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
566                         /* We have processed this flag, so we are done with this control now */
567
568                         /* Remove search control, so we don't confuse a backend server */
569                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
570                                 ldb_oom(ldb_module_get_ctx(module));
571                                 return LDB_ERR_OPERATIONS_ERROR;
572                         }
573                 }
574
575                 return partition_replicate(module, req, req->op.search.base);
576         }
577 }
578
579 /* add */
580 static int partition_add(struct ldb_module *module, struct ldb_request *req)
581 {
582         return partition_replicate(module, req, req->op.add.message->dn);
583 }
584
585 /* modify */
586 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
587 {
588         return partition_replicate(module, req, req->op.mod.message->dn);
589 }
590
591 /* delete */
592 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
593 {
594         return partition_replicate(module, req, req->op.del.dn);
595 }
596
597 /* rename */
598 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
599 {
600         /* Find backend */
601         struct dsdb_partition *backend, *backend2;
602         
603         struct partition_private_data *data = talloc_get_type(module->private_data, 
604                                                               struct partition_private_data);
605
606         /* Skip the lot if 'data' isn't here yet (initialisation) */
607         if (!data) {
608                 return LDB_ERR_OPERATIONS_ERROR;
609         }
610
611         backend = find_partition(data, req->op.rename.olddn, req);
612         backend2 = find_partition(data, req->op.rename.newdn, req);
613
614         if ((backend && !backend2) || (!backend && backend2)) {
615                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
616         }
617
618         if (backend != backend2) {
619                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
620                                        "Cannot rename from %s in %s to %s in %s: %s",
621                                        ldb_dn_get_linearized(req->op.rename.olddn),
622                                        ldb_dn_get_linearized(backend->ctrl->dn),
623                                        ldb_dn_get_linearized(req->op.rename.newdn),
624                                        ldb_dn_get_linearized(backend2->ctrl->dn),
625                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
626                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
627         }
628
629         return partition_replicate(module, req, req->op.rename.olddn);
630 }
631
632 /* start a transaction */
633 static int partition_start_trans(struct ldb_module *module)
634 {
635         int i, ret;
636         struct partition_private_data *data = talloc_get_type(module->private_data, 
637                                                               struct partition_private_data);
638         /* Look at base DN */
639         /* Figure out which partition it is under */
640         /* Skip the lot if 'data' isn't here yet (initialization) */
641         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
642                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
643         }
644         ret = ldb_next_start_trans(module);
645         if (ret != LDB_SUCCESS) {
646                 return ret;
647         }
648
649         ret = partition_reload_if_required(module, data);
650         if (ret != LDB_SUCCESS) {
651                 return ret;
652         }
653
654         for (i=0; data && data->partitions && data->partitions[i]; i++) {
655                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
656                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
657                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
658                 }
659                 ret = ldb_next_start_trans(data->partitions[i]->module);
660                 if (ret != LDB_SUCCESS) {
661                         /* Back it out, if it fails on one */
662                         for (i--; i >= 0; i--) {
663                                 ldb_next_del_trans(data->partitions[i]->module);
664                         }
665                         ldb_next_del_trans(module);
666                         return ret;
667                 }
668         }
669
670         data->in_transaction++;
671
672         return LDB_SUCCESS;
673 }
674
675 /* prepare for a commit */
676 static int partition_prepare_commit(struct ldb_module *module)
677 {
678         int i;
679         struct partition_private_data *data = talloc_get_type(module->private_data, 
680                                                               struct partition_private_data);
681
682         for (i=0; data && data->partitions && data->partitions[i]; i++) {
683                 int ret;
684
685                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
686                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
687                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
688                 }
689                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
690                 if (ret != LDB_SUCCESS) {
691                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
692                         return ret;
693                 }
694         }
695
696         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
697                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
698         }
699         return ldb_next_prepare_commit(module);
700 }
701
702
703 /* end a transaction */
704 static int partition_end_trans(struct ldb_module *module)
705 {
706         int i, ret, ret2;
707         struct partition_private_data *data = talloc_get_type(module->private_data, 
708                                                               struct partition_private_data);
709
710         ret = LDB_SUCCESS;
711
712         if (data->in_transaction == 0) {
713                 DEBUG(0,("partition end transaction mismatch\n"));
714                 ret = LDB_ERR_OPERATIONS_ERROR;
715         } else {
716                 data->in_transaction--;
717         }
718
719         for (i=0; data && data->partitions && data->partitions[i]; i++) {
720                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
721                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
722                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
723                 }
724                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
725                 if (ret2 != LDB_SUCCESS) {
726                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
727                         ret = ret2;
728                 }
729         }
730
731         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
732                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
733         }
734         ret2 = ldb_next_end_trans(module);
735         if (ret2 != LDB_SUCCESS) {
736                 ret = ret2;
737         }
738         return ret;
739 }
740
741 /* delete a transaction */
742 static int partition_del_trans(struct ldb_module *module)
743 {
744         int i, ret, final_ret = LDB_SUCCESS;
745         struct partition_private_data *data = talloc_get_type(module->private_data, 
746                                                               struct partition_private_data);
747         for (i=0; data && data->partitions && data->partitions[i]; i++) {
748                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
749                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
750                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
751                 }
752                 ret = ldb_next_del_trans(data->partitions[i]->module);
753                 if (ret != LDB_SUCCESS) {
754                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
755                         final_ret = ret;
756                 }
757         }       
758
759         if (data->in_transaction == 0) {
760                 DEBUG(0,("partition del transaction mismatch\n"));
761                 return LDB_ERR_OPERATIONS_ERROR;
762         }
763         data->in_transaction--;
764
765         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
766                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
767         }
768         ret = ldb_next_del_trans(module);
769         if (ret != LDB_SUCCESS) {
770                 final_ret = ret;
771         }
772         return final_ret;
773 }
774
775 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
776                                      enum ldb_sequence_type type, uint64_t *seq_number) 
777 {
778         int ret;
779         struct ldb_result *res;
780         struct ldb_seqnum_request *tseq;
781         struct ldb_request *treq;
782         struct ldb_seqnum_result *seqr;
783         res = talloc_zero(mem_ctx, struct ldb_result);
784         if (res == NULL) {
785                 return LDB_ERR_OPERATIONS_ERROR;
786         }
787         tseq = talloc_zero(res, struct ldb_seqnum_request);
788         if (tseq == NULL) {
789                 talloc_free(res);
790                 return LDB_ERR_OPERATIONS_ERROR;
791         }
792         tseq->type = type;
793         
794         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
795                                      LDB_EXTENDED_SEQUENCE_NUMBER,
796                                      tseq,
797                                      NULL,
798                                      res,
799                                      ldb_extended_default_callback,
800                                      NULL);
801         if (ret != LDB_SUCCESS) {
802                 talloc_free(res);
803                 return ret;
804         }
805         
806         ret = ldb_next_request(module, treq);
807         if (ret != LDB_SUCCESS) {
808                 talloc_free(res);
809                 return ret;
810         }
811         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
812         if (ret != LDB_SUCCESS) {
813                 talloc_free(res);
814                 return ret;
815         }
816         
817         seqr = talloc_get_type(res->extended->data,
818                                struct ldb_seqnum_result);
819         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
820                 ret = LDB_ERR_OPERATIONS_ERROR;
821                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
822                 talloc_free(res);
823                 return ret;
824         } else {
825                 *seq_number = seqr->seq_num;
826         }
827         talloc_free(res);
828         return LDB_SUCCESS;
829 }
830
831 /* FIXME: This function is still semi-async */
832 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
833 {
834         int i, ret;
835         uint64_t seq_number = 0;
836         uint64_t timestamp_sequence = 0;
837         uint64_t timestamp = 0;
838         struct partition_private_data *data = talloc_get_type(module->private_data, 
839                                                               struct partition_private_data);
840         struct ldb_seqnum_request *seq;
841         struct ldb_seqnum_result *seqr;
842         struct ldb_request *treq;
843         struct ldb_seqnum_request *tseq;
844         struct ldb_seqnum_result *tseqr;
845         struct ldb_extended *ext;
846         struct ldb_result *res;
847         struct dsdb_partition *p;
848
849         p = find_partition(data, NULL, req);
850         if (p != NULL) {
851                 /* the caller specified what partition they want the
852                  * sequence number operation on - just pass it on
853                  */
854                 return ldb_next_request(p->module, req);                
855         }
856
857         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
858
859         switch (seq->type) {
860         case LDB_SEQ_NEXT:
861         case LDB_SEQ_HIGHEST_SEQ:
862
863                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
864                 if (ret != LDB_SUCCESS) {
865                         return ret;
866                 }
867
868                 /* Skip the lot if 'data' isn't here yet (initialisation) */
869                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
870
871                         res = talloc_zero(req, struct ldb_result);
872                         if (res == NULL) {
873                                 return LDB_ERR_OPERATIONS_ERROR;
874                         }
875                         tseq = talloc_zero(res, struct ldb_seqnum_request);
876                         if (tseq == NULL) {
877                                 talloc_free(res);
878                                 return LDB_ERR_OPERATIONS_ERROR;
879                         }
880                         tseq->type = seq->type;
881
882                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
883                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
884                                                      tseq,
885                                                      NULL,
886                                                      res,
887                                                      ldb_extended_default_callback,
888                                                      NULL);
889                         if (ret != LDB_SUCCESS) {
890                                 talloc_free(res);
891                                 return ret;
892                         }
893
894                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
895                                 ret = ldb_request_add_control(treq,
896                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
897                                                               false, data->partitions[i]->ctrl);
898                                 if (ret != LDB_SUCCESS) {
899                                         talloc_free(res);
900                                         return ret;
901                                 }
902                         }
903
904                         ret = partition_request(data->partitions[i]->module, treq);
905                         if (ret != LDB_SUCCESS) {
906                                 talloc_free(res);
907                                 return ret;
908                         }
909                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
910                         if (ret != LDB_SUCCESS) {
911                                 talloc_free(res);
912                                 return ret;
913                         }
914                         tseqr = talloc_get_type(res->extended->data,
915                                                 struct ldb_seqnum_result);
916                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
917                                 timestamp_sequence = MAX(timestamp_sequence,
918                                                          tseqr->seq_num);
919                         } else {
920                                 seq_number += tseqr->seq_num;
921                         }
922                         talloc_free(res);
923                 }
924                 /* fall through */
925         case LDB_SEQ_HIGHEST_TIMESTAMP:
926
927                 res = talloc_zero(req, struct ldb_result);
928                 if (res == NULL) {
929                         return LDB_ERR_OPERATIONS_ERROR;
930                 }
931
932                 tseq = talloc_zero(res, struct ldb_seqnum_request);
933                 if (tseq == NULL) {
934                         talloc_free(res);
935                         return LDB_ERR_OPERATIONS_ERROR;
936                 }
937                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
938
939                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
940                                              LDB_EXTENDED_SEQUENCE_NUMBER,
941                                              tseq,
942                                              NULL,
943                                              res,
944                                              ldb_extended_default_callback,
945                                              NULL);
946                 if (ret != LDB_SUCCESS) {
947                         talloc_free(res);
948                         return ret;
949                 }
950
951                 ret = ldb_next_request(module, treq);
952                 if (ret != LDB_SUCCESS) {
953                         talloc_free(res);
954                         return ret;
955                 }
956                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
957                 if (ret != LDB_SUCCESS) {
958                         talloc_free(res);
959                         return ret;
960                 }
961
962                 tseqr = talloc_get_type(res->extended->data,
963                                            struct ldb_seqnum_result);
964                 timestamp = tseqr->seq_num;
965
966                 talloc_free(res);
967
968                 /* Skip the lot if 'data' isn't here yet (initialisation) */
969                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
970
971                         res = talloc_zero(req, struct ldb_result);
972                         if (res == NULL) {
973                                 return LDB_ERR_OPERATIONS_ERROR;
974                         }
975
976                         tseq = talloc_zero(res, struct ldb_seqnum_request);
977                         if (tseq == NULL) {
978                                 talloc_free(res);
979                                 return LDB_ERR_OPERATIONS_ERROR;
980                         }
981                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
982
983                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
984                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
985                                                      tseq,
986                                                      NULL,
987                                                      res,
988                                                      ldb_extended_default_callback,
989                                                      NULL);
990                         if (ret != LDB_SUCCESS) {
991                                 talloc_free(res);
992                                 return ret;
993                         }
994
995                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
996                                 ret = ldb_request_add_control(treq,
997                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
998                                                               false, data->partitions[i]->ctrl);
999                                 if (ret != LDB_SUCCESS) {
1000                                         talloc_free(res);
1001                                         return ret;
1002                                 }
1003                         }
1004
1005                         ret = partition_request(data->partitions[i]->module, treq);
1006                         if (ret != LDB_SUCCESS) {
1007                                 talloc_free(res);
1008                                 return ret;
1009                         }
1010                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1011                         if (ret != LDB_SUCCESS) {
1012                                 talloc_free(res);
1013                                 return ret;
1014                         }
1015
1016                         tseqr = talloc_get_type(res->extended->data,
1017                                                   struct ldb_seqnum_result);
1018                         timestamp = MAX(timestamp, tseqr->seq_num);
1019
1020                         talloc_free(res);
1021                 }
1022
1023                 break;
1024         }
1025
1026         ext = talloc_zero(req, struct ldb_extended);
1027         if (!ext) {
1028                 return LDB_ERR_OPERATIONS_ERROR;
1029         }
1030         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1031         if (seqr == NULL) {
1032                 talloc_free(ext);
1033                 return LDB_ERR_OPERATIONS_ERROR;
1034         }
1035         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1036         ext->data = seqr;
1037
1038         switch (seq->type) {
1039         case LDB_SEQ_NEXT:
1040         case LDB_SEQ_HIGHEST_SEQ:
1041
1042                 /* Has someone above set a timebase sequence? */
1043                 if (timestamp_sequence) {
1044                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1045                 } else {
1046                         seqr->seq_num = seq_number;
1047                 }
1048
1049                 if (timestamp_sequence > seqr->seq_num) {
1050                         seqr->seq_num = timestamp_sequence;
1051                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1052                 }
1053
1054                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1055                 break;
1056         case LDB_SEQ_HIGHEST_TIMESTAMP:
1057                 seqr->seq_num = timestamp;
1058                 break;
1059         }
1060
1061         if (seq->type == LDB_SEQ_NEXT) {
1062                 seqr->seq_num++;
1063         }
1064
1065         /* send request done */
1066         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1067 }
1068
1069 /* extended */
1070 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1071 {
1072         struct partition_private_data *data;
1073         struct partition_context *ac;
1074         int ret;
1075
1076         data = talloc_get_type(module->private_data, struct partition_private_data);
1077         if (!data) {
1078                 return ldb_next_request(module, req);
1079         }
1080
1081         ret = partition_reload_if_required(module, data);
1082         if (ret != LDB_SUCCESS) {
1083                 return ret;
1084         }
1085         
1086         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1087                 return partition_sequence_number(module, req);
1088         }
1089
1090         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1091                 return partition_create(module, req);
1092         }
1093
1094         /* 
1095          * as the extended operation has no dn
1096          * we need to send it to all partitions
1097          */
1098
1099         ac = partition_init_ctx(module, req);
1100         if (!ac) {
1101                 return LDB_ERR_OPERATIONS_ERROR;
1102         }
1103
1104         return partition_send_all(module, ac, req);
1105 }
1106
1107 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1108         .name              = "partition",
1109         .init_context      = partition_init,
1110         .search            = partition_search,
1111         .add               = partition_add,
1112         .modify            = partition_modify,
1113         .del               = partition_delete,
1114         .rename            = partition_rename,
1115         .extended          = partition_extended,
1116         .start_transaction = partition_start_trans,
1117         .prepare_commit    = partition_prepare_commit,
1118         .end_transaction   = partition_end_trans,
1119         .del_transaction   = partition_del_trans,
1120 };