b879bc410610065f382c195df7b67055385b2843
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42
43         struct part_request *part_req;
44         int num_requests;
45         int finished_requests;
46 };
47
48 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
49 {
50         struct partition_context *ac;
51
52         ac = talloc_zero(req, struct partition_context);
53         if (ac == NULL) {
54                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
55                 return NULL;
56         }
57
58         ac->module = module;
59         ac->req = req;
60
61         return ac;
62 }
63
64 /*
65  *    helper functions to call the next module in chain
66  *    */
67
68 int partition_request(struct ldb_module *module, struct ldb_request *request)
69 {
70         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) { \
71                 const struct dsdb_control_current_partition *partition = NULL;
72                 struct ldb_control *partition_ctrl = ldb_request_get_control(request, DSDB_CONTROL_CURRENT_PARTITION_OID);
73                 if (partition_ctrl) {
74                         partition = talloc_get_type(partition_ctrl->data,
75                                                     struct dsdb_control_current_partition);
76                 }
77
78                 if (partition != NULL) {
79                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> %s", 
80                                   ldb_dn_get_linearized(partition->dn));                        
81                 } else {
82                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_request() -> (metadata partition)");                 
83                 }
84         }
85
86         return ldb_next_request(module, request);
87 }
88
89 static struct dsdb_partition *find_partition(struct partition_private_data *data,
90                                              struct ldb_dn *dn,
91                                              struct ldb_request *req)
92 {
93         int i;
94         struct ldb_control *partition_ctrl;
95
96         /* see if the request has the partition DN specified in a
97          * control. The repl_meta_data module can specify this to
98          * ensure that replication happens to the right partition
99          */
100         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
101         if (partition_ctrl) {
102                 const struct dsdb_control_current_partition *partition;
103                 partition = talloc_get_type(partition_ctrl->data,
104                                             struct dsdb_control_current_partition);
105                 if (partition != NULL) {
106                         dn = partition->dn;
107                 }
108         }
109
110         if (dn == NULL) {
111                 return NULL;
112         }
113
114         /* Look at base DN */
115         /* Figure out which partition it is under */
116         /* Skip the lot if 'data' isn't here yet (initialisation) */
117         for (i=0; data && data->partitions && data->partitions[i]; i++) {
118                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
119                         return data->partitions[i];
120                 }
121         }
122
123         return NULL;
124 }
125
126 /**
127  * fire the caller's callback for every entry, but only send 'done' once.
128  */
129 static int partition_req_callback(struct ldb_request *req,
130                                   struct ldb_reply *ares)
131 {
132         struct partition_context *ac;
133         struct ldb_module *module;
134         struct ldb_request *nreq;
135         int ret;
136         struct partition_private_data *data;
137         struct ldb_control *partition_ctrl;
138
139         ac = talloc_get_type(req->context, struct partition_context);
140         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
141
142         if (!ares) {
143                 return ldb_module_done(ac->req, NULL, NULL,
144                                         LDB_ERR_OPERATIONS_ERROR);
145         }
146
147         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
148         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
149                 /* If we didn't fan this request out to mulitple partitions,
150                  * or this is an individual search result, we can
151                  * deterministily tell the caller what partition this was
152                  * written to (repl_meta_data likes to know) */
153                         ret = ldb_reply_add_control(ares,
154                                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
155                                                     false, partition_ctrl->data);
156                         if (ret != LDB_SUCCESS) {
157                                 return ldb_module_done(ac->req, NULL, NULL,
158                                                        ret);
159                         }
160         }
161
162         if (ares->error != LDB_SUCCESS) {
163                 return ldb_module_done(ac->req, ares->controls,
164                                         ares->response, ares->error);
165         }
166
167         switch (ares->type) {
168         case LDB_REPLY_REFERRAL:
169                 /* ignore referrals for now */
170                 break;
171
172         case LDB_REPLY_ENTRY:
173                 if (ac->req->operation != LDB_SEARCH) {
174                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
175                                 "partition_req_callback:"
176                                 " Unsupported reply type for this request");
177                         return ldb_module_done(ac->req, NULL, NULL,
178                                                 LDB_ERR_OPERATIONS_ERROR);
179                 }
180                 
181                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
182
183         case LDB_REPLY_DONE:
184                 if (ac->req->operation == LDB_EXTENDED) {
185                         /* FIXME: check for ares->response, replmd does not fill it ! */
186                         if (ares->response) {
187                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
188                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
189                                                           "partition_req_callback:"
190                                                           " Unknown extended reply, "
191                                                           "only supports START_TLS");
192                                         talloc_free(ares);
193                                         return ldb_module_done(ac->req, NULL, NULL,
194                                                                 LDB_ERR_OPERATIONS_ERROR);
195                                 }
196                         }
197                 }
198
199                 ac->finished_requests++;
200                 if (ac->finished_requests == ac->num_requests) {
201                         /* this was the last one, call callback */
202                         return ldb_module_done(ac->req, ares->controls,
203                                                ares->response, 
204                                                ares->error);
205                 }
206
207                 /* not the last, now call the next one */
208                 module = ac->part_req[ac->finished_requests].module;
209                 nreq = ac->part_req[ac->finished_requests].req;
210
211                 ret = partition_request(module, nreq);
212                 if (ret != LDB_SUCCESS) {
213                         talloc_free(ares);
214                         return ldb_module_done(ac->req, NULL, NULL, ret);
215                 }
216
217                 break;
218         }
219
220         talloc_free(ares);
221         return LDB_SUCCESS;
222 }
223
224 static int partition_prep_request(struct partition_context *ac,
225                                   struct dsdb_partition *partition)
226 {
227         int ret;
228         struct ldb_request *req;
229
230         ac->part_req = talloc_realloc(ac, ac->part_req,
231                                         struct part_request,
232                                         ac->num_requests + 1);
233         if (ac->part_req == NULL) {
234                 ldb_oom(ldb_module_get_ctx(ac->module));
235                 return LDB_ERR_OPERATIONS_ERROR;
236         }
237
238         switch (ac->req->operation) {
239         case LDB_SEARCH:
240                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
241                                         ac->part_req,
242                                         ac->req->op.search.base,
243                                         ac->req->op.search.scope,
244                                         ac->req->op.search.tree,
245                                         ac->req->op.search.attrs,
246                                         ac->req->controls,
247                                         ac, partition_req_callback,
248                                         ac->req);
249                 break;
250         case LDB_ADD:
251                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
252                                         ac->req->op.add.message,
253                                         ac->req->controls,
254                                         ac, partition_req_callback,
255                                         ac->req);
256                 break;
257         case LDB_MODIFY:
258                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
259                                         ac->req->op.mod.message,
260                                         ac->req->controls,
261                                         ac, partition_req_callback,
262                                         ac->req);
263                 break;
264         case LDB_DELETE:
265                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
266                                         ac->req->op.del.dn,
267                                         ac->req->controls,
268                                         ac, partition_req_callback,
269                                         ac->req);
270                 break;
271         case LDB_RENAME:
272                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
273                                         ac->req->op.rename.olddn,
274                                         ac->req->op.rename.newdn,
275                                         ac->req->controls,
276                                         ac, partition_req_callback,
277                                         ac->req);
278                 break;
279         case LDB_EXTENDED:
280                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
281                                         ac->part_req,
282                                         ac->req->op.extended.oid,
283                                         ac->req->op.extended.data,
284                                         ac->req->controls,
285                                         ac, partition_req_callback,
286                                         ac->req);
287                 break;
288         default:
289                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
290                                   "Unsupported request type!");
291                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
292         }
293
294         if (ret != LDB_SUCCESS) {
295                 return ret;
296         }
297
298         ac->part_req[ac->num_requests].req = req;
299
300         if (ac->req->controls) {
301                 req->controls = talloc_memdup(req, ac->req->controls,
302                                         talloc_get_size(ac->req->controls));
303                 if (req->controls == NULL) {
304                         ldb_oom(ldb_module_get_ctx(ac->module));
305                         return LDB_ERR_OPERATIONS_ERROR;
306                 }
307         }
308
309         if (partition) {
310                 ac->part_req[ac->num_requests].module = partition->module;
311
312                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
313                         ret = ldb_request_add_control(req,
314                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
315                                                       false, partition->ctrl);
316                         if (ret != LDB_SUCCESS) {
317                                 return ret;
318                         }
319                 }
320
321                 if (req->operation == LDB_SEARCH) {
322                         /* If the search is for 'more' than this partition,
323                          * then change the basedn, so a remote LDAP server
324                          * doesn't object */
325                         if (ldb_dn_compare_base(partition->ctrl->dn,
326                                                 req->op.search.base) != 0) {
327                                 req->op.search.base = partition->ctrl->dn;
328                         }
329                 }
330
331         } else {
332                 /* make sure you put the module here, or
333                  * or ldb_next_request() will skip a module */
334                 ac->part_req[ac->num_requests].module = ac->module;
335         }
336
337         ac->num_requests++;
338
339         return LDB_SUCCESS;
340 }
341
342 static int partition_call_first(struct partition_context *ac)
343 {
344         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
345 }
346
347 /**
348  * Send a request down to all the partitions
349  */
350 static int partition_send_all(struct ldb_module *module, 
351                               struct partition_context *ac, 
352                               struct ldb_request *req) 
353 {
354         int i;
355         struct partition_private_data *data = talloc_get_type(module->private_data, 
356                                                               struct partition_private_data);
357         int ret = partition_prep_request(ac, NULL);
358         if (ret != LDB_SUCCESS) {
359                 return ret;
360         }
361         for (i=0; data && data->partitions && data->partitions[i]; i++) {
362                 ret = partition_prep_request(ac, data->partitions[i]);
363                 if (ret != LDB_SUCCESS) {
364                         return ret;
365                 }
366         }
367
368         /* fire the first one */
369         return partition_call_first(ac);
370 }
371
372 /**
373  * Figure out which backend a request needs to be aimed at.  Some
374  * requests must be replicated to all backends
375  */
376 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
377 {
378         struct partition_context *ac;
379         unsigned i;
380         int ret;
381         struct dsdb_partition *partition;
382         struct partition_private_data *data = talloc_get_type(module->private_data, 
383                                                               struct partition_private_data);
384         if (!data || !data->partitions) {
385                 return ldb_next_request(module, req);
386         }
387
388         if (req->operation != LDB_SEARCH && ldb_dn_is_special(dn)) {
389                 /* Is this a special DN, we need to replicate to every backend? */
390                 for (i=0; data->replicate && data->replicate[i]; i++) {
391                         if (ldb_dn_compare(data->replicate[i], 
392                                            dn) == 0) {
393                                 
394                                 ac = partition_init_ctx(module, req);
395                                 if (!ac) {
396                                         return LDB_ERR_OPERATIONS_ERROR;
397                                 }
398                                 
399                                 return partition_send_all(module, ac, req);
400                         }
401                 }
402         }
403
404         /* Otherwise, we need to find the partition to fire it to */
405
406         /* Find partition */
407         partition = find_partition(data, dn, req);
408         if (!partition) {
409                 /*
410                  * if we haven't found a matching partition
411                  * pass the request to the main ldb
412                  *
413                  * TODO: we should maybe return an error here
414                  *       if it's not a special dn
415                  */
416
417                 return ldb_next_request(module, req);
418         }
419
420         ac = partition_init_ctx(module, req);
421         if (!ac) {
422                 return LDB_ERR_OPERATIONS_ERROR;
423         }
424
425         /* we need to add a control but we never touch the original request */
426         ret = partition_prep_request(ac, partition);
427         if (ret != LDB_SUCCESS) {
428                 return ret;
429         }
430
431         /* fire the first one */
432         return partition_call_first(ac);
433 }
434
435 /* search */
436 static int partition_search(struct ldb_module *module, struct ldb_request *req)
437 {
438         int ret;
439         struct ldb_control **saved_controls;
440         /* Find backend */
441         struct partition_private_data *data = talloc_get_type(module->private_data, 
442                                                               struct partition_private_data);
443
444         /* issue request */
445
446         /* (later) consider if we should be searching multiple
447          * partitions (for 'invisible' partition behaviour */
448
449         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
450         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
451         
452         struct ldb_search_options_control *search_options = NULL;
453         struct dsdb_partition *p;
454         
455         ret = partition_reload_if_required(module, data);
456         if (ret != LDB_SUCCESS) {
457                 return ret;
458         }
459
460         p = find_partition(data, NULL, req);
461         if (p != NULL) {
462                 /* the caller specified what partition they want the
463                  * search - just pass it on
464                  */
465                 return ldb_next_request(p->module, req);                
466         }
467
468
469         if (search_control) {
470                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
471         }
472
473         /* Remove the domain_scope control, so we don't confuse a backend server */
474         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
475                 ldb_oom(ldb_module_get_ctx(module));
476                 return LDB_ERR_OPERATIONS_ERROR;
477         }
478
479         /*
480          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
481          * down as uncritical to make windows 2008 dcpromo happy.
482          */
483         if (search_control) {
484                 search_control->critical = 0;
485         }
486
487         /* TODO:
488            Generate referrals (look for a partition under this DN) if we don't have the above control specified
489         */
490         
491         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
492                 int i;
493                 struct partition_context *ac;
494                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
495                         /* We have processed this flag, so we are done with this control now */
496
497                         /* Remove search control, so we don't confuse a backend server */
498                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
499                                 ldb_oom(ldb_module_get_ctx(module));
500                                 return LDB_ERR_OPERATIONS_ERROR;
501                         }
502                 }
503                 ac = partition_init_ctx(module, req);
504                 if (!ac) {
505                         return LDB_ERR_OPERATIONS_ERROR;
506                 }
507
508                 /* Search from the base DN */
509                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
510                         return partition_send_all(module, ac, req);
511                 }
512                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
513                         bool match = false, stop = false;
514                         /* Find all partitions under the search base 
515                            
516                            we match if:
517
518                               1) the DN we are looking for exactly matches the partition
519                              or
520                               2) the DN we are looking for is a parent of the partition and it isn't
521                                  a scope base search
522                              or
523                               3) the DN we are looking for is a child of the partition
524                          */
525                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
526                                 match = true;
527                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
528                                         stop = true;
529                                 }
530                         }
531                         if (!match && 
532                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
533                              req->op.search.scope != LDB_SCOPE_BASE)) {
534                                 match = true;
535                         }
536                         if (!match &&
537                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
538                                 match = true;
539                                 stop = true; /* note that this relies on partition ordering */
540                         }
541                         if (match) {
542                                 ret = partition_prep_request(ac, data->partitions[i]);
543                                 if (ret != LDB_SUCCESS) {
544                                         return ret;
545                                 }
546                         }
547                         if (stop) break;
548                 }
549
550                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
551                 if (ac->num_requests == 0) {
552                         talloc_free(ac);
553                         return ldb_next_request(module, req);
554                 }
555
556                 /* fire the first one */
557                 return partition_call_first(ac);
558
559         } else {
560                 /* Handle this like all other requests */
561                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
562                         /* We have processed this flag, so we are done with this control now */
563
564                         /* Remove search control, so we don't confuse a backend server */
565                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
566                                 ldb_oom(ldb_module_get_ctx(module));
567                                 return LDB_ERR_OPERATIONS_ERROR;
568                         }
569                 }
570
571                 return partition_replicate(module, req, req->op.search.base);
572         }
573 }
574
575 /* add */
576 static int partition_add(struct ldb_module *module, struct ldb_request *req)
577 {
578         return partition_replicate(module, req, req->op.add.message->dn);
579 }
580
581 /* modify */
582 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
583 {
584         return partition_replicate(module, req, req->op.mod.message->dn);
585 }
586
587 /* delete */
588 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
589 {
590         return partition_replicate(module, req, req->op.del.dn);
591 }
592
593 /* rename */
594 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
595 {
596         /* Find backend */
597         struct dsdb_partition *backend, *backend2;
598         
599         struct partition_private_data *data = talloc_get_type(module->private_data, 
600                                                               struct partition_private_data);
601
602         /* Skip the lot if 'data' isn't here yet (initialisation) */
603         if (!data) {
604                 return LDB_ERR_OPERATIONS_ERROR;
605         }
606
607         backend = find_partition(data, req->op.rename.olddn, req);
608         backend2 = find_partition(data, req->op.rename.newdn, req);
609
610         if ((backend && !backend2) || (!backend && backend2)) {
611                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
612         }
613
614         if (backend != backend2) {
615                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
616                                        "Cannot rename from %s in %s to %s in %s: %s",
617                                        ldb_dn_get_linearized(req->op.rename.olddn),
618                                        ldb_dn_get_linearized(backend->ctrl->dn),
619                                        ldb_dn_get_linearized(req->op.rename.newdn),
620                                        ldb_dn_get_linearized(backend2->ctrl->dn),
621                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
622                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
623         }
624
625         return partition_replicate(module, req, req->op.rename.olddn);
626 }
627
628 /* start a transaction */
629 static int partition_start_trans(struct ldb_module *module)
630 {
631         int i, ret;
632         struct partition_private_data *data = talloc_get_type(module->private_data, 
633                                                               struct partition_private_data);
634         /* Look at base DN */
635         /* Figure out which partition it is under */
636         /* Skip the lot if 'data' isn't here yet (initialization) */
637         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
638                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> (metadata partition)");
639         }
640         ret = ldb_next_start_trans(module);
641         if (ret != LDB_SUCCESS) {
642                 return ret;
643         }
644
645         ret = partition_reload_if_required(module, data);
646         if (ret != LDB_SUCCESS) {
647                 return ret;
648         }
649
650         for (i=0; data && data->partitions && data->partitions[i]; i++) {
651                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
652                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_start_trans() -> %s", 
653                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
654                 }
655                 ret = ldb_next_start_trans(data->partitions[i]->module);
656                 if (ret != LDB_SUCCESS) {
657                         /* Back it out, if it fails on one */
658                         for (i--; i >= 0; i--) {
659                                 ldb_next_del_trans(data->partitions[i]->module);
660                         }
661                         ldb_next_del_trans(module);
662                         return ret;
663                 }
664         }
665
666         data->in_transaction++;
667
668         return LDB_SUCCESS;
669 }
670
671 /* prepare for a commit */
672 static int partition_prepare_commit(struct ldb_module *module)
673 {
674         int i;
675         struct partition_private_data *data = talloc_get_type(module->private_data, 
676                                                               struct partition_private_data);
677
678         for (i=0; data && data->partitions && data->partitions[i]; i++) {
679                 int ret;
680
681                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
682                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> %s", 
683                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
684                 }
685                 ret = ldb_next_prepare_commit(data->partitions[i]->module);
686                 if (ret != LDB_SUCCESS) {
687                         ldb_asprintf_errstring(module->ldb, "prepare_commit error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
688                         return ret;
689                 }
690         }
691
692         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
693                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_prepare_commit() -> (metadata partition)");
694         }
695         return ldb_next_prepare_commit(module);
696 }
697
698
699 /* end a transaction */
700 static int partition_end_trans(struct ldb_module *module)
701 {
702         int i, ret, ret2;
703         struct partition_private_data *data = talloc_get_type(module->private_data, 
704                                                               struct partition_private_data);
705
706         ret = LDB_SUCCESS;
707
708         if (data->in_transaction == 0) {
709                 DEBUG(0,("partition end transaction mismatch\n"));
710                 ret = LDB_ERR_OPERATIONS_ERROR;
711         } else {
712                 data->in_transaction--;
713         }
714
715         for (i=0; data && data->partitions && data->partitions[i]; i++) {
716                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
717                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> %s", 
718                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
719                 }
720                 ret2 = ldb_next_end_trans(data->partitions[i]->module);
721                 if (ret2 != LDB_SUCCESS) {
722                         ldb_asprintf_errstring(module->ldb, "end_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
723                         ret = ret2;
724                 }
725         }
726
727         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
728                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_end_trans() -> (metadata partition)");
729         }
730         ret2 = ldb_next_end_trans(module);
731         if (ret2 != LDB_SUCCESS) {
732                 ret = ret2;
733         }
734         return ret;
735 }
736
737 /* delete a transaction */
738 static int partition_del_trans(struct ldb_module *module)
739 {
740         int i, ret, final_ret = LDB_SUCCESS;
741         struct partition_private_data *data = talloc_get_type(module->private_data, 
742                                                               struct partition_private_data);
743         for (i=0; data && data->partitions && data->partitions[i]; i++) {
744                 if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
745                         ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> %s", 
746                                   ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
747                 }
748                 ret = ldb_next_del_trans(data->partitions[i]->module);
749                 if (ret != LDB_SUCCESS) {
750                         ldb_asprintf_errstring(module->ldb, "del_trans error on %s: %s", ldb_dn_get_linearized(data->partitions[i]->ctrl->dn), ldb_errstring(module->ldb));
751                         final_ret = ret;
752                 }
753         }       
754
755         if (data->in_transaction == 0) {
756                 DEBUG(0,("partition del transaction mismatch\n"));
757                 return LDB_ERR_OPERATIONS_ERROR;
758         }
759         data->in_transaction--;
760
761         if ((module && module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
762                 ldb_debug(module->ldb, LDB_DEBUG_TRACE, "partition_del_trans() -> (metadata partition)");
763         }
764         ret = ldb_next_del_trans(module);
765         if (ret != LDB_SUCCESS) {
766                 final_ret = ret;
767         }
768         return final_ret;
769 }
770
771 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
772                                      enum ldb_sequence_type type, uint64_t *seq_number) 
773 {
774         int ret;
775         struct ldb_result *res;
776         struct ldb_seqnum_request *tseq;
777         struct ldb_request *treq;
778         struct ldb_seqnum_result *seqr;
779         res = talloc_zero(mem_ctx, struct ldb_result);
780         if (res == NULL) {
781                 return LDB_ERR_OPERATIONS_ERROR;
782         }
783         tseq = talloc_zero(res, struct ldb_seqnum_request);
784         if (tseq == NULL) {
785                 talloc_free(res);
786                 return LDB_ERR_OPERATIONS_ERROR;
787         }
788         tseq->type = type;
789         
790         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
791                                      LDB_EXTENDED_SEQUENCE_NUMBER,
792                                      tseq,
793                                      NULL,
794                                      res,
795                                      ldb_extended_default_callback,
796                                      NULL);
797         if (ret != LDB_SUCCESS) {
798                 talloc_free(res);
799                 return ret;
800         }
801         
802         ret = ldb_next_request(module, treq);
803         if (ret != LDB_SUCCESS) {
804                 talloc_free(res);
805                 return ret;
806         }
807         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
808         if (ret != LDB_SUCCESS) {
809                 talloc_free(res);
810                 return ret;
811         }
812         
813         seqr = talloc_get_type(res->extended->data,
814                                struct ldb_seqnum_result);
815         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
816                 ret = LDB_ERR_OPERATIONS_ERROR;
817                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
818                 talloc_free(res);
819                 return ret;
820         } else {
821                 *seq_number = seqr->seq_num;
822         }
823         talloc_free(res);
824         return LDB_SUCCESS;
825 }
826
827 /* FIXME: This function is still semi-async */
828 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
829 {
830         int i, ret;
831         uint64_t seq_number = 0;
832         uint64_t timestamp_sequence = 0;
833         uint64_t timestamp = 0;
834         struct partition_private_data *data = talloc_get_type(module->private_data, 
835                                                               struct partition_private_data);
836         struct ldb_seqnum_request *seq;
837         struct ldb_seqnum_result *seqr;
838         struct ldb_request *treq;
839         struct ldb_seqnum_request *tseq;
840         struct ldb_seqnum_result *tseqr;
841         struct ldb_extended *ext;
842         struct ldb_result *res;
843         struct dsdb_partition *p;
844
845         p = find_partition(data, NULL, req);
846         if (p != NULL) {
847                 /* the caller specified what partition they want the
848                  * sequence number operation on - just pass it on
849                  */
850                 return ldb_next_request(p->module, req);                
851         }
852
853         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
854
855         switch (seq->type) {
856         case LDB_SEQ_NEXT:
857         case LDB_SEQ_HIGHEST_SEQ:
858
859                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
860                 if (ret != LDB_SUCCESS) {
861                         return ret;
862                 }
863
864                 /* Skip the lot if 'data' isn't here yet (initialisation) */
865                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
866
867                         res = talloc_zero(req, struct ldb_result);
868                         if (res == NULL) {
869                                 return LDB_ERR_OPERATIONS_ERROR;
870                         }
871                         tseq = talloc_zero(res, struct ldb_seqnum_request);
872                         if (tseq == NULL) {
873                                 talloc_free(res);
874                                 return LDB_ERR_OPERATIONS_ERROR;
875                         }
876                         tseq->type = seq->type;
877
878                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
879                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
880                                                      tseq,
881                                                      NULL,
882                                                      res,
883                                                      ldb_extended_default_callback,
884                                                      NULL);
885                         if (ret != LDB_SUCCESS) {
886                                 talloc_free(res);
887                                 return ret;
888                         }
889
890                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
891                                 ret = ldb_request_add_control(treq,
892                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
893                                                               false, data->partitions[i]->ctrl);
894                                 if (ret != LDB_SUCCESS) {
895                                         talloc_free(res);
896                                         return ret;
897                                 }
898                         }
899
900                         ret = partition_request(data->partitions[i]->module, treq);
901                         if (ret != LDB_SUCCESS) {
902                                 talloc_free(res);
903                                 return ret;
904                         }
905                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
906                         if (ret != LDB_SUCCESS) {
907                                 talloc_free(res);
908                                 return ret;
909                         }
910                         tseqr = talloc_get_type(res->extended->data,
911                                                 struct ldb_seqnum_result);
912                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
913                                 timestamp_sequence = MAX(timestamp_sequence,
914                                                          tseqr->seq_num);
915                         } else {
916                                 seq_number += tseqr->seq_num;
917                         }
918                         talloc_free(res);
919                 }
920                 /* fall through */
921         case LDB_SEQ_HIGHEST_TIMESTAMP:
922
923                 res = talloc_zero(req, struct ldb_result);
924                 if (res == NULL) {
925                         return LDB_ERR_OPERATIONS_ERROR;
926                 }
927
928                 tseq = talloc_zero(res, struct ldb_seqnum_request);
929                 if (tseq == NULL) {
930                         talloc_free(res);
931                         return LDB_ERR_OPERATIONS_ERROR;
932                 }
933                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
934
935                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
936                                              LDB_EXTENDED_SEQUENCE_NUMBER,
937                                              tseq,
938                                              NULL,
939                                              res,
940                                              ldb_extended_default_callback,
941                                              NULL);
942                 if (ret != LDB_SUCCESS) {
943                         talloc_free(res);
944                         return ret;
945                 }
946
947                 ret = ldb_next_request(module, treq);
948                 if (ret != LDB_SUCCESS) {
949                         talloc_free(res);
950                         return ret;
951                 }
952                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
953                 if (ret != LDB_SUCCESS) {
954                         talloc_free(res);
955                         return ret;
956                 }
957
958                 tseqr = talloc_get_type(res->extended->data,
959                                            struct ldb_seqnum_result);
960                 timestamp = tseqr->seq_num;
961
962                 talloc_free(res);
963
964                 /* Skip the lot if 'data' isn't here yet (initialisation) */
965                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
966
967                         res = talloc_zero(req, struct ldb_result);
968                         if (res == NULL) {
969                                 return LDB_ERR_OPERATIONS_ERROR;
970                         }
971
972                         tseq = talloc_zero(res, struct ldb_seqnum_request);
973                         if (tseq == NULL) {
974                                 talloc_free(res);
975                                 return LDB_ERR_OPERATIONS_ERROR;
976                         }
977                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
978
979                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
980                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
981                                                      tseq,
982                                                      NULL,
983                                                      res,
984                                                      ldb_extended_default_callback,
985                                                      NULL);
986                         if (ret != LDB_SUCCESS) {
987                                 talloc_free(res);
988                                 return ret;
989                         }
990
991                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
992                                 ret = ldb_request_add_control(treq,
993                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
994                                                               false, data->partitions[i]->ctrl);
995                                 if (ret != LDB_SUCCESS) {
996                                         talloc_free(res);
997                                         return ret;
998                                 }
999                         }
1000
1001                         ret = partition_request(data->partitions[i]->module, treq);
1002                         if (ret != LDB_SUCCESS) {
1003                                 talloc_free(res);
1004                                 return ret;
1005                         }
1006                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1007                         if (ret != LDB_SUCCESS) {
1008                                 talloc_free(res);
1009                                 return ret;
1010                         }
1011
1012                         tseqr = talloc_get_type(res->extended->data,
1013                                                   struct ldb_seqnum_result);
1014                         timestamp = MAX(timestamp, tseqr->seq_num);
1015
1016                         talloc_free(res);
1017                 }
1018
1019                 break;
1020         }
1021
1022         ext = talloc_zero(req, struct ldb_extended);
1023         if (!ext) {
1024                 return LDB_ERR_OPERATIONS_ERROR;
1025         }
1026         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1027         if (seqr == NULL) {
1028                 talloc_free(ext);
1029                 return LDB_ERR_OPERATIONS_ERROR;
1030         }
1031         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1032         ext->data = seqr;
1033
1034         switch (seq->type) {
1035         case LDB_SEQ_NEXT:
1036         case LDB_SEQ_HIGHEST_SEQ:
1037
1038                 /* Has someone above set a timebase sequence? */
1039                 if (timestamp_sequence) {
1040                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1041                 } else {
1042                         seqr->seq_num = seq_number;
1043                 }
1044
1045                 if (timestamp_sequence > seqr->seq_num) {
1046                         seqr->seq_num = timestamp_sequence;
1047                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1048                 }
1049
1050                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1051                 break;
1052         case LDB_SEQ_HIGHEST_TIMESTAMP:
1053                 seqr->seq_num = timestamp;
1054                 break;
1055         }
1056
1057         if (seq->type == LDB_SEQ_NEXT) {
1058                 seqr->seq_num++;
1059         }
1060
1061         /* send request done */
1062         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1063 }
1064
1065 /* extended */
1066 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1067 {
1068         struct partition_private_data *data;
1069         struct partition_context *ac;
1070         int ret;
1071
1072         data = talloc_get_type(module->private_data, struct partition_private_data);
1073         if (!data) {
1074                 return ldb_next_request(module, req);
1075         }
1076
1077         ret = partition_reload_if_required(module, data);
1078         if (ret != LDB_SUCCESS) {
1079                 return ret;
1080         }
1081         
1082         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1083                 return partition_sequence_number(module, req);
1084         }
1085
1086         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1087                 return partition_create(module, req);
1088         }
1089
1090         /* 
1091          * as the extended operation has no dn
1092          * we need to send it to all partitions
1093          */
1094
1095         ac = partition_init_ctx(module, req);
1096         if (!ac) {
1097                 return LDB_ERR_OPERATIONS_ERROR;
1098         }
1099
1100         return partition_send_all(module, ac, req);
1101 }
1102
1103 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1104         .name              = "partition",
1105         .init_context      = partition_init,
1106         .search            = partition_search,
1107         .add               = partition_add,
1108         .modify            = partition_modify,
1109         .del               = partition_delete,
1110         .rename            = partition_rename,
1111         .extended          = partition_extended,
1112         .start_transaction = partition_start_trans,
1113         .prepare_commit    = partition_prepare_commit,
1114         .end_transaction   = partition_end_trans,
1115         .del_transaction   = partition_del_trans,
1116 };