Fix sequence number generation against OpenLDAP
[kai/samba.git] / source4 / dsdb / samdb / ldb_modules / partition.c
1 /* 
2    Partitions ldb module
3
4    Copyright (C) Andrew Bartlett <abartlet@samba.org> 2006
5    Copyright (C) Stefan Metzmacher <metze@samba.org> 2007
6
7    * NOTICE: this module is NOT released under the GNU LGPL license as
8    * other ldb code. This module is release under the GNU GPL v3 or
9    * later license.
10
11    This program is free software; you can redistribute it and/or modify
12    it under the terms of the GNU General Public License as published by
13    the Free Software Foundation; either version 3 of the License, or
14    (at your option) any later version.
15    
16    This program is distributed in the hope that it will be useful,
17    but WITHOUT ANY WARRANTY; without even the implied warranty of
18    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19    GNU General Public License for more details.
20    
21    You should have received a copy of the GNU General Public License
22    along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 */
24
25 /*
26  *  Name: ldb
27  *
28  *  Component: ldb partitions module
29  *
30  *  Description: Implement LDAP partitions
31  *
32  *  Author: Andrew Bartlett
33  *  Author: Stefan Metzmacher
34  */
35
36 #include "includes.h"
37 #include "ldb/include/ldb_includes.h"
38 #include "dsdb/samdb/samdb.h"
39
40 struct partition_private_data {
41         struct dsdb_control_current_partition **partitions;
42         struct ldb_dn **replicate;
43 };
44
45 struct part_request {
46         struct ldb_module *module;
47         struct ldb_request *req;
48 };
49
50 struct partition_context {
51         struct ldb_module *module;
52         struct ldb_request *req;
53         bool got_success;
54
55         struct part_request *part_req;
56         int num_requests;
57         int finished_requests;
58 };
59
60 static struct partition_context *partition_init_ctx(struct ldb_module *module, struct ldb_request *req)
61 {
62         struct partition_context *ac;
63
64         ac = talloc_zero(req, struct partition_context);
65         if (ac == NULL) {
66                 ldb_set_errstring(module->ldb, "Out of Memory");
67                 return NULL;
68         }
69
70         ac->module = module;
71         ac->req = req;
72
73         return ac;
74 }
75
76 #define PARTITION_FIND_OP(module, op) do { \
77         struct ldb_context *ldbctx = module->ldb; \
78         while (module && module->ops->op == NULL) module = module->next; \
79         if (module == NULL) { \
80                 ldb_asprintf_errstring(ldbctx, \
81                         "Unable to find backend operation for " #op ); \
82                 return LDB_ERR_OPERATIONS_ERROR; \
83         } \
84 } while (0)
85
86 /*
87  *    helper functions to call the next module in chain
88  *    */
89
90 static int partition_request(struct ldb_module *module, struct ldb_request *request)
91 {
92         int ret;
93         switch (request->operation) {
94         case LDB_SEARCH:
95                 PARTITION_FIND_OP(module, search);
96                 ret = module->ops->search(module, request);
97                 break;
98         case LDB_ADD:
99                 PARTITION_FIND_OP(module, add);
100                 ret = module->ops->add(module, request);
101                 break;
102         case LDB_MODIFY:
103                 PARTITION_FIND_OP(module, modify);
104                 ret = module->ops->modify(module, request);
105                 break;
106         case LDB_DELETE:
107                 PARTITION_FIND_OP(module, del);
108                 ret = module->ops->del(module, request);
109                 break;
110         case LDB_RENAME:
111                 PARTITION_FIND_OP(module, rename);
112                 ret = module->ops->rename(module, request);
113                 break;
114         case LDB_EXTENDED:
115                 PARTITION_FIND_OP(module, extended);
116                 ret = module->ops->extended(module, request);
117                 break;
118         default:
119                 PARTITION_FIND_OP(module, request);
120                 ret = module->ops->request(module, request);
121                 break;
122         }
123         if (ret == LDB_SUCCESS) {
124                 return ret;
125         }
126         if (!ldb_errstring(module->ldb)) {
127                 /* Set a default error string, to place the blame somewhere */
128                 ldb_asprintf_errstring(module->ldb,
129                                         "error in module %s: %s (%d)",
130                                         module->ops->name,
131                                         ldb_strerror(ret), ret);
132         }
133         return ret;
134 }
135
136 static struct dsdb_control_current_partition *find_partition(struct partition_private_data *data,
137                                                              struct ldb_dn *dn)
138 {
139         int i;
140
141         /* Look at base DN */
142         /* Figure out which partition it is under */
143         /* Skip the lot if 'data' isn't here yet (initialistion) */
144         for (i=0; data && data->partitions && data->partitions[i]; i++) {
145                 if (ldb_dn_compare_base(data->partitions[i]->dn, dn) == 0) {
146                         return data->partitions[i];
147                 }
148         }
149
150         return NULL;
151 };
152
153 /**
154  * fire the caller's callback for every entry, but only send 'done' once.
155  */
156 static int partition_req_callback(struct ldb_request *req,
157                                   struct ldb_reply *ares)
158 {
159         struct partition_context *ac;
160         struct ldb_module *module;
161         struct ldb_request *nreq;
162         int ret;
163
164         ac = talloc_get_type(req->context, struct partition_context);
165
166         if (!ares) {
167                 return ldb_module_done(ac->req, NULL, NULL,
168                                         LDB_ERR_OPERATIONS_ERROR);
169         }
170
171         if (ares->error != LDB_SUCCESS && !ac->got_success) {
172                 return ldb_module_done(ac->req, ares->controls,
173                                         ares->response, ares->error);
174         }
175
176         switch (ares->type) {
177         case LDB_REPLY_REFERRAL:
178                 /* ignore referrals for now */
179                 break;
180
181         case LDB_REPLY_ENTRY:
182                 if (ac->req->operation != LDB_SEARCH) {
183                         ldb_set_errstring(ac->module->ldb,
184                                 "partition_req_callback:"
185                                 " Unsupported reply type for this request");
186                         return ldb_module_done(ac->req, NULL, NULL,
187                                                 LDB_ERR_OPERATIONS_ERROR);
188                 }
189                 return ldb_module_send_entry(ac->req, ares->message);
190
191         case LDB_REPLY_DONE:
192                 if (ares->error == LDB_SUCCESS) {
193                         ac->got_success = true;
194                 }
195                 if (ac->req->operation == LDB_EXTENDED) {
196                         /* FIXME: check for ares->response, replmd does not fill it ! */
197                         if (ares->response) {
198                                 if (strcmp(ares->response->oid, LDB_EXTENDED_START_TLS_OID) != 0) {
199                                         ldb_set_errstring(ac->module->ldb,
200                                                           "partition_req_callback:"
201                                                           " Unknown extended reply, "
202                                                           "only supports START_TLS");
203                                         talloc_free(ares);
204                                         return ldb_module_done(ac->req, NULL, NULL,
205                                                                 LDB_ERR_OPERATIONS_ERROR);
206                                 }
207                         }
208                 }
209
210                 ac->finished_requests++;
211                 if (ac->finished_requests == ac->num_requests) {
212                         /* this was the last one, call callback */
213                         return ldb_module_done(ac->req, ares->controls,
214                                                ares->response, 
215                                                ac->got_success?LDB_SUCCESS:ares->error);
216                 }
217
218                 /* not the last, now call the next one */
219                 module = ac->part_req[ac->finished_requests].module;
220                 nreq = ac->part_req[ac->finished_requests].req;
221
222                 ret = partition_request(module, nreq);
223                 if (ret != LDB_SUCCESS) {
224                         talloc_free(ares);
225                         return ldb_module_done(ac->req, NULL, NULL, ret);
226                 }
227
228                 break;
229         }
230
231         talloc_free(ares);
232         return LDB_SUCCESS;
233 }
234
235 static int partition_prep_request(struct partition_context *ac,
236                                   struct dsdb_control_current_partition *partition)
237 {
238         int ret;
239         struct ldb_request *req;
240
241         ac->part_req = talloc_realloc(ac, ac->part_req,
242                                         struct part_request,
243                                         ac->num_requests + 1);
244         if (ac->part_req == NULL) {
245                 ldb_oom(ac->module->ldb);
246                 return LDB_ERR_OPERATIONS_ERROR;
247         }
248
249         switch (ac->req->operation) {
250         case LDB_SEARCH:
251                 ret = ldb_build_search_req_ex(&req, ac->module->ldb,
252                                         ac->part_req,
253                                         ac->req->op.search.base,
254                                         ac->req->op.search.scope,
255                                         ac->req->op.search.tree,
256                                         ac->req->op.search.attrs,
257                                         ac->req->controls,
258                                         ac, partition_req_callback,
259                                         ac->req);
260                 break;
261         case LDB_ADD:
262                 ret = ldb_build_add_req(&req, ac->module->ldb, ac->part_req,
263                                         ac->req->op.add.message,
264                                         ac->req->controls,
265                                         ac, partition_req_callback,
266                                         ac->req);
267                 break;
268         case LDB_MODIFY:
269                 ret = ldb_build_mod_req(&req, ac->module->ldb, ac->part_req,
270                                         ac->req->op.mod.message,
271                                         ac->req->controls,
272                                         ac, partition_req_callback,
273                                         ac->req);
274                 break;
275         case LDB_DELETE:
276                 ret = ldb_build_del_req(&req, ac->module->ldb, ac->part_req,
277                                         ac->req->op.del.dn,
278                                         ac->req->controls,
279                                         ac, partition_req_callback,
280                                         ac->req);
281                 break;
282         case LDB_RENAME:
283                 ret = ldb_build_rename_req(&req, ac->module->ldb, ac->part_req,
284                                         ac->req->op.rename.olddn,
285                                         ac->req->op.rename.newdn,
286                                         ac->req->controls,
287                                         ac, partition_req_callback,
288                                         ac->req);
289                 break;
290         case LDB_EXTENDED:
291                 ret = ldb_build_extended_req(&req, ac->module->ldb,
292                                         ac->part_req,
293                                         ac->req->op.extended.oid,
294                                         ac->req->op.extended.data,
295                                         ac->req->controls,
296                                         ac, partition_req_callback,
297                                         ac->req);
298                 break;
299         default:
300                 ldb_set_errstring(ac->module->ldb,
301                                   "Unsupported request type!");
302                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
303         }
304
305         if (ret != LDB_SUCCESS) {
306                 return ret;
307         }
308
309         ac->part_req[ac->num_requests].req = req;
310
311         if (ac->req->controls) {
312                 req->controls = talloc_memdup(req, ac->req->controls,
313                                         talloc_get_size(ac->req->controls));
314                 if (req->controls == NULL) {
315                         ldb_oom(ac->module->ldb);
316                         return LDB_ERR_OPERATIONS_ERROR;
317                 }
318         }
319
320         if (partition) {
321                 ac->part_req[ac->num_requests].module = partition->module;
322
323                 ret = ldb_request_add_control(req,
324                                         DSDB_CONTROL_CURRENT_PARTITION_OID,
325                                         false, partition);
326                 if (ret != LDB_SUCCESS) {
327                         return ret;
328                 }
329
330                 if (req->operation == LDB_SEARCH) {
331                         /* If the search is for 'more' than this partition,
332                          * then change the basedn, so a remote LDAP server
333                          * doesn't object */
334                         if (ldb_dn_compare_base(partition->dn,
335                                                 req->op.search.base) != 0) {
336                                 req->op.search.base = partition->dn;
337                         }
338                 }
339
340         } else {
341                 /* make sure you put the NEXT module here, or
342                  * partition_request() will simply loop forever on itself */
343                 ac->part_req[ac->num_requests].module = ac->module->next;
344         }
345
346         ac->num_requests++;
347
348         return LDB_SUCCESS;
349 }
350
351 static int partition_call_first(struct partition_context *ac)
352 {
353         return partition_request(ac->part_req[0].module, ac->part_req[0].req);
354 }
355
356 /**
357  * Send a request down to all the partitions
358  */
359 static int partition_send_all(struct ldb_module *module, 
360                               struct partition_context *ac, 
361                               struct ldb_request *req) 
362 {
363         int i;
364         struct partition_private_data *data = talloc_get_type(module->private_data, 
365                                                               struct partition_private_data);
366         int ret = partition_prep_request(ac, NULL);
367         if (ret != LDB_SUCCESS) {
368                 return ret;
369         }
370         for (i=0; data && data->partitions && data->partitions[i]; i++) {
371                 ret = partition_prep_request(ac, data->partitions[i]);
372                 if (ret != LDB_SUCCESS) {
373                         return ret;
374                 }
375         }
376
377         /* fire the first one */
378         return partition_call_first(ac);
379 }
380
381 /**
382  * Figure out which backend a request needs to be aimed at.  Some
383  * requests must be replicated to all backends
384  */
385 static int partition_replicate(struct ldb_module *module, struct ldb_request *req, struct ldb_dn *dn) 
386 {
387         struct partition_context *ac;
388         unsigned i;
389         int ret;
390         struct dsdb_control_current_partition *partition;
391         struct partition_private_data *data = talloc_get_type(module->private_data, 
392                                                               struct partition_private_data);
393         if (!data || !data->partitions) {
394                 return ldb_next_request(module, req);
395         }
396         
397         if (req->operation != LDB_SEARCH) {
398                 /* Is this a special DN, we need to replicate to every backend? */
399                 for (i=0; data->replicate && data->replicate[i]; i++) {
400                         if (ldb_dn_compare(data->replicate[i], 
401                                            dn) == 0) {
402                                 
403                                 ac = partition_init_ctx(module, req);
404                                 if (!ac) {
405                                         return LDB_ERR_OPERATIONS_ERROR;
406                                 }
407                                 
408                                 return partition_send_all(module, ac, req);
409                         }
410                 }
411         }
412
413         /* Otherwise, we need to find the partition to fire it to */
414
415         /* Find partition */
416         partition = find_partition(data, dn);
417         if (!partition) {
418                 /*
419                  * if we haven't found a matching partition
420                  * pass the request to the main ldb
421                  *
422                  * TODO: we should maybe return an error here
423                  *       if it's not a special dn
424                  */
425
426                 return ldb_next_request(module, req);
427         }
428
429         ac = partition_init_ctx(module, req);
430         if (!ac) {
431                 return LDB_ERR_OPERATIONS_ERROR;
432         }
433
434         /* we need to add a control but we never touch the original request */
435         ret = partition_prep_request(ac, partition);
436         if (ret != LDB_SUCCESS) {
437                 return ret;
438         }
439
440         /* fire the first one */
441         return partition_call_first(ac);
442 }
443
444 /* search */
445 static int partition_search(struct ldb_module *module, struct ldb_request *req)
446 {
447         struct ldb_control **saved_controls;
448         
449         /* Find backend */
450         struct partition_private_data *data = talloc_get_type(module->private_data, 
451                                                               struct partition_private_data);
452         /* issue request */
453
454         /* (later) consider if we should be searching multiple
455          * partitions (for 'invisible' partition behaviour */
456
457         struct ldb_control *search_control = ldb_request_get_control(req, LDB_CONTROL_SEARCH_OPTIONS_OID);
458         struct ldb_control *domain_scope_control = ldb_request_get_control(req, LDB_CONTROL_DOMAIN_SCOPE_OID);
459         
460         struct ldb_search_options_control *search_options = NULL;
461         if (search_control) {
462                 search_options = talloc_get_type(search_control->data, struct ldb_search_options_control);
463         }
464
465         /* Remove the domain_scope control, so we don't confuse a backend server */
466         if (domain_scope_control && !save_controls(domain_scope_control, req, &saved_controls)) {
467                 ldb_oom(module->ldb);
468                 return LDB_ERR_OPERATIONS_ERROR;
469         }
470
471         /*
472          * for now pass down the LDB_CONTROL_SEARCH_OPTIONS_OID control
473          * down as uncritical to make windows 2008 dcpromo happy.
474          */
475         if (search_control) {
476                 search_control->critical = 0;
477         }
478
479         /* TODO:
480            Generate referrals (look for a partition under this DN) if we don't have the above control specified
481         */
482         
483         if (search_options && (search_options->search_options & LDB_SEARCH_OPTION_PHANTOM_ROOT)) {
484                 int ret, i;
485                 struct partition_context *ac;
486                 if ((search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
487                         /* We have processed this flag, so we are done with this control now */
488
489                         /* Remove search control, so we don't confuse a backend server */
490                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
491                                 ldb_oom(module->ldb);
492                                 return LDB_ERR_OPERATIONS_ERROR;
493                         }
494                 }
495                 ac = partition_init_ctx(module, req);
496                 if (!ac) {
497                         return LDB_ERR_OPERATIONS_ERROR;
498                 }
499
500                 /* Search from the base DN */
501                 if (!req->op.search.base || ldb_dn_is_null(req->op.search.base)) {
502                         return partition_send_all(module, ac, req);
503                 }
504                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
505                         bool match = false, stop = false;
506                         /* Find all partitions under the search base 
507                            
508                            we match if:
509
510                               1) the DN we are looking for exactly matches the partition
511                              or
512                               2) the DN we are looking for is a parent of the partition and it isn't
513                                  a scope base search
514                              or
515                               3) the DN we are looking for is a child of the partition
516                          */
517                         if (ldb_dn_compare(data->partitions[i]->dn, req->op.search.base) == 0) {
518                                 match = true;
519                                 if (req->op.search.scope == LDB_SCOPE_BASE) {
520                                         stop = true;
521                                 }
522                         }
523                         if (!match && 
524                             (ldb_dn_compare_base(req->op.search.base, data->partitions[i]->dn) == 0 &&
525                              req->op.search.scope != LDB_SCOPE_BASE)) {
526                                 match = true;
527                         }
528                         if (!match &&
529                             ldb_dn_compare_base(data->partitions[i]->dn, req->op.search.base) == 0) {
530                                 match = true;
531                                 stop = true; /* note that this relies on partition ordering */
532                         }
533                         if (match) {
534                                 ret = partition_prep_request(ac, data->partitions[i]);
535                                 if (ret != LDB_SUCCESS) {
536                                         return ret;
537                                 }
538                         }
539                         if (stop) break;
540                 }
541
542                 /* Perhaps we didn't match any partitions.  Try the main partition, only */
543                 if (ac->num_requests == 0) {
544                         talloc_free(ac);
545                         return ldb_next_request(module, req);
546                 }
547
548                 /* fire the first one */
549                 return partition_call_first(ac);
550
551         } else {
552                 /* Handle this like all other requests */
553                 if (search_control && (search_options->search_options & ~LDB_SEARCH_OPTION_PHANTOM_ROOT) == 0) {
554                         /* We have processed this flag, so we are done with this control now */
555
556                         /* Remove search control, so we don't confuse a backend server */
557                         if (search_control && !save_controls(search_control, req, &saved_controls)) {
558                                 ldb_oom(module->ldb);
559                                 return LDB_ERR_OPERATIONS_ERROR;
560                         }
561                 }
562
563                 return partition_replicate(module, req, req->op.search.base);
564         }
565 }
566
567 /* add */
568 static int partition_add(struct ldb_module *module, struct ldb_request *req)
569 {
570         return partition_replicate(module, req, req->op.add.message->dn);
571 }
572
573 /* modify */
574 static int partition_modify(struct ldb_module *module, struct ldb_request *req)
575 {
576         return partition_replicate(module, req, req->op.mod.message->dn);
577 }
578
579 /* delete */
580 static int partition_delete(struct ldb_module *module, struct ldb_request *req)
581 {
582         return partition_replicate(module, req, req->op.del.dn);
583 }
584
585 /* rename */
586 static int partition_rename(struct ldb_module *module, struct ldb_request *req)
587 {
588         /* Find backend */
589         struct dsdb_control_current_partition *backend, *backend2;
590         
591         struct partition_private_data *data = talloc_get_type(module->private_data, 
592                                                               struct partition_private_data);
593
594         /* Skip the lot if 'data' isn't here yet (initialization) */
595         if (!data) {
596                 return LDB_ERR_OPERATIONS_ERROR;
597         }
598
599         backend = find_partition(data, req->op.rename.olddn);
600         backend2 = find_partition(data, req->op.rename.newdn);
601
602         if ((backend && !backend2) || (!backend && backend2)) {
603                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
604         }
605
606         if (backend != backend2) {
607                 ldb_asprintf_errstring(module->ldb, 
608                                        "Cannot rename from %s in %s to %s in %s: %s",
609                                        ldb_dn_get_linearized(req->op.rename.olddn),
610                                        ldb_dn_get_linearized(backend->dn),
611                                        ldb_dn_get_linearized(req->op.rename.newdn),
612                                        ldb_dn_get_linearized(backend2->dn),
613                                        ldb_strerror(LDB_ERR_AFFECTS_MULTIPLE_DSAS));
614                 return LDB_ERR_AFFECTS_MULTIPLE_DSAS;
615         }
616
617         return partition_replicate(module, req, req->op.rename.olddn);
618 }
619
620 /* start a transaction */
621 static int partition_start_trans(struct ldb_module *module)
622 {
623         int i, ret;
624         struct partition_private_data *data = talloc_get_type(module->private_data, 
625                                                               struct partition_private_data);
626         /* Look at base DN */
627         /* Figure out which partition it is under */
628         /* Skip the lot if 'data' isn't here yet (initialization) */
629         ret = ldb_next_start_trans(module);
630         if (ret != LDB_SUCCESS) {
631                 return ret;
632         }
633
634         for (i=0; data && data->partitions && data->partitions[i]; i++) {
635                 struct ldb_module *next = data->partitions[i]->module;
636                 PARTITION_FIND_OP(next, start_transaction);
637
638                 ret = next->ops->start_transaction(next);
639                 if (ret != LDB_SUCCESS) {
640                         /* Back it out, if it fails on one */
641                         for (i--; i >= 0; i--) {
642                                 next = data->partitions[i]->module;
643                                 PARTITION_FIND_OP(next, del_transaction);
644
645                                 next->ops->del_transaction(next);
646                         }
647                         ldb_next_del_trans(module);
648                         return ret;
649                 }
650         }
651         return LDB_SUCCESS;
652 }
653
654 /* end a transaction */
655 static int partition_end_trans(struct ldb_module *module)
656 {
657         int i, ret;
658         struct partition_private_data *data = talloc_get_type(module->private_data, 
659                                                               struct partition_private_data);
660         ret = ldb_next_end_trans(module);
661         if (ret != LDB_SUCCESS) {
662                 return ret;
663         }
664
665         /* Look at base DN */
666         /* Figure out which partition it is under */
667         /* Skip the lot if 'data' isn't here yet (initialistion) */
668         for (i=0; data && data->partitions && data->partitions[i]; i++) {
669                 struct ldb_module *next = data->partitions[i]->module;
670                 PARTITION_FIND_OP(next, end_transaction);
671
672                 ret = next->ops->end_transaction(next);
673                 if (ret != LDB_SUCCESS) {
674                         /* Back it out, if it fails on one */
675                         for (i--; i >= 0; i--) {
676                                 next = data->partitions[i]->module;
677                                 PARTITION_FIND_OP(next, del_transaction);
678
679                                 next->ops->del_transaction(next);
680                         }
681                         ldb_next_del_trans(module);
682                         return ret;
683                 }
684         }
685
686         return LDB_SUCCESS;
687 }
688
689 /* delete a transaction */
690 static int partition_del_trans(struct ldb_module *module)
691 {
692         int i, ret, ret2 = LDB_SUCCESS;
693         struct partition_private_data *data = talloc_get_type(module->private_data, 
694                                                               struct partition_private_data);
695         ret = ldb_next_del_trans(module);
696         if (ret != LDB_SUCCESS) {
697                 ret2 = ret;
698         }
699
700         /* Look at base DN */
701         /* Figure out which partition it is under */
702         /* Skip the lot if 'data' isn't here yet (initialistion) */
703         for (i=0; data && data->partitions && data->partitions[i]; i++) {
704                 struct ldb_module *next = data->partitions[i]->module;
705                 PARTITION_FIND_OP(next, del_transaction);
706
707                 ret = next->ops->del_transaction(next);
708                 if (ret != LDB_SUCCESS) {
709                         ret2 = ret;
710                 }
711         }
712         return ret2;
713 }
714
715
716 /* FIXME: This function is still semi-async */
717 static int partition_sequence_number(struct ldb_module *module, struct ldb_request *req)
718 {
719         int i, ret;
720         uint64_t seq_number = 0;
721         uint64_t timestamp_sequence = 0;
722         uint64_t timestamp = 0;
723         struct partition_private_data *data = talloc_get_type(module->private_data, 
724                                                               struct partition_private_data);
725         struct ldb_seqnum_request *seq;
726         struct ldb_seqnum_result *seqr;
727         struct ldb_request *treq;
728         struct ldb_seqnum_request *tseq;
729         struct ldb_seqnum_result *tseqr;
730         struct ldb_extended *ext;
731         struct ldb_result *res;
732
733         seq = talloc_get_type(req->op.extended.data, struct ldb_seqnum_request);
734
735         switch (seq->type) {
736         case LDB_SEQ_NEXT:
737         case LDB_SEQ_HIGHEST_SEQ:
738                 res = talloc_zero(req, struct ldb_result);
739                 if (res == NULL) {
740                         return LDB_ERR_OPERATIONS_ERROR;
741                 }
742                 tseq = talloc_zero(res, struct ldb_seqnum_request);
743                 if (tseq == NULL) {
744                         talloc_free(res);
745                         return LDB_ERR_OPERATIONS_ERROR;
746                 }
747                 tseq->type = seq->type;
748
749                 ret = ldb_build_extended_req(&treq, module->ldb, res,
750                                              LDB_EXTENDED_SEQUENCE_NUMBER,
751                                              tseq,
752                                              NULL,
753                                              res,
754                                              ldb_extended_default_callback,
755                                              NULL);
756                 ret = ldb_next_request(module, treq);
757                 if (ret == LDB_SUCCESS) {
758                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
759                 }
760                 if (ret != LDB_SUCCESS) {
761                         talloc_free(res);
762                         return ret;
763                 }
764                 seqr = talloc_get_type(res->extended->data,
765                                         struct ldb_seqnum_result);
766                 if (seqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
767                         timestamp_sequence = seqr->seq_num;
768                 } else {
769                         seq_number += seqr->seq_num;
770                 }
771                 talloc_free(res);
772
773                 /* Skip the lot if 'data' isn't here yet (initialistion) */
774                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
775
776                         res = talloc_zero(req, struct ldb_result);
777                         if (res == NULL) {
778                                 return LDB_ERR_OPERATIONS_ERROR;
779                         }
780                         tseq = talloc_zero(res, struct ldb_seqnum_request);
781                         if (tseq == NULL) {
782                                 talloc_free(res);
783                                 return LDB_ERR_OPERATIONS_ERROR;
784                         }
785                         tseq->type = seq->type;
786
787                         ret = ldb_build_extended_req(&treq, module->ldb, res,
788                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
789                                                      tseq,
790                                                      NULL,
791                                                      res,
792                                                      ldb_extended_default_callback,
793                                                      NULL);
794                         if (ret != LDB_SUCCESS) {
795                                 talloc_free(res);
796                                 return ret;
797                         }
798
799                         ret = ldb_request_add_control(treq,
800                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
801                                                       false, data->partitions[i]);
802                         if (ret != LDB_SUCCESS) {
803                                 talloc_free(res);
804                                 return ret;
805                         }
806
807                         ret = partition_request(data->partitions[i]->module, treq);
808                         if (ret != LDB_SUCCESS) {
809                                 talloc_free(res);
810                                 return ret;
811                         }
812                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
813                         if (ret != LDB_SUCCESS) {
814                                 talloc_free(res);
815                                 return ret;
816                         }
817                         tseqr = talloc_get_type(res->extended->data,
818                                                 struct ldb_seqnum_result);
819                         if (tseqr->flags & LDB_SEQ_TIMESTAMP_SEQUENCE) {
820                                 timestamp_sequence = MAX(timestamp_sequence,
821                                                          tseqr->seq_num);
822                         } else {
823                                 seq_number += tseqr->seq_num;
824                         }
825                         talloc_free(res);
826                 }
827                 /* fall through */
828         case LDB_SEQ_HIGHEST_TIMESTAMP:
829
830                 res = talloc_zero(req, struct ldb_result);
831                 if (res == NULL) {
832                         return LDB_ERR_OPERATIONS_ERROR;
833                 }
834
835                 tseq = talloc_zero(res, struct ldb_seqnum_request);
836                 if (tseq == NULL) {
837                         talloc_free(res);
838                         return LDB_ERR_OPERATIONS_ERROR;
839                 }
840                 tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
841
842                 ret = ldb_build_extended_req(&treq, module->ldb, res,
843                                              LDB_EXTENDED_SEQUENCE_NUMBER,
844                                              tseq,
845                                              NULL,
846                                              res,
847                                              ldb_extended_default_callback,
848                                              NULL);
849                 if (ret != LDB_SUCCESS) {
850                         talloc_free(res);
851                         return ret;
852                 }
853
854                 ret = ldb_next_request(module, treq);
855                 if (ret != LDB_SUCCESS) {
856                         talloc_free(res);
857                         return ret;
858                 }
859                 ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
860                 if (ret != LDB_SUCCESS) {
861                         talloc_free(res);
862                         return ret;
863                 }
864
865                 tseqr = talloc_get_type(res->extended->data,
866                                            struct ldb_seqnum_result);
867                 timestamp = tseqr->seq_num;
868
869                 talloc_free(res);
870
871                 /* Skip the lot if 'data' isn't here yet (initialistion) */
872                 for (i=0; data && data->partitions && data->partitions[i]; i++) {
873
874                         res = talloc_zero(req, struct ldb_result);
875                         if (res == NULL) {
876                                 return LDB_ERR_OPERATIONS_ERROR;
877                         }
878
879                         tseq = talloc_zero(res, struct ldb_seqnum_request);
880                         if (tseq == NULL) {
881                                 talloc_free(res);
882                                 return LDB_ERR_OPERATIONS_ERROR;
883                         }
884                         tseq->type = LDB_SEQ_HIGHEST_TIMESTAMP;
885
886                         ret = ldb_build_extended_req(&treq, module->ldb, res,
887                                                      LDB_EXTENDED_SEQUENCE_NUMBER,
888                                                      tseq,
889                                                      NULL,
890                                                      res,
891                                                      ldb_extended_default_callback,
892                                                      NULL);
893                         if (ret != LDB_SUCCESS) {
894                                 talloc_free(res);
895                                 return ret;
896                         }
897
898                         ret = ldb_request_add_control(treq,
899                                                       DSDB_CONTROL_CURRENT_PARTITION_OID,
900                                                       false, data->partitions[i]);
901                         if (ret != LDB_SUCCESS) {
902                                 talloc_free(res);
903                                 return ret;
904                         }
905
906                         ret = partition_request(data->partitions[i]->module, treq);
907                         if (ret != LDB_SUCCESS) {
908                                 talloc_free(res);
909                                 return ret;
910                         }
911                         ret = ldb_wait(treq->handle, LDB_WAIT_ALL);
912                         if (ret != LDB_SUCCESS) {
913                                 talloc_free(res);
914                                 return ret;
915                         }
916
917                         tseqr = talloc_get_type(res->extended->data,
918                                                   struct ldb_seqnum_result);
919                         timestamp = MAX(timestamp, tseqr->seq_num);
920
921                         talloc_free(res);
922                 }
923
924                 break;
925         }
926
927         ext = talloc_zero(req, struct ldb_extended);
928         if (!ext) {
929                 return LDB_ERR_OPERATIONS_ERROR;
930         }
931         seqr = talloc_zero(ext, struct ldb_seqnum_result);
932         if (seqr == NULL) {
933                 talloc_free(ext);
934                 return LDB_ERR_OPERATIONS_ERROR;
935         }
936         ext->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
937         ext->data = seqr;
938
939         switch (seq->type) {
940         case LDB_SEQ_NEXT:
941         case LDB_SEQ_HIGHEST_SEQ:
942
943                 /* Has someone above set a timebase sequence? */
944                 if (timestamp_sequence) {
945                         seqr->seq_num = (((unsigned long long)timestamp << 24) | (seq_number & 0xFFFFFF));
946                 } else {
947                         seqr->seq_num = seq_number;
948                 }
949
950                 if (timestamp_sequence > seqr->seq_num) {
951                         seqr->seq_num = timestamp_sequence;
952                         seqr->flags |= LDB_SEQ_TIMESTAMP_SEQUENCE;
953                 }
954
955                 seqr->flags |= LDB_SEQ_GLOBAL_SEQUENCE;
956                 break;
957         case LDB_SEQ_HIGHEST_TIMESTAMP:
958                 seqr->seq_num = timestamp;
959                 break;
960         }
961
962         if (seq->type == LDB_SEQ_NEXT) {
963                 seqr->seq_num++;
964         }
965
966         /* send request done */
967         return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
968 }
969
970 static int partition_extended_replicated_objects(struct ldb_module *module, struct ldb_request *req)
971 {
972         struct dsdb_extended_replicated_objects *ext;
973
974         ext = talloc_get_type(req->op.extended.data, struct dsdb_extended_replicated_objects);
975         if (!ext) {
976                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: invalid extended data\n");
977                 return LDB_ERR_PROTOCOL_ERROR;
978         }
979
980         if (ext->version != DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION) {
981                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended_replicated_objects: extended data invalid version [%u != %u]\n",
982                           ext->version, DSDB_EXTENDED_REPLICATED_OBJECTS_VERSION);
983                 return LDB_ERR_PROTOCOL_ERROR;
984         }
985
986         return partition_replicate(module, req, ext->partition_dn);
987 }
988
989 static int partition_extended_schema_update_now(struct ldb_module *module, struct ldb_request *req)
990 {
991         struct dsdb_control_current_partition *partition;
992         struct partition_private_data *data;
993         struct ldb_dn *schema_dn;
994         struct partition_context *ac;
995         int ret;
996
997         schema_dn = talloc_get_type(req->op.extended.data, struct ldb_dn);
998         if (!schema_dn) {
999                 ldb_debug(module->ldb, LDB_DEBUG_FATAL, "partition_extended: invalid extended data\n");
1000                 return LDB_ERR_PROTOCOL_ERROR;
1001         }
1002
1003         data = talloc_get_type(module->private_data, struct partition_private_data);
1004         if (!data) {
1005                 return LDB_ERR_OPERATIONS_ERROR;
1006         }
1007         
1008         partition = find_partition( data, schema_dn );
1009         if (!partition) {
1010                 return ldb_next_request(module, req);
1011         }
1012
1013         ac = partition_init_ctx(module, req);
1014         if (!ac) {
1015                 return LDB_ERR_OPERATIONS_ERROR;
1016         }
1017
1018         /* we need to add a control but we never touch the original request */
1019         ret = partition_prep_request(ac, partition);
1020         if (ret != LDB_SUCCESS) {
1021                 return ret;
1022         }
1023
1024         /* fire the first one */
1025         return partition_call_first(ac);
1026 }
1027
1028
1029 /* extended */
1030 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
1031 {
1032         struct partition_private_data *data;
1033         struct partition_context *ac;
1034
1035         data = talloc_get_type(module->private_data, struct partition_private_data);
1036         if (!data || !data->partitions) {
1037                 return ldb_next_request(module, req);
1038         }
1039
1040         if (strcmp(req->op.extended.oid, LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1041                 return partition_sequence_number(module, req);
1042         }
1043
1044         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_REPLICATED_OBJECTS_OID) == 0) {
1045                 return partition_extended_replicated_objects(module, req);
1046         }
1047
1048         /* forward schemaUpdateNow operation to schema_fsmo module*/
1049         if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
1050                 return partition_extended_schema_update_now( module, req );
1051         }       
1052
1053         /* 
1054          * as the extended operation has no dn
1055          * we need to send it to all partitions
1056          */
1057
1058         ac = partition_init_ctx(module, req);
1059         if (!ac) {
1060                 return LDB_ERR_OPERATIONS_ERROR;
1061         }
1062
1063         return partition_send_all(module, ac, req);
1064 }
1065
1066 static int partition_sort_compare(const void *v1, const void *v2)
1067 {
1068         struct dsdb_control_current_partition *p1;
1069         struct dsdb_control_current_partition *p2;
1070
1071         p1 = *((struct dsdb_control_current_partition **)v1);
1072         p2 = *((struct dsdb_control_current_partition **)v2);
1073
1074         return ldb_dn_compare(p1->dn, p2->dn);
1075 }
1076
1077 static int partition_init(struct ldb_module *module)
1078 {
1079         int ret, i;
1080         TALLOC_CTX *mem_ctx = talloc_new(module);
1081         const char *attrs[] = { "partition", "replicateEntries", "modules", NULL };
1082         struct ldb_result *res;
1083         struct ldb_message *msg;
1084         struct ldb_message_element *partition_attributes;
1085         struct ldb_message_element *replicate_attributes;
1086         struct ldb_message_element *modules_attributes;
1087
1088         struct partition_private_data *data;
1089
1090         if (!mem_ctx) {
1091                 return LDB_ERR_OPERATIONS_ERROR;
1092         }
1093
1094         data = talloc(mem_ctx, struct partition_private_data);
1095         if (data == NULL) {
1096                 return LDB_ERR_OPERATIONS_ERROR;
1097         }
1098
1099         ret = ldb_search(module->ldb, mem_ctx, &res,
1100                          ldb_dn_new(mem_ctx, module->ldb, "@PARTITION"),
1101                          LDB_SCOPE_BASE, attrs, NULL);
1102         if (ret != LDB_SUCCESS) {
1103                 talloc_free(mem_ctx);
1104                 return ret;
1105         }
1106         if (res->count == 0) {
1107                 talloc_free(mem_ctx);
1108                 return ldb_next_init(module);
1109         }
1110
1111         if (res->count > 1) {
1112                 talloc_free(mem_ctx);
1113                 return LDB_ERR_CONSTRAINT_VIOLATION;
1114         }
1115
1116         msg = res->msgs[0];
1117
1118         partition_attributes = ldb_msg_find_element(msg, "partition");
1119         if (!partition_attributes) {
1120                 ldb_set_errstring(module->ldb, "partition_init: no partitions specified");
1121                 talloc_free(mem_ctx);
1122                 return LDB_ERR_CONSTRAINT_VIOLATION;
1123         }
1124         data->partitions = talloc_array(data, struct dsdb_control_current_partition *, partition_attributes->num_values + 1);
1125         if (!data->partitions) {
1126                 talloc_free(mem_ctx);
1127                 return LDB_ERR_OPERATIONS_ERROR;
1128         }
1129         for (i=0; i < partition_attributes->num_values; i++) {
1130                 char *base = talloc_strdup(data->partitions, (char *)partition_attributes->values[i].data);
1131                 char *p = strchr(base, ':');
1132                 if (!p) {
1133                         ldb_asprintf_errstring(module->ldb, 
1134                                                 "partition_init: "
1135                                                 "invalid form for partition record (missing ':'): %s", base);
1136                         talloc_free(mem_ctx);
1137                         return LDB_ERR_CONSTRAINT_VIOLATION;
1138                 }
1139                 p[0] = '\0';
1140                 p++;
1141                 if (!p[0]) {
1142                         ldb_asprintf_errstring(module->ldb, 
1143                                                 "partition_init: "
1144                                                 "invalid form for partition record (missing backend database): %s", base);
1145                         talloc_free(mem_ctx);
1146                         return LDB_ERR_CONSTRAINT_VIOLATION;
1147                 }
1148                 data->partitions[i] = talloc(data->partitions, struct dsdb_control_current_partition);
1149                 if (!data->partitions[i]) {
1150                         talloc_free(mem_ctx);
1151                         return LDB_ERR_OPERATIONS_ERROR;
1152                 }
1153                 data->partitions[i]->version = DSDB_CONTROL_CURRENT_PARTITION_VERSION;
1154
1155                 data->partitions[i]->dn = ldb_dn_new(data->partitions[i], module->ldb, base);
1156                 if (!data->partitions[i]->dn) {
1157                         ldb_asprintf_errstring(module->ldb, 
1158                                                 "partition_init: invalid DN in partition record: %s", base);
1159                         talloc_free(mem_ctx);
1160                         return LDB_ERR_CONSTRAINT_VIOLATION;
1161                 }
1162
1163                 data->partitions[i]->backend = samdb_relative_path(module->ldb, 
1164                                                                    data->partitions[i], 
1165                                                                    p);
1166                 if (!data->partitions[i]->backend) {
1167                         ldb_asprintf_errstring(module->ldb, 
1168                                                 "partition_init: unable to determine an relative path for partition: %s", base);
1169                         talloc_free(mem_ctx);                   
1170                 }
1171                 ret = ldb_connect_backend(module->ldb, data->partitions[i]->backend, NULL, &data->partitions[i]->module);
1172                 if (ret != LDB_SUCCESS) {
1173                         talloc_free(mem_ctx);
1174                         return ret;
1175                 }
1176         }
1177         data->partitions[i] = NULL;
1178
1179         /* sort these into order, most to least specific */
1180         qsort(data->partitions, partition_attributes->num_values,
1181               sizeof(*data->partitions), partition_sort_compare);
1182
1183         for (i=0; data->partitions[i]; i++) {
1184                 struct ldb_request *req;
1185                 req = talloc_zero(mem_ctx, struct ldb_request);
1186                 if (req == NULL) {
1187                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Out of memory!\n");
1188                         talloc_free(mem_ctx);
1189                         return LDB_ERR_OPERATIONS_ERROR;
1190                 }
1191                 
1192                 req->operation = LDB_REQ_REGISTER_PARTITION;
1193                 req->op.reg_partition.dn = data->partitions[i]->dn;
1194                 req->callback = ldb_op_default_callback;
1195
1196                 ldb_set_timeout(module->ldb, req, 0);
1197
1198                 req->handle = ldb_handle_new(req, module->ldb);
1199                 if (req->handle == NULL) {
1200                         return LDB_ERR_OPERATIONS_ERROR;
1201                 }
1202                 
1203                 ret = ldb_request(module->ldb, req);
1204                 if (ret == LDB_SUCCESS) {
1205                         ret = ldb_wait(req->handle, LDB_WAIT_ALL);
1206                 }
1207                 if (ret != LDB_SUCCESS) {
1208                         ldb_debug(module->ldb, LDB_DEBUG_ERROR, "partition: Unable to register partition with rootdse!\n");
1209                         talloc_free(mem_ctx);
1210                         return LDB_ERR_OTHER;
1211                 }
1212                 talloc_free(req);
1213         }
1214
1215         replicate_attributes = ldb_msg_find_element(msg, "replicateEntries");
1216         if (!replicate_attributes) {
1217                 data->replicate = NULL;
1218         } else {
1219                 data->replicate = talloc_array(data, struct ldb_dn *, replicate_attributes->num_values + 1);
1220                 if (!data->replicate) {
1221                         talloc_free(mem_ctx);
1222                         return LDB_ERR_OPERATIONS_ERROR;
1223                 }
1224
1225                 for (i=0; i < replicate_attributes->num_values; i++) {
1226                         data->replicate[i] = ldb_dn_from_ldb_val(data->replicate, module->ldb, &replicate_attributes->values[i]);
1227                         if (!ldb_dn_validate(data->replicate[i])) {
1228                                 ldb_asprintf_errstring(module->ldb, 
1229                                                         "partition_init: "
1230                                                         "invalid DN in partition replicate record: %s", 
1231                                                         replicate_attributes->values[i].data);
1232                                 talloc_free(mem_ctx);
1233                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1234                         }
1235                 }
1236                 data->replicate[i] = NULL;
1237         }
1238
1239         /* Make the private data available to any searches the modules may trigger in initialisation */
1240         module->private_data = data;
1241         talloc_steal(module, data);
1242         
1243         modules_attributes = ldb_msg_find_element(msg, "modules");
1244         if (modules_attributes) {
1245                 for (i=0; i < modules_attributes->num_values; i++) {
1246                         struct ldb_dn *base_dn;
1247                         int partition_idx;
1248                         struct dsdb_control_current_partition *partition = NULL;
1249                         const char **modules = NULL;
1250
1251                         char *base = talloc_strdup(data->partitions, (char *)modules_attributes->values[i].data);
1252                         char *p = strchr(base, ':');
1253                         if (!p) {
1254                                 ldb_asprintf_errstring(module->ldb, 
1255                                                         "partition_init: "
1256                                                         "invalid form for partition module record (missing ':'): %s", base);
1257                                 talloc_free(mem_ctx);
1258                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1259                         }
1260                         p[0] = '\0';
1261                         p++;
1262                         if (!p[0]) {
1263                                 ldb_asprintf_errstring(module->ldb, 
1264                                                         "partition_init: "
1265                                                         "invalid form for partition module record (missing backend database): %s", base);
1266                                 talloc_free(mem_ctx);
1267                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1268                         }
1269
1270                         modules = ldb_modules_list_from_string(module->ldb, mem_ctx,
1271                                                                p);
1272                         
1273                         base_dn = ldb_dn_new(mem_ctx, module->ldb, base);
1274                         if (!ldb_dn_validate(base_dn)) {
1275                                 talloc_free(mem_ctx);
1276                                 return LDB_ERR_OPERATIONS_ERROR;
1277                         }
1278                         
1279                         for (partition_idx = 0; data->partitions[partition_idx]; partition_idx++) {
1280                                 if (ldb_dn_compare(data->partitions[partition_idx]->dn, base_dn) == 0) {
1281                                         partition = data->partitions[partition_idx];
1282                                         break;
1283                                 }
1284                         }
1285                         
1286                         if (!partition) {
1287                                 ldb_asprintf_errstring(module->ldb, 
1288                                                         "partition_init: "
1289                                                         "invalid form for partition module record (no such partition): %s", base);
1290                                 talloc_free(mem_ctx);
1291                                 return LDB_ERR_CONSTRAINT_VIOLATION;
1292                         }
1293                         
1294                         ret = ldb_load_modules_list(module->ldb, modules, partition->module, &partition->module);
1295                         if (ret != LDB_SUCCESS) {
1296                                 ldb_asprintf_errstring(module->ldb, 
1297                                                        "partition_init: "
1298                                                        "loading backend for %s failed: %s", 
1299                                                        base, ldb_errstring(module->ldb));
1300                                 talloc_free(mem_ctx);
1301                                 return ret;
1302                         }
1303                         ret = ldb_init_module_chain(module->ldb, partition->module);
1304                         if (ret != LDB_SUCCESS) {
1305                                 ldb_asprintf_errstring(module->ldb, 
1306                                                        "partition_init: "
1307                                                        "initialising backend for %s failed: %s", 
1308                                                        base, ldb_errstring(module->ldb));
1309                                 talloc_free(mem_ctx);
1310                                 return ret;
1311                         }
1312                 }
1313         }
1314
1315         ret = ldb_mod_register_control(module, LDB_CONTROL_DOMAIN_SCOPE_OID);
1316         if (ret != LDB_SUCCESS) {
1317                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1318                         "partition: Unable to register control with rootdse!\n");
1319                 return LDB_ERR_OPERATIONS_ERROR;
1320         }
1321
1322         ret = ldb_mod_register_control(module, LDB_CONTROL_SEARCH_OPTIONS_OID);
1323         if (ret != LDB_SUCCESS) {
1324                 ldb_debug(module->ldb, LDB_DEBUG_ERROR,
1325                         "partition: Unable to register control with rootdse!\n");
1326                 return LDB_ERR_OPERATIONS_ERROR;
1327         }
1328
1329         talloc_free(mem_ctx);
1330         return ldb_next_init(module);
1331 }
1332
1333 _PUBLIC_ const struct ldb_module_ops ldb_partition_module_ops = {
1334         .name              = "partition",
1335         .init_context      = partition_init,
1336         .search            = partition_search,
1337         .add               = partition_add,
1338         .modify            = partition_modify,
1339         .del               = partition_delete,
1340         .rename            = partition_rename,
1341         .extended          = partition_extended,
1342         .start_transaction = partition_start_trans,
1343         .end_transaction   = partition_end_trans,
1344         .del_transaction   = partition_del_trans,
1345 };