Merge commit 'master/master'
[samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v3 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 3 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  *  Author: Stefan Metzmacher
34  */
35
36 #include "includes.h"
37 #include "ldb/include/ldb_includes.h"
38 #include "dsdb/samdb/samdb.h"
39
40 struct partition_private_data {
41         struct dsdb_control_current_partition **partitions;
42         struct ldb_dn **replicate;
43 };
44
45 struct part_request {
46         struct ldb_module *module;
47         struct ldb_request *req;
48 };
49
50 struct partition_context {
51         struct ldb_module *module;
52         struct ldb_request *req;
53         bool got_success;
54
55         struct part_request *part_req;
56         int num_requests;
57         int finished_requests;
58 };
59
60 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
61 {
62         struct partition_context *ac;
63
64         ac = talloc_zero(req, struct partition_context);
65         if (ac == NULL) {
66                 ldb_set_errstring(module->ldb, "Out of Memory");
67                 return NULL;
68         }
69
70         ac->module = module;
71         ac->req = req;
72
73         return ac;
74 }
75
76 #define PARTITION_FIND_OP(module, op) do { \
77         struct ldb_context *ldbctx = module->ldb; \
78         while (module && module->ops->op == NULL) module = module->next; \
79         if (module == NULL) { \
80                 ldb_asprintf_errstring(ldbctx, \
81                         "Unable to find backend operation for " #op ); \
82                 return LDB_ERR_OPERATIONS_ERROR; \
83         } \
84 } while (0)
85
86 /*
87  *    helper functions to call the next module in chain
88  *    */
89
90 int partition_request(struct ldb_module *module, struct ldb_request *request)
91 {
92         int ret;
93         switch (request->operation) {
94         case LDB_SEARCH:
95                 PARTITION_FIND_OP(module, search);
96                 ret = module->ops->search(module, request);
97                 break;
98         case LDB_ADD:
99                 PARTITION_FIND_OP(module, add);
100                 ret = module->ops->add(module, request);
101                 break;
102         case LDB_MODIFY:
103                 PARTITION_FIND_OP(module, modify);
104                 ret = module->ops->modify(module, request);
105                 break;
106         case LDB_DELETE:
107                 PARTITION_FIND_OP(module, del);
108                 ret = module->ops->del(module, request);
109                 break;
110         case LDB_RENAME:
111                 PARTITION_FIND_OP(module, rename);
112                 ret = module->ops->rename(module, request);
113                 break;
114         case LDB_EXTENDED:
115                 PARTITION_FIND_OP(module, extended);
116                 ret = module->ops->extended(module, request);
117                 break;
118         case LDB_SEQUENCE_NUMBER:
119                 PARTITION_FIND_OP(module, sequence_number);
120                 ret = module->ops->sequence_number(module, request);
121                 break;
122         default:
123                 PARTITION_FIND_OP(module, request);
124                 ret = module->ops->request(module, request);
125                 break;
126         }
127         if (ret == LDB_SUCCESS) {
128                 return ret;
129         }
130         if (!ldb_errstring(module->ldb)) {
131                 /* Set a default error string, to place the blame somewhere */
132                 ldb_asprintf_errstring(module->ldb,
133                                         "error in module %s: %s (%d)",
134                                         module->ops->name,
135                                         ldb_strerror(ret), ret);
136         }
137         return ret;
138 }
139
140 static struct dsdb_control_current_partition *find_partition(struct partition_private_data *data,
141                                                              struct ldb_dn *dn)
142 {
143         int i;
144
145         /* Look at base DN */
146         /* Figure out which partition it is under */
147         /* Skip the lot if 'data' isn't here yet (initialistion) */
148         for (i=0; data && data->partitions && data->partitions[i]; i++) {
149                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
150                         return data->partitions[i];
151                 }
152         }
153
154         return NULL;
155 };
156
157 /**
158  * fire the caller's callback for every entry, but only send 'done' once.
159  */
160 static int partition_req_callback(struct ldb_request *req,
161                                   struct ldb_reply *ares)
162 {
163         struct partition_context *ac;
164         struct ldb_module *module;
165         struct ldb_request *nreq;
166         int ret;
167
168         ac = talloc_get_type(req->context, struct partition_context);
169
170         if (!ares) {
171                 return ldb_module_done(ac->req, NULL, NULL,
172                                         LDB_ERR_OPERATIONS_ERROR);
173         }
174
175         if (ares->error != LDB_SUCCESS && !ac->got_success) {
176                 return ldb_module_done(ac->req, ares->controls,
177                                         ares->response, ares->error);
178         }
179
180         switch (ares->type) {
181         case LDB_REPLY_REFERRAL:
182                 /* ignore referrals for now */
183                 break;
184
185         case LDB_REPLY_ENTRY:
186                 if (ac->req->operation != LDB_SEARCH) {
187                         ldb_set_errstring(ac->module->ldb,
188                                 "partition_req_callback:"
189                                 " Unsupported reply type for this request");
190                         return ldb_module_done(ac->req, NULL, NULL,
191                                                 LDB_ERR_OPERATIONS_ERROR);
192                 }
193                 return ldb_module_send_entry(ac->req, ares->message);
194
195         case LDB_REPLY_DONE:
196                 if (ares->error == LDB_SUCCESS) {
197                         ac->got_success = true;
198                 }
199                 if (ac->req->operation == LDB_EXTENDED) {
200                         /* FIXME: check for ares->response, replmd does not fill it ! */
201                         if (ares->response) {
202                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
203                                         ldb_set_errstring(ac->module->ldb,
204                                                           "partition_req_callback:"
205                                                           " Unknown extended reply, "
206                                                           "only supports START_TLS");
207                                         talloc_free(ares);
208                                         return ldb_module_done(ac->req, NULL, NULL,
209                                                                 LDB_ERR_OPERATIONS_ERROR);
210                                 }
211                         }
212                 }
213
214                 ac->finished_requests++;
215                 if (ac->finished_requests == ac->num_requests) {
216                         /* this was the last one, call callback */
217                         return ldb_module_done(ac->req, ares->controls,
218                                                ares->response, 
219                                                ac->got_success?LDB_SUCCESS:ares->error);
220                 }
221
222                 /* not the last, now call the next one */
223                 module = ac->part_req[ac->finished_requests].module;
224                 nreq = ac->part_req[ac->finished_requests].req;
225
226                 ret = partition_request(module, nreq);
227                 if (ret != LDB_SUCCESS) {
228                         talloc_free(ares);
229                         return ldb_module_done(ac->req, NULL, NULL, ret);
230                 }
231
232                 break;
233         }
234
235         talloc_free(ares);
236         return LDB_SUCCESS;
237 }
238
239 static int partition_prep_request(struct partition_context *ac,
240                                   struct dsdb_control_current_partition *partition)
241 {
242         int ret;
243         struct ldb_request *req;
244
245         ac->part_req = talloc_realloc(ac, ac->part_req,
246                                         struct part_request,
247                                         ac->num_requests + 1);
248         if (ac->part_req == NULL) {
249                 ldb_oom(ac->module->ldb);
250                 return LDB_ERR_OPERATIONS_ERROR;
251         }
252
253         switch (ac->req->operation) {
254         case LDB_SEARCH:
255                 ret = ldb_build_search_req_ex(&req, ac->module->ldb,
256                                         ac->part_req,
257                                         ac->req->op.search.base,
258                                         ac->req->op.search.scope,
259                                         ac->req->op.search.tree,
260                                         ac->req->op.search.attrs,
261                                         ac->req->controls,
262                                         ac, partition_req_callback,
263                                         ac->req);
264                 break;
265         case LDB_ADD:
266                 ret = ldb_build_add_req(&req, ac->module->ldb, ac->part_req,
267                                         ac->req->op.add.message,
268                                         ac->req->controls,
269                                         ac, partition_req_callback,
270                                         ac->req);
271                 break;
272         case LDB_MODIFY:
273                 ret = ldb_build_mod_req(&req, ac->module->ldb, ac->part_req,
274                                         ac->req->op.mod.message,
275                                         ac->req->controls,
276                                         ac, partition_req_callback,
277                                         ac->req);
278                 break;
279         case LDB_DELETE:
280                 ret = ldb_build_del_req(&req, ac->module->ldb, ac->part_req,
281                                         ac->req->op.del.dn,
282                                         ac->req->controls,
283                                         ac, partition_req_callback,
284                                         ac->req);
285                 break;
286         case LDB_RENAME:
287                 ret = ldb_build_rename_req(&req, ac->module->ldb, ac->part_req,
288                                         ac->req->op.rename.olddn,
289                                         ac->req->op.rename.newdn,
290                                         ac->req->controls,
291                                         ac, partition_req_callback,
292                                         ac->req);
293                 break;
294         case LDB_EXTENDED:
295                 ret = ldb_build_extended_req(&req, ac->module->ldb,
296                                         ac->part_req,
297                                         ac->req->op.extended.oid,
298                                         ac->req->op.extended.data,
299                                         ac->req->controls,
300                                         ac, partition_req_callback,
301                                         ac->req);
302                 break;
303         default:
304                 ldb_set_errstring(ac->module->ldb,
305                                   "Unsupported request type!");
306                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
307         }
308
309         if (ret != LDB_SUCCESS) {
310                 return ret;
311         }
312
313         ac->part_req[ac->num_requests].req = req;
314
315         if (ac->req->controls) {
316                 req->controls = talloc_memdup(req, ac->req->controls,
317                                         talloc_get_size(ac->req->controls));
318                 if (req->controls == NULL) {
319                         ldb_oom(ac->module->ldb);
320                         return LDB_ERR_OPERATIONS_ERROR;
321                 }
322         }
323
324         if (partition) {
325                 ac->part_req[ac->num_requests].module = partition->module;
326
327                 ret = ldb_request_add_control(req,
328                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
329                                         false, partition);
330                 if (ret != LDB_SUCCESS) {
331                         return ret;
332                 }
333
334                 if (req->operation == LDB_SEARCH) {
335                         /* If the search is for 'more' than this partition,
336                          * then change the basedn, so a remote LDAP server
337                          * doesn't object */
338                         if (ldb_dn_compare_base(partition->dn,
339                                                 req->op.search.base) != 0) {
340                                 req->op.search.base = partition->dn;
341                         }
342                 }
343
344         } else {
345                 /* make sure you put the NEXT module here, or
346                  * partition_request() will simply loop forever on itself */
347                 ac->part_req[ac->num_requests].module = ac->module->next;
348         }
349
350         ac->num_requests++;
351
352         return LDB_SUCCESS;
353 }
354
355 static int partition_call_first(struct partition_context *ac)
356 {
357         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
358 }
359
360 /**
361  * Send a request down to all the partitions
362  */
363 static int partition_send_all(struct ldb_module *module, 
364                               struct partition_context *ac, 
365                               struct ldb_request *req) 
366 {
367         int i;
368         struct partition_private_data *data = talloc_get_type(module->private_data, 
369                                                               struct partition_private_data);
370         int ret = partition_prep_request(ac, NULL);
371         if (ret != LDB_SUCCESS) {
372                 return ret;
373         }
374         for (i=0; data && data->partitions && data->partitions[i]; i++) {
375                 ret = partition_prep_request(ac, data->partitions[i]);
376                 if (ret != LDB_SUCCESS) {
377                         return ret;
378                 }
379         }
380
381         /* fire the first one */
382         return partition_call_first(ac);
383 }
384
385 /**
386  * Figure out which backend a request needs to be aimed at.  Some
387  * requests must be replicated to all backends
388  */
389 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
390 {
391         struct partition_context *ac;
392         unsigned i;
393         int ret;
394         struct dsdb_control_current_partition *partition;
395         struct partition_private_data *data = talloc_get_type(module->private_data, 
396                                                               struct partition_private_data);
397         if (!data || !data->partitions) {
398                 return ldb_next_request(module, req);
399         }
400         
401         if (req->operation != LDB_SEARCH) {
402                 /* Is this a special DN, we need to replicate to every backend? */
403                 for (i=0; data->replicate && data->replicate[i]; i++) {
404                         if (ldb_dn_compare(data->replicate[i], 
405                                            dn) == 0) {
406                                 
407                                 ac = partition_init_ctx(module, req);
408                                 if (!ac) {
409                                         return LDB_ERR_OPERATIONS_ERROR;
410                                 }
411                                 
412                                 return partition_send_all(module, ac, req);
413                         }
414                 }
415         }
416
417         /* Otherwise, we need to find the partition to fire it to */
418
419         /* Find partition */
420         partition = find_partition(data, dn);
421         if (!partition) {
422                 /*
423                  * if we haven't found a matching partition
424                  * pass the request to the main ldb
425                  *
426                  * TODO: we should maybe return an error here
427                  *       if it's not a special dn
428                  */
429
430                 return ldb_next_request(module, req);
431         }
432
433         ac = partition_init_ctx(module, req);
434         if (!ac) {
435                 return LDB_ERR_OPERATIONS_ERROR;
436         }
437
438         /* we need to add a control but we never touch the original request */
439         ret = partition_prep_request(ac, partition);
440         if (ret != LDB_SUCCESS) {
441                 return ret;
442         }
443
444         /* fire the first one */
445         return partition_call_first(ac);
446 }
447
448 /* search */
449 static int partition_search(struct ldb_module *module, struct ldb_request *req)
450 {
451         struct ldb_control **saved_controls;
452         
453         /* Find backend */
454         struct partition_private_data *data = talloc_get_type(module->private_data, 
455                                                               struct partition_private_data);
456         /* issue request */
457
458         /* (later) consider if we should be searching multiple
459          * partitions (for 'invisible' partition behaviour */
460
461         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
462         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
463         
464         struct ldb_search_options_control *search_options = NULL;
465         if (search_control) {
466                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
467         }
468
469         /* Remove the domain_scope control, so we don't confuse a backend server */
470         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
471                 ldb_oom(module->ldb);
472                 return LDB_ERR_OPERATIONS_ERROR;
473         }
474
475         /*
476          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
477          * down as uncritical to make windows 2008 dcpromo happy.
478          */
479         if (search_control) {
480                 search_control->critical = 0;
481         }
482
483         /* TODO:
484            Generate referrals (look for a partition under this DN) if we don't have the above control specified
485         */
486         
487         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
488                 int ret, i;
489                 struct partition_context *ac;
490                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
491                         /* We have processed this flag, so we are done with this control now */
492
493                         /* Remove search control, so we don't confuse a backend server */
494                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
495                                 ldb_oom(module->ldb);
496                                 return LDB_ERR_OPERATIONS_ERROR;
497                         }
498                 }
499                 ac = partition_init_ctx(module, req);
500                 if (!ac) {
501                         return LDB_ERR_OPERATIONS_ERROR;
502                 }
503
504                 /* Search from the base DN */
505                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
506                         return partition_send_all(module, ac, req);
507                 }
508                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
509                         bool match = false, stop = false;
510                         /* Find all partitions under the search base 
511                            
512                            we match if:
513
514                               1) the DN we are looking for exactly matches the partition
515                              or
516                               2) the DN we are looking for is a parent of the partition and it isn't
517                                  a scope base search
518                              or
519                               3) the DN we are looking for is a child of the partition
520                          */
521                         if (ldb_dn_compare(data->partitions[i]->dn, req->op.search.base) == 0) {
522                                 match = true;
523                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
524                                         stop = true;
525                                 }
526                         }
527                         if (!match && 
528                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0 &&
529                              req->op.search.scope != LDB_SCOPE_BASE)) {
530                                 match = true;
531                         }
532                         if (!match &&
533                             ldb_dn_compare_base(data->partitions[i]->dn, req->op.search.base) == 0) {
534                                 match = true;
535                                 stop = true; /* note that this relies on partition ordering */
536                         }
537                         if (match) {
538                                 ret = partition_prep_request(ac, data->partitions[i]);
539                                 if (ret != LDB_SUCCESS) {
540                                         return ret;
541                                 }
542                         }
543                         if (stop) break;
544                 }
545
546                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
547                 if (ac->num_requests == 0) {
548                         talloc_free(ac);
549                         return ldb_next_request(module, req);
550                 }
551
552                 /* fire the first one */
553                 return partition_call_first(ac);
554
555         } else {
556                 /* Handle this like all other requests */
557                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
558                         /* We have processed this flag, so we are done with this control now */
559
560                         /* Remove search control, so we don't confuse a backend server */
561                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
562                                 ldb_oom(module->ldb);
563                                 return LDB_ERR_OPERATIONS_ERROR;
564                         }
565                 }
566
567                 return partition_replicate(module, req, req->op.search.base);
568         }
569 }
570
571 /* add */
572 static int partition_add(struct ldb_module *module, struct ldb_request *req)
573 {
574         return partition_replicate(module, req, req->op.add.message->dn);
575 }
576
577 /* modify */
578 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
579 {
580         return partition_replicate(module, req, req->op.mod.message->dn);
581 }
582
583 /* delete */
584 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
585 {
586         return partition_replicate(module, req, req->op.del.dn);
587 }
588
589 /* rename */
590 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
591 {
592         int i, matched = -1;
593         /* Find backend */
594         struct dsdb_control_current_partition *backend, *backend2;
595         
596         struct partition_private_data *data = talloc_get_type(module->private_data, 
597                                                               struct partition_private_data);
598
599         /* Skip the lot if 'data' isn't here yet (initialization) */
600         if (!data) {
601                 return LDB_ERR_OPERATIONS_ERROR;
602         }
603
604         backend = find_partition(data, req->op.rename.olddn);
605         backend2 = find_partition(data, req->op.rename.newdn);
606
607         if ((backend && !backend2) || (!backend && backend2)) {
608                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
609         }
610
611         if (backend != backend2) {
612                 ldb_asprintf_errstring(module->ldb, 
613                                        "Cannot rename from %s in %s to %s in %s: %s",
614                                        ldb_dn_get_linearized(req->op.rename.olddn),
615                                        ldb_dn_get_linearized(backend->dn),
616                                        ldb_dn_get_linearized(req->op.rename.newdn),
617                                        ldb_dn_get_linearized(backend2->dn),
618                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
619                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
620         }
621
622         for (i=0; data && data->partitions && data->partitions[i]; i++) {
623                 if (ldb_dn_compare_base(data->partitions[i]->dn, req->op.rename.olddn) == 0) {
624                         matched = i;
625                 }
626         }
627
628         if (matched > 0) {
629                 ldb_asprintf_errstring(module->ldb, 
630                                        "Cannot rename from %s to %s, subtree rename would cross partition %s: %s",
631                                        ldb_dn_get_linearized(req->op.rename.olddn),
632                                        ldb_dn_get_linearized(req->op.rename.newdn),
633                                        ldb_dn_get_linearized(data->partitions[matched]->dn),
634                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
635                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
636         }
637
638         return partition_replicate(module, req, req->op.rename.olddn);
639 }
640
641 /* start a transaction */
642 static int partition_start_trans(struct ldb_module *module)
643 {
644         int i, ret;
645         struct partition_private_data *data = talloc_get_type(module->private_data, 
646                                                               struct partition_private_data);
647         /* Look at base DN */
648         /* Figure out which partition it is under */
649         /* Skip the lot if 'data' isn't here yet (initialization) */
650         ret = ldb_next_start_trans(module);
651         if (ret != LDB_SUCCESS) {
652                 return ret;
653         }
654
655         for (i=0; data && data->partitions && data->partitions[i]; i++) {
656                 struct ldb_module *next = data->partitions[i]->module;
657                 PARTITION_FIND_OP(next, start_transaction);
658
659                 ret = next->ops->start_transaction(next);
660                 if (ret != LDB_SUCCESS) {
661                         /* Back it out, if it fails on one */
662                         for (i--; i >= 0; i--) {
663                                 next = data->partitions[i]->module;
664                                 PARTITION_FIND_OP(next, del_transaction);
665
666                                 next->ops->del_transaction(next);
667                         }
668                         ldb_next_del_trans(module);
669                         return ret;
670                 }
671         }
672         return LDB_SUCCESS;
673 }
674
675 /* end a transaction */
676 static int partition_end_trans(struct ldb_module *module)
677 {
678         int i, ret;
679         struct partition_private_data *data = talloc_get_type(module->private_data, 
680                                                               struct partition_private_data);
681         ret = ldb_next_end_trans(module);
682         if (ret != LDB_SUCCESS) {
683                 return ret;
684         }
685
686         /* Look at base DN */
687         /* Figure out which partition it is under */
688         /* Skip the lot if 'data' isn't here yet (initialistion) */
689         for (i=0; data && data->partitions && data->partitions[i]; i++) {
690                 struct ldb_module *next = data->partitions[i]->module;
691                 PARTITION_FIND_OP(next, end_transaction);
692
693                 ret = next->ops->end_transaction(next);
694                 if (ret != LDB_SUCCESS) {
695                         /* Back it out, if it fails on one */
696                         for (i--; i >= 0; i--) {
697                                 next = data->partitions[i]->module;
698                                 PARTITION_FIND_OP(next, del_transaction);
699
700                                 next->ops->del_transaction(next);
701                         }
702                         ldb_next_del_trans(module);
703                         return ret;
704                 }
705         }
706
707         return LDB_SUCCESS;
708 }
709
710 /* delete a transaction */
711 static int partition_del_trans(struct ldb_module *module)
712 {
713         int i, ret, ret2 = LDB_SUCCESS;
714         struct partition_private_data *data = talloc_get_type(module->private_data, 
715                                                               struct partition_private_data);
716         ret = ldb_next_del_trans(module);
717         if (ret != LDB_SUCCESS) {
718                 ret2 = ret;
719         }
720
721         /* Look at base DN */
722         /* Figure out which partition it is under */
723         /* Skip the lot if 'data' isn't here yet (initialistion) */
724         for (i=0; data && data->partitions && data->partitions[i]; i++) {
725                 struct ldb_module *next = data->partitions[i]->module;
726                 PARTITION_FIND_OP(next, del_transaction);
727
728                 ret = next->ops->del_transaction(next);
729                 if (ret != LDB_SUCCESS) {
730                         ret2 = ret;
731                 }
732         }
733         return ret2;
734 }
735
736 /* NOTE: ldb_sequence_number is still a completely SYNCHRONOUS call
737  * implemented only in ldb_rdb. It does not require ldb_wait() to be
738  * called after a request is made */
739 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
740 {
741         int i, ret;
742         uint64_t seq_number = 0;
743         uint64_t timestamp_sequence = 0;
744         uint64_t timestamp = 0;
745         struct partition_private_data *data = talloc_get_type(module->private_data, 
746                                                               struct partition_private_data);
747
748         switch (req->op.seq_num.type) {
749         case LDB_SEQ_NEXT:
750         case LDB_SEQ_HIGHEST_SEQ:
751                 ret = ldb_next_request(module, req);
752                 if (ret != LDB_SUCCESS) {
753                         return ret;
754                 }
755                 if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
756                         timestamp_sequence = req->op.seq_num.seq_num;
757                 } else {
758                         seq_number = seq_number + req->op.seq_num.seq_num;
759                 }
760
761                 /* gross hack part1 */
762                 ret = ldb_request_add_control(req,
763                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
764                                         false, NULL);
765                 if (ret != LDB_SUCCESS) {
766                         return ret;
767                 }
768
769                 /* Look at base DN */
770                 /* Figure out which partition it is under */
771                 /* Skip the lot if 'data' isn't here yet (initialistion) */
772                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
773
774                         /* gross hack part2 */
775                         int j;
776                         for (j=0; req->controls[j]; j++) {
777                                 if (strcmp(req->controls[j]->oid, DSDB_CONTROL_CURRENT_PARTITION_OID) == 0) {
778                                         req->controls[j]->data = data->partitions[i];
779                                         break;
780                                 }
781                         }
782
783                         ret = partition_request(data->partitions[i]->module, req);
784                         if (ret != LDB_SUCCESS) {
785                                 return ret;
786                         }
787                         if (req->op.seq_num.flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
788                                 timestamp_sequence = MAX(timestamp_sequence, req->op.seq_num.seq_num);
789                         } else {
790                                 seq_number = seq_number + req->op.seq_num.seq_num;
791                         }
792                 }
793                 /* fall though */
794         case LDB_SEQ_HIGHEST_TIMESTAMP:
795         {
796                 struct ldb_request *date_req = talloc(req, struct ldb_request);
797                 if (!date_req) {
798                         return LDB_ERR_OPERATIONS_ERROR;
799                 }
800                 *date_req = *req;
801                 date_req->op.seq_num.flags = LDB_SEQ_HIGHEST_TIMESTAMP;
802
803                 ret = ldb_next_request(module, date_req);
804                 if (ret != LDB_SUCCESS) {
805                         return ret;
806                 }
807                 timestamp = date_req->op.seq_num.seq_num;
808
809                 /* Look at base DN */
810                 /* Figure out which partition it is under */
811                 /* Skip the lot if 'data' isn't here yet (initialistion) */
812                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
813
814                         ret = partition_request(data->partitions[i]->module, req);
815                         if (ret != LDB_SUCCESS) {
816                                 return ret;
817                         }
818                         timestamp = MAX(timestamp, date_req->op.seq_num.seq_num);
819                 }
820                 break;
821         }
822         }
823
824         switch (req->op.seq_num.flags) {
825         case LDB_SEQ_NEXT:
826         case LDB_SEQ_HIGHEST_SEQ:
827
828                 req->op.seq_num.flags = 0;
829
830                 /* Has someone above set a timebase sequence? */
831                 if (timestamp_sequence) {
832                         req->op.seq_num.seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
833                 } else {
834                         req->op.seq_num.seq_num = seq_number;
835                 }
836
837                 if (timestamp_sequence > req->op.seq_num.seq_num) {
838                         req->op.seq_num.seq_num = timestamp_sequence;
839                         req->op.seq_num.flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
840                 }
841
842                 req->op.seq_num.flags |= LDB_SEQ_GLOBAL_SEQUENCE;
843                 break;
844         case LDB_SEQ_HIGHEST_TIMESTAMP:
845                 req->op.seq_num.seq_num = timestamp;
846                 break;
847         }
848
849         switch (req->op.seq_num.flags) {
850         case LDB_SEQ_NEXT:
851                 req->op.seq_num.seq_num++;
852         }
853         return LDB_SUCCESS;
854 }
855
856 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
857 {
858         struct dsdb_extended_replicated_objects *ext;
859
860         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
861         if (!ext) {
862                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
863                 return LDB_ERR_PROTOCOL_ERROR;
864         }
865
866         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
867                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
868                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
869                 return LDB_ERR_PROTOCOL_ERROR;
870         }
871
872         return partition_replicate(module, req, ext->partition_dn);
873 }
874
875 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
876 {
877         struct dsdb_control_current_partition *partition;
878         struct partition_private_data *data;
879         struct ldb_dn *schema_dn;
880         struct partition_context *ac;
881         int ret;
882
883         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
884         if (!schema_dn) {
885                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
886                 return LDB_ERR_PROTOCOL_ERROR;
887         }
888
889         data = talloc_get_type(module->private_data, struct partition_private_data);
890         if (!data) {
891                 return LDB_ERR_OPERATIONS_ERROR;
892         }
893         
894         partition = find_partition( data, schema_dn );
895         if (!partition) {
896                 return ldb_next_request(module, req);
897         }
898
899         ac = partition_init_ctx(module, req);
900         if (!ac) {
901                 return LDB_ERR_OPERATIONS_ERROR;
902         }
903
904         /* we need to add a control but we never touch the original request */
905         ret = partition_prep_request(ac, partition);
906         if (ret != LDB_SUCCESS) {
907                 return ret;
908         }
909
910         /* fire the first one */
911         return partition_call_first(ac);
912 }
913
914
915 /* extended */
916 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
917 {
918         struct partition_private_data *data;
919         struct partition_context *ac;
920
921         data = talloc_get_type(module->private_data, struct partition_private_data);
922         if (!data || !data->partitions) {
923                 return ldb_next_request(module, req);
924         }
925
926         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
927                 return partition_extended_replicated_objects(module, req);
928         }
929
930         /* forward schemaUpdateNow operation to schema_fsmo module*/
931         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
932                 return partition_extended_schema_update_now( module, req );
933         }       
934
935         /* 
936          * as the extended operation has no dn
937          * we need to send it to all partitions
938          */
939
940         ac = partition_init_ctx(module, req);
941         if (!ac) {
942                 return LDB_ERR_OPERATIONS_ERROR;
943         }
944
945         return partition_send_all(module, ac, req);
946 }
947
948 static int partition_sort_compare(const void *v1, const void *v2)
949 {
950         struct dsdb_control_current_partition *p1;
951         struct dsdb_control_current_partition *p2;
952
953         p1 = *((struct dsdb_control_current_partition **)v1);
954         p2 = *((struct dsdb_control_current_partition **)v2);
955
956         return ldb_dn_compare(p1->dn, p2->dn);
957 }
958
959 static int partition_init(struct ldb_module *module)
960 {
961         int ret, i;
962         TALLOC_CTX *mem_ctx = talloc_new(module);
963         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
964         struct ldb_result *res;
965         struct ldb_message *msg;
966         struct ldb_message_element *partition_attributes;
967         struct ldb_message_element *replicate_attributes;
968         struct ldb_message_element *modules_attributes;
969
970         struct partition_private_data *data;
971
972         if (!mem_ctx) {
973                 return LDB_ERR_OPERATIONS_ERROR;
974         }
975
976         data = talloc(mem_ctx, struct partition_private_data);
977         if (data == NULL) {
978                 return LDB_ERR_OPERATIONS_ERROR;
979         }
980
981         ret = ldb_search(module->ldb, mem_ctx, &res,
982                          ldb_dn_new(mem_ctx, module->ldb, "@PARTITION"),
983                          LDB_SCOPE_BASE, attrs, NULL);
984         if (ret != LDB_SUCCESS) {
985                 talloc_free(mem_ctx);
986                 return ret;
987         }
988         if (res->count == 0) {
989                 talloc_free(mem_ctx);
990                 return ldb_next_init(module);
991         }
992
993         if (res->count > 1) {
994                 talloc_free(mem_ctx);
995                 return LDB_ERR_CONSTRAINT_VIOLATION;
996         }
997
998         msg = res->msgs[0];
999
1000         partition_attributes = ldb_msg_find_element(msg, "partition");
1001         if (!partition_attributes) {
1002                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
1003                 talloc_free(mem_ctx);
1004                 return LDB_ERR_CONSTRAINT_VIOLATION;
1005         }
1006         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
1007         if (!data->partitions) {
1008                 talloc_free(mem_ctx);
1009                 return LDB_ERR_OPERATIONS_ERROR;
1010         }
1011         for (i=0; i < partition_attributes->num_values; i++) {
1012                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1013                 char *p = strchr(base, ':');
1014                 if (!p) {
1015                         ldb_asprintf_errstring(module->ldb, 
1016                                                 "partition_init: "
1017                                                 "invalid form for partition record (missing ':'): %s", base);
1018                         talloc_free(mem_ctx);
1019                         return LDB_ERR_CONSTRAINT_VIOLATION;
1020                 }
1021                 p[0] = '\0';
1022                 p++;
1023                 if (!p[0]) {
1024                         ldb_asprintf_errstring(module->ldb, 
1025                                                 "partition_init: "
1026                                                 "invalid form for partition record (missing backend database): %s", base);
1027                         talloc_free(mem_ctx);
1028                         return LDB_ERR_CONSTRAINT_VIOLATION;
1029                 }
1030                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
1031                 if (!data->partitions[i]) {
1032                         talloc_free(mem_ctx);
1033                         return LDB_ERR_OPERATIONS_ERROR;
1034                 }
1035                 data->partitions[i]->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1036
1037                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], module->ldb, base);
1038                 if (!data->partitions[i]->dn) {
1039                         ldb_asprintf_errstring(module->ldb, 
1040                                                 "partition_init: invalid DN in partition record: %s", base);
1041                         talloc_free(mem_ctx);
1042                         return LDB_ERR_CONSTRAINT_VIOLATION;
1043                 }
1044
1045                 data->partitions[i]->backend = samdb_relative_path(module->ldb, 
1046                                                                    data->partitions[i], 
1047                                                                    p);
1048                 if (!data->partitions[i]->backend) {
1049                         ldb_asprintf_errstring(module->ldb, 
1050                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1051                         talloc_free(mem_ctx);                   
1052                 }
1053                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
1054                 if (ret != LDB_SUCCESS) {
1055                         talloc_free(mem_ctx);
1056                         return ret;
1057                 }
1058         }
1059         data->partitions[i] = NULL;
1060
1061         /* sort these into order, most to least specific */
1062         qsort(data->partitions, partition_attributes->num_values,
1063               sizeof(*data->partitions), partition_sort_compare);
1064
1065         for (i=0; data->partitions[i]; i++) {
1066                 struct ldb_request *req;
1067                 req = talloc_zero(mem_ctx, struct ldb_request);
1068                 if (req == NULL) {
1069                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1070                         talloc_free(mem_ctx);
1071                         return LDB_ERR_OPERATIONS_ERROR;
1072                 }
1073                 
1074                 req->operation = LDB_REQ_REGISTER_PARTITION;
1075                 req->op.reg_partition.dn = data->partitions[i]->dn;
1076                 req->callback = ldb_op_default_callback;
1077
1078                 ldb_set_timeout(module->ldb, req, 0);
1079
1080                 req->handle = ldb_handle_new(req, module->ldb);
1081                 if (req->handle == NULL) {
1082                         return LDB_ERR_OPERATIONS_ERROR;
1083                 }
1084                 
1085                 ret = ldb_request(module->ldb, req);
1086                 if (ret == LDB_SUCCESS) {
1087                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1088                 }
1089                 if (ret != LDB_SUCCESS) {
1090                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1091                         talloc_free(mem_ctx);
1092                         return LDB_ERR_OTHER;
1093                 }
1094                 talloc_free(req);
1095         }
1096
1097         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1098         if (!replicate_attributes) {
1099                 data->replicate = NULL;
1100         } else {
1101                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1102                 if (!data->replicate) {
1103                         talloc_free(mem_ctx);
1104                         return LDB_ERR_OPERATIONS_ERROR;
1105                 }
1106
1107                 for (i=0; i < replicate_attributes->num_values; i++) {
1108                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, module->ldb, &replicate_attributes->values[i]);
1109                         if (!ldb_dn_validate(data->replicate[i])) {
1110                                 ldb_asprintf_errstring(module->ldb, 
1111                                                         "partition_init: "
1112                                                         "invalid DN in partition replicate record: %s", 
1113                                                         replicate_attributes->values[i].data);
1114                                 talloc_free(mem_ctx);
1115                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1116                         }
1117                 }
1118                 data->replicate[i] = NULL;
1119         }
1120
1121         /* Make the private data available to any searches the modules may trigger in initialisation */
1122         module->private_data = data;
1123         talloc_steal(module, data);
1124         
1125         modules_attributes = ldb_msg_find_element(msg, "modules");
1126         if (modules_attributes) {
1127                 for (i=0; i < modules_attributes->num_values; i++) {
1128                         struct ldb_dn *base_dn;
1129                         int partition_idx;
1130                         struct dsdb_control_current_partition *partition = NULL;
1131                         const char **modules = NULL;
1132
1133                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1134                         char *p = strchr(base, ':');
1135                         if (!p) {
1136                                 ldb_asprintf_errstring(module->ldb, 
1137                                                         "partition_init: "
1138                                                         "invalid form for partition module record (missing ':'): %s", base);
1139                                 talloc_free(mem_ctx);
1140                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1141                         }
1142                         p[0] = '\0';
1143                         p++;
1144                         if (!p[0]) {
1145                                 ldb_asprintf_errstring(module->ldb, 
1146                                                         "partition_init: "
1147                                                         "invalid form for partition module record (missing backend database): %s", base);
1148                                 talloc_free(mem_ctx);
1149                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1150                         }
1151
1152                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
1153                                                                p);
1154                         
1155                         base_dn = ldb_dn_new(mem_ctx, module->ldb, base);
1156                         if (!ldb_dn_validate(base_dn)) {
1157                                 talloc_free(mem_ctx);
1158                                 return LDB_ERR_OPERATIONS_ERROR;
1159                         }
1160                         
1161                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1162                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
1163                                         partition = data->partitions[partition_idx];
1164                                         break;
1165                                 }
1166                         }
1167                         
1168                         if (!partition) {
1169                                 ldb_asprintf_errstring(module->ldb, 
1170                                                         "partition_init: "
1171                                                         "invalid form for partition module record (no such partition): %s", base);
1172                                 talloc_free(mem_ctx);
1173                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1174                         }
1175                         
1176                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
1177                         if (ret != LDB_SUCCESS) {
1178                                 ldb_asprintf_errstring(module->ldb, 
1179                                                        "partition_init: "
1180                                                        "loading backend for %s failed: %s", 
1181                                                        base, ldb_errstring(module->ldb));
1182                                 talloc_free(mem_ctx);
1183                                 return ret;
1184                         }
1185                         ret = ldb_init_module_chain(module->ldb, partition->module);
1186                         if (ret != LDB_SUCCESS) {
1187                                 ldb_asprintf_errstring(module->ldb, 
1188                                                        "partition_init: "
1189                                                        "initialising backend for %s failed: %s", 
1190                                                        base, ldb_errstring(module->ldb));
1191                                 talloc_free(mem_ctx);
1192                                 return ret;
1193                         }
1194                 }
1195         }
1196
1197         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1198         if (ret != LDB_SUCCESS) {
1199                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1200                         "partition: Unable to register control with rootdse!\n");
1201                 return LDB_ERR_OPERATIONS_ERROR;
1202         }
1203
1204         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1205         if (ret != LDB_SUCCESS) {
1206                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1207                         "partition: Unable to register control with rootdse!\n");
1208                 return LDB_ERR_OPERATIONS_ERROR;
1209         }
1210
1211         talloc_free(mem_ctx);
1212         return ldb_next_init(module);
1213 }
1214
1215 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1216         .name              = "partition",
1217         .init_context      = partition_init,
1218         .search            = partition_search,
1219         .add               = partition_add,
1220         .modify            = partition_modify,
1221         .del               = partition_delete,
1222         .rename            = partition_rename,
1223         .extended          = partition_extended,
1224         .sequence_number   = partition_sequence_number,
1225         .start_transaction = partition_start_trans,
1226         .end_transaction   = partition_end_trans,
1227         .del_transaction   = partition_del_trans,
1228 };