s4:dsdb Remove workaround for two partition head records
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42         bool got_success;
43
44         struct part_request *part_req;
45         int num_requests;
46         int finished_requests;
47 };
48
49 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
50 {
51         struct partition_context *ac;
52
53         ac = talloc_zero(req, struct partition_context);
54         if (ac == NULL) {
55                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
56                 return NULL;
57         }
58
59         ac->module = module;
60         ac->req = req;
61
62         return ac;
63 }
64
65 /*
66  *    helper functions to call the next module in chain
67  *    */
68
69 int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         int ret;
72         switch (request->operation) {
73         case LDB_SEARCH:
74                 PARTITION_FIND_OP(module, search);
75                 ret = module->ops->search(module, request);
76                 break;
77         case LDB_ADD:
78                 PARTITION_FIND_OP(module, add);
79                 ret = module->ops->add(module, request);
80                 break;
81         case LDB_MODIFY:
82                 PARTITION_FIND_OP(module, modify);
83                 ret = module->ops->modify(module, request);
84                 break;
85         case LDB_DELETE:
86                 PARTITION_FIND_OP(module, del);
87                 ret = module->ops->del(module, request);
88                 break;
89         case LDB_RENAME:
90                 PARTITION_FIND_OP(module, rename);
91                 ret = module->ops->rename(module, request);
92                 break;
93         case LDB_EXTENDED:
94                 PARTITION_FIND_OP(module, extended);
95                 ret = module->ops->extended(module, request);
96                 break;
97         default:
98                 PARTITION_FIND_OP(module, request);
99                 ret = module->ops->request(module, request);
100                 break;
101         }
102         if (ret == LDB_SUCCESS) {
103                 return ret;
104         }
105         if (!ldb_errstring(ldb_module_get_ctx(module))) {
106                 /* Set a default error string, to place the blame somewhere */
107                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
108                                         "error in module %s: %s (%d)",
109                                         module->ops->name,
110                                         ldb_strerror(ret), ret);
111         }
112         return ret;
113 }
114
115 static struct dsdb_partition *find_partition(struct partition_private_data *data,
116                                              struct ldb_dn *dn,
117                                              struct ldb_request *req)
118 {
119         int i;
120         struct ldb_control *partition_ctrl;
121
122         /* see if the request has the partition DN specified in a
123          * control. The repl_meta_data module can specify this to
124          * ensure that replication happens to the right partition
125          */
126         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
127         if (partition_ctrl) {
128                 const struct dsdb_control_current_partition *partition;
129                 partition = talloc_get_type(partition_ctrl->data,
130                                             struct dsdb_control_current_partition);
131                 if (partition != NULL) {
132                         dn = partition->dn;
133                 }
134         }
135
136         if (dn == NULL) {
137                 return NULL;
138         }
139
140         /* Look at base DN */
141         /* Figure out which partition it is under */
142         /* Skip the lot if 'data' isn't here yet (initialisation) */
143         for (i=0; data && data->partitions && data->partitions[i]; i++) {
144                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
145                         return data->partitions[i];
146                 }
147         }
148
149         return NULL;
150 }
151
152 /**
153  * fire the caller's callback for every entry, but only send 'done' once.
154  */
155 static int partition_req_callback(struct ldb_request *req,
156                                   struct ldb_reply *ares)
157 {
158         struct partition_context *ac;
159         struct ldb_module *module;
160         struct ldb_request *nreq;
161         int ret;
162         struct partition_private_data *data;
163         struct ldb_control *partition_ctrl;
164
165         ac = talloc_get_type(req->context, struct partition_context);
166         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
167
168         if (!ares) {
169                 return ldb_module_done(ac->req, NULL, NULL,
170                                         LDB_ERR_OPERATIONS_ERROR);
171         }
172
173         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
174         if (partition_ctrl && (ac->num_requests == 1 || ares->type == LDB_REPLY_ENTRY)) {
175                 /* If we didn't fan this request out to mulitple partitions,
176                  * or this is an individual search result, we can
177                  * deterministily tell the caller what partition this was
178                  * written to (repl_meta_data likes to know) */
179                         ret = ldb_reply_add_control(ares,
180                                                     DSDB_CONTROL_CURRENT_PARTITION_OID,
181                                                     false, partition_ctrl->data);
182                         if (ret != LDB_SUCCESS) {
183                                 return ldb_module_done(ac->req, NULL, NULL,
184                                                        ret);
185                         }
186         }
187
188         if (ares->error != LDB_SUCCESS && !ac->got_success) {
189                 return ldb_module_done(ac->req, ares->controls,
190                                         ares->response, ares->error);
191         }
192
193         switch (ares->type) {
194         case LDB_REPLY_REFERRAL:
195                 /* ignore referrals for now */
196                 break;
197
198         case LDB_REPLY_ENTRY:
199                 if (ac->req->operation != LDB_SEARCH) {
200                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
201                                 "partition_req_callback:"
202                                 " Unsupported reply type for this request");
203                         return ldb_module_done(ac->req, NULL, NULL,
204                                                 LDB_ERR_OPERATIONS_ERROR);
205                 }
206                 
207                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
208
209         case LDB_REPLY_DONE:
210                 if (ares->error == LDB_SUCCESS) {
211                         ac->got_success = true;
212                 }
213                 if (ac->req->operation == LDB_EXTENDED) {
214                         /* FIXME: check for ares->response, replmd does not fill it ! */
215                         if (ares->response) {
216                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
217                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
218                                                           "partition_req_callback:"
219                                                           " Unknown extended reply, "
220                                                           "only supports START_TLS");
221                                         talloc_free(ares);
222                                         return ldb_module_done(ac->req, NULL, NULL,
223                                                                 LDB_ERR_OPERATIONS_ERROR);
224                                 }
225                         }
226                 }
227
228                 ac->finished_requests++;
229                 if (ac->finished_requests == ac->num_requests) {
230                         /* this was the last one, call callback */
231                         return ldb_module_done(ac->req, ares->controls,
232                                                ares->response, 
233                                                ac->got_success?LDB_SUCCESS:ares->error);
234                 }
235
236                 /* not the last, now call the next one */
237                 module = ac->part_req[ac->finished_requests].module;
238                 nreq = ac->part_req[ac->finished_requests].req;
239
240                 ret = partition_request(module, nreq);
241                 if (ret != LDB_SUCCESS) {
242                         talloc_free(ares);
243                         return ldb_module_done(ac->req, NULL, NULL, ret);
244                 }
245
246                 break;
247         }
248
249         talloc_free(ares);
250         return LDB_SUCCESS;
251 }
252
253 static int partition_prep_request(struct partition_context *ac,
254                                   struct dsdb_partition *partition)
255 {
256         int ret;
257         struct ldb_request *req;
258
259         ac->part_req = talloc_realloc(ac, ac->part_req,
260                                         struct part_request,
261                                         ac->num_requests + 1);
262         if (ac->part_req == NULL) {
263                 ldb_oom(ldb_module_get_ctx(ac->module));
264                 return LDB_ERR_OPERATIONS_ERROR;
265         }
266
267         switch (ac->req->operation) {
268         case LDB_SEARCH:
269                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
270                                         ac->part_req,
271                                         ac->req->op.search.base,
272                                         ac->req->op.search.scope,
273                                         ac->req->op.search.tree,
274                                         ac->req->op.search.attrs,
275                                         ac->req->controls,
276                                         ac, partition_req_callback,
277                                         ac->req);
278                 break;
279         case LDB_ADD:
280                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
281                                         ac->req->op.add.message,
282                                         ac->req->controls,
283                                         ac, partition_req_callback,
284                                         ac->req);
285                 break;
286         case LDB_MODIFY:
287                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
288                                         ac->req->op.mod.message,
289                                         ac->req->controls,
290                                         ac, partition_req_callback,
291                                         ac->req);
292                 break;
293         case LDB_DELETE:
294                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
295                                         ac->req->op.del.dn,
296                                         ac->req->controls,
297                                         ac, partition_req_callback,
298                                         ac->req);
299                 break;
300         case LDB_RENAME:
301                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
302                                         ac->req->op.rename.olddn,
303                                         ac->req->op.rename.newdn,
304                                         ac->req->controls,
305                                         ac, partition_req_callback,
306                                         ac->req);
307                 break;
308         case LDB_EXTENDED:
309                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
310                                         ac->part_req,
311                                         ac->req->op.extended.oid,
312                                         ac->req->op.extended.data,
313                                         ac->req->controls,
314                                         ac, partition_req_callback,
315                                         ac->req);
316                 break;
317         default:
318                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
319                                   "Unsupported request type!");
320                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
321         }
322
323         if (ret != LDB_SUCCESS) {
324                 return ret;
325         }
326
327         ac->part_req[ac->num_requests].req = req;
328
329         if (ac->req->controls) {
330                 req->controls = talloc_memdup(req, ac->req->controls,
331                                         talloc_get_size(ac->req->controls));
332                 if (req->controls == NULL) {
333                         ldb_oom(ldb_module_get_ctx(ac->module));
334                         return LDB_ERR_OPERATIONS_ERROR;
335                 }
336         }
337
338         if (partition) {
339                 ac->part_req[ac->num_requests].module = partition->module;
340
341                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
342                         ret = ldb_request_add_control(req,
343                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
344                                                       false, partition->ctrl);
345                         if (ret != LDB_SUCCESS) {
346                                 return ret;
347                         }
348                 }
349
350                 if (req->operation == LDB_SEARCH) {
351                         /* If the search is for 'more' than this partition,
352                          * then change the basedn, so a remote LDAP server
353                          * doesn't object */
354                         if (ldb_dn_compare_base(partition->ctrl->dn,
355                                                 req->op.search.base) != 0) {
356                                 req->op.search.base = partition->ctrl->dn;
357                         }
358                 }
359
360         } else {
361                 /* make sure you put the NEXT module here, or
362                  * partition_request() will simply loop forever on itself */
363                 ac->part_req[ac->num_requests].module = ac->module->next;
364         }
365
366         ac->num_requests++;
367
368         return LDB_SUCCESS;
369 }
370
371 static int partition_call_first(struct partition_context *ac)
372 {
373         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
374 }
375
376 /**
377  * Send a request down to all the partitions
378  */
379 static int partition_send_all(struct ldb_module *module, 
380                               struct partition_context *ac, 
381                               struct ldb_request *req) 
382 {
383         int i;
384         struct partition_private_data *data = talloc_get_type(module->private_data, 
385                                                               struct partition_private_data);
386         int ret = partition_prep_request(ac, NULL);
387         if (ret != LDB_SUCCESS) {
388                 return ret;
389         }
390         for (i=0; data && data->partitions && data->partitions[i]; i++) {
391                 ret = partition_prep_request(ac, data->partitions[i]);
392                 if (ret != LDB_SUCCESS) {
393                         return ret;
394                 }
395         }
396
397         /* fire the first one */
398         return partition_call_first(ac);
399 }
400
401 /**
402  * Figure out which backend a request needs to be aimed at.  Some
403  * requests must be replicated to all backends
404  */
405 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
406 {
407         struct partition_context *ac;
408         unsigned i;
409         int ret;
410         struct dsdb_partition *partition;
411         struct partition_private_data *data = talloc_get_type(module->private_data, 
412                                                               struct partition_private_data);
413         if (!data || !data->partitions) {
414                 return ldb_next_request(module, req);
415         }
416
417         if (req->operation != LDB_SEARCH) {
418                 /* Is this a special DN, we need to replicate to every backend? */
419                 for (i=0; data->replicate && data->replicate[i]; i++) {
420                         if (ldb_dn_compare(data->replicate[i], 
421                                            dn) == 0) {
422                                 
423                                 ac = partition_init_ctx(module, req);
424                                 if (!ac) {
425                                         return LDB_ERR_OPERATIONS_ERROR;
426                                 }
427                                 
428                                 return partition_send_all(module, ac, req);
429                         }
430                 }
431         }
432
433         /* Otherwise, we need to find the partition to fire it to */
434
435         /* Find partition */
436         partition = find_partition(data, dn, req);
437         if (!partition) {
438                 /*
439                  * if we haven't found a matching partition
440                  * pass the request to the main ldb
441                  *
442                  * TODO: we should maybe return an error here
443                  *       if it's not a special dn
444                  */
445
446                 return ldb_next_request(module, req);
447         }
448
449         ac = partition_init_ctx(module, req);
450         if (!ac) {
451                 return LDB_ERR_OPERATIONS_ERROR;
452         }
453
454         /* we need to add a control but we never touch the original request */
455         ret = partition_prep_request(ac, partition);
456         if (ret != LDB_SUCCESS) {
457                 return ret;
458         }
459
460         /* fire the first one */
461         return partition_call_first(ac);
462 }
463
464 /* search */
465 static int partition_search(struct ldb_module *module, struct ldb_request *req)
466 {
467         int ret;
468         struct ldb_control **saved_controls;
469         /* Find backend */
470         struct partition_private_data *data = talloc_get_type(module->private_data, 
471                                                               struct partition_private_data);
472
473         /* issue request */
474
475         /* (later) consider if we should be searching multiple
476          * partitions (for 'invisible' partition behaviour */
477
478         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
479         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
480         
481         struct ldb_search_options_control *search_options = NULL;
482         struct dsdb_partition *p;
483         
484         ret = partition_reload_if_required(module, data);
485         if (ret != LDB_SUCCESS) {
486                 return ret;
487         }
488
489         p = find_partition(data, NULL, req);
490         if (p != NULL) {
491                 /* the caller specified what partition they want the
492                  * search - just pass it on
493                  */
494                 return ldb_next_request(p->module, req);                
495         }
496
497
498         if (search_control) {
499                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
500         }
501
502         /* Remove the domain_scope control, so we don't confuse a backend server */
503         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
504                 ldb_oom(ldb_module_get_ctx(module));
505                 return LDB_ERR_OPERATIONS_ERROR;
506         }
507
508         /*
509          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
510          * down as uncritical to make windows 2008 dcpromo happy.
511          */
512         if (search_control) {
513                 search_control->critical = 0;
514         }
515
516         /* TODO:
517            Generate referrals (look for a partition under this DN) if we don't have the above control specified
518         */
519         
520         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
521                 int i;
522                 struct partition_context *ac;
523                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
524                         /* We have processed this flag, so we are done with this control now */
525
526                         /* Remove search control, so we don't confuse a backend server */
527                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
528                                 ldb_oom(ldb_module_get_ctx(module));
529                                 return LDB_ERR_OPERATIONS_ERROR;
530                         }
531                 }
532                 ac = partition_init_ctx(module, req);
533                 if (!ac) {
534                         return LDB_ERR_OPERATIONS_ERROR;
535                 }
536
537                 /* Search from the base DN */
538                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
539                         return partition_send_all(module, ac, req);
540                 }
541                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
542                         bool match = false, stop = false;
543                         /* Find all partitions under the search base 
544                            
545                            we match if:
546
547                               1) the DN we are looking for exactly matches the partition
548                              or
549                               2) the DN we are looking for is a parent of the partition and it isn't
550                                  a scope base search
551                              or
552                               3) the DN we are looking for is a child of the partition
553                          */
554                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
555                                 match = true;
556                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
557                                         stop = true;
558                                 }
559                         }
560                         if (!match && 
561                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
562                              req->op.search.scope != LDB_SCOPE_BASE)) {
563                                 match = true;
564                         }
565                         if (!match &&
566                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
567                                 match = true;
568                                 stop = true; /* note that this relies on partition ordering */
569                         }
570                         if (match) {
571                                 ret = partition_prep_request(ac, data->partitions[i]);
572                                 if (ret != LDB_SUCCESS) {
573                                         return ret;
574                                 }
575                         }
576                         if (stop) break;
577                 }
578
579                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
580                 if (ac->num_requests == 0) {
581                         talloc_free(ac);
582                         return ldb_next_request(module, req);
583                 }
584
585                 /* fire the first one */
586                 return partition_call_first(ac);
587
588         } else {
589                 /* Handle this like all other requests */
590                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
591                         /* We have processed this flag, so we are done with this control now */
592
593                         /* Remove search control, so we don't confuse a backend server */
594                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
595                                 ldb_oom(ldb_module_get_ctx(module));
596                                 return LDB_ERR_OPERATIONS_ERROR;
597                         }
598                 }
599
600                 return partition_replicate(module, req, req->op.search.base);
601         }
602 }
603
604 /* add */
605 static int partition_add(struct ldb_module *module, struct ldb_request *req)
606 {
607         return partition_replicate(module, req, req->op.add.message->dn);
608 }
609
610 /* modify */
611 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
612 {
613         return partition_replicate(module, req, req->op.mod.message->dn);
614 }
615
616 /* delete */
617 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
618 {
619         return partition_replicate(module, req, req->op.del.dn);
620 }
621
622 /* rename */
623 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
624 {
625         /* Find backend */
626         struct dsdb_partition *backend, *backend2;
627         
628         struct partition_private_data *data = talloc_get_type(module->private_data, 
629                                                               struct partition_private_data);
630
631         /* Skip the lot if 'data' isn't here yet (initialisation) */
632         if (!data) {
633                 return LDB_ERR_OPERATIONS_ERROR;
634         }
635
636         backend = find_partition(data, req->op.rename.olddn, req);
637         backend2 = find_partition(data, req->op.rename.newdn, req);
638
639         if ((backend && !backend2) || (!backend && backend2)) {
640                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
641         }
642
643         if (backend != backend2) {
644                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
645                                        "Cannot rename from %s in %s to %s in %s: %s",
646                                        ldb_dn_get_linearized(req->op.rename.olddn),
647                                        ldb_dn_get_linearized(backend->ctrl->dn),
648                                        ldb_dn_get_linearized(req->op.rename.newdn),
649                                        ldb_dn_get_linearized(backend2->ctrl->dn),
650                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
651                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
652         }
653
654         return partition_replicate(module, req, req->op.rename.olddn);
655 }
656
657 /* start a transaction */
658 static int partition_start_trans(struct ldb_module *module)
659 {
660         int i, ret;
661         struct partition_private_data *data = talloc_get_type(module->private_data, 
662                                                               struct partition_private_data);
663         /* Look at base DN */
664         /* Figure out which partition it is under */
665         /* Skip the lot if 'data' isn't here yet (initialization) */
666         ret = ldb_next_start_trans(module);
667         if (ret != LDB_SUCCESS) {
668                 return ret;
669         }
670
671         ret = partition_reload_if_required(module, data);
672         if (ret != LDB_SUCCESS) {
673                 return ret;
674         }
675
676         for (i=0; data && data->partitions && data->partitions[i]; i++) {
677                 struct ldb_module *next = data->partitions[i]->module;
678                 PARTITION_FIND_OP(next, start_transaction);
679
680                 ret = next->ops->start_transaction(next);
681                 if (ret != LDB_SUCCESS) {
682                         /* Back it out, if it fails on one */
683                         for (i--; i >= 0; i--) {
684                                 next = data->partitions[i]->module;
685                                 PARTITION_FIND_OP(next, del_transaction);
686
687                                 next->ops->del_transaction(next);
688                         }
689                         ldb_next_del_trans(module);
690                         return ret;
691                 }
692         }
693         return LDB_SUCCESS;
694 }
695
696 /* prepare for a commit */
697 static int partition_prepare_commit(struct ldb_module *module)
698 {
699         int i;
700         struct partition_private_data *data = talloc_get_type(module->private_data, 
701                                                               struct partition_private_data);
702
703         for (i=0; data && data->partitions && data->partitions[i]; i++) {
704                 struct ldb_module *next_prepare = data->partitions[i]->module;
705                 int ret;
706
707                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
708                 if (next_prepare == NULL) {
709                         continue;
710                 }
711
712                 ret = next_prepare->ops->prepare_commit(next_prepare);
713                 if (ret != LDB_SUCCESS) {
714                         return ret;
715                 }
716         }
717
718         return ldb_next_prepare_commit(module);
719 }
720
721
722 /* end a transaction */
723 static int partition_end_trans(struct ldb_module *module)
724 {
725         int i;
726         struct partition_private_data *data = talloc_get_type(module->private_data, 
727                                                               struct partition_private_data);
728         for (i=0; data && data->partitions && data->partitions[i]; i++) {
729                 struct ldb_module *next_end = data->partitions[i]->module;
730                 int ret;
731
732                 PARTITION_FIND_OP(next_end, end_transaction);
733
734                 ret = next_end->ops->end_transaction(next_end);
735                 if (ret != LDB_SUCCESS) {
736                         return ret;
737                 }
738         }
739
740         return ldb_next_end_trans(module);
741 }
742
743 /* delete a transaction */
744 static int partition_del_trans(struct ldb_module *module)
745 {
746         int i, ret, final_ret = LDB_SUCCESS;
747         struct partition_private_data *data = talloc_get_type(module->private_data, 
748                                                               struct partition_private_data);
749         for (i=0; data && data->partitions && data->partitions[i]; i++) {
750                 struct ldb_module *next = data->partitions[i]->module;
751                 PARTITION_FIND_OP(next, del_transaction);
752
753                 ret = next->ops->del_transaction(next);
754                 if (ret != LDB_SUCCESS) {
755                         final_ret = ret;
756                 }
757         }       
758
759         ret = ldb_next_del_trans(module);
760         if (ret != LDB_SUCCESS) {
761                 final_ret = ret;
762         }
763         return final_ret;
764 }
765
766 int partition_primary_sequence_number(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
767                                      enum ldb_sequence_type type, uint64_t *seq_number) 
768 {
769         int ret;
770         struct ldb_result *res;
771         struct ldb_seqnum_request *tseq;
772         struct ldb_request *treq;
773         struct ldb_seqnum_result *seqr;
774         res = talloc_zero(mem_ctx, struct ldb_result);
775         if (res == NULL) {
776                 return LDB_ERR_OPERATIONS_ERROR;
777         }
778         tseq = talloc_zero(res, struct ldb_seqnum_request);
779         if (tseq == NULL) {
780                 talloc_free(res);
781                 return LDB_ERR_OPERATIONS_ERROR;
782         }
783         tseq->type = type;
784         
785         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
786                                      LDB_EXTENDED_SEQUENCE_NUMBER,
787                                      tseq,
788                                      NULL,
789                                      res,
790                                      ldb_extended_default_callback,
791                                      NULL);
792         if (ret != LDB_SUCCESS) {
793                 talloc_free(res);
794                 return ret;
795         }
796         
797         ret = ldb_next_request(module, treq);
798         if (ret != LDB_SUCCESS) {
799                 talloc_free(res);
800                 return ret;
801         }
802         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
803         if (ret != LDB_SUCCESS) {
804                 talloc_free(res);
805                 return ret;
806         }
807         
808         seqr = talloc_get_type(res->extended->data,
809                                struct ldb_seqnum_result);
810         if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
811                 ret = LDB_ERR_OPERATIONS_ERROR;
812                 ldb_set_errstring(ldb_module_get_ctx(module), "Primary backend in partitions module returned a timestamp based seq number (must return a normal number)");
813                 talloc_free(res);
814                 return ret;
815         } else {
816                 *seq_number = seqr->seq_num;
817         }
818         talloc_free(res);
819         return LDB_SUCCESS;
820 }
821
822 /* FIXME: This function is still semi-async */
823 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
824 {
825         int i, ret;
826         uint64_t seq_number = 0;
827         uint64_t timestamp_sequence = 0;
828         uint64_t timestamp = 0;
829         struct partition_private_data *data = talloc_get_type(module->private_data, 
830                                                               struct partition_private_data);
831         struct ldb_seqnum_request *seq;
832         struct ldb_seqnum_result *seqr;
833         struct ldb_request *treq;
834         struct ldb_seqnum_request *tseq;
835         struct ldb_seqnum_result *tseqr;
836         struct ldb_extended *ext;
837         struct ldb_result *res;
838         struct dsdb_partition *p;
839
840         p = find_partition(data, NULL, req);
841         if (p != NULL) {
842                 /* the caller specified what partition they want the
843                  * sequence number operation on - just pass it on
844                  */
845                 return ldb_next_request(p->module, req);                
846         }
847
848         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
849
850         switch (seq->type) {
851         case LDB_SEQ_NEXT:
852         case LDB_SEQ_HIGHEST_SEQ:
853
854                 ret = partition_primary_sequence_number(module, req, seq->type, &seq_number);
855                 if (ret != LDB_SUCCESS) {
856                         return ret;
857                 }
858
859                 /* Skip the lot if 'data' isn't here yet (initialisation) */
860                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
861
862                         res = talloc_zero(req, struct ldb_result);
863                         if (res == NULL) {
864                                 return LDB_ERR_OPERATIONS_ERROR;
865                         }
866                         tseq = talloc_zero(res, struct ldb_seqnum_request);
867                         if (tseq == NULL) {
868                                 talloc_free(res);
869                                 return LDB_ERR_OPERATIONS_ERROR;
870                         }
871                         tseq->type = seq->type;
872
873                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
874                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
875                                                      tseq,
876                                                      NULL,
877                                                      res,
878                                                      ldb_extended_default_callback,
879                                                      NULL);
880                         if (ret != LDB_SUCCESS) {
881                                 talloc_free(res);
882                                 return ret;
883                         }
884
885                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
886                                 ret = ldb_request_add_control(treq,
887                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
888                                                               false, data->partitions[i]->ctrl);
889                                 if (ret != LDB_SUCCESS) {
890                                         talloc_free(res);
891                                         return ret;
892                                 }
893                         }
894
895                         ret = partition_request(data->partitions[i]->module, treq);
896                         if (ret != LDB_SUCCESS) {
897                                 talloc_free(res);
898                                 return ret;
899                         }
900                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
901                         if (ret != LDB_SUCCESS) {
902                                 talloc_free(res);
903                                 return ret;
904                         }
905                         tseqr = talloc_get_type(res->extended->data,
906                                                 struct ldb_seqnum_result);
907                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
908                                 timestamp_sequence = MAX(timestamp_sequence,
909                                                          tseqr->seq_num);
910                         } else {
911                                 seq_number += tseqr->seq_num;
912                         }
913                         talloc_free(res);
914                 }
915                 /* fall through */
916         case LDB_SEQ_HIGHEST_TIMESTAMP:
917
918                 res = talloc_zero(req, struct ldb_result);
919                 if (res == NULL) {
920                         return LDB_ERR_OPERATIONS_ERROR;
921                 }
922
923                 tseq = talloc_zero(res, struct ldb_seqnum_request);
924                 if (tseq == NULL) {
925                         talloc_free(res);
926                         return LDB_ERR_OPERATIONS_ERROR;
927                 }
928                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
929
930                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
931                                              LDB_EXTENDED_SEQUENCE_NUMBER,
932                                              tseq,
933                                              NULL,
934                                              res,
935                                              ldb_extended_default_callback,
936                                              NULL);
937                 if (ret != LDB_SUCCESS) {
938                         talloc_free(res);
939                         return ret;
940                 }
941
942                 ret = ldb_next_request(module, treq);
943                 if (ret != LDB_SUCCESS) {
944                         talloc_free(res);
945                         return ret;
946                 }
947                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
948                 if (ret != LDB_SUCCESS) {
949                         talloc_free(res);
950                         return ret;
951                 }
952
953                 tseqr = talloc_get_type(res->extended->data,
954                                            struct ldb_seqnum_result);
955                 timestamp = tseqr->seq_num;
956
957                 talloc_free(res);
958
959                 /* Skip the lot if 'data' isn't here yet (initialisation) */
960                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
961
962                         res = talloc_zero(req, struct ldb_result);
963                         if (res == NULL) {
964                                 return LDB_ERR_OPERATIONS_ERROR;
965                         }
966
967                         tseq = talloc_zero(res, struct ldb_seqnum_request);
968                         if (tseq == NULL) {
969                                 talloc_free(res);
970                                 return LDB_ERR_OPERATIONS_ERROR;
971                         }
972                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
973
974                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
975                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
976                                                      tseq,
977                                                      NULL,
978                                                      res,
979                                                      ldb_extended_default_callback,
980                                                      NULL);
981                         if (ret != LDB_SUCCESS) {
982                                 talloc_free(res);
983                                 return ret;
984                         }
985
986                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
987                                 ret = ldb_request_add_control(treq,
988                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
989                                                               false, data->partitions[i]->ctrl);
990                                 if (ret != LDB_SUCCESS) {
991                                         talloc_free(res);
992                                         return ret;
993                                 }
994                         }
995
996                         ret = partition_request(data->partitions[i]->module, treq);
997                         if (ret != LDB_SUCCESS) {
998                                 talloc_free(res);
999                                 return ret;
1000                         }
1001                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
1002                         if (ret != LDB_SUCCESS) {
1003                                 talloc_free(res);
1004                                 return ret;
1005                         }
1006
1007                         tseqr = talloc_get_type(res->extended->data,
1008                                                   struct ldb_seqnum_result);
1009                         timestamp = MAX(timestamp, tseqr->seq_num);
1010
1011                         talloc_free(res);
1012                 }
1013
1014                 break;
1015         }
1016
1017         ext = talloc_zero(req, struct ldb_extended);
1018         if (!ext) {
1019                 return LDB_ERR_OPERATIONS_ERROR;
1020         }
1021         seqr = talloc_zero(ext, struct ldb_seqnum_result);
1022         if (seqr == NULL) {
1023                 talloc_free(ext);
1024                 return LDB_ERR_OPERATIONS_ERROR;
1025         }
1026         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1027         ext->data = seqr;
1028
1029         switch (seq->type) {
1030         case LDB_SEQ_NEXT:
1031         case LDB_SEQ_HIGHEST_SEQ:
1032
1033                 /* Has someone above set a timebase sequence? */
1034                 if (timestamp_sequence) {
1035                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1036                 } else {
1037                         seqr->seq_num = seq_number;
1038                 }
1039
1040                 if (timestamp_sequence > seqr->seq_num) {
1041                         seqr->seq_num = timestamp_sequence;
1042                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1043                 }
1044
1045                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1046                 break;
1047         case LDB_SEQ_HIGHEST_TIMESTAMP:
1048                 seqr->seq_num = timestamp;
1049                 break;
1050         }
1051
1052         if (seq->type == LDB_SEQ_NEXT) {
1053                 seqr->seq_num++;
1054         }
1055
1056         /* send request done */
1057         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1058 }
1059
1060 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1061 {
1062         struct dsdb_partition *partition;
1063         struct partition_private_data *data;
1064         struct ldb_dn *schema_dn;
1065         struct partition_context *ac;
1066         int ret;
1067
1068         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1069         if (!schema_dn) {
1070                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1071                 return LDB_ERR_PROTOCOL_ERROR;
1072         }
1073
1074         data = talloc_get_type(module->private_data, struct partition_private_data);
1075         if (!data) {
1076                 return LDB_ERR_OPERATIONS_ERROR;
1077         }
1078         
1079         partition = find_partition( data, schema_dn, req);
1080         if (!partition) {
1081                 return ldb_next_request(module, req);
1082         }
1083
1084         ac = partition_init_ctx(module, req);
1085         if (!ac) {
1086                 return LDB_ERR_OPERATIONS_ERROR;
1087         }
1088
1089         /* we need to add a control but we never touch the original request */
1090         ret = partition_prep_request(ac, partition);
1091         if (ret != LDB_SUCCESS) {
1092                 return ret;
1093         }
1094
1095         /* fire the first one */
1096         ret = partition_call_first(ac);
1097
1098         if (ret != LDB_SUCCESS){
1099                 return ret;
1100         }
1101
1102         return ldb_request_done(req, ret);
1103 }
1104
1105
1106 /* extended */
1107 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1108 {
1109         struct partition_private_data *data;
1110         struct partition_context *ac;
1111         int ret;
1112
1113         data = talloc_get_type(module->private_data, struct partition_private_data);
1114         if (!data) {
1115                 return ldb_next_request(module, req);
1116         }
1117
1118         ret = partition_reload_if_required(module, data);
1119         if (ret != LDB_SUCCESS) {
1120                 return ret;
1121         }
1122         
1123         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1124                 return partition_sequence_number(module, req);
1125         }
1126
1127         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1128                 return partition_create(module, req);
1129         }
1130
1131         /* forward schemaUpdateNow operation to schema_fsmo module*/
1132         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1133                 return partition_extended_schema_update_now( module, req );
1134         }       
1135
1136         /* 
1137          * as the extended operation has no dn
1138          * we need to send it to all partitions
1139          */
1140
1141         ac = partition_init_ctx(module, req);
1142         if (!ac) {
1143                 return LDB_ERR_OPERATIONS_ERROR;
1144         }
1145
1146         return partition_send_all(module, ac, req);
1147 }
1148
1149 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1150         .name              = "partition",
1151         .init_context      = partition_init,
1152         .search            = partition_search,
1153         .add               = partition_add,
1154         .modify            = partition_modify,
1155         .del               = partition_delete,
1156         .rename            = partition_rename,
1157         .extended          = partition_extended,
1158         .start_transaction = partition_start_trans,
1159         .prepare_commit    = partition_prepare_commit,
1160         .end_transaction   = partition_end_trans,
1161         .del_transaction   = partition_del_trans,
1162 };