s4-ldb: fixed request handling for schemaUpdateNow op
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42         bool got_success;
43
44         struct part_request *part_req;
45         int num_requests;
46         int finished_requests;
47 };
48
49 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
50 {
51         struct partition_context *ac;
52
53         ac = talloc_zero(req, struct partition_context);
54         if (ac == NULL) {
55                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
56                 return NULL;
57         }
58
59         ac->module = module;
60         ac->req = req;
61
62         return ac;
63 }
64
65 /*
66  *    helper functions to call the next module in chain
67  *    */
68
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         int ret;
72         switch (request->operation) {
73         case LDB_SEARCH:
74                 PARTITION_FIND_OP(module, search);
75                 ret = module->ops->search(module, request);
76                 break;
77         case LDB_ADD:
78                 PARTITION_FIND_OP(module, add);
79                 ret = module->ops->add(module, request);
80                 break;
81         case LDB_MODIFY:
82                 PARTITION_FIND_OP(module, modify);
83                 ret = module->ops->modify(module, request);
84                 break;
85         case LDB_DELETE:
86                 PARTITION_FIND_OP(module, del);
87                 ret = module->ops->del(module, request);
88                 break;
89         case LDB_RENAME:
90                 PARTITION_FIND_OP(module, rename);
91                 ret = module->ops->rename(module, request);
92                 break;
93         case LDB_EXTENDED:
94                 PARTITION_FIND_OP(module, extended);
95                 ret = module->ops->extended(module, request);
96                 break;
97         default:
98                 PARTITION_FIND_OP(module, request);
99                 ret = module->ops->request(module, request);
100                 break;
101         }
102         if (ret == LDB_SUCCESS) {
103                 return ret;
104         }
105         if (!ldb_errstring(ldb_module_get_ctx(module))) {
106                 /* Set a default error string, to place the blame somewhere */
107                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
108                                         "error in module %s: %s (%d)",
109                                         module->ops->name,
110                                         ldb_strerror(ret), ret);
111         }
112         return ret;
113 }
114
115 static struct dsdb_partition *find_partition(struct partition_private_data *data,
116                                              struct ldb_dn *dn,
117                                              struct ldb_request *req)
118 {
119         int i;
120         struct ldb_control *partition_ctrl;
121
122         /* see if the request has the partition DN specified in a
123          * control. The repl_meta_data module can specify this to
124          * ensure that replication happens to the right partition
125          */
126         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
127         if (partition_ctrl) {
128                 const struct dsdb_control_current_partition *partition;
129                 partition = talloc_get_type(partition_ctrl->data,
130                                             struct dsdb_control_current_partition);
131                 if (partition != NULL) {
132                         dn = partition->dn;
133                 }
134         }
135
136         if (dn == NULL) {
137                 return NULL;
138         }
139
140         /* Look at base DN */
141         /* Figure out which partition it is under */
142         /* Skip the lot if 'data' isn't here yet (initialisation) */
143         for (i=0; data && data->partitions && data->partitions[i]; i++) {
144                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
145                         return data->partitions[i];
146                 }
147         }
148
149         return NULL;
150 }
151
152 /**
153  * fire the caller's callback for every entry, but only send 'done' once.
154  */
155 static int partition_req_callback(struct ldb_request *req,
156                                   struct ldb_reply *ares)
157 {
158         struct partition_context *ac;
159         struct ldb_module *module;
160         struct ldb_request *nreq;
161         int ret;
162         struct partition_private_data *data;
163         struct ldb_control *partition_ctrl;
164
165         ac = talloc_get_type(req->context, struct partition_context);
166         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
167
168         if (!ares) {
169                 return ldb_module_done(ac->req, NULL, NULL,
170                                         LDB_ERR_OPERATIONS_ERROR);
171         }
172
173         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
174         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
175                 /* If we didn't fan this request out to mulitple partitions,
176                  * or this is an individual search result, we can
177                  * deterministily tell the caller what partition this was
178                  * written to (repl_meta_data likes to know) */
179                         ret = ldb_reply_add_control(ares,
180                                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
181                                                     false, partition_ctrl->data);
182                         if (ret != LDB_SUCCESS) {
183                                 return ldb_module_done(ac->req, NULL, NULL,
184                                                        ret);
185                         }
186         }
187
188         if (ares->error != LDB_SUCCESS && !ac->got_success) {
189                 return ldb_module_done(ac->req, ares->controls,
190                                         ares->response, ares->error);
191         }
192
193         switch (ares->type) {
194         case LDB_REPLY_REFERRAL:
195                 /* ignore referrals for now */
196                 break;
197
198         case LDB_REPLY_ENTRY:
199                 if (ac->req->operation != LDB_SEARCH) {
200                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
201                                 "partition_req_callback:"
202                                 " Unsupported reply type for this request");
203                         return ldb_module_done(ac->req, NULL, NULL,
204                                                 LDB_ERR_OPERATIONS_ERROR);
205                 }
206                 
207                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
208
209         case LDB_REPLY_DONE:
210                 if (ares->error == LDB_SUCCESS) {
211                         ac->got_success = true;
212                 }
213                 if (ac->req->operation == LDB_EXTENDED) {
214                         /* FIXME: check for ares->response, replmd does not fill it ! */
215                         if (ares->response) {
216                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
217                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
218                                                           "partition_req_callback:"
219                                                           " Unknown extended reply, "
220                                                           "only supports START_TLS");
221                                         talloc_free(ares);
222                                         return ldb_module_done(ac->req, NULL, NULL,
223                                                                 LDB_ERR_OPERATIONS_ERROR);
224                                 }
225                         }
226                 }
227
228                 ac->finished_requests++;
229                 if (ac->finished_requests == ac->num_requests) {
230                         /* this was the last one, call callback */
231                         return ldb_module_done(ac->req, ares->controls,
232                                                ares->response, 
233                                                ac->got_success?LDB_SUCCESS:ares->error);
234                 }
235
236                 /* not the last, now call the next one */
237                 module = ac->part_req[ac->finished_requests].module;
238                 nreq = ac->part_req[ac->finished_requests].req;
239
240                 ret = partition_request(module, nreq);
241                 if (ret != LDB_SUCCESS) {
242                         talloc_free(ares);
243                         return ldb_module_done(ac->req, NULL, NULL, ret);
244                 }
245
246                 break;
247         }
248
249         talloc_free(ares);
250         return LDB_SUCCESS;
251 }
252
253 static int partition_prep_request(struct partition_context *ac,
254                                   struct dsdb_partition *partition)
255 {
256         int ret;
257         struct ldb_request *req;
258
259         ac->part_req = talloc_realloc(ac, ac->part_req,
260                                         struct part_request,
261                                         ac->num_requests + 1);
262         if (ac->part_req == NULL) {
263                 ldb_oom(ldb_module_get_ctx(ac->module));
264                 return LDB_ERR_OPERATIONS_ERROR;
265         }
266
267         switch (ac->req->operation) {
268         case LDB_SEARCH:
269                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
270                                         ac->part_req,
271                                         ac->req->op.search.base,
272                                         ac->req->op.search.scope,
273                                         ac->req->op.search.tree,
274                                         ac->req->op.search.attrs,
275                                         ac->req->controls,
276                                         ac, partition_req_callback,
277                                         ac->req);
278                 break;
279         case LDB_ADD:
280                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
281                                         ac->req->op.add.message,
282                                         ac->req->controls,
283                                         ac, partition_req_callback,
284                                         ac->req);
285                 break;
286         case LDB_MODIFY:
287                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288                                         ac->req->op.mod.message,
289                                         ac->req->controls,
290                                         ac, partition_req_callback,
291                                         ac->req);
292                 break;
293         case LDB_DELETE:
294                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
295                                         ac->req->op.del.dn,
296                                         ac->req->controls,
297                                         ac, partition_req_callback,
298                                         ac->req);
299                 break;
300         case LDB_RENAME:
301                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
302                                         ac->req->op.rename.olddn,
303                                         ac->req->op.rename.newdn,
304                                         ac->req->controls,
305                                         ac, partition_req_callback,
306                                         ac->req);
307                 break;
308         case LDB_EXTENDED:
309                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
310                                         ac->part_req,
311                                         ac->req->op.extended.oid,
312                                         ac->req->op.extended.data,
313                                         ac->req->controls,
314                                         ac, partition_req_callback,
315                                         ac->req);
316                 break;
317         default:
318                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
319                                   "Unsupported request type!");
320                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
321         }
322
323         if (ret != LDB_SUCCESS) {
324                 return ret;
325         }
326
327         ac->part_req[ac->num_requests].req = req;
328
329         if (ac->req->controls) {
330                 req->controls = talloc_memdup(req, ac->req->controls,
331                                         talloc_get_size(ac->req->controls));
332                 if (req->controls == NULL) {
333                         ldb_oom(ldb_module_get_ctx(ac->module));
334                         return LDB_ERR_OPERATIONS_ERROR;
335                 }
336         }
337
338         if (partition) {
339                 ac->part_req[ac->num_requests].module = partition->module;
340
341                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
342                         ret = ldb_request_add_control(req,
343                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
344                                                       false, partition->ctrl);
345                         if (ret != LDB_SUCCESS) {
346                                 return ret;
347                         }
348                 }
349
350                 if (req->operation == LDB_SEARCH) {
351                         /* If the search is for 'more' than this partition,
352                          * then change the basedn, so a remote LDAP server
353                          * doesn't object */
354                         if (ldb_dn_compare_base(partition->ctrl->dn,
355                                                 req->op.search.base) != 0) {
356                                 req->op.search.base = partition->ctrl->dn;
357                         }
358                 }
359
360         } else {
361                 /* make sure you put the NEXT module here, or
362                  * partition_request() will simply loop forever on itself */
363                 ac->part_req[ac->num_requests].module = ac->module->next;
364         }
365
366         ac->num_requests++;
367
368         return LDB_SUCCESS;
369 }
370
371 static int partition_call_first(struct partition_context *ac)
372 {
373         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
374 }
375
376 /**
377  * Send a request down to all the partitions
378  */
379 static int partition_send_all(struct ldb_module *module, 
380                               struct partition_context *ac, 
381                               struct ldb_request *req) 
382 {
383         int i;
384         struct partition_private_data *data = talloc_get_type(module->private_data, 
385                                                               struct partition_private_data);
386         int ret = partition_prep_request(ac, NULL);
387         if (ret != LDB_SUCCESS) {
388                 return ret;
389         }
390         for (i=0; data && data->partitions && data->partitions[i]; i++) {
391                 ret = partition_prep_request(ac, data->partitions[i]);
392                 if (ret != LDB_SUCCESS) {
393                         return ret;
394                 }
395         }
396
397         /* fire the first one */
398         return partition_call_first(ac);
399 }
400
401 /**
402  * Figure out which backend a request needs to be aimed at.  Some
403  * requests must be replicated to all backends
404  */
405 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
406 {
407         struct partition_context *ac;
408         unsigned i;
409         int ret;
410         struct dsdb_partition *partition;
411         struct partition_private_data *data = talloc_get_type(module->private_data, 
412                                                               struct partition_private_data);
413         if (!data || !data->partitions) {
414                 return ldb_next_request(module, req);
415         }
416
417         if (req->operation != LDB_SEARCH) {
418                 /* Is this a special DN, we need to replicate to every backend? */
419                 for (i=0; data->replicate && data->replicate[i]; i++) {
420                         if (ldb_dn_compare(data->replicate[i], 
421                                            dn) == 0) {
422                                 
423                                 ac = partition_init_ctx(module, req);
424                                 if (!ac) {
425                                         return LDB_ERR_OPERATIONS_ERROR;
426                                 }
427                                 
428                                 return partition_send_all(module, ac, req);
429                         }
430                 }
431         }
432
433         /* Otherwise, we need to find the partition to fire it to */
434
435         /* Find partition */
436         partition = find_partition(data, dn, req);
437         if (!partition) {
438                 /*
439                  * if we haven't found a matching partition
440                  * pass the request to the main ldb
441                  *
442                  * TODO: we should maybe return an error here
443                  *       if it's not a special dn
444                  */
445
446                 return ldb_next_request(module, req);
447         }
448
449         ac = partition_init_ctx(module, req);
450         if (!ac) {
451                 return LDB_ERR_OPERATIONS_ERROR;
452         }
453
454         /* we need to add a control but we never touch the original request */
455         ret = partition_prep_request(ac, partition);
456         if (ret != LDB_SUCCESS) {
457                 return ret;
458         }
459
460         /* fire the first one */
461         return partition_call_first(ac);
462 }
463
464 /* search */
465 static int partition_search(struct ldb_module *module, struct ldb_request *req)
466 {
467         int ret;
468         struct ldb_control **saved_controls;
469         /* Find backend */
470         struct partition_private_data *data = talloc_get_type(module->private_data, 
471                                                               struct partition_private_data);
472
473         /* issue request */
474
475         /* (later) consider if we should be searching multiple
476          * partitions (for 'invisible' partition behaviour */
477
478         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
479         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
480         
481         struct ldb_search_options_control *search_options = NULL;
482         struct dsdb_partition *p;
483         
484         ret = partition_reload_if_required(module, data);
485         if (ret != LDB_SUCCESS) {
486                 return ret;
487         }
488
489         p = find_partition(data, NULL, req);
490         if (p != NULL) {
491                 /* the caller specified what partition they want the
492                  * search - just pass it on
493                  */
494                 return ldb_next_request(p->module, req);                
495         }
496
497
498         if (search_control) {
499                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
500         }
501
502         /* Remove the domain_scope control, so we don't confuse a backend server */
503         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
504                 ldb_oom(ldb_module_get_ctx(module));
505                 return LDB_ERR_OPERATIONS_ERROR;
506         }
507
508         /*
509          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
510          * down as uncritical to make windows 2008 dcpromo happy.
511          */
512         if (search_control) {
513                 search_control->critical = 0;
514         }
515
516         /* TODO:
517            Generate referrals (look for a partition under this DN) if we don't have the above control specified
518         */
519         
520         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
521                 int i;
522                 struct partition_context *ac;
523                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
524                         /* We have processed this flag, so we are done with this control now */
525
526                         /* Remove search control, so we don't confuse a backend server */
527                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
528                                 ldb_oom(ldb_module_get_ctx(module));
529                                 return LDB_ERR_OPERATIONS_ERROR;
530                         }
531                 }
532                 ac = partition_init_ctx(module, req);
533                 if (!ac) {
534                         return LDB_ERR_OPERATIONS_ERROR;
535                 }
536
537                 /* Search from the base DN */
538                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
539                         return partition_send_all(module, ac, req);
540                 }
541                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
542                         bool match = false, stop = false;
543                         /* Find all partitions under the search base 
544                            
545                            we match if:
546
547                               1) the DN we are looking for exactly matches the partition
548                              or
549                               2) the DN we are looking for is a parent of the partition and it isn't
550                                  a scope base search
551                              or
552                               3) the DN we are looking for is a child of the partition
553                          */
554                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
555                                 match = true;
556                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
557                                         stop = true;
558                                 }
559                         }
560                         if (!match && 
561                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
562                              req->op.search.scope != LDB_SCOPE_BASE)) {
563                                 match = true;
564                         }
565                         if (!match &&
566                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
567                                 match = true;
568                                 stop = true; /* note that this relies on partition ordering */
569                         }
570                         if (match) {
571                                 ret = partition_prep_request(ac, data->partitions[i]);
572                                 if (ret != LDB_SUCCESS) {
573                                         return ret;
574                                 }
575                         }
576                         if (stop) break;
577                 }
578
579                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
580                 if (ac->num_requests == 0) {
581                         talloc_free(ac);
582                         return ldb_next_request(module, req);
583                 }
584
585                 /* fire the first one */
586                 return partition_call_first(ac);
587
588         } else {
589                 /* Handle this like all other requests */
590                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
591                         /* We have processed this flag, so we are done with this control now */
592
593                         /* Remove search control, so we don't confuse a backend server */
594                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
595                                 ldb_oom(ldb_module_get_ctx(module));
596                                 return LDB_ERR_OPERATIONS_ERROR;
597                         }
598                 }
599
600                 return partition_replicate(module, req, req->op.search.base);
601         }
602 }
603
604 /* add */
605 static int partition_add(struct ldb_module *module, struct ldb_request *req)
606 {
607         return partition_replicate(module, req, req->op.add.message->dn);
608 }
609
610 /* modify */
611 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
612 {
613         return partition_replicate(module, req, req->op.mod.message->dn);
614 }
615
616 /* delete */
617 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
618 {
619         return partition_replicate(module, req, req->op.del.dn);
620 }
621
622 /* rename */
623 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
624 {
625         /* Find backend */
626         struct dsdb_partition *backend, *backend2;
627         
628         struct partition_private_data *data = talloc_get_type(module->private_data, 
629                                                               struct partition_private_data);
630
631         /* Skip the lot if 'data' isn't here yet (initialisation) */
632         if (!data) {
633                 return LDB_ERR_OPERATIONS_ERROR;
634         }
635
636         backend = find_partition(data, req->op.rename.olddn, req);
637         backend2 = find_partition(data, req->op.rename.newdn, req);
638
639         if ((backend && !backend2) || (!backend && backend2)) {
640                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
641         }
642
643         if (backend != backend2) {
644                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
645                                        "Cannot rename from %s in %s to %s in %s: %s",
646                                        ldb_dn_get_linearized(req->op.rename.olddn),
647                                        ldb_dn_get_linearized(backend->ctrl->dn),
648                                        ldb_dn_get_linearized(req->op.rename.newdn),
649                                        ldb_dn_get_linearized(backend2->ctrl->dn),
650                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
651                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
652         }
653
654         return partition_replicate(module, req, req->op.rename.olddn);
655 }
656
657 /* start a transaction */
658 static int partition_start_trans(struct ldb_module *module)
659 {
660         int i, ret;
661         struct partition_private_data *data = talloc_get_type(module->private_data, 
662                                                               struct partition_private_data);
663         /* Look at base DN */
664         /* Figure out which partition it is under */
665         /* Skip the lot if 'data' isn't here yet (initialization) */
666         ret = ldb_next_start_trans(module);
667         if (ret != LDB_SUCCESS) {
668                 return ret;
669         }
670
671         ret = partition_reload_if_required(module, data);
672         if (ret != LDB_SUCCESS) {
673                 return ret;
674         }
675
676         for (i=0; data && data->partitions && data->partitions[i]; i++) {
677                 struct ldb_module *next = data->partitions[i]->module;
678                 PARTITION_FIND_OP(next, start_transaction);
679
680                 ret = next->ops->start_transaction(next);
681                 if (ret != LDB_SUCCESS) {
682                         /* Back it out, if it fails on one */
683                         for (i--; i >= 0; i--) {
684                                 next = data->partitions[i]->module;
685                                 PARTITION_FIND_OP(next, del_transaction);
686
687                                 next->ops->del_transaction(next);
688                         }
689                         ldb_next_del_trans(module);
690                         return ret;
691                 }
692         }
693
694         data->in_transaction++;
695
696         return LDB_SUCCESS;
697 }
698
699 /* prepare for a commit */
700 static int partition_prepare_commit(struct ldb_module *module)
701 {
702         int i;
703         struct partition_private_data *data = talloc_get_type(module->private_data, 
704                                                               struct partition_private_data);
705
706         for (i=0; data && data->partitions && data->partitions[i]; i++) {
707                 struct ldb_module *next_prepare = data->partitions[i]->module;
708                 int ret;
709
710                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
711                 if (next_prepare == NULL) {
712                         continue;
713                 }
714
715                 ret = next_prepare->ops->prepare_commit(next_prepare);
716                 if (ret != LDB_SUCCESS) {
717                         return ret;
718                 }
719         }
720
721         return ldb_next_prepare_commit(module);
722 }
723
724
725 /* end a transaction */
726 static int partition_end_trans(struct ldb_module *module)
727 {
728         int i;
729         struct partition_private_data *data = talloc_get_type(module->private_data, 
730                                                               struct partition_private_data);
731         for (i=0; data && data->partitions && data->partitions[i]; i++) {
732                 struct ldb_module *next_end = data->partitions[i]->module;
733                 int ret;
734
735                 PARTITION_FIND_OP(next_end, end_transaction);
736
737                 ret = next_end->ops->end_transaction(next_end);
738                 if (ret != LDB_SUCCESS) {
739                         return ret;
740                 }
741         }
742
743         if (data->in_transaction == 0) {
744                 DEBUG(0,("partition end transaction mismatch\n"));
745                 return LDB_ERR_OPERATIONS_ERROR;
746         }
747         data->in_transaction--;
748
749         return ldb_next_end_trans(module);
750 }
751
752 /* delete a transaction */
753 static int partition_del_trans(struct ldb_module *module)
754 {
755         int i, ret, final_ret = LDB_SUCCESS;
756         struct partition_private_data *data = talloc_get_type(module->private_data, 
757                                                               struct partition_private_data);
758         for (i=0; data && data->partitions && data->partitions[i]; i++) {
759                 struct ldb_module *next = data->partitions[i]->module;
760                 PARTITION_FIND_OP(next, del_transaction);
761
762                 ret = next->ops->del_transaction(next);
763                 if (ret != LDB_SUCCESS) {
764                         final_ret = ret;
765                 }
766         }       
767
768         if (data->in_transaction == 0) {
769                 DEBUG(0,("partition del transaction mismatch\n"));
770                 return LDB_ERR_OPERATIONS_ERROR;
771         }
772         data->in_transaction--;
773
774         ret = ldb_next_del_trans(module);
775         if (ret != LDB_SUCCESS) {
776                 final_ret = ret;
777         }
778         return final_ret;
779 }
780
781 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
782                                      enum ldb_sequence_type type, uint64_t *seq_number) 
783 {
784         int ret;
785         struct ldb_result *res;
786         struct ldb_seqnum_request *tseq;
787         struct ldb_request *treq;
788         struct ldb_seqnum_result *seqr;
789         res = talloc_zero(mem_ctx, struct ldb_result);
790         if (res == NULL) {
791                 return LDB_ERR_OPERATIONS_ERROR;
792         }
793         tseq = talloc_zero(res, struct ldb_seqnum_request);
794         if (tseq == NULL) {
795                 talloc_free(res);
796                 return LDB_ERR_OPERATIONS_ERROR;
797         }
798         tseq->type = type;
799         
800         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
801                                      LDB_EXTENDED_SEQUENCE_NUMBER,
802                                      tseq,
803                                      NULL,
804                                      res,
805                                      ldb_extended_default_callback,
806                                      NULL);
807         if (ret != LDB_SUCCESS) {
808                 talloc_free(res);
809                 return ret;
810         }
811         
812         ret = ldb_next_request(module, treq);
813         if (ret != LDB_SUCCESS) {
814                 talloc_free(res);
815                 return ret;
816         }
817         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
818         if (ret != LDB_SUCCESS) {
819                 talloc_free(res);
820                 return ret;
821         }
822         
823         seqr = talloc_get_type(res->extended->data,
824                                struct ldb_seqnum_result);
825         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
826                 ret = LDB_ERR_OPERATIONS_ERROR;
827                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
828                 talloc_free(res);
829                 return ret;
830         } else {
831                 *seq_number = seqr->seq_num;
832         }
833         talloc_free(res);
834         return LDB_SUCCESS;
835 }
836
837 /* FIXME: This function is still semi-async */
838 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
839 {
840         int i, ret;
841         uint64_t seq_number = 0;
842         uint64_t timestamp_sequence = 0;
843         uint64_t timestamp = 0;
844         struct partition_private_data *data = talloc_get_type(module->private_data, 
845                                                               struct partition_private_data);
846         struct ldb_seqnum_request *seq;
847         struct ldb_seqnum_result *seqr;
848         struct ldb_request *treq;
849         struct ldb_seqnum_request *tseq;
850         struct ldb_seqnum_result *tseqr;
851         struct ldb_extended *ext;
852         struct ldb_result *res;
853         struct dsdb_partition *p;
854
855         p = find_partition(data, NULL, req);
856         if (p != NULL) {
857                 /* the caller specified what partition they want the
858                  * sequence number operation on - just pass it on
859                  */
860                 return ldb_next_request(p->module, req);                
861         }
862
863         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
864
865         switch (seq->type) {
866         case LDB_SEQ_NEXT:
867         case LDB_SEQ_HIGHEST_SEQ:
868
869                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
870                 if (ret != LDB_SUCCESS) {
871                         return ret;
872                 }
873
874                 /* Skip the lot if 'data' isn't here yet (initialisation) */
875                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
876
877                         res = talloc_zero(req, struct ldb_result);
878                         if (res == NULL) {
879                                 return LDB_ERR_OPERATIONS_ERROR;
880                         }
881                         tseq = talloc_zero(res, struct ldb_seqnum_request);
882                         if (tseq == NULL) {
883                                 talloc_free(res);
884                                 return LDB_ERR_OPERATIONS_ERROR;
885                         }
886                         tseq->type = seq->type;
887
888                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
889                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
890                                                      tseq,
891                                                      NULL,
892                                                      res,
893                                                      ldb_extended_default_callback,
894                                                      NULL);
895                         if (ret != LDB_SUCCESS) {
896                                 talloc_free(res);
897                                 return ret;
898                         }
899
900                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
901                                 ret = ldb_request_add_control(treq,
902                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
903                                                               false, data->partitions[i]->ctrl);
904                                 if (ret != LDB_SUCCESS) {
905                                         talloc_free(res);
906                                         return ret;
907                                 }
908                         }
909
910                         ret = partition_request(data->partitions[i]->module, treq);
911                         if (ret != LDB_SUCCESS) {
912                                 talloc_free(res);
913                                 return ret;
914                         }
915                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
916                         if (ret != LDB_SUCCESS) {
917                                 talloc_free(res);
918                                 return ret;
919                         }
920                         tseqr = talloc_get_type(res->extended->data,
921                                                 struct ldb_seqnum_result);
922                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
923                                 timestamp_sequence = MAX(timestamp_sequence,
924                                                          tseqr->seq_num);
925                         } else {
926                                 seq_number += tseqr->seq_num;
927                         }
928                         talloc_free(res);
929                 }
930                 /* fall through */
931         case LDB_SEQ_HIGHEST_TIMESTAMP:
932
933                 res = talloc_zero(req, struct ldb_result);
934                 if (res == NULL) {
935                         return LDB_ERR_OPERATIONS_ERROR;
936                 }
937
938                 tseq = talloc_zero(res, struct ldb_seqnum_request);
939                 if (tseq == NULL) {
940                         talloc_free(res);
941                         return LDB_ERR_OPERATIONS_ERROR;
942                 }
943                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
944
945                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
946                                              LDB_EXTENDED_SEQUENCE_NUMBER,
947                                              tseq,
948                                              NULL,
949                                              res,
950                                              ldb_extended_default_callback,
951                                              NULL);
952                 if (ret != LDB_SUCCESS) {
953                         talloc_free(res);
954                         return ret;
955                 }
956
957                 ret = ldb_next_request(module, treq);
958                 if (ret != LDB_SUCCESS) {
959                         talloc_free(res);
960                         return ret;
961                 }
962                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
963                 if (ret != LDB_SUCCESS) {
964                         talloc_free(res);
965                         return ret;
966                 }
967
968                 tseqr = talloc_get_type(res->extended->data,
969                                            struct ldb_seqnum_result);
970                 timestamp = tseqr->seq_num;
971
972                 talloc_free(res);
973
974                 /* Skip the lot if 'data' isn't here yet (initialisation) */
975                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
976
977                         res = talloc_zero(req, struct ldb_result);
978                         if (res == NULL) {
979                                 return LDB_ERR_OPERATIONS_ERROR;
980                         }
981
982                         tseq = talloc_zero(res, struct ldb_seqnum_request);
983                         if (tseq == NULL) {
984                                 talloc_free(res);
985                                 return LDB_ERR_OPERATIONS_ERROR;
986                         }
987                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
988
989                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
990                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
991                                                      tseq,
992                                                      NULL,
993                                                      res,
994                                                      ldb_extended_default_callback,
995                                                      NULL);
996                         if (ret != LDB_SUCCESS) {
997                                 talloc_free(res);
998                                 return ret;
999                         }
1000
1001                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
1002                                 ret = ldb_request_add_control(treq,
1003                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
1004                                                               false, data->partitions[i]->ctrl);
1005                                 if (ret != LDB_SUCCESS) {
1006                                         talloc_free(res);
1007                                         return ret;
1008                                 }
1009                         }
1010
1011                         ret = partition_request(data->partitions[i]->module, treq);
1012                         if (ret != LDB_SUCCESS) {
1013                                 talloc_free(res);
1014                                 return ret;
1015                         }
1016                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1017                         if (ret != LDB_SUCCESS) {
1018                                 talloc_free(res);
1019                                 return ret;
1020                         }
1021
1022                         tseqr = talloc_get_type(res->extended->data,
1023                                                   struct ldb_seqnum_result);
1024                         timestamp = MAX(timestamp, tseqr->seq_num);
1025
1026                         talloc_free(res);
1027                 }
1028
1029                 break;
1030         }
1031
1032         ext = talloc_zero(req, struct ldb_extended);
1033         if (!ext) {
1034                 return LDB_ERR_OPERATIONS_ERROR;
1035         }
1036         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1037         if (seqr == NULL) {
1038                 talloc_free(ext);
1039                 return LDB_ERR_OPERATIONS_ERROR;
1040         }
1041         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1042         ext->data = seqr;
1043
1044         switch (seq->type) {
1045         case LDB_SEQ_NEXT:
1046         case LDB_SEQ_HIGHEST_SEQ:
1047
1048                 /* Has someone above set a timebase sequence? */
1049                 if (timestamp_sequence) {
1050                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1051                 } else {
1052                         seqr->seq_num = seq_number;
1053                 }
1054
1055                 if (timestamp_sequence > seqr->seq_num) {
1056                         seqr->seq_num = timestamp_sequence;
1057                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1058                 }
1059
1060                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1061                 break;
1062         case LDB_SEQ_HIGHEST_TIMESTAMP:
1063                 seqr->seq_num = timestamp;
1064                 break;
1065         }
1066
1067         if (seq->type == LDB_SEQ_NEXT) {
1068                 seqr->seq_num++;
1069         }
1070
1071         /* send request done */
1072         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1073 }
1074
1075 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1076 {
1077         struct dsdb_partition *partition;
1078         struct partition_private_data *data;
1079         struct ldb_dn *schema_dn;
1080         struct partition_context *ac;
1081         int ret;
1082
1083         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1084         if (!schema_dn) {
1085                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1086                 return LDB_ERR_PROTOCOL_ERROR;
1087         }
1088
1089         data = talloc_get_type(module->private_data, struct partition_private_data);
1090         if (!data) {
1091                 return LDB_ERR_OPERATIONS_ERROR;
1092         }
1093         
1094         partition = find_partition( data, schema_dn, req);
1095         if (!partition) {
1096                 return ldb_next_request(module, req);
1097         }
1098
1099         ac = partition_init_ctx(module, req);
1100         if (!ac) {
1101                 return LDB_ERR_OPERATIONS_ERROR;
1102         }
1103
1104         /* we need to add a control but we never touch the original request */
1105         ret = partition_prep_request(ac, partition);
1106         if (ret != LDB_SUCCESS) {
1107                 return ret;
1108         }
1109
1110         /* fire the first one */
1111         ret = partition_call_first(ac);
1112
1113         if (ret != LDB_SUCCESS){
1114                 return ret;
1115         }
1116
1117         return ldb_module_done(req, NULL, NULL, ret);
1118 }
1119
1120
1121 /* extended */
1122 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1123 {
1124         struct partition_private_data *data;
1125         struct partition_context *ac;
1126         int ret;
1127
1128         data = talloc_get_type(module->private_data, struct partition_private_data);
1129         if (!data) {
1130                 return ldb_next_request(module, req);
1131         }
1132
1133         ret = partition_reload_if_required(module, data);
1134         if (ret != LDB_SUCCESS) {
1135                 return ret;
1136         }
1137         
1138         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1139                 return partition_sequence_number(module, req);
1140         }
1141
1142         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1143                 return partition_create(module, req);
1144         }
1145
1146         /* forward schemaUpdateNow operation to schema_fsmo module*/
1147         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1148                 return partition_extended_schema_update_now( module, req );
1149         }       
1150
1151         /* 
1152          * as the extended operation has no dn
1153          * we need to send it to all partitions
1154          */
1155
1156         ac = partition_init_ctx(module, req);
1157         if (!ac) {
1158                 return LDB_ERR_OPERATIONS_ERROR;
1159         }
1160
1161         return partition_send_all(module, ac, req);
1162 }
1163
1164 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1165         .name              = "partition",
1166         .init_context      = partition_init,
1167         .search            = partition_search,
1168         .add               = partition_add,
1169         .modify            = partition_modify,
1170         .del               = partition_delete,
1171         .rename            = partition_rename,
1172         .extended          = partition_extended,
1173         .start_transaction = partition_start_trans,
1174         .prepare_commit    = partition_prepare_commit,
1175         .end_transaction   = partition_end_trans,
1176         .del_transaction   = partition_del_trans,
1177 };