ad3518e6b2ec61acf7e3374b9df75fb2f79862e2
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v3 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 3 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  *  Author: Stefan Metzmacher
34  */
35
36 #include "includes.h"
37 #include "ldb/include/ldb_includes.h"
38 #include "dsdb/samdb/samdb.h"
39
40 struct partition_private_data {
41         struct dsdb_control_current_partition **partitions;
42         struct ldb_dn **replicate;
43 };
44
45 struct part_request {
46         struct ldb_module *module;
47         struct ldb_request *req;
48 };
49
50 struct partition_context {
51         struct ldb_module *module;
52         struct ldb_request *req;
53
54         struct part_request *part_req;
55         int num_requests;
56         int finished_requests;
57 };
58
59 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
60 {
61         struct partition_context *ac;
62
63         ac = talloc_zero(req, struct partition_context);
64         if (ac == NULL) {
65                 ldb_set_errstring(module->ldb, "Out of Memory");
66                 return NULL;
67         }
68
69         ac->module = module;
70         ac->req = req;
71
72         return ac;
73 }
74
75 #define PARTITION_FIND_OP(module, op) do { \
76         struct ldb_context *ldbctx = module->ldb; \
77         while (module && module->ops->op == NULL) module = module->next; \
78         if (module == NULL) { \
79                 ldb_asprintf_errstring(ldbctx, \
80                         "Unable to find backend operation for " #op ); \
81                 return LDB_ERR_OPERATIONS_ERROR; \
82         } \
83 } while (0)
84
85 /*
86  *    helper functions to call the next module in chain
87  *    */
88
89 int partition_request(struct ldb_module *module, struct ldb_request *request)
90 {
91         int ret;
92         switch (request->operation) {
93         case LDB_SEARCH:
94                 PARTITION_FIND_OP(module, search);
95                 ret = module->ops->search(module, request);
96                 break;
97         case LDB_ADD:
98                 PARTITION_FIND_OP(module, add);
99                 ret = module->ops->add(module, request);
100                 break;
101         case LDB_MODIFY:
102                 PARTITION_FIND_OP(module, modify);
103                 ret = module->ops->modify(module, request);
104                 break;
105         case LDB_DELETE:
106                 PARTITION_FIND_OP(module, del);
107                 ret = module->ops->del(module, request);
108                 break;
109         case LDB_RENAME:
110                 PARTITION_FIND_OP(module, rename);
111                 ret = module->ops->rename(module, request);
112                 break;
113         case LDB_EXTENDED:
114                 PARTITION_FIND_OP(module, extended);
115                 ret = module->ops->extended(module, request);
116                 break;
117         case LDB_SEQUENCE_NUMBER:
118                 PARTITION_FIND_OP(module, sequence_number);
119                 ret = module->ops->sequence_number(module, request);
120                 break;
121         default:
122                 PARTITION_FIND_OP(module, request);
123                 ret = module->ops->request(module, request);
124                 break;
125         }
126         if (ret == LDB_SUCCESS) {
127                 return ret;
128         }
129         if (!ldb_errstring(module->ldb)) {
130                 /* Set a default error string, to place the blame somewhere */
131                 ldb_asprintf_errstring(module->ldb,
132                                         "error in module %s: %s (%d)",
133                                         module->ops->name,
134                                         ldb_strerror(ret), ret);
135         }
136         return ret;
137 }
138
139 static struct dsdb_control_current_partition *find_partition(struct partition_private_data *data,
140                                                              struct ldb_dn *dn)
141 {
142         int i;
143
144         /* Look at base DN */
145         /* Figure out which partition it is under */
146         /* Skip the lot if 'data' isn't here yet (initialistion) */
147         for (i=0; data && data->partitions && data->partitions[i]; i++) {
148                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
149                         return data->partitions[i];
150                 }
151         }
152
153         return NULL;
154 };
155
156 /**
157  * fire the caller's callback for every entry, but only send 'done' once.
158  */
159 static int partition_req_callback(struct ldb_request *req,
160                                   struct ldb_reply *ares)
161 {
162         struct partition_context *ac;
163         struct ldb_module *module;
164         struct ldb_request *nreq;
165         int ret;
166
167         ac = talloc_get_type(req->context, struct partition_context);
168
169         if (!ares) {
170                 return ldb_module_done(ac->req, NULL, NULL,
171                                         LDB_ERR_OPERATIONS_ERROR);
172         }
173         if (ares->error != LDB_SUCCESS) {
174                 return ldb_module_done(ac->req, ares->controls,
175                                         ares->response, ares->error);
176         }
177
178         switch (ares->type) {
179         case LDB_REPLY_REFERRAL:
180                 /* ignore referrals for now */
181                 break;
182
183         case LDB_REPLY_ENTRY:
184                 if (ac->req->operation != LDB_SEARCH) {
185                         ldb_set_errstring(ac->module->ldb,
186                                 "partition_req_callback:"
187                                 " Unsupported reply type for this request");
188                         return ldb_module_done(ac->req, NULL, NULL,
189                                                 LDB_ERR_OPERATIONS_ERROR);
190                 }
191                 return ldb_module_send_entry(ac->req, ares->message);
192
193         case LDB_REPLY_DONE:
194                 if (ac->req->operation == LDB_EXTENDED) {
195                         /* FIXME: check for ares->response, replmd does not fill it ! */
196                         if (ares->response) {
197                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
198                                         ldb_set_errstring(ac->module->ldb,
199                                                           "partition_req_callback:"
200                                                           " Unknown extended reply, "
201                                                           "only supports START_TLS");
202                                         talloc_free(ares);
203                                         return ldb_module_done(ac->req, NULL, NULL,
204                                                                 LDB_ERR_OPERATIONS_ERROR);
205                                 }
206                         }
207                 }
208
209                 ac->finished_requests++;
210                 if (ac->finished_requests == ac->num_requests) {
211                         /* this was the last one, call callback */
212                         return ldb_module_done(ac->req, ares->controls,
213                                                 ares->response, ares->error);
214                 }
215
216                 /* not the last, now call the next one */
217                 module = ac->part_req[ac->finished_requests].module;
218                 nreq = ac->part_req[ac->finished_requests].req;
219
220                 ret = partition_request(module, nreq);
221                 if (ret != LDB_SUCCESS) {
222                         talloc_free(ares);
223                         return ldb_module_done(ac->req, NULL, NULL, ret);
224                 }
225
226                 break;
227         }
228
229         talloc_free(ares);
230         return LDB_SUCCESS;
231 }
232
233 static int partition_prep_request(struct partition_context *ac,
234                                   struct dsdb_control_current_partition *partition)
235 {
236         int ret;
237         struct ldb_request *req;
238
239         ac->part_req = talloc_realloc(ac, ac->part_req,
240                                         struct part_request,
241                                         ac->num_requests + 1);
242         if (ac->part_req == NULL) {
243                 ldb_oom(ac->module->ldb);
244                 return LDB_ERR_OPERATIONS_ERROR;
245         }
246
247         switch (ac->req->operation) {
248         case LDB_SEARCH:
249                 ret = ldb_build_search_req_ex(&req, ac->module->ldb,
250                                         ac->part_req,
251                                         ac->req->op.search.base,
252                                         ac->req->op.search.scope,
253                                         ac->req->op.search.tree,
254                                         ac->req->op.search.attrs,
255                                         ac->req->controls,
256                                         ac, partition_req_callback,
257                                         ac->req);
258                 break;
259         case LDB_ADD:
260                 ret = ldb_build_add_req(&req, ac->module->ldb, ac->part_req,
261                                         ac->req->op.add.message,
262                                         ac->req->controls,
263                                         ac, partition_req_callback,
264                                         ac->req);
265                 break;
266         case LDB_MODIFY:
267                 ret = ldb_build_mod_req(&req, ac->module->ldb, ac->part_req,
268                                         ac->req->op.mod.message,
269                                         ac->req->controls,
270                                         ac, partition_req_callback,
271                                         ac->req);
272                 break;
273         case LDB_DELETE:
274                 ret = ldb_build_del_req(&req, ac->module->ldb, ac->part_req,
275                                         ac->req->op.del.dn,
276                                         ac->req->controls,
277                                         ac, partition_req_callback,
278                                         ac->req);
279                 break;
280         case LDB_RENAME:
281                 ret = ldb_build_rename_req(&req, ac->module->ldb, ac->part_req,
282                                         ac->req->op.rename.olddn,
283                                         ac->req->op.rename.newdn,
284                                         ac->req->controls,
285                                         ac, partition_req_callback,
286                                         ac->req);
287                 break;
288         case LDB_EXTENDED:
289                 ret = ldb_build_extended_req(&req, ac->module->ldb,
290                                         ac->part_req,
291                                         ac->req->op.extended.oid,
292                                         ac->req->op.extended.data,
293                                         ac->req->controls,
294                                         ac, partition_req_callback,
295                                         ac->req);
296                 break;
297         default:
298                 ldb_set_errstring(ac->module->ldb,
299                                   "Unsupported request type!");
300                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
301         }
302
303         if (ret != LDB_SUCCESS) {
304                 return ret;
305         }
306
307         ac->part_req[ac->num_requests].req = req;
308
309         if (ac->req->controls) {
310                 req->controls = talloc_memdup(req, ac->req->controls,
311                                         talloc_get_size(ac->req->controls));
312                 if (req->controls == NULL) {
313                         ldb_oom(ac->module->ldb);
314                         return LDB_ERR_OPERATIONS_ERROR;
315                 }
316         }
317
318         if (partition) {
319                 ac->part_req[ac->num_requests].module = partition->module;
320
321                 ret = ldb_request_add_control(req,
322                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
323                                         false, partition);
324                 if (ret != LDB_SUCCESS) {
325                         return ret;
326                 }
327
328                 if (req->operation == LDB_SEARCH) {
329                         /* If the search is for 'more' than this partition,
330                          * then change the basedn, so a remote LDAP server
331                          * doesn't object */
332                         if (ldb_dn_compare_base(partition->dn,
333                                                 req->op.search.base) != 0) {
334                                 req->op.search.base = partition->dn;
335                         }
336                 }
337
338         } else {
339                 /* make sure you put the NEXT module here, or
340                  * partition_request() will simply loop forever on itself */
341                 ac->part_req[ac->num_requests].module = ac->module->next;
342         }
343
344         ac->num_requests++;
345
346         return LDB_SUCCESS;
347 }
348
349 static int partition_call_first(struct partition_context *ac)
350 {
351         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
352 }
353
354 /**
355  * Send a request down to all the partitions
356  */
357 static int partition_send_all(struct ldb_module *module, 
358                               struct partition_context *ac, 
359                               struct ldb_request *req) 
360 {
361         int i;
362         struct partition_private_data *data = talloc_get_type(module->private_data, 
363                                                               struct partition_private_data);
364         int ret = partition_prep_request(ac, NULL);
365         if (ret != LDB_SUCCESS) {
366                 return ret;
367         }
368         for (i=0; data && data->partitions && data->partitions[i]; i++) {
369                 ret = partition_prep_request(ac, data->partitions[i]);
370                 if (ret != LDB_SUCCESS) {
371                         return ret;
372                 }
373         }
374
375         /* fire the first one */
376         return partition_call_first(ac);
377 }
378
379 /**
380  * Figure out which backend a request needs to be aimed at.  Some
381  * requests must be replicated to all backends
382  */
383 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
384 {
385         struct partition_context *ac;
386         unsigned i;
387         int ret;
388         struct dsdb_control_current_partition *partition;
389         struct partition_private_data *data = talloc_get_type(module->private_data, 
390                                                               struct partition_private_data);
391         if (!data || !data->partitions) {
392                 return ldb_next_request(module, req);
393         }
394         
395         if (req->operation != LDB_SEARCH) {
396                 /* Is this a special DN, we need to replicate to every backend? */
397                 for (i=0; data->replicate && data->replicate[i]; i++) {
398                         if (ldb_dn_compare(data->replicate[i], 
399                                            dn) == 0) {
400                                 
401                                 ac = partition_init_ctx(module, req);
402                                 if (!ac) {
403                                         return LDB_ERR_OPERATIONS_ERROR;
404                                 }
405                                 
406                                 return partition_send_all(module, ac, req);
407                         }
408                 }
409         }
410
411         /* Otherwise, we need to find the partition to fire it to */
412
413         /* Find partition */
414         partition = find_partition(data, dn);
415         if (!partition) {
416                 /*
417                  * if we haven't found a matching partition
418                  * pass the request to the main ldb
419                  *
420                  * TODO: we should maybe return an error here
421                  *       if it's not a special dn
422                  */
423
424                 return ldb_next_request(module, req);
425         }
426
427         ac = partition_init_ctx(module, req);
428         if (!ac) {
429                 return LDB_ERR_OPERATIONS_ERROR;
430         }
431
432         /* we need to add a control but we never touch the original request */
433         ret = partition_prep_request(ac, partition);
434         if (ret != LDB_SUCCESS) {
435                 return ret;
436         }
437
438         /* fire the first one */
439         return partition_call_first(ac);
440 }
441
442 /* search */
443 static int partition_search(struct ldb_module *module, struct ldb_request *req)
444 {
445         struct ldb_control **saved_controls;
446         
447         /* Find backend */
448         struct partition_private_data *data = talloc_get_type(module->private_data, 
449                                                               struct partition_private_data);
450         /* issue request */
451
452         /* (later) consider if we should be searching multiple
453          * partitions (for 'invisible' partition behaviour */
454
455         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
456         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
457         
458         struct ldb_search_options_control *search_options = NULL;
459         if (search_control) {
460                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
461         }
462
463         /* Remove the domain_scope control, so we don't confuse a backend server */
464         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
465                 ldb_oom(module->ldb);
466                 return LDB_ERR_OPERATIONS_ERROR;
467         }
468
469         /*
470          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
471          * down as uncritical to make windows 2008 dcpromo happy.
472          */
473         if (search_control) {
474                 search_control->critical = 0;
475         }
476
477         /* TODO:
478            Generate referrals (look for a partition under this DN) if we don't have the above control specified
479         */
480         
481         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
482                 int ret, i;
483                 struct partition_context *ac;
484                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
485                         /* We have processed this flag, so we are done with this control now */
486
487                         /* Remove search control, so we don't confuse a backend server */
488                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
489                                 ldb_oom(module->ldb);
490                                 return LDB_ERR_OPERATIONS_ERROR;
491                         }
492                 }
493                 ac = partition_init_ctx(module, req);
494                 if (!ac) {
495                         return LDB_ERR_OPERATIONS_ERROR;
496                 }
497
498                 /* Search from the base DN */
499                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
500                         return partition_send_all(module, ac, req);
501                 }
502                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
503                         /* Find all partitions under the search base */
504                         if (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0) {
505                                 ret = partition_prep_request(ac, data->partitions[i]);
506                                 if (ret != LDB_SUCCESS) {
507                                         return ret;
508                                 }
509                         }
510                 }
511
512                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
513                 if (ac->num_requests == 0) {
514                         talloc_free(ac);
515                         return ldb_next_request(module, req);
516                 }
517
518                 /* fire the first one */
519                 return partition_call_first(ac);
520
521         } else {
522                 /* Handle this like all other requests */
523                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
524                         /* We have processed this flag, so we are done with this control now */
525
526                         /* Remove search control, so we don't confuse a backend server */
527                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
528                                 ldb_oom(module->ldb);
529                                 return LDB_ERR_OPERATIONS_ERROR;
530                         }
531                 }
532
533                 return partition_replicate(module, req, req->op.search.base);
534         }
535 }
536
537 /* add */
538 static int partition_add(struct ldb_module *module, struct ldb_request *req)
539 {
540         return partition_replicate(module, req, req->op.add.message->dn);
541 }
542
543 /* modify */
544 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
545 {
546         return partition_replicate(module, req, req->op.mod.message->dn);
547 }
548
549 /* delete */
550 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
551 {
552         return partition_replicate(module, req, req->op.del.dn);
553 }
554
555 /* rename */
556 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
557 {
558         int i, matched = -1;
559         /* Find backend */
560         struct dsdb_control_current_partition *backend, *backend2;
561         
562         struct partition_private_data *data = talloc_get_type(module->private_data, 
563                                                               struct partition_private_data);
564
565         /* Skip the lot if 'data' isn't here yet (initialization) */
566         if (!data) {
567                 return LDB_ERR_OPERATIONS_ERROR;
568         }
569
570         backend = find_partition(data, req->op.rename.olddn);
571         backend2 = find_partition(data, req->op.rename.newdn);
572
573         if ((backend && !backend2) || (!backend && backend2)) {
574                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
575         }
576
577         if (backend != backend2) {
578                 ldb_asprintf_errstring(module->ldb, 
579                                        "Cannot rename from %s in %s to %s in %s: %s",
580                                        ldb_dn_get_linearized(req->op.rename.olddn),
581                                        ldb_dn_get_linearized(backend->dn),
582                                        ldb_dn_get_linearized(req->op.rename.newdn),
583                                        ldb_dn_get_linearized(backend2->dn),
584                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
585                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
586         }
587
588         for (i=0; data && data->partitions && data->partitions[i]; i++) {
589                 if (ldb_dn_compare_base(req->op.rename.olddn, data->partitions[i]->dn) == 0) {
590                         matched = i;
591                 }
592         }
593
594         if (matched > 0) {
595                 ldb_asprintf_errstring(module->ldb, 
596                                        "Cannot rename from %s to %s, subtree rename would cross partition %s: %s",
597                                        ldb_dn_get_linearized(req->op.rename.olddn),
598                                        ldb_dn_get_linearized(req->op.rename.newdn),
599                                        ldb_dn_get_linearized(data->partitions[matched]->dn),
600                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
601                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
602         }
603
604         return partition_replicate(module, req, req->op.rename.olddn);
605 }
606
607 /* start a transaction */
608 static int partition_start_trans(struct ldb_module *module)
609 {
610         int i, ret;
611         struct partition_private_data *data = talloc_get_type(module->private_data, 
612                                                               struct partition_private_data);
613         /* Look at base DN */
614         /* Figure out which partition it is under */
615         /* Skip the lot if 'data' isn't here yet (initialization) */
616         ret = ldb_next_start_trans(module);
617         if (ret != LDB_SUCCESS) {
618                 return ret;
619         }
620
621         for (i=0; data && data->partitions && data->partitions[i]; i++) {
622                 struct ldb_module *next = data->partitions[i]->module;
623                 PARTITION_FIND_OP(next, start_transaction);
624
625                 ret = next->ops->start_transaction(next);
626                 if (ret != LDB_SUCCESS) {
627                         /* Back it out, if it fails on one */
628                         for (i--; i >= 0; i--) {
629                                 next = data->partitions[i]->module;
630                                 PARTITION_FIND_OP(next, del_transaction);
631
632                                 next->ops->del_transaction(next);
633                         }
634                         ldb_next_del_trans(module);
635                         return ret;
636                 }
637         }
638         return LDB_SUCCESS;
639 }
640
641 /* end a transaction */
642 static int partition_end_trans(struct ldb_module *module)
643 {
644         int i, ret;
645         struct partition_private_data *data = talloc_get_type(module->private_data, 
646                                                               struct partition_private_data);
647         ret = ldb_next_end_trans(module);
648         if (ret != LDB_SUCCESS) {
649                 return ret;
650         }
651
652         /* Look at base DN */
653         /* Figure out which partition it is under */
654         /* Skip the lot if 'data' isn't here yet (initialistion) */
655         for (i=0; data && data->partitions && data->partitions[i]; i++) {
656                 struct ldb_module *next = data->partitions[i]->module;
657                 PARTITION_FIND_OP(next, end_transaction);
658
659                 ret = next->ops->end_transaction(next);
660                 if (ret != LDB_SUCCESS) {
661                         /* Back it out, if it fails on one */
662                         for (i--; i >= 0; i--) {
663                                 next = data->partitions[i]->module;
664                                 PARTITION_FIND_OP(next, del_transaction);
665
666                                 next->ops->del_transaction(next);
667                         }
668                         ldb_next_del_trans(module);
669                         return ret;
670                 }
671         }
672
673         return LDB_SUCCESS;
674 }
675
676 /* delete a transaction */
677 static int partition_del_trans(struct ldb_module *module)
678 {
679         int i, ret, ret2 = LDB_SUCCESS;
680         struct partition_private_data *data = talloc_get_type(module->private_data, 
681                                                               struct partition_private_data);
682         ret = ldb_next_del_trans(module);
683         if (ret != LDB_SUCCESS) {
684                 ret2 = ret;
685         }
686
687         /* Look at base DN */
688         /* Figure out which partition it is under */
689         /* Skip the lot if 'data' isn't here yet (initialistion) */
690         for (i=0; data && data->partitions && data->partitions[i]; i++) {
691                 struct ldb_module *next = data->partitions[i]->module;
692                 PARTITION_FIND_OP(next, del_transaction);
693
694                 ret = next->ops->del_transaction(next);
695                 if (ret != LDB_SUCCESS) {
696                         ret2 = ret;
697                 }
698         }
699         return ret2;
700 }
701
702 /* NOTE: ldb_sequence_number is still a completely SYNCHRONOUS call
703  * implemented only in ldb_rdb. It does not require ldb_wait() to be
704  * called after a request is made */
705 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
706 {
707         int i, ret;
708         uint64_t seq_number = 0;
709         uint64_t timestamp_sequence = 0;
710         uint64_t timestamp = 0;
711         struct partition_private_data *data = talloc_get_type(module->private_data, 
712                                                               struct partition_private_data);
713
714         switch (req->op.seq_num.type) {
715         case LDB_SEQ_NEXT:
716         case LDB_SEQ_HIGHEST_SEQ:
717                 ret = ldb_next_request(module, req);
718                 if (ret != LDB_SUCCESS) {
719                         return ret;
720                 }
721                 if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
722                         timestamp_sequence = req->op.seq_num.seq_num;
723                 } else {
724                         seq_number = seq_number + req->op.seq_num.seq_num;
725                 }
726
727                 /* gross hack part1 */
728                 ret = ldb_request_add_control(req,
729                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
730                                         false, NULL);
731                 if (ret != LDB_SUCCESS) {
732                         return ret;
733                 }
734
735                 /* Look at base DN */
736                 /* Figure out which partition it is under */
737                 /* Skip the lot if 'data' isn't here yet (initialistion) */
738                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
739
740                         /* gross hack part2 */
741                         int j;
742                         for (j=0; req->controls[j]; j++) {
743                                 if (strcmp(req->controls[j]->oid, DSDB_CONTROL_CURRENT_PARTITION_OID) == 0) {
744                                         req->controls[j]->data = data->partitions[i];
745                                         break;
746                                 }
747                         }
748
749                         ret = partition_request(data->partitions[i]->module, req);
750                         if (ret != LDB_SUCCESS) {
751                                 return ret;
752                         }
753                         if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
754                                 timestamp_sequence = MAX(timestamp_sequence, req->op.seq_num.seq_num);
755                         } else {
756                                 seq_number = seq_number + req->op.seq_num.seq_num;
757                         }
758                 }
759                 /* fall though */
760         case LDB_SEQ_HIGHEST_TIMESTAMP:
761         {
762                 struct ldb_request *date_req = talloc(req, struct ldb_request);
763                 if (!date_req) {
764                         return LDB_ERR_OPERATIONS_ERROR;
765                 }
766                 *date_req = *req;
767                 date_req->op.seq_num.flags = LDB_SEQ_HIGHEST_TIMESTAMP;
768
769                 ret = ldb_next_request(module, date_req);
770                 if (ret != LDB_SUCCESS) {
771                         return ret;
772                 }
773                 timestamp = date_req->op.seq_num.seq_num;
774
775                 /* Look at base DN */
776                 /* Figure out which partition it is under */
777                 /* Skip the lot if 'data' isn't here yet (initialistion) */
778                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
779
780                         ret = partition_request(data->partitions[i]->module, req);
781                         if (ret != LDB_SUCCESS) {
782                                 return ret;
783                         }
784                         timestamp = MAX(timestamp, date_req->op.seq_num.seq_num);
785                 }
786                 break;
787         }
788         }
789
790         switch (req->op.seq_num.flags) {
791         case LDB_SEQ_NEXT:
792         case LDB_SEQ_HIGHEST_SEQ:
793
794                 req->op.seq_num.flags = 0;
795
796                 /* Has someone above set a timebase sequence? */
797                 if (timestamp_sequence) {
798                         req->op.seq_num.seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
799                 } else {
800                         req->op.seq_num.seq_num = seq_number;
801                 }
802
803                 if (timestamp_sequence > req->op.seq_num.seq_num) {
804                         req->op.seq_num.seq_num = timestamp_sequence;
805                         req->op.seq_num.flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
806                 }
807
808                 req->op.seq_num.flags |= LDB_SEQ_GLOBAL_SEQUENCE;
809                 break;
810         case LDB_SEQ_HIGHEST_TIMESTAMP:
811                 req->op.seq_num.seq_num = timestamp;
812                 break;
813         }
814
815         switch (req->op.seq_num.flags) {
816         case LDB_SEQ_NEXT:
817                 req->op.seq_num.seq_num++;
818         }
819         return LDB_SUCCESS;
820 }
821
822 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
823 {
824         struct dsdb_extended_replicated_objects *ext;
825
826         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
827         if (!ext) {
828                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
829                 return LDB_ERR_PROTOCOL_ERROR;
830         }
831
832         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
833                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
834                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
835                 return LDB_ERR_PROTOCOL_ERROR;
836         }
837
838         return partition_replicate(module, req, ext->partition_dn);
839 }
840
841 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
842 {
843         struct dsdb_control_current_partition *partition;
844         struct partition_private_data *data;
845         struct ldb_dn *schema_dn;
846         struct partition_context *ac;
847         int ret;
848
849         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
850         if (!schema_dn) {
851                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
852                 return LDB_ERR_PROTOCOL_ERROR;
853         }
854
855         data = talloc_get_type(module->private_data, struct partition_private_data);
856         if (!data) {
857                 return LDB_ERR_OPERATIONS_ERROR;
858         }
859         
860         partition = find_partition( data, schema_dn );
861         if (!partition) {
862                 return ldb_next_request(module, req);
863         }
864
865         ac = partition_init_ctx(module, req);
866         if (!ac) {
867                 return LDB_ERR_OPERATIONS_ERROR;
868         }
869
870         /* we need to add a control but we never touch the original request */
871         ret = partition_prep_request(ac, partition);
872         if (ret != LDB_SUCCESS) {
873                 return ret;
874         }
875
876         /* fire the first one */
877         return partition_call_first(ac);
878 }
879
880
881 /* extended */
882 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
883 {
884         struct partition_private_data *data;
885         struct partition_context *ac;
886
887         data = talloc_get_type(module->private_data, struct partition_private_data);
888         if (!data || !data->partitions) {
889                 return ldb_next_request(module, req);
890         }
891
892         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
893                 return partition_extended_replicated_objects(module, req);
894         }
895
896         /* forward schemaUpdateNow operation to schema_fsmo module*/
897         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
898                 return partition_extended_schema_update_now( module, req );
899         }       
900
901         /* 
902          * as the extended operation has no dn
903          * we need to send it to all partitions
904          */
905
906         ac = partition_init_ctx(module, req);
907         if (!ac) {
908                 return LDB_ERR_OPERATIONS_ERROR;
909         }
910
911         return partition_send_all(module, ac, req);
912 }
913
914 static int partition_sort_compare(const void *v1, const void *v2)
915 {
916         struct dsdb_control_current_partition *p1;
917         struct dsdb_control_current_partition *p2;
918
919         p1 = *((struct dsdb_control_current_partition **)v1);
920         p2 = *((struct dsdb_control_current_partition **)v2);
921
922         return ldb_dn_compare(p1->dn, p2->dn);
923 }
924
925 static int partition_init(struct ldb_module *module)
926 {
927         int ret, i;
928         TALLOC_CTX *mem_ctx = talloc_new(module);
929         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
930         struct ldb_result *res;
931         struct ldb_message *msg;
932         struct ldb_message_element *partition_attributes;
933         struct ldb_message_element *replicate_attributes;
934         struct ldb_message_element *modules_attributes;
935
936         struct partition_private_data *data;
937
938         if (!mem_ctx) {
939                 return LDB_ERR_OPERATIONS_ERROR;
940         }
941
942         data = talloc(mem_ctx, struct partition_private_data);
943         if (data == NULL) {
944                 return LDB_ERR_OPERATIONS_ERROR;
945         }
946
947         ret = ldb_search(module->ldb, mem_ctx, &res,
948                          ldb_dn_new(mem_ctx, module->ldb, "@PARTITION"),
949                          LDB_SCOPE_BASE, attrs, NULL);
950         if (ret != LDB_SUCCESS) {
951                 talloc_free(mem_ctx);
952                 return ret;
953         }
954         if (res->count == 0) {
955                 talloc_free(mem_ctx);
956                 return ldb_next_init(module);
957         }
958
959         if (res->count > 1) {
960                 talloc_free(mem_ctx);
961                 return LDB_ERR_CONSTRAINT_VIOLATION;
962         }
963
964         msg = res->msgs[0];
965
966         partition_attributes = ldb_msg_find_element(msg, "partition");
967         if (!partition_attributes) {
968                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
969                 talloc_free(mem_ctx);
970                 return LDB_ERR_CONSTRAINT_VIOLATION;
971         }
972         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
973         if (!data->partitions) {
974                 talloc_free(mem_ctx);
975                 return LDB_ERR_OPERATIONS_ERROR;
976         }
977         for (i=0; i < partition_attributes->num_values; i++) {
978                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
979                 char *p = strchr(base, ':');
980                 if (!p) {
981                         ldb_asprintf_errstring(module->ldb, 
982                                                 "partition_init: "
983                                                 "invalid form for partition record (missing ':'): %s", base);
984                         talloc_free(mem_ctx);
985                         return LDB_ERR_CONSTRAINT_VIOLATION;
986                 }
987                 p[0] = '\0';
988                 p++;
989                 if (!p[0]) {
990                         ldb_asprintf_errstring(module->ldb, 
991                                                 "partition_init: "
992                                                 "invalid form for partition record (missing backend database): %s", base);
993                         talloc_free(mem_ctx);
994                         return LDB_ERR_CONSTRAINT_VIOLATION;
995                 }
996                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
997                 if (!data->partitions[i]) {
998                         talloc_free(mem_ctx);
999                         return LDB_ERR_OPERATIONS_ERROR;
1000                 }
1001                 data->partitions[i]->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1002
1003                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], module->ldb, base);
1004                 if (!data->partitions[i]->dn) {
1005                         ldb_asprintf_errstring(module->ldb, 
1006                                                 "partition_init: invalid DN in partition record: %s", base);
1007                         talloc_free(mem_ctx);
1008                         return LDB_ERR_CONSTRAINT_VIOLATION;
1009                 }
1010
1011                 data->partitions[i]->backend = samdb_relative_path(module->ldb, 
1012                                                                    data->partitions[i], 
1013                                                                    p);
1014                 if (!data->partitions[i]->backend) {
1015                         ldb_asprintf_errstring(module->ldb, 
1016                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1017                         talloc_free(mem_ctx);                   
1018                 }
1019                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
1020                 if (ret != LDB_SUCCESS) {
1021                         talloc_free(mem_ctx);
1022                         return ret;
1023                 }
1024         }
1025         data->partitions[i] = NULL;
1026
1027         /* sort these into order, most to least specific */
1028         qsort(data->partitions, partition_attributes->num_values,
1029               sizeof(*data->partitions), partition_sort_compare);
1030
1031         for (i=0; data->partitions[i]; i++) {
1032                 struct ldb_request *req;
1033                 req = talloc_zero(mem_ctx, struct ldb_request);
1034                 if (req == NULL) {
1035                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1036                         talloc_free(mem_ctx);
1037                         return LDB_ERR_OPERATIONS_ERROR;
1038                 }
1039                 
1040                 req->operation = LDB_REQ_REGISTER_PARTITION;
1041                 req->op.reg_partition.dn = data->partitions[i]->dn;
1042                 req->callback = ldb_op_default_callback;
1043
1044                 ldb_set_timeout(module->ldb, req, 0);
1045
1046                 req->handle = ldb_handle_new(req, module->ldb);
1047                 if (req->handle == NULL) {
1048                         return LDB_ERR_OPERATIONS_ERROR;
1049                 }
1050                 
1051                 ret = ldb_request(module->ldb, req);
1052                 if (ret == LDB_SUCCESS) {
1053                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1054                 }
1055                 if (ret != LDB_SUCCESS) {
1056                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1057                         talloc_free(mem_ctx);
1058                         return LDB_ERR_OTHER;
1059                 }
1060                 talloc_free(req);
1061         }
1062
1063         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1064         if (!replicate_attributes) {
1065                 data->replicate = NULL;
1066         } else {
1067                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1068                 if (!data->replicate) {
1069                         talloc_free(mem_ctx);
1070                         return LDB_ERR_OPERATIONS_ERROR;
1071                 }
1072
1073                 for (i=0; i < replicate_attributes->num_values; i++) {
1074                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, module->ldb, &replicate_attributes->values[i]);
1075                         if (!ldb_dn_validate(data->replicate[i])) {
1076                                 ldb_asprintf_errstring(module->ldb, 
1077                                                         "partition_init: "
1078                                                         "invalid DN in partition replicate record: %s", 
1079                                                         replicate_attributes->values[i].data);
1080                                 talloc_free(mem_ctx);
1081                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1082                         }
1083                 }
1084                 data->replicate[i] = NULL;
1085         }
1086
1087         /* Make the private data available to any searches the modules may trigger in initialisation */
1088         module->private_data = data;
1089         talloc_steal(module, data);
1090         
1091         modules_attributes = ldb_msg_find_element(msg, "modules");
1092         if (modules_attributes) {
1093                 for (i=0; i < modules_attributes->num_values; i++) {
1094                         struct ldb_dn *base_dn;
1095                         int partition_idx;
1096                         struct dsdb_control_current_partition *partition = NULL;
1097                         const char **modules = NULL;
1098
1099                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1100                         char *p = strchr(base, ':');
1101                         if (!p) {
1102                                 ldb_asprintf_errstring(module->ldb, 
1103                                                         "partition_init: "
1104                                                         "invalid form for partition module record (missing ':'): %s", base);
1105                                 talloc_free(mem_ctx);
1106                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1107                         }
1108                         p[0] = '\0';
1109                         p++;
1110                         if (!p[0]) {
1111                                 ldb_asprintf_errstring(module->ldb, 
1112                                                         "partition_init: "
1113                                                         "invalid form for partition module record (missing backend database): %s", base);
1114                                 talloc_free(mem_ctx);
1115                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1116                         }
1117
1118                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
1119                                                                p);
1120                         
1121                         base_dn = ldb_dn_new(mem_ctx, module->ldb, base);
1122                         if (!ldb_dn_validate(base_dn)) {
1123                                 talloc_free(mem_ctx);
1124                                 return LDB_ERR_OPERATIONS_ERROR;
1125                         }
1126                         
1127                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1128                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
1129                                         partition = data->partitions[partition_idx];
1130                                         break;
1131                                 }
1132                         }
1133                         
1134                         if (!partition) {
1135                                 ldb_asprintf_errstring(module->ldb, 
1136                                                         "partition_init: "
1137                                                         "invalid form for partition module record (no such partition): %s", base);
1138                                 talloc_free(mem_ctx);
1139                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1140                         }
1141                         
1142                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
1143                         if (ret != LDB_SUCCESS) {
1144                                 ldb_asprintf_errstring(module->ldb, 
1145                                                        "partition_init: "
1146                                                        "loading backend for %s failed: %s", 
1147                                                        base, ldb_errstring(module->ldb));
1148                                 talloc_free(mem_ctx);
1149                                 return ret;
1150                         }
1151                         ret = ldb_init_module_chain(module->ldb, partition->module);
1152                         if (ret != LDB_SUCCESS) {
1153                                 ldb_asprintf_errstring(module->ldb, 
1154                                                        "partition_init: "
1155                                                        "initialising backend for %s failed: %s", 
1156                                                        base, ldb_errstring(module->ldb));
1157                                 talloc_free(mem_ctx);
1158                                 return ret;
1159                         }
1160                 }
1161         }
1162
1163         talloc_free(mem_ctx);
1164         return ldb_next_init(module);
1165 }
1166
1167 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1168         .name              = "partition",
1169         .init_context      = partition_init,
1170         .search            = partition_search,
1171         .add               = partition_add,
1172         .modify            = partition_modify,
1173         .del               = partition_delete,
1174         .rename            = partition_rename,
1175         .extended          = partition_extended,
1176         .sequence_number   = partition_sequence_number,
1177         .start_transaction = partition_start_trans,
1178         .end_transaction   = partition_end_trans,
1179         .del_transaction   = partition_del_trans,
1180 };