s4:dsdb Rework modules create new partitions at runtime
[ira/wip.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22  *  Name: ldb
23  *
24  *  Component: ldb partitions module
25  *
26  *  Description: Implement LDAP partitions
27  *
28  *  Author: Andrew Bartlett
29  *  Author: Stefan Metzmacher
30  */
31
32 #include "dsdb/samdb/ldb_modules/partition.h"
33
34 struct part_request {
35         struct ldb_module *module;
36         struct ldb_request *req;
37 };
38
39 struct partition_context {
40         struct ldb_module *module;
41         struct ldb_request *req;
42         bool got_success;
43
44         struct part_request *part_req;
45         int num_requests;
46         int finished_requests;
47 };
48
49 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
50 {
51         struct partition_context *ac;
52
53         ac = talloc_zero(req, struct partition_context);
54         if (ac == NULL) {
55                 ldb_set_errstring(ldb_module_get_ctx(module), "Out of Memory");
56                 return NULL;
57         }
58
59         ac->module = module;
60         ac->req = req;
61
62         return ac;
63 }
64
65 /*
66  *    helper functions to call the next module in chain
67  *    */
68
69 static int partition_request(struct ldb_module *module, struct ldb_request *request)
70 {
71         int ret;
72         switch (request->operation) {
73         case LDB_SEARCH:
74                 PARTITION_FIND_OP(module, search);
75                 ret = module->ops->search(module, request);
76                 break;
77         case LDB_ADD:
78                 PARTITION_FIND_OP(module, add);
79                 ret = module->ops->add(module, request);
80                 break;
81         case LDB_MODIFY:
82                 PARTITION_FIND_OP(module, modify);
83                 ret = module->ops->modify(module, request);
84                 break;
85         case LDB_DELETE:
86                 PARTITION_FIND_OP(module, del);
87                 ret = module->ops->del(module, request);
88                 break;
89         case LDB_RENAME:
90                 PARTITION_FIND_OP(module, rename);
91                 ret = module->ops->rename(module, request);
92                 break;
93         case LDB_EXTENDED:
94                 PARTITION_FIND_OP(module, extended);
95                 ret = module->ops->extended(module, request);
96                 break;
97         default:
98                 PARTITION_FIND_OP(module, request);
99                 ret = module->ops->request(module, request);
100                 break;
101         }
102         if (ret == LDB_SUCCESS) {
103                 return ret;
104         }
105         if (!ldb_errstring(ldb_module_get_ctx(module))) {
106                 /* Set a default error string, to place the blame somewhere */
107                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
108                                         "error in module %s: %s (%d)",
109                                         module->ops->name,
110                                         ldb_strerror(ret), ret);
111         }
112         return ret;
113 }
114
115 static struct dsdb_partition *find_partition(struct partition_private_data *data,
116                                              struct ldb_dn *dn,
117                                              struct ldb_request *req)
118 {
119         int i;
120         struct ldb_control *partition_ctrl;
121
122         /* see if the request has the partition DN specified in a
123          * control. The repl_meta_data module can specify this to
124          * ensure that replication happens to the right partition
125          */
126         partition_ctrl = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
127         if (partition_ctrl) {
128                 const struct dsdb_control_current_partition *partition;
129                 partition = talloc_get_type(partition_ctrl->data,
130                                             struct dsdb_control_current_partition);
131                 if (partition != NULL) {
132                         dn = partition->dn;
133                 }
134         }
135
136         if (dn == NULL) {
137                 return NULL;
138         }
139
140         /* Look at base DN */
141         /* Figure out which partition it is under */
142         /* Skip the lot if 'data' isn't here yet (initialisation) */
143         for (i=0; data && data->partitions && data->partitions[i]; i++) {
144                 if (ldb_dn_compare_base(data->partitions[i]->ctrl->dn, dn) == 0) {
145                         return data->partitions[i];
146                 }
147         }
148
149         return NULL;
150 }
151
152 /**
153  * fire the caller's callback for every entry, but only send 'done' once.
154  */
155 static int partition_req_callback(struct ldb_request *req,
156                                   struct ldb_reply *ares)
157 {
158         struct partition_context *ac;
159         struct ldb_module *module;
160         struct ldb_request *nreq;
161         int ret, i;
162         struct partition_private_data *data;
163
164         ac = talloc_get_type(req->context, struct partition_context);
165         data = talloc_get_type(ac->module->private_data, struct partition_private_data);
166
167         if (!ares) {
168                 return ldb_module_done(ac->req, NULL, NULL,
169                                         LDB_ERR_OPERATIONS_ERROR);
170         }
171
172         if (ares->error != LDB_SUCCESS && !ac->got_success) {
173                 return ldb_module_done(ac->req, ares->controls,
174                                         ares->response, ares->error);
175         }
176
177         switch (ares->type) {
178         case LDB_REPLY_REFERRAL:
179                 /* ignore referrals for now */
180                 break;
181
182         case LDB_REPLY_ENTRY:
183                 if (ac->req->operation != LDB_SEARCH) {
184                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
185                                 "partition_req_callback:"
186                                 " Unsupported reply type for this request");
187                         return ldb_module_done(ac->req, NULL, NULL,
188                                                 LDB_ERR_OPERATIONS_ERROR);
189                 }
190                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
191                         if (ldb_dn_compare(ares->message->dn, data->partitions[i]->ctrl->dn) == 0) {
192                                 struct ldb_control *part_control;
193                                 /* this is a partition root message - make
194                                    sure it isn't one of our fake root
195                                    entries from a parent partition */
196                                 part_control = ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID);
197                                 if (part_control && part_control->data != data->partitions[i]->ctrl) {
198                                         DEBUG(6,(__location__ ": Discarding partition mount object %s\n",
199                                                  ldb_dn_get_linearized(ares->message->dn)));
200                                         talloc_free(ares);
201                                         return LDB_SUCCESS;
202                                 }
203                         }
204                 }
205                 
206                 return ldb_module_send_entry(ac->req, ares->message, ares->controls);
207
208         case LDB_REPLY_DONE:
209                 if (ares->error == LDB_SUCCESS) {
210                         ac->got_success = true;
211                 }
212                 if (ac->req->operation == LDB_EXTENDED) {
213                         /* FIXME: check for ares->response, replmd does not fill it ! */
214                         if (ares->response) {
215                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
216                                         ldb_set_errstring(ldb_module_get_ctx(ac->module),
217                                                           "partition_req_callback:"
218                                                           " Unknown extended reply, "
219                                                           "only supports START_TLS");
220                                         talloc_free(ares);
221                                         return ldb_module_done(ac->req, NULL, NULL,
222                                                                 LDB_ERR_OPERATIONS_ERROR);
223                                 }
224                         }
225                 }
226
227                 ac->finished_requests++;
228                 if (ac->finished_requests == ac->num_requests) {
229                         /* this was the last one, call callback */
230                         return ldb_module_done(ac->req, ares->controls,
231                                                ares->response, 
232                                                ac->got_success?LDB_SUCCESS:ares->error);
233                 }
234
235                 /* not the last, now call the next one */
236                 module = ac->part_req[ac->finished_requests].module;
237                 nreq = ac->part_req[ac->finished_requests].req;
238
239                 ret = partition_request(module, nreq);
240                 if (ret != LDB_SUCCESS) {
241                         talloc_free(ares);
242                         return ldb_module_done(ac->req, NULL, NULL, ret);
243                 }
244
245                 break;
246         }
247
248         talloc_free(ares);
249         return LDB_SUCCESS;
250 }
251
252 static int partition_prep_request(struct partition_context *ac,
253                                   struct dsdb_partition *partition)
254 {
255         int ret;
256         struct ldb_request *req;
257
258         ac->part_req = talloc_realloc(ac, ac->part_req,
259                                         struct part_request,
260                                         ac->num_requests + 1);
261         if (ac->part_req == NULL) {
262                 ldb_oom(ldb_module_get_ctx(ac->module));
263                 return LDB_ERR_OPERATIONS_ERROR;
264         }
265
266         switch (ac->req->operation) {
267         case LDB_SEARCH:
268                 ret = ldb_build_search_req_ex(&req, ldb_module_get_ctx(ac->module),
269                                         ac->part_req,
270                                         ac->req->op.search.base,
271                                         ac->req->op.search.scope,
272                                         ac->req->op.search.tree,
273                                         ac->req->op.search.attrs,
274                                         ac->req->controls,
275                                         ac, partition_req_callback,
276                                         ac->req);
277                 break;
278         case LDB_ADD:
279                 ret = ldb_build_add_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
280                                         ac->req->op.add.message,
281                                         ac->req->controls,
282                                         ac, partition_req_callback,
283                                         ac->req);
284                 break;
285         case LDB_MODIFY:
286                 ret = ldb_build_mod_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
287                                         ac->req->op.mod.message,
288                                         ac->req->controls,
289                                         ac, partition_req_callback,
290                                         ac->req);
291                 break;
292         case LDB_DELETE:
293                 ret = ldb_build_del_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
294                                         ac->req->op.del.dn,
295                                         ac->req->controls,
296                                         ac, partition_req_callback,
297                                         ac->req);
298                 break;
299         case LDB_RENAME:
300                 ret = ldb_build_rename_req(&req, ldb_module_get_ctx(ac->module), ac->part_req,
301                                         ac->req->op.rename.olddn,
302                                         ac->req->op.rename.newdn,
303                                         ac->req->controls,
304                                         ac, partition_req_callback,
305                                         ac->req);
306                 break;
307         case LDB_EXTENDED:
308                 ret = ldb_build_extended_req(&req, ldb_module_get_ctx(ac->module),
309                                         ac->part_req,
310                                         ac->req->op.extended.oid,
311                                         ac->req->op.extended.data,
312                                         ac->req->controls,
313                                         ac, partition_req_callback,
314                                         ac->req);
315                 break;
316         default:
317                 ldb_set_errstring(ldb_module_get_ctx(ac->module),
318                                   "Unsupported request type!");
319                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
320         }
321
322         if (ret != LDB_SUCCESS) {
323                 return ret;
324         }
325
326         ac->part_req[ac->num_requests].req = req;
327
328         if (ac->req->controls) {
329                 req->controls = talloc_memdup(req, ac->req->controls,
330                                         talloc_get_size(ac->req->controls));
331                 if (req->controls == NULL) {
332                         ldb_oom(ldb_module_get_ctx(ac->module));
333                         return LDB_ERR_OPERATIONS_ERROR;
334                 }
335         }
336
337         if (partition) {
338                 ac->part_req[ac->num_requests].module = partition->module;
339
340                 if (!ldb_request_get_control(req, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
341                         ret = ldb_request_add_control(req,
342                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
343                                                       false, partition->ctrl);
344                         if (ret != LDB_SUCCESS) {
345                                 return ret;
346                         }
347                 }
348
349                 if (req->operation == LDB_SEARCH) {
350                         /* If the search is for 'more' than this partition,
351                          * then change the basedn, so a remote LDAP server
352                          * doesn't object */
353                         if (ldb_dn_compare_base(partition->ctrl->dn,
354                                                 req->op.search.base) != 0) {
355                                 req->op.search.base = partition->ctrl->dn;
356                         }
357                 }
358
359         } else {
360                 /* make sure you put the NEXT module here, or
361                  * partition_request() will simply loop forever on itself */
362                 ac->part_req[ac->num_requests].module = ac->module->next;
363         }
364
365         ac->num_requests++;
366
367         return LDB_SUCCESS;
368 }
369
370 static int partition_call_first(struct partition_context *ac)
371 {
372         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
373 }
374
375 /**
376  * Send a request down to all the partitions
377  */
378 static int partition_send_all(struct ldb_module *module, 
379                               struct partition_context *ac, 
380                               struct ldb_request *req) 
381 {
382         int i;
383         struct partition_private_data *data = talloc_get_type(module->private_data, 
384                                                               struct partition_private_data);
385         int ret = partition_prep_request(ac, NULL);
386         if (ret != LDB_SUCCESS) {
387                 return ret;
388         }
389         for (i=0; data && data->partitions && data->partitions[i]; i++) {
390                 ret = partition_prep_request(ac, data->partitions[i]);
391                 if (ret != LDB_SUCCESS) {
392                         return ret;
393                 }
394         }
395
396         /* fire the first one */
397         return partition_call_first(ac);
398 }
399
400 /**
401  * Figure out which backend a request needs to be aimed at.  Some
402  * requests must be replicated to all backends
403  */
404 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
405 {
406         struct partition_context *ac;
407         unsigned i;
408         int ret;
409         struct dsdb_partition *partition;
410         struct partition_private_data *data = talloc_get_type(module->private_data, 
411                                                               struct partition_private_data);
412         if (!data || !data->partitions) {
413                 return ldb_next_request(module, req);
414         }
415         
416         if (req->operation != LDB_SEARCH) {
417                 /* Is this a special DN, we need to replicate to every backend? */
418                 for (i=0; data->replicate && data->replicate[i]; i++) {
419                         if (ldb_dn_compare(data->replicate[i], 
420                                            dn) == 0) {
421                                 
422                                 ac = partition_init_ctx(module, req);
423                                 if (!ac) {
424                                         return LDB_ERR_OPERATIONS_ERROR;
425                                 }
426                                 
427                                 return partition_send_all(module, ac, req);
428                         }
429                 }
430         }
431
432         /* Otherwise, we need to find the partition to fire it to */
433
434         /* Find partition */
435         partition = find_partition(data, dn, req);
436         if (!partition) {
437                 /*
438                  * if we haven't found a matching partition
439                  * pass the request to the main ldb
440                  *
441                  * TODO: we should maybe return an error here
442                  *       if it's not a special dn
443                  */
444
445                 return ldb_next_request(module, req);
446         }
447
448         ac = partition_init_ctx(module, req);
449         if (!ac) {
450                 return LDB_ERR_OPERATIONS_ERROR;
451         }
452
453         /* we need to add a control but we never touch the original request */
454         ret = partition_prep_request(ac, partition);
455         if (ret != LDB_SUCCESS) {
456                 return ret;
457         }
458
459         /* fire the first one */
460         return partition_call_first(ac);
461 }
462
463 /* search */
464 static int partition_search(struct ldb_module *module, struct ldb_request *req)
465 {
466         struct ldb_control **saved_controls;
467         /* Find backend */
468         struct partition_private_data *data = talloc_get_type(module->private_data, 
469                                                               struct partition_private_data);
470
471         /* issue request */
472
473         /* (later) consider if we should be searching multiple
474          * partitions (for 'invisible' partition behaviour */
475
476         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
477         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
478         
479         struct ldb_search_options_control *search_options = NULL;
480         struct dsdb_partition *p;
481         
482         p = find_partition(data, NULL, req);
483         if (p != NULL) {
484                 /* the caller specified what partition they want the
485                  * search - just pass it on
486                  */
487                 return ldb_next_request(p->module, req);                
488         }
489
490
491         if (search_control) {
492                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
493         }
494
495         /* Remove the domain_scope control, so we don't confuse a backend server */
496         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
497                 ldb_oom(ldb_module_get_ctx(module));
498                 return LDB_ERR_OPERATIONS_ERROR;
499         }
500
501         /*
502          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
503          * down as uncritical to make windows 2008 dcpromo happy.
504          */
505         if (search_control) {
506                 search_control->critical = 0;
507         }
508
509         /* TODO:
510            Generate referrals (look for a partition under this DN) if we don't have the above control specified
511         */
512         
513         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
514                 int ret, i;
515                 struct partition_context *ac;
516                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
517                         /* We have processed this flag, so we are done with this control now */
518
519                         /* Remove search control, so we don't confuse a backend server */
520                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
521                                 ldb_oom(ldb_module_get_ctx(module));
522                                 return LDB_ERR_OPERATIONS_ERROR;
523                         }
524                 }
525                 ac = partition_init_ctx(module, req);
526                 if (!ac) {
527                         return LDB_ERR_OPERATIONS_ERROR;
528                 }
529
530                 /* Search from the base DN */
531                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
532                         return partition_send_all(module, ac, req);
533                 }
534                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
535                         bool match = false, stop = false;
536                         /* Find all partitions under the search base 
537                            
538                            we match if:
539
540                               1) the DN we are looking for exactly matches the partition
541                              or
542                               2) the DN we are looking for is a parent of the partition and it isn't
543                                  a scope base search
544                              or
545                               3) the DN we are looking for is a child of the partition
546                          */
547                         if (ldb_dn_compare(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
548                                 match = true;
549                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
550                                         stop = true;
551                                 }
552                         }
553                         if (!match && 
554                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->ctrl->dn) == 0 &&
555                              req->op.search.scope != LDB_SCOPE_BASE)) {
556                                 match = true;
557                         }
558                         if (!match &&
559                             ldb_dn_compare_base(data->partitions[i]->ctrl->dn, req->op.search.base) == 0) {
560                                 match = true;
561                                 stop = true; /* note that this relies on partition ordering */
562                         }
563                         if (match) {
564                                 ret = partition_prep_request(ac, data->partitions[i]);
565                                 if (ret != LDB_SUCCESS) {
566                                         return ret;
567                                 }
568                         }
569                         if (stop) break;
570                 }
571
572                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
573                 if (ac->num_requests == 0) {
574                         talloc_free(ac);
575                         return ldb_next_request(module, req);
576                 }
577
578                 /* fire the first one */
579                 return partition_call_first(ac);
580
581         } else {
582                 /* Handle this like all other requests */
583                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
584                         /* We have processed this flag, so we are done with this control now */
585
586                         /* Remove search control, so we don't confuse a backend server */
587                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
588                                 ldb_oom(ldb_module_get_ctx(module));
589                                 return LDB_ERR_OPERATIONS_ERROR;
590                         }
591                 }
592
593                 return partition_replicate(module, req, req->op.search.base);
594         }
595 }
596
597 /* add */
598 static int partition_add(struct ldb_module *module, struct ldb_request *req)
599 {
600         return partition_replicate(module, req, req->op.add.message->dn);
601 }
602
603 /* modify */
604 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
605 {
606         return partition_replicate(module, req, req->op.mod.message->dn);
607 }
608
609 /* delete */
610 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
611 {
612         return partition_replicate(module, req, req->op.del.dn);
613 }
614
615 /* rename */
616 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
617 {
618         /* Find backend */
619         struct dsdb_partition *backend, *backend2;
620         
621         struct partition_private_data *data = talloc_get_type(module->private_data, 
622                                                               struct partition_private_data);
623
624         /* Skip the lot if 'data' isn't here yet (initialisation) */
625         if (!data) {
626                 return LDB_ERR_OPERATIONS_ERROR;
627         }
628
629         backend = find_partition(data, req->op.rename.olddn, req);
630         backend2 = find_partition(data, req->op.rename.newdn, req);
631
632         if ((backend && !backend2) || (!backend && backend2)) {
633                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
634         }
635
636         if (backend != backend2) {
637                 ldb_asprintf_errstring(ldb_module_get_ctx(module), 
638                                        "Cannot rename from %s in %s to %s in %s: %s",
639                                        ldb_dn_get_linearized(req->op.rename.olddn),
640                                        ldb_dn_get_linearized(backend->ctrl->dn),
641                                        ldb_dn_get_linearized(req->op.rename.newdn),
642                                        ldb_dn_get_linearized(backend2->ctrl->dn),
643                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
644                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
645         }
646
647         return partition_replicate(module, req, req->op.rename.olddn);
648 }
649
650 /* start a transaction */
651 static int partition_start_trans(struct ldb_module *module)
652 {
653         int i, ret;
654         struct partition_private_data *data = talloc_get_type(module->private_data, 
655                                                               struct partition_private_data);
656         /* Look at base DN */
657         /* Figure out which partition it is under */
658         /* Skip the lot if 'data' isn't here yet (initialization) */
659         ret = ldb_next_start_trans(module);
660         if (ret != LDB_SUCCESS) {
661                 return ret;
662         }
663
664         for (i=0; data && data->partitions && data->partitions[i]; i++) {
665                 struct ldb_module *next = data->partitions[i]->module;
666                 PARTITION_FIND_OP(next, start_transaction);
667
668                 ret = next->ops->start_transaction(next);
669                 if (ret != LDB_SUCCESS) {
670                         /* Back it out, if it fails on one */
671                         for (i--; i >= 0; i--) {
672                                 next = data->partitions[i]->module;
673                                 PARTITION_FIND_OP(next, del_transaction);
674
675                                 next->ops->del_transaction(next);
676                         }
677                         ldb_next_del_trans(module);
678                         return ret;
679                 }
680         }
681         return LDB_SUCCESS;
682 }
683
684 /* prepare for a commit */
685 static int partition_prepare_commit(struct ldb_module *module)
686 {
687         int i;
688         struct partition_private_data *data = talloc_get_type(module->private_data, 
689                                                               struct partition_private_data);
690
691         for (i=0; data && data->partitions && data->partitions[i]; i++) {
692                 struct ldb_module *next_prepare = data->partitions[i]->module;
693                 int ret;
694
695                 PARTITION_FIND_OP_NOERROR(next_prepare, prepare_commit);
696                 if (next_prepare == NULL) {
697                         continue;
698                 }
699
700                 ret = next_prepare->ops->prepare_commit(next_prepare);
701                 if (ret != LDB_SUCCESS) {
702                         return ret;
703                 }
704         }
705
706         return ldb_next_prepare_commit(module);
707 }
708
709
710 /* end a transaction */
711 static int partition_end_trans(struct ldb_module *module)
712 {
713         int i;
714         struct partition_private_data *data = talloc_get_type(module->private_data, 
715                                                               struct partition_private_data);
716         for (i=0; data && data->partitions && data->partitions[i]; i++) {
717                 struct ldb_module *next_end = data->partitions[i]->module;
718                 int ret;
719
720                 PARTITION_FIND_OP(next_end, end_transaction);
721
722                 ret = next_end->ops->end_transaction(next_end);
723                 if (ret != LDB_SUCCESS) {
724                         return ret;
725                 }
726         }
727
728         return ldb_next_end_trans(module);
729 }
730
731 /* delete a transaction */
732 static int partition_del_trans(struct ldb_module *module)
733 {
734         int i, ret, final_ret = LDB_SUCCESS;
735         struct partition_private_data *data = talloc_get_type(module->private_data, 
736                                                               struct partition_private_data);
737         for (i=0; data && data->partitions && data->partitions[i]; i++) {
738                 struct ldb_module *next = data->partitions[i]->module;
739                 PARTITION_FIND_OP(next, del_transaction);
740
741                 ret = next->ops->del_transaction(next);
742                 if (ret != LDB_SUCCESS) {
743                         final_ret = ret;
744                 }
745         }       
746
747         ret = ldb_next_del_trans(module);
748         if (ret != LDB_SUCCESS) {
749                 final_ret = ret;
750         }
751         return final_ret;
752 }
753
754
755 /* FIXME: This function is still semi-async */
756 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
757 {
758         int i, ret;
759         uint64_t seq_number = 0;
760         uint64_t timestamp_sequence = 0;
761         uint64_t timestamp = 0;
762         struct partition_private_data *data = talloc_get_type(module->private_data, 
763                                                               struct partition_private_data);
764         struct ldb_seqnum_request *seq;
765         struct ldb_seqnum_result *seqr;
766         struct ldb_request *treq;
767         struct ldb_seqnum_request *tseq;
768         struct ldb_seqnum_result *tseqr;
769         struct ldb_extended *ext;
770         struct ldb_result *res;
771         struct dsdb_partition *p;
772
773         p = find_partition(data, NULL, req);
774         if (p != NULL) {
775                 /* the caller specified what partition they want the
776                  * sequence number operation on - just pass it on
777                  */
778                 return ldb_next_request(p->module, req);                
779         }
780
781         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
782
783         switch (seq->type) {
784         case LDB_SEQ_NEXT:
785         case LDB_SEQ_HIGHEST_SEQ:
786                 res = talloc_zero(req, struct ldb_result);
787                 if (res == NULL) {
788                         return LDB_ERR_OPERATIONS_ERROR;
789                 }
790                 tseq = talloc_zero(res, struct ldb_seqnum_request);
791                 if (tseq == NULL) {
792                         talloc_free(res);
793                         return LDB_ERR_OPERATIONS_ERROR;
794                 }
795                 tseq->type = seq->type;
796
797                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
798                                              LDB_EXTENDED_SEQUENCE_NUMBER,
799                                              tseq,
800                                              NULL,
801                                              res,
802                                              ldb_extended_default_callback,
803                                              NULL);
804                 if (ret != LDB_SUCCESS) {
805                         talloc_free(res);
806                         return ret;
807                 }
808
809                 ret = ldb_next_request(module, treq);
810                 if (ret != LDB_SUCCESS) {
811                         talloc_free(res);
812                         return ret;
813                 }
814                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
815                 if (ret != LDB_SUCCESS) {
816                         talloc_free(res);
817                         return ret;
818                 }
819
820                 seqr = talloc_get_type(res->extended->data,
821                                         struct ldb_seqnum_result);
822                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
823                         timestamp_sequence = seqr->seq_num;
824                 } else {
825                         seq_number += seqr->seq_num;
826                 }
827                 talloc_free(res);
828
829                 /* Skip the lot if 'data' isn't here yet (initialisation) */
830                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
831
832                         res = talloc_zero(req, struct ldb_result);
833                         if (res == NULL) {
834                                 return LDB_ERR_OPERATIONS_ERROR;
835                         }
836                         tseq = talloc_zero(res, struct ldb_seqnum_request);
837                         if (tseq == NULL) {
838                                 talloc_free(res);
839                                 return LDB_ERR_OPERATIONS_ERROR;
840                         }
841                         tseq->type = seq->type;
842
843                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
844                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
845                                                      tseq,
846                                                      NULL,
847                                                      res,
848                                                      ldb_extended_default_callback,
849                                                      NULL);
850                         if (ret != LDB_SUCCESS) {
851                                 talloc_free(res);
852                                 return ret;
853                         }
854
855                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
856                                 ret = ldb_request_add_control(treq,
857                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
858                                                               false, data->partitions[i]->ctrl);
859                                 if (ret != LDB_SUCCESS) {
860                                         talloc_free(res);
861                                         return ret;
862                                 }
863                         }
864
865                         ret = partition_request(data->partitions[i]->module, treq);
866                         if (ret != LDB_SUCCESS) {
867                                 talloc_free(res);
868                                 return ret;
869                         }
870                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
871                         if (ret != LDB_SUCCESS) {
872                                 talloc_free(res);
873                                 return ret;
874                         }
875                         tseqr = talloc_get_type(res->extended->data,
876                                                 struct ldb_seqnum_result);
877                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
878                                 timestamp_sequence = MAX(timestamp_sequence,
879                                                          tseqr->seq_num);
880                         } else {
881                                 seq_number += tseqr->seq_num;
882                         }
883                         talloc_free(res);
884                 }
885                 /* fall through */
886         case LDB_SEQ_HIGHEST_TIMESTAMP:
887
888                 res = talloc_zero(req, struct ldb_result);
889                 if (res == NULL) {
890                         return LDB_ERR_OPERATIONS_ERROR;
891                 }
892
893                 tseq = talloc_zero(res, struct ldb_seqnum_request);
894                 if (tseq == NULL) {
895                         talloc_free(res);
896                         return LDB_ERR_OPERATIONS_ERROR;
897                 }
898                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
899
900                 ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
901                                              LDB_EXTENDED_SEQUENCE_NUMBER,
902                                              tseq,
903                                              NULL,
904                                              res,
905                                              ldb_extended_default_callback,
906                                              NULL);
907                 if (ret != LDB_SUCCESS) {
908                         talloc_free(res);
909                         return ret;
910                 }
911
912                 ret = ldb_next_request(module, treq);
913                 if (ret != LDB_SUCCESS) {
914                         talloc_free(res);
915                         return ret;
916                 }
917                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
918                 if (ret != LDB_SUCCESS) {
919                         talloc_free(res);
920                         return ret;
921                 }
922
923                 tseqr = talloc_get_type(res->extended->data,
924                                            struct ldb_seqnum_result);
925                 timestamp = tseqr->seq_num;
926
927                 talloc_free(res);
928
929                 /* Skip the lot if 'data' isn't here yet (initialisation) */
930                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
931
932                         res = talloc_zero(req, struct ldb_result);
933                         if (res == NULL) {
934                                 return LDB_ERR_OPERATIONS_ERROR;
935                         }
936
937                         tseq = talloc_zero(res, struct ldb_seqnum_request);
938                         if (tseq == NULL) {
939                                 talloc_free(res);
940                                 return LDB_ERR_OPERATIONS_ERROR;
941                         }
942                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
943
944                         ret = ldb_build_extended_req(&treq, ldb_module_get_ctx(module), res,
945                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
946                                                      tseq,
947                                                      NULL,
948                                                      res,
949                                                      ldb_extended_default_callback,
950                                                      NULL);
951                         if (ret != LDB_SUCCESS) {
952                                 talloc_free(res);
953                                 return ret;
954                         }
955
956                         if (!ldb_request_get_control(treq, DSDB_CONTROL_CURRENT_PARTITION_OID)) {
957                                 ret = ldb_request_add_control(treq,
958                                                               DSDB_CONTROL_CURRENT_PARTITION_OID,
959                                                               false, data->partitions[i]->ctrl);
960                                 if (ret != LDB_SUCCESS) {
961                                         talloc_free(res);
962                                         return ret;
963                                 }
964                         }
965
966                         ret = partition_request(data->partitions[i]->module, treq);
967                         if (ret != LDB_SUCCESS) {
968                                 talloc_free(res);
969                                 return ret;
970                         }
971                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
972                         if (ret != LDB_SUCCESS) {
973                                 talloc_free(res);
974                                 return ret;
975                         }
976
977                         tseqr = talloc_get_type(res->extended->data,
978                                                   struct ldb_seqnum_result);
979                         timestamp = MAX(timestamp, tseqr->seq_num);
980
981                         talloc_free(res);
982                 }
983
984                 break;
985         }
986
987         ext = talloc_zero(req, struct ldb_extended);
988         if (!ext) {
989                 return LDB_ERR_OPERATIONS_ERROR;
990         }
991         seqr = talloc_zero(ext, struct ldb_seqnum_result);
992         if (seqr == NULL) {
993                 talloc_free(ext);
994                 return LDB_ERR_OPERATIONS_ERROR;
995         }
996         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
997         ext->data = seqr;
998
999         switch (seq->type) {
1000         case LDB_SEQ_NEXT:
1001         case LDB_SEQ_HIGHEST_SEQ:
1002
1003                 /* Has someone above set a timebase sequence? */
1004                 if (timestamp_sequence) {
1005                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
1006                 } else {
1007                         seqr->seq_num = seq_number;
1008                 }
1009
1010                 if (timestamp_sequence > seqr->seq_num) {
1011                         seqr->seq_num = timestamp_sequence;
1012                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
1013                 }
1014
1015                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
1016                 break;
1017         case LDB_SEQ_HIGHEST_TIMESTAMP:
1018                 seqr->seq_num = timestamp;
1019                 break;
1020         }
1021
1022         if (seq->type == LDB_SEQ_NEXT) {
1023                 seqr->seq_num++;
1024         }
1025
1026         /* send request done */
1027         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
1028 }
1029
1030 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
1031 {
1032         struct dsdb_partition *partition;
1033         struct partition_private_data *data;
1034         struct ldb_dn *schema_dn;
1035         struct partition_context *ac;
1036         int ret;
1037
1038         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
1039         if (!schema_dn) {
1040                 ldb_debug(ldb_module_get_ctx(module), LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1041                 return LDB_ERR_PROTOCOL_ERROR;
1042         }
1043
1044         data = talloc_get_type(module->private_data, struct partition_private_data);
1045         if (!data) {
1046                 return LDB_ERR_OPERATIONS_ERROR;
1047         }
1048         
1049         partition = find_partition( data, schema_dn, req);
1050         if (!partition) {
1051                 return ldb_next_request(module, req);
1052         }
1053
1054         ac = partition_init_ctx(module, req);
1055         if (!ac) {
1056                 return LDB_ERR_OPERATIONS_ERROR;
1057         }
1058
1059         /* we need to add a control but we never touch the original request */
1060         ret = partition_prep_request(ac, partition);
1061         if (ret != LDB_SUCCESS) {
1062                 return ret;
1063         }
1064
1065         /* fire the first one */
1066         ret = partition_call_first(ac);
1067
1068         if (ret != LDB_SUCCESS){
1069                 return ret;
1070         }
1071
1072         return ldb_request_done(req, ret);
1073 }
1074
1075
1076 /* extended */
1077 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1078 {
1079         struct partition_private_data *data;
1080         struct partition_context *ac;
1081
1082         data = talloc_get_type(module->private_data, struct partition_private_data);
1083         if (!data) {
1084                 return ldb_next_request(module, req);
1085         }
1086
1087         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1088                 return partition_sequence_number(module, req);
1089         }
1090
1091         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_CREATE_PARTITION_OID) == 0) {
1092                 return partition_create(module, req);
1093         }
1094
1095         /* forward schemaUpdateNow operation to schema_fsmo module*/
1096         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1097                 return partition_extended_schema_update_now( module, req );
1098         }       
1099
1100         /* 
1101          * as the extended operation has no dn
1102          * we need to send it to all partitions
1103          */
1104
1105         ac = partition_init_ctx(module, req);
1106         if (!ac) {
1107                 return LDB_ERR_OPERATIONS_ERROR;
1108         }
1109
1110         return partition_send_all(module, ac, req);
1111 }
1112
1113 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1114         .name              = "partition",
1115         .init_context      = partition_init,
1116         .search            = partition_search,
1117         .add               = partition_add,
1118         .modify            = partition_modify,
1119         .del               = partition_delete,
1120         .rename            = partition_rename,
1121         .extended          = partition_extended,
1122         .start_transaction = partition_start_trans,
1123         .prepare_commit    = partition_prepare_commit,
1124         .end_transaction   = partition_end_trans,
1125         .del_transaction   = partition_del_trans,
1126 };