Merge commit 'release-4-0-0alpha15' into master4-tmp
[amitay/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include <lib/tdb_compat/tdb_compat.h>
54
55
56 /*
57   map a tdb error code to a ldb error code
58 */
59 int ltdb_err_map(enum TDB_ERROR tdb_code)
60 {
61         switch (tdb_code) {
62         case TDB_SUCCESS:
63                 return LDB_SUCCESS;
64         case TDB_ERR_CORRUPT:
65         case TDB_ERR_OOM:
66         case TDB_ERR_EINVAL:
67                 return LDB_ERR_OPERATIONS_ERROR;
68         case TDB_ERR_IO:
69                 return LDB_ERR_PROTOCOL_ERROR;
70         case TDB_ERR_LOCK:
71 #ifndef BUILD_TDB2
72         case TDB_ERR_NOLOCK:
73 #endif
74                 return LDB_ERR_BUSY;
75 #ifndef BUILD_TDB2
76         case TDB_ERR_LOCK_TIMEOUT:
77 #endif
78                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
79         case TDB_ERR_EXISTS:
80                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
81         case TDB_ERR_NOEXIST:
82                 return LDB_ERR_NO_SUCH_OBJECT;
83         case TDB_ERR_RDONLY:
84                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
85         default:
86                 break;
87         }
88         return LDB_ERR_OTHER;
89 }
90
91 /*
92   lock the database for read - use by ltdb_search and ltdb_sequence_number
93 */
94 int ltdb_lock_read(struct ldb_module *module)
95 {
96         void *data = ldb_module_get_private(module);
97         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
98         int ret = 0;
99
100         if (ltdb->in_transaction == 0 &&
101             ltdb->read_lock_count == 0) {
102                 ret = tdb_lockall_read(ltdb->tdb);
103         }
104         if (ret == 0) {
105                 ltdb->read_lock_count++;
106         }
107         return ret;
108 }
109
110 /*
111   unlock the database after a ltdb_lock_read()
112 */
113 int ltdb_unlock_read(struct ldb_module *module)
114 {
115         void *data = ldb_module_get_private(module);
116         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
117         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
118                 tdb_unlockall_read(ltdb->tdb);
119                 return 0;
120         }
121         ltdb->read_lock_count--;
122         return 0;
123 }
124
125
126 /*
127   form a TDB_DATA for a record key
128   caller frees
129
130   note that the key for a record can depend on whether the
131   dn refers to a case sensitive index record or not
132 */
133 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
134 {
135         struct ldb_context *ldb = ldb_module_get_ctx(module);
136         TDB_DATA key;
137         char *key_str = NULL;
138         const char *dn_folded = NULL;
139
140         /*
141           most DNs are case insensitive. The exception is index DNs for
142           case sensitive attributes
143
144           there are 3 cases dealt with in this code:
145
146           1) if the dn doesn't start with @ then uppercase the attribute
147              names and the attributes values of case insensitive attributes
148           2) if the dn starts with @ then leave it alone -
149              the indexing code handles the rest
150         */
151
152         dn_folded = ldb_dn_get_casefold(dn);
153         if (!dn_folded) {
154                 goto failed;
155         }
156
157         key_str = talloc_strdup(ldb, "DN=");
158         if (!key_str) {
159                 goto failed;
160         }
161
162         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
163         if (!key_str) {
164                 goto failed;
165         }
166
167         key.dptr = (uint8_t *)key_str;
168         key.dsize = strlen(key_str) + 1;
169
170         return key;
171
172 failed:
173         errno = ENOMEM;
174         key.dptr = NULL;
175         key.dsize = 0;
176         return key;
177 }
178
179 /*
180   check special dn's have valid attributes
181   currently only @ATTRIBUTES is checked
182 */
183 static int ltdb_check_special_dn(struct ldb_module *module,
184                                  const struct ldb_message *msg)
185 {
186         struct ldb_context *ldb = ldb_module_get_ctx(module);
187         unsigned int i, j;
188
189         if (! ldb_dn_is_special(msg->dn) ||
190             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
191                 return LDB_SUCCESS;
192         }
193
194         /* we have @ATTRIBUTES, let's check attributes are fine */
195         /* should we check that we deny multivalued attributes ? */
196         for (i = 0; i < msg->num_elements; i++) {
197                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
198
199                 for (j = 0; j < msg->elements[i].num_values; j++) {
200                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
201                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
202                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
203                         }
204                 }
205         }
206
207         return LDB_SUCCESS;
208 }
209
210
211 /*
212   we've made a modification to a dn - possibly reindex and
213   update sequence number
214 */
215 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
216 {
217         int ret = LDB_SUCCESS;
218         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
219
220         /* only allow modifies inside a transaction, otherwise the
221          * ldb is unsafe */
222         if (ltdb->in_transaction == 0) {
223                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
224                 return LDB_ERR_OPERATIONS_ERROR;
225         }
226
227         if (ldb_dn_is_special(dn) &&
228             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
229              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
230                 ret = ltdb_reindex(module);
231         }
232
233         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
234         if (ret == LDB_SUCCESS &&
235             !(ldb_dn_is_special(dn) &&
236               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
237                 ret = ltdb_increase_sequence_number(module);
238         }
239
240         /* If the modify was to @OPTIONS, reload the cache */
241         if (ret == LDB_SUCCESS &&
242             ldb_dn_is_special(dn) &&
243             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
244                 ret = ltdb_cache_reload(module);
245         }
246
247         return ret;
248 }
249
250 /*
251   store a record into the db
252 */
253 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
254 {
255         void *data = ldb_module_get_private(module);
256         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
257         TDB_DATA tdb_key, tdb_data;
258         int ret = LDB_SUCCESS;
259
260         tdb_key = ltdb_key(module, msg->dn);
261         if (tdb_key.dptr == NULL) {
262                 return LDB_ERR_OTHER;
263         }
264
265         ret = ltdb_pack_data(module, msg, &tdb_data);
266         if (ret == -1) {
267                 talloc_free(tdb_key.dptr);
268                 return LDB_ERR_OTHER;
269         }
270
271         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
272         if (ret != 0) {
273                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
274                 goto done;
275         }
276
277 done:
278         talloc_free(tdb_key.dptr);
279         talloc_free(tdb_data.dptr);
280
281         return ret;
282 }
283
284
285 /*
286   check if a attribute is a single valued, for a given element
287  */
288 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
289                                   struct ldb_message_element *el)
290 {
291         if (!a) return false;
292         if (el != NULL) {
293                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
294                         /* override from a ldb module, for example
295                            used for the description field, which is
296                            marked multi-valued in the schema but which
297                            should not actually accept multiple
298                            values */
299                         return true;
300                 }
301                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
302                         /* override from a ldb module, for example used for
303                            deleted linked attribute entries */
304                         return false;
305                 }
306         }
307         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
308                 return true;
309         }
310         return false;
311 }
312
313 static int ltdb_add_internal(struct ldb_module *module,
314                              const struct ldb_message *msg)
315 {
316         struct ldb_context *ldb = ldb_module_get_ctx(module);
317         int ret = LDB_SUCCESS;
318         unsigned int i;
319
320         for (i=0;i<msg->num_elements;i++) {
321                 struct ldb_message_element *el = &msg->elements[i];
322                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
323
324                 if (el->num_values == 0) {
325                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
326                                                el->name, ldb_dn_get_linearized(msg->dn));
327                         return LDB_ERR_CONSTRAINT_VIOLATION;
328                 }
329                 if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
330                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
331                                                el->name, ldb_dn_get_linearized(msg->dn));
332                         return LDB_ERR_CONSTRAINT_VIOLATION;
333                 }
334         }
335
336         ret = ltdb_store(module, msg, TDB_INSERT);
337         if (ret != LDB_SUCCESS) {
338                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
339                         ldb_asprintf_errstring(ldb,
340                                                "Entry %s already exists",
341                                                ldb_dn_get_linearized(msg->dn));
342                 }
343                 return ret;
344         }
345
346         ret = ltdb_index_add_new(module, msg);
347         if (ret != LDB_SUCCESS) {
348                 return ret;
349         }
350
351         ret = ltdb_modified(module, msg->dn);
352
353         return ret;
354 }
355
356 /*
357   add a record to the database
358 */
359 static int ltdb_add(struct ltdb_context *ctx)
360 {
361         struct ldb_module *module = ctx->module;
362         struct ldb_request *req = ctx->req;
363         int ret = LDB_SUCCESS;
364
365         ret = ltdb_check_special_dn(module, req->op.add.message);
366         if (ret != LDB_SUCCESS) {
367                 return ret;
368         }
369
370         ldb_request_set_state(req, LDB_ASYNC_PENDING);
371
372         if (ltdb_cache_load(module) != 0) {
373                 return LDB_ERR_OPERATIONS_ERROR;
374         }
375
376         ret = ltdb_add_internal(module, req->op.add.message);
377
378         return ret;
379 }
380
381 /*
382   delete a record from the database, not updating indexes (used for deleting
383   index records)
384 */
385 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
386 {
387         void *data = ldb_module_get_private(module);
388         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
389         TDB_DATA tdb_key;
390         int ret;
391
392         tdb_key = ltdb_key(module, dn);
393         if (!tdb_key.dptr) {
394                 return LDB_ERR_OTHER;
395         }
396
397         ret = tdb_delete(ltdb->tdb, tdb_key);
398         talloc_free(tdb_key.dptr);
399
400         if (ret != 0) {
401                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
402         }
403
404         return ret;
405 }
406
407 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
408 {
409         struct ldb_message *msg;
410         int ret = LDB_SUCCESS;
411
412         msg = ldb_msg_new(module);
413         if (msg == NULL) {
414                 return LDB_ERR_OPERATIONS_ERROR;
415         }
416
417         /* in case any attribute of the message was indexed, we need
418            to fetch the old record */
419         ret = ltdb_search_dn1(module, dn, msg);
420         if (ret != LDB_SUCCESS) {
421                 /* not finding the old record is an error */
422                 goto done;
423         }
424
425         ret = ltdb_delete_noindex(module, dn);
426         if (ret != LDB_SUCCESS) {
427                 goto done;
428         }
429
430         /* remove any indexed attributes */
431         ret = ltdb_index_delete(module, msg);
432         if (ret != LDB_SUCCESS) {
433                 goto done;
434         }
435
436         ret = ltdb_modified(module, dn);
437         if (ret != LDB_SUCCESS) {
438                 goto done;
439         }
440
441 done:
442         talloc_free(msg);
443         return ret;
444 }
445
446 /*
447   delete a record from the database
448 */
449 static int ltdb_delete(struct ltdb_context *ctx)
450 {
451         struct ldb_module *module = ctx->module;
452         struct ldb_request *req = ctx->req;
453         int ret = LDB_SUCCESS;
454
455         ldb_request_set_state(req, LDB_ASYNC_PENDING);
456
457         if (ltdb_cache_load(module) != 0) {
458                 return LDB_ERR_OPERATIONS_ERROR;
459         }
460
461         ret = ltdb_delete_internal(module, req->op.del.dn);
462
463         return ret;
464 }
465
466 /*
467   find an element by attribute name. At the moment this does a linear search,
468   it should be re-coded to use a binary search once all places that modify
469   records guarantee sorted order
470
471   return the index of the first matching element if found, otherwise -1
472 */
473 static int find_element(const struct ldb_message *msg, const char *name)
474 {
475         unsigned int i;
476         for (i=0;i<msg->num_elements;i++) {
477                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
478                         return i;
479                 }
480         }
481         return -1;
482 }
483
484
485 /*
486   add an element to an existing record. Assumes a elements array that we
487   can call re-alloc on, and assumed that we can re-use the data pointers from
488   the passed in additional values. Use with care!
489
490   returns 0 on success, -1 on failure (and sets errno)
491 */
492 static int ltdb_msg_add_element(struct ldb_context *ldb,
493                                 struct ldb_message *msg,
494                                 struct ldb_message_element *el)
495 {
496         struct ldb_message_element *e2;
497         unsigned int i;
498
499         if (el->num_values == 0) {
500                 /* nothing to do here - we don't add empty elements */
501                 return 0;
502         }
503
504         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
505                               msg->num_elements+1);
506         if (!e2) {
507                 errno = ENOMEM;
508                 return -1;
509         }
510
511         msg->elements = e2;
512
513         e2 = &msg->elements[msg->num_elements];
514
515         e2->name = el->name;
516         e2->flags = el->flags;
517         e2->values = talloc_array(msg->elements,
518                                   struct ldb_val, el->num_values);
519         if (!e2->values) {
520                 errno = ENOMEM;
521                 return -1;
522         }
523         for (i=0;i<el->num_values;i++) {
524                 e2->values[i] = el->values[i];
525         }
526         e2->num_values = el->num_values;
527
528         ++msg->num_elements;
529
530         return 0;
531 }
532
533 /*
534   delete all elements having a specified attribute name
535 */
536 static int msg_delete_attribute(struct ldb_module *module,
537                                 struct ldb_context *ldb,
538                                 struct ldb_message *msg, const char *name)
539 {
540         unsigned int i;
541         int ret;
542         struct ldb_message_element *el;
543
544         el = ldb_msg_find_element(msg, name);
545         if (el == NULL) {
546                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
547         }
548         i = el - msg->elements;
549
550         ret = ltdb_index_del_element(module, msg->dn, el);
551         if (ret != LDB_SUCCESS) {
552                 return ret;
553         }
554
555         talloc_free(el->values);
556         if (msg->num_elements > (i+1)) {
557                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
558         }
559         msg->num_elements--;
560         msg->elements = talloc_realloc(msg, msg->elements,
561                                        struct ldb_message_element,
562                                        msg->num_elements);
563         return LDB_SUCCESS;
564 }
565
566 /*
567   delete all elements matching an attribute name/value
568
569   return LDB Error on failure
570 */
571 static int msg_delete_element(struct ldb_module *module,
572                               struct ldb_message *msg,
573                               const char *name,
574                               const struct ldb_val *val)
575 {
576         struct ldb_context *ldb = ldb_module_get_ctx(module);
577         unsigned int i;
578         int found, ret;
579         struct ldb_message_element *el;
580         const struct ldb_schema_attribute *a;
581
582         found = find_element(msg, name);
583         if (found == -1) {
584                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
585         }
586
587         i = (unsigned int) found;
588         el = &(msg->elements[i]);
589
590         a = ldb_schema_attribute_by_name(ldb, el->name);
591
592         for (i=0;i<el->num_values;i++) {
593                 bool matched;
594                 if (a->syntax->operator_fn) {
595                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
596                                                      &el->values[i], val, &matched);
597                         if (ret != LDB_SUCCESS) return ret;
598                 } else {
599                         matched = (a->syntax->comparison_fn(ldb, ldb,
600                                                             &el->values[i], val) == 0);
601                 }
602                 if (matched) {
603                         if (el->num_values == 1) {
604                                 return msg_delete_attribute(module, ldb, msg, name);
605                         }
606
607                         ret = ltdb_index_del_value(module, msg->dn, el, i);
608                         if (ret != LDB_SUCCESS) {
609                                 return ret;
610                         }
611
612                         if (i<el->num_values-1) {
613                                 memmove(&el->values[i], &el->values[i+1],
614                                         sizeof(el->values[i])*
615                                                 (el->num_values-(i+1)));
616                         }
617                         el->num_values--;
618
619                         /* per definition we find in a canonicalised message an
620                            attribute value only once. So we are finished here */
621                         return LDB_SUCCESS;
622                 }
623         }
624
625         /* Not found */
626         return LDB_ERR_NO_SUCH_ATTRIBUTE;
627 }
628
629
630 /*
631   modify a record - internal interface
632
633   yuck - this is O(n^2). Luckily n is usually small so we probably
634   get away with it, but if we ever have really large attribute lists
635   then we'll need to look at this again
636
637   'req' is optional, and is used to specify controls if supplied
638 */
639 int ltdb_modify_internal(struct ldb_module *module,
640                          const struct ldb_message *msg,
641                          struct ldb_request *req)
642 {
643         struct ldb_context *ldb = ldb_module_get_ctx(module);
644         void *data = ldb_module_get_private(module);
645         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
646         TDB_DATA tdb_key, tdb_data;
647         struct ldb_message *msg2;
648         unsigned int i, j, k;
649         int ret = LDB_SUCCESS, idx;
650         struct ldb_control *control_permissive = NULL;
651
652         if (req) {
653                 control_permissive = ldb_request_get_control(req,
654                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
655         }
656
657         tdb_key = ltdb_key(module, msg->dn);
658         if (!tdb_key.dptr) {
659                 return LDB_ERR_OTHER;
660         }
661
662         tdb_data = tdb_fetch_compat(ltdb->tdb, tdb_key);
663         if (!tdb_data.dptr) {
664                 talloc_free(tdb_key.dptr);
665                 return ltdb_err_map(tdb_error(ltdb->tdb));
666         }
667
668         msg2 = ldb_msg_new(tdb_key.dptr);
669         if (msg2 == NULL) {
670                 free(tdb_data.dptr);
671                 ret = LDB_ERR_OTHER;
672                 goto done;
673         }
674
675         ret = ltdb_unpack_data(module, &tdb_data, msg2);
676         free(tdb_data.dptr);
677         if (ret == -1) {
678                 ret = LDB_ERR_OTHER;
679                 goto done;
680         }
681
682         if (!msg2->dn) {
683                 msg2->dn = msg->dn;
684         }
685
686         for (i=0; i<msg->num_elements; i++) {
687                 struct ldb_message_element *el = &msg->elements[i], *el2;
688                 struct ldb_val *vals;
689                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
690                 const char *dn;
691
692                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
693                 case LDB_FLAG_MOD_ADD:
694
695                         if (el->num_values == 0) {
696                                 ldb_asprintf_errstring(ldb,
697                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
698                                                        el->name, ldb_dn_get_linearized(msg2->dn));
699                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
700                                 goto done;
701                         }
702
703                         /* make a copy of the array so that a permissive
704                          * control can remove duplicates without changing the
705                          * original values, but do not copy data as we do not
706                          * need to keep it around once the operation is
707                          * finished */
708                         if (control_permissive) {
709                                 el = talloc(msg2, struct ldb_message_element);
710                                 if (!el) {
711                                         ret = LDB_ERR_OTHER;
712                                         goto done;
713                                 }
714                                 *el = msg->elements[i];
715                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
716                                 if (el->values == NULL) {
717                                         ret = LDB_ERR_OTHER;
718                                         goto done;
719                                 }
720                                 for (j = 0; j < el->num_values; j++) {
721                                         el->values[j] = msg->elements[i].values[j];
722                                 }
723                         }
724
725                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
726                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
727                                                        el->name, ldb_dn_get_linearized(msg2->dn));
728                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
729                                 goto done;
730                         }
731
732                         /* Checks if element already exists */
733                         idx = find_element(msg2, el->name);
734                         if (idx == -1) {
735                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
736                                         ret = LDB_ERR_OTHER;
737                                         goto done;
738                                 }
739                                 ret = ltdb_index_add_element(module, msg2->dn,
740                                                              el);
741                                 if (ret != LDB_SUCCESS) {
742                                         goto done;
743                                 }
744                         } else {
745                                 j = (unsigned int) idx;
746                                 el2 = &(msg2->elements[j]);
747
748                                 /* We cannot add another value on a existing one
749                                    if the attribute is single-valued */
750                                 if (ldb_tdb_single_valued(a, el)) {
751                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
752                                                                el->name, ldb_dn_get_linearized(msg2->dn));
753                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
754                                         goto done;
755                                 }
756
757                                 /* Check that values don't exist yet on multi-
758                                    valued attributes or aren't provided twice */
759                                 for (j = 0; j < el->num_values; j++) {
760                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
761                                                 if (control_permissive) {
762                                                         /* remove this one as if it was never added */
763                                                         el->num_values--;
764                                                         for (k = j; k < el->num_values; k++) {
765                                                                 el->values[k] = el->values[k + 1];
766                                                         }
767                                                         j--; /* rewind */
768
769                                                         continue;
770                                                 }
771
772                                                 ldb_asprintf_errstring(ldb,
773                                                                        "attribute '%s': value #%u on '%s' already exists",
774                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
775                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
776                                                 goto done;
777                                         }
778                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
779                                                 ldb_asprintf_errstring(ldb,
780                                                                        "attribute '%s': value #%u on '%s' provided more than once",
781                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
782                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
783                                                 goto done;
784                                         }
785                                 }
786
787                                 /* Now combine existing and new values to a new
788                                    attribute record */
789                                 vals = talloc_realloc(msg2->elements,
790                                                       el2->values, struct ldb_val,
791                                                       el2->num_values + el->num_values);
792                                 if (vals == NULL) {
793                                         ldb_oom(ldb);
794                                         ret = LDB_ERR_OTHER;
795                                         goto done;
796                                 }
797
798                                 for (j=0; j<el->num_values; j++) {
799                                         vals[el2->num_values + j] =
800                                                 ldb_val_dup(vals, &el->values[j]);
801                                 }
802
803                                 el2->values = vals;
804                                 el2->num_values += el->num_values;
805
806                                 ret = ltdb_index_add_element(module, msg2->dn, el);
807                                 if (ret != LDB_SUCCESS) {
808                                         goto done;
809                                 }
810                         }
811
812                         break;
813
814                 case LDB_FLAG_MOD_REPLACE:
815
816                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
817                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
818                                                        el->name, ldb_dn_get_linearized(msg2->dn));
819                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
820                                 goto done;
821                         }
822
823                         /* TODO: This is O(n^2) - replace with more efficient check */
824                         for (j=0; j<el->num_values; j++) {
825                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
826                                         ldb_asprintf_errstring(ldb,
827                                                                "attribute '%s': value #%u on '%s' provided more than once",
828                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
829                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
830                                         goto done;
831                                 }
832                         }
833
834                         /* Checks if element already exists */
835                         idx = find_element(msg2, el->name);
836                         if (idx != -1) {
837                                 j = (unsigned int) idx;
838                                 el2 = &(msg2->elements[j]);
839                                 if (ldb_msg_element_compare(el, el2) == 0) {
840                                         /* we are replacing with the same values */
841                                         continue;
842                                 }
843                         
844                                 /* Delete the attribute if it exists in the DB */
845                                 if (msg_delete_attribute(module, ldb, msg2,
846                                                          el->name) != 0) {
847                                         ret = LDB_ERR_OTHER;
848                                         goto done;
849                                 }
850                         }
851
852                         /* Recreate it with the new values */
853                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
854                                 ret = LDB_ERR_OTHER;
855                                 goto done;
856                         }
857
858                         ret = ltdb_index_add_element(module, msg2->dn, el);
859                         if (ret != LDB_SUCCESS) {
860                                 goto done;
861                         }
862
863                         break;
864
865                 case LDB_FLAG_MOD_DELETE:
866                         dn = ldb_dn_get_linearized(msg2->dn);
867                         if (dn == NULL) {
868                                 ret = LDB_ERR_OTHER;
869                                 goto done;
870                         }
871
872                         if (msg->elements[i].num_values == 0) {
873                                 /* Delete the whole attribute */
874                                 ret = msg_delete_attribute(module, ldb, msg2,
875                                                            msg->elements[i].name);
876                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
877                                     control_permissive) {
878                                         ret = LDB_SUCCESS;
879                                 } else {
880                                         ldb_asprintf_errstring(ldb,
881                                                                "attribute '%s': no such attribute for delete on '%s'",
882                                                                msg->elements[i].name, dn);
883                                 }
884                                 if (ret != LDB_SUCCESS) {
885                                         goto done;
886                                 }
887                         } else {
888                                 /* Delete specified values from an attribute */
889                                 for (j=0; j < msg->elements[i].num_values; j++) {
890                                         ret = msg_delete_element(module,
891                                                                  msg2,
892                                                                  msg->elements[i].name,
893                                                                  &msg->elements[i].values[j]);
894                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
895                                             control_permissive) {
896                                                 ret = LDB_SUCCESS;
897                                         } else {
898                                                 ldb_asprintf_errstring(ldb,
899                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
900                                                                        msg->elements[i].name, dn);
901                                         }
902                                         if (ret != LDB_SUCCESS) {
903                                                 goto done;
904                                         }
905                                 }
906                         }
907                         break;
908                 default:
909                         ldb_asprintf_errstring(ldb,
910                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
911                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
912                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
913                         ret = LDB_ERR_PROTOCOL_ERROR;
914                         goto done;
915                 }
916         }
917
918         ret = ltdb_store(module, msg2, TDB_MODIFY);
919         if (ret != LDB_SUCCESS) {
920                 goto done;
921         }
922
923         ret = ltdb_modified(module, msg2->dn);
924         if (ret != LDB_SUCCESS) {
925                 goto done;
926         }
927
928 done:
929         talloc_free(tdb_key.dptr);
930         return ret;
931 }
932
933 /*
934   modify a record
935 */
936 static int ltdb_modify(struct ltdb_context *ctx)
937 {
938         struct ldb_module *module = ctx->module;
939         struct ldb_request *req = ctx->req;
940         int ret = LDB_SUCCESS;
941
942         ret = ltdb_check_special_dn(module, req->op.mod.message);
943         if (ret != LDB_SUCCESS) {
944                 return ret;
945         }
946
947         ldb_request_set_state(req, LDB_ASYNC_PENDING);
948
949         if (ltdb_cache_load(module) != 0) {
950                 return LDB_ERR_OPERATIONS_ERROR;
951         }
952
953         ret = ltdb_modify_internal(module, req->op.mod.message, req);
954
955         return ret;
956 }
957
958 /*
959   rename a record
960 */
961 static int ltdb_rename(struct ltdb_context *ctx)
962 {
963         struct ldb_module *module = ctx->module;
964         struct ldb_request *req = ctx->req;
965         struct ldb_message *msg;
966         int ret = LDB_SUCCESS;
967
968         ldb_request_set_state(req, LDB_ASYNC_PENDING);
969
970         if (ltdb_cache_load(ctx->module) != 0) {
971                 return LDB_ERR_OPERATIONS_ERROR;
972         }
973
974         msg = ldb_msg_new(ctx);
975         if (msg == NULL) {
976                 return LDB_ERR_OPERATIONS_ERROR;
977         }
978
979         /* in case any attribute of the message was indexed, we need
980            to fetch the old record */
981         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
982         if (ret != LDB_SUCCESS) {
983                 /* not finding the old record is an error */
984                 return ret;
985         }
986
987         /* Always delete first then add, to avoid conflicts with
988          * unique indexes. We rely on the transaction to make this
989          * atomic
990          */
991         ret = ltdb_delete_internal(module, msg->dn);
992         if (ret != LDB_SUCCESS) {
993                 return ret;
994         }
995
996         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
997         if (msg->dn == NULL) {
998                 return LDB_ERR_OPERATIONS_ERROR;
999         }
1000
1001         ret = ltdb_add_internal(module, msg);
1002
1003         return ret;
1004 }
1005
1006 static int ltdb_start_trans(struct ldb_module *module)
1007 {
1008         void *data = ldb_module_get_private(module);
1009         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1010
1011         if (tdb_transaction_start(ltdb->tdb) != 0) {
1012                 return ltdb_err_map(tdb_error(ltdb->tdb));
1013         }
1014
1015         ltdb->in_transaction++;
1016
1017         ltdb_index_transaction_start(module);
1018
1019         return LDB_SUCCESS;
1020 }
1021
1022 static int ltdb_prepare_commit(struct ldb_module *module)
1023 {
1024         void *data = ldb_module_get_private(module);
1025         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1026
1027         if (ltdb->in_transaction != 1) {
1028                 return LDB_SUCCESS;
1029         }
1030
1031         if (ltdb_index_transaction_commit(module) != 0) {
1032                 tdb_transaction_cancel(ltdb->tdb);
1033                 ltdb->in_transaction--;
1034                 return ltdb_err_map(tdb_error(ltdb->tdb));
1035         }
1036
1037         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1038                 ltdb->in_transaction--;
1039                 return ltdb_err_map(tdb_error(ltdb->tdb));
1040         }
1041
1042         ltdb->prepared_commit = true;
1043
1044         return LDB_SUCCESS;
1045 }
1046
1047 static int ltdb_end_trans(struct ldb_module *module)
1048 {
1049         void *data = ldb_module_get_private(module);
1050         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1051
1052         if (!ltdb->prepared_commit) {
1053                 int ret = ltdb_prepare_commit(module);
1054                 if (ret != LDB_SUCCESS) {
1055                         return ret;
1056                 }
1057         }
1058
1059         ltdb->in_transaction--;
1060         ltdb->prepared_commit = false;
1061
1062         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1063                 return ltdb_err_map(tdb_error(ltdb->tdb));
1064         }
1065
1066         return LDB_SUCCESS;
1067 }
1068
1069 static int ltdb_del_trans(struct ldb_module *module)
1070 {
1071         void *data = ldb_module_get_private(module);
1072         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1073
1074         ltdb->in_transaction--;
1075
1076         if (ltdb_index_transaction_cancel(module) != 0) {
1077                 tdb_transaction_cancel(ltdb->tdb);
1078                 return ltdb_err_map(tdb_error(ltdb->tdb));
1079         }
1080
1081         tdb_transaction_cancel(ltdb->tdb);
1082         return LDB_SUCCESS;
1083 }
1084
1085 /*
1086   return sequenceNumber from @BASEINFO
1087 */
1088 static int ltdb_sequence_number(struct ltdb_context *ctx,
1089                                 struct ldb_extended **ext)
1090 {
1091         struct ldb_context *ldb;
1092         struct ldb_module *module = ctx->module;
1093         struct ldb_request *req = ctx->req;
1094         TALLOC_CTX *tmp_ctx = NULL;
1095         struct ldb_seqnum_request *seq;
1096         struct ldb_seqnum_result *res;
1097         struct ldb_message *msg = NULL;
1098         struct ldb_dn *dn;
1099         const char *date;
1100         int ret = LDB_SUCCESS;
1101
1102         ldb = ldb_module_get_ctx(module);
1103
1104         seq = talloc_get_type(req->op.extended.data,
1105                                 struct ldb_seqnum_request);
1106         if (seq == NULL) {
1107                 return LDB_ERR_OPERATIONS_ERROR;
1108         }
1109
1110         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1111
1112         if (ltdb_lock_read(module) != 0) {
1113                 return LDB_ERR_OPERATIONS_ERROR;
1114         }
1115
1116         res = talloc_zero(req, struct ldb_seqnum_result);
1117         if (res == NULL) {
1118                 ret = LDB_ERR_OPERATIONS_ERROR;
1119                 goto done;
1120         }
1121
1122         tmp_ctx = talloc_new(req);
1123         if (tmp_ctx == NULL) {
1124                 ret = LDB_ERR_OPERATIONS_ERROR;
1125                 goto done;
1126         }
1127
1128         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1129         if (dn == NULL) {
1130                 ret = LDB_ERR_OPERATIONS_ERROR;
1131                 goto done;
1132         }
1133
1134         msg = ldb_msg_new(tmp_ctx);
1135         if (msg == NULL) {
1136                 ret = LDB_ERR_OPERATIONS_ERROR;
1137                 goto done;
1138         }
1139
1140         ret = ltdb_search_dn1(module, dn, msg);
1141         if (ret != LDB_SUCCESS) {
1142                 goto done;
1143         }
1144
1145         switch (seq->type) {
1146         case LDB_SEQ_HIGHEST_SEQ:
1147                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1148                 break;
1149         case LDB_SEQ_NEXT:
1150                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1151                 res->seq_num++;
1152                 break;
1153         case LDB_SEQ_HIGHEST_TIMESTAMP:
1154                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1155                 if (date) {
1156                         res->seq_num = ldb_string_to_time(date);
1157                 } else {
1158                         res->seq_num = 0;
1159                         /* zero is as good as anything when we don't know */
1160                 }
1161                 break;
1162         }
1163
1164         *ext = talloc_zero(req, struct ldb_extended);
1165         if (*ext == NULL) {
1166                 ret = LDB_ERR_OPERATIONS_ERROR;
1167                 goto done;
1168         }
1169         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1170         (*ext)->data = talloc_steal(*ext, res);
1171
1172 done:
1173         talloc_free(tmp_ctx);
1174         ltdb_unlock_read(module);
1175         return ret;
1176 }
1177
1178 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1179 {
1180         struct ldb_context *ldb;
1181         struct ldb_request *req;
1182         struct ldb_reply *ares;
1183
1184         ldb = ldb_module_get_ctx(ctx->module);
1185         req = ctx->req;
1186
1187         /* if we already returned an error just return */
1188         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1189                 return;
1190         }
1191
1192         ares = talloc_zero(req, struct ldb_reply);
1193         if (!ares) {
1194                 ldb_oom(ldb);
1195                 req->callback(req, NULL);
1196                 return;
1197         }
1198         ares->type = LDB_REPLY_DONE;
1199         ares->error = error;
1200
1201         req->callback(req, ares);
1202 }
1203
1204 static void ltdb_timeout(struct tevent_context *ev,
1205                           struct tevent_timer *te,
1206                           struct timeval t,
1207                           void *private_data)
1208 {
1209         struct ltdb_context *ctx;
1210         ctx = talloc_get_type(private_data, struct ltdb_context);
1211
1212         if (!ctx->request_terminated) {
1213                 /* request is done now */
1214                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1215         }
1216
1217         if (!ctx->request_terminated) {
1218                 /* neutralize the spy */
1219                 ctx->spy->ctx = NULL;
1220         }
1221         talloc_free(ctx);
1222 }
1223
1224 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1225                                         struct ldb_extended *ext,
1226                                         int error)
1227 {
1228         struct ldb_context *ldb;
1229         struct ldb_request *req;
1230         struct ldb_reply *ares;
1231
1232         ldb = ldb_module_get_ctx(ctx->module);
1233         req = ctx->req;
1234
1235         /* if we already returned an error just return */
1236         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1237                 return;
1238         }
1239
1240         ares = talloc_zero(req, struct ldb_reply);
1241         if (!ares) {
1242                 ldb_oom(ldb);
1243                 req->callback(req, NULL);
1244                 return;
1245         }
1246         ares->type = LDB_REPLY_DONE;
1247         ares->response = ext;
1248         ares->error = error;
1249
1250         req->callback(req, ares);
1251 }
1252
1253 static void ltdb_handle_extended(struct ltdb_context *ctx)
1254 {
1255         struct ldb_extended *ext = NULL;
1256         int ret;
1257
1258         if (strcmp(ctx->req->op.extended.oid,
1259                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1260                 /* get sequence number */
1261                 ret = ltdb_sequence_number(ctx, &ext);
1262         } else {
1263                 /* not recognized */
1264                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1265         }
1266
1267         ltdb_request_extended_done(ctx, ext, ret);
1268 }
1269
1270 static void ltdb_callback(struct tevent_context *ev,
1271                           struct tevent_timer *te,
1272                           struct timeval t,
1273                           void *private_data)
1274 {
1275         struct ltdb_context *ctx;
1276         int ret;
1277
1278         ctx = talloc_get_type(private_data, struct ltdb_context);
1279
1280         if (ctx->request_terminated) {
1281                 goto done;
1282         }
1283
1284         switch (ctx->req->operation) {
1285         case LDB_SEARCH:
1286                 ret = ltdb_search(ctx);
1287                 break;
1288         case LDB_ADD:
1289                 ret = ltdb_add(ctx);
1290                 break;
1291         case LDB_MODIFY:
1292                 ret = ltdb_modify(ctx);
1293                 break;
1294         case LDB_DELETE:
1295                 ret = ltdb_delete(ctx);
1296                 break;
1297         case LDB_RENAME:
1298                 ret = ltdb_rename(ctx);
1299                 break;
1300         case LDB_EXTENDED:
1301                 ltdb_handle_extended(ctx);
1302                 goto done;
1303         default:
1304                 /* no other op supported */
1305                 ret = LDB_ERR_PROTOCOL_ERROR;
1306         }
1307
1308         if (!ctx->request_terminated) {
1309                 /* request is done now */
1310                 ltdb_request_done(ctx, ret);
1311         }
1312
1313 done:
1314         if (!ctx->request_terminated) {
1315                 /* neutralize the spy */
1316                 ctx->spy->ctx = NULL;
1317         }
1318         talloc_free(ctx);
1319 }
1320
1321 static int ltdb_request_destructor(void *ptr)
1322 {
1323         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1324
1325         if (spy->ctx != NULL) {
1326                 spy->ctx->request_terminated = true;
1327         }
1328
1329         return 0;
1330 }
1331
1332 static int ltdb_handle_request(struct ldb_module *module,
1333                                struct ldb_request *req)
1334 {
1335         struct ldb_control *control_permissive;
1336         struct ldb_context *ldb;
1337         struct tevent_context *ev;
1338         struct ltdb_context *ac;
1339         struct tevent_timer *te;
1340         struct timeval tv;
1341         unsigned int i;
1342
1343         ldb = ldb_module_get_ctx(module);
1344
1345         control_permissive = ldb_request_get_control(req,
1346                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1347
1348         for (i = 0; req->controls && req->controls[i]; i++) {
1349                 if (req->controls[i]->critical &&
1350                     req->controls[i] != control_permissive) {
1351                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1352                                                req->controls[i]->oid);
1353                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1354                 }
1355         }
1356
1357         if (req->starttime == 0 || req->timeout == 0) {
1358                 ldb_set_errstring(ldb, "Invalid timeout settings");
1359                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1360         }
1361
1362         ev = ldb_get_event_context(ldb);
1363
1364         ac = talloc_zero(ldb, struct ltdb_context);
1365         if (ac == NULL) {
1366                 ldb_oom(ldb);
1367                 return LDB_ERR_OPERATIONS_ERROR;
1368         }
1369
1370         ac->module = module;
1371         ac->req = req;
1372
1373         tv.tv_sec = 0;
1374         tv.tv_usec = 0;
1375         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1376         if (NULL == te) {
1377                 talloc_free(ac);
1378                 return LDB_ERR_OPERATIONS_ERROR;
1379         }
1380
1381         tv.tv_sec = req->starttime + req->timeout;
1382         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1383         if (NULL == ac->timeout_event) {
1384                 talloc_free(ac);
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         /* set a spy so that we do not try to use the request context
1389          * if it is freed before ltdb_callback fires */
1390         ac->spy = talloc(req, struct ltdb_req_spy);
1391         if (NULL == ac->spy) {
1392                 talloc_free(ac);
1393                 return LDB_ERR_OPERATIONS_ERROR;
1394         }
1395         ac->spy->ctx = ac;
1396
1397         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1398
1399         return LDB_SUCCESS;
1400 }
1401
1402 static int ltdb_init_rootdse(struct ldb_module *module)
1403 {
1404         struct ldb_context *ldb;
1405         int ret;
1406
1407         ldb = ldb_module_get_ctx(module);
1408
1409         ret = ldb_mod_register_control(module,
1410                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1411         /* ignore errors on this - we expect it for non-sam databases */
1412
1413         /* there can be no module beyond the backend, just return */
1414         return LDB_SUCCESS;
1415 }
1416
1417 static const struct ldb_module_ops ltdb_ops = {
1418         .name              = "tdb",
1419         .init_context      = ltdb_init_rootdse,
1420         .search            = ltdb_handle_request,
1421         .add               = ltdb_handle_request,
1422         .modify            = ltdb_handle_request,
1423         .del               = ltdb_handle_request,
1424         .rename            = ltdb_handle_request,
1425         .extended          = ltdb_handle_request,
1426         .start_transaction = ltdb_start_trans,
1427         .end_transaction   = ltdb_end_trans,
1428         .prepare_commit    = ltdb_prepare_commit,
1429         .del_transaction   = ltdb_del_trans,
1430 };
1431
1432 /*
1433   connect to the database
1434 */
1435 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1436                         unsigned int flags, const char *options[],
1437                         struct ldb_module **_module)
1438 {
1439         struct ldb_module *module;
1440         const char *path;
1441         int tdb_flags, open_flags;
1442         struct ltdb_private *ltdb;
1443
1444         /* parse the url */
1445         if (strchr(url, ':')) {
1446                 if (strncmp(url, "tdb://", 6) != 0) {
1447                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1448                                   "Invalid tdb URL '%s'", url);
1449                         return LDB_ERR_OPERATIONS_ERROR;
1450                 }
1451                 path = url+6;
1452         } else {
1453                 path = url;
1454         }
1455
1456         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1457
1458         /* check for the 'nosync' option */
1459         if (flags & LDB_FLG_NOSYNC) {
1460                 tdb_flags |= TDB_NOSYNC;
1461         }
1462
1463         /* and nommap option */
1464         if (flags & LDB_FLG_NOMMAP) {
1465                 tdb_flags |= TDB_NOMMAP;
1466         }
1467
1468         if (flags & LDB_FLG_RDONLY) {
1469                 open_flags = O_RDONLY;
1470         } else {
1471                 open_flags = O_CREAT | O_RDWR;
1472         }
1473
1474         ltdb = talloc_zero(ldb, struct ltdb_private);
1475         if (!ltdb) {
1476                 ldb_oom(ldb);
1477                 return LDB_ERR_OPERATIONS_ERROR;
1478         }
1479
1480         /* note that we use quite a large default hash size */
1481         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1482                                    tdb_flags, open_flags,
1483                                    ldb_get_create_perms(ldb), ldb);
1484         if (!ltdb->tdb) {
1485                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1486                           "Unable to open tdb '%s'", path);
1487                 talloc_free(ltdb);
1488                 return LDB_ERR_OPERATIONS_ERROR;
1489         }
1490
1491         if (getenv("LDB_WARN_UNINDEXED")) {
1492                 ltdb->warn_unindexed = true;
1493         }
1494
1495         ltdb->sequence_number = 0;
1496
1497         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1498         if (!module) {
1499                 talloc_free(ltdb);
1500                 return LDB_ERR_OPERATIONS_ERROR;
1501         }
1502         ldb_module_set_private(module, ltdb);
1503         talloc_steal(module, ltdb);
1504
1505         if (ltdb_cache_load(module) != 0) {
1506                 talloc_free(module);
1507                 talloc_free(ltdb);
1508                 return LDB_ERR_OPERATIONS_ERROR;
1509         }
1510
1511         *_module = module;
1512         return LDB_SUCCESS;
1513 }
1514
1515 int ldb_tdb_init(const char *version)
1516 {
1517         LDB_MODULE_CHECK_VERSION(version);
1518         return ldb_register_backend("tdb", ltdb_connect, false);
1519 }