16e556d78d958ff67fb0e242f40afc0bb693757a
[nivanova/samba-autobuild/.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include <lib/tdb_compat/tdb_compat.h>
54
55
56 /*
57   map a tdb error code to a ldb error code
58 */
59 int ltdb_err_map(enum TDB_ERROR tdb_code)
60 {
61         switch (tdb_code) {
62         case TDB_SUCCESS:
63                 return LDB_SUCCESS;
64         case TDB_ERR_CORRUPT:
65         case TDB_ERR_OOM:
66         case TDB_ERR_EINVAL:
67                 return LDB_ERR_OPERATIONS_ERROR;
68         case TDB_ERR_IO:
69                 return LDB_ERR_PROTOCOL_ERROR;
70         case TDB_ERR_LOCK:
71         case TDB_ERR_NOLOCK:
72                 return LDB_ERR_BUSY;
73         case TDB_ERR_LOCK_TIMEOUT:
74                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
75         case TDB_ERR_EXISTS:
76                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
77         case TDB_ERR_NOEXIST:
78                 return LDB_ERR_NO_SUCH_OBJECT;
79         case TDB_ERR_RDONLY:
80                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
81         default:
82                 break;
83         }
84         return LDB_ERR_OTHER;
85 }
86
87 /*
88   lock the database for read - use by ltdb_search and ltdb_sequence_number
89 */
90 int ltdb_lock_read(struct ldb_module *module)
91 {
92         void *data = ldb_module_get_private(module);
93         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
94         int ret = 0;
95
96         if (ltdb->in_transaction == 0 &&
97             ltdb->read_lock_count == 0) {
98                 ret = tdb_lockall_read(ltdb->tdb);
99         }
100         if (ret == 0) {
101                 ltdb->read_lock_count++;
102         }
103         return ret;
104 }
105
106 /*
107   unlock the database after a ltdb_lock_read()
108 */
109 int ltdb_unlock_read(struct ldb_module *module)
110 {
111         void *data = ldb_module_get_private(module);
112         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
113         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
114                 tdb_unlockall_read(ltdb->tdb);
115                 return 0;
116         }
117         ltdb->read_lock_count--;
118         return 0;
119 }
120
121
122 /*
123   form a TDB_DATA for a record key
124   caller frees
125
126   note that the key for a record can depend on whether the
127   dn refers to a case sensitive index record or not
128 */
129 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
130 {
131         struct ldb_context *ldb = ldb_module_get_ctx(module);
132         TDB_DATA key;
133         char *key_str = NULL;
134         const char *dn_folded = NULL;
135
136         /*
137           most DNs are case insensitive. The exception is index DNs for
138           case sensitive attributes
139
140           there are 3 cases dealt with in this code:
141
142           1) if the dn doesn't start with @ then uppercase the attribute
143              names and the attributes values of case insensitive attributes
144           2) if the dn starts with @ then leave it alone -
145              the indexing code handles the rest
146         */
147
148         dn_folded = ldb_dn_get_casefold(dn);
149         if (!dn_folded) {
150                 goto failed;
151         }
152
153         key_str = talloc_strdup(ldb, "DN=");
154         if (!key_str) {
155                 goto failed;
156         }
157
158         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
159         if (!key_str) {
160                 goto failed;
161         }
162
163         key.dptr = (uint8_t *)key_str;
164         key.dsize = strlen(key_str) + 1;
165
166         return key;
167
168 failed:
169         errno = ENOMEM;
170         key.dptr = NULL;
171         key.dsize = 0;
172         return key;
173 }
174
175 /*
176   check special dn's have valid attributes
177   currently only @ATTRIBUTES is checked
178 */
179 static int ltdb_check_special_dn(struct ldb_module *module,
180                                  const struct ldb_message *msg)
181 {
182         struct ldb_context *ldb = ldb_module_get_ctx(module);
183         unsigned int i, j;
184
185         if (! ldb_dn_is_special(msg->dn) ||
186             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
187                 return LDB_SUCCESS;
188         }
189
190         /* we have @ATTRIBUTES, let's check attributes are fine */
191         /* should we check that we deny multivalued attributes ? */
192         for (i = 0; i < msg->num_elements; i++) {
193                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
194
195                 for (j = 0; j < msg->elements[i].num_values; j++) {
196                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
197                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
198                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
199                         }
200                 }
201         }
202
203         return LDB_SUCCESS;
204 }
205
206
207 /*
208   we've made a modification to a dn - possibly reindex and
209   update sequence number
210 */
211 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
212 {
213         int ret = LDB_SUCCESS;
214         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
215
216         /* only allow modifies inside a transaction, otherwise the
217          * ldb is unsafe */
218         if (ltdb->in_transaction == 0) {
219                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
220                 return LDB_ERR_OPERATIONS_ERROR;
221         }
222
223         if (ldb_dn_is_special(dn) &&
224             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
225              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
226                 ret = ltdb_reindex(module);
227         }
228
229         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
230         if (ret == LDB_SUCCESS &&
231             !(ldb_dn_is_special(dn) &&
232               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
233                 ret = ltdb_increase_sequence_number(module);
234         }
235
236         /* If the modify was to @OPTIONS, reload the cache */
237         if (ret == LDB_SUCCESS &&
238             ldb_dn_is_special(dn) &&
239             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
240                 ret = ltdb_cache_reload(module);
241         }
242
243         return ret;
244 }
245
246 /*
247   store a record into the db
248 */
249 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
250 {
251         void *data = ldb_module_get_private(module);
252         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
253         TDB_DATA tdb_key, tdb_data;
254         int ret = LDB_SUCCESS;
255
256         tdb_key = ltdb_key(module, msg->dn);
257         if (tdb_key.dptr == NULL) {
258                 return LDB_ERR_OTHER;
259         }
260
261         ret = ltdb_pack_data(module, msg, &tdb_data);
262         if (ret == -1) {
263                 talloc_free(tdb_key.dptr);
264                 return LDB_ERR_OTHER;
265         }
266
267         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
268         if (ret != 0) {
269                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
270                 goto done;
271         }
272
273 done:
274         talloc_free(tdb_key.dptr);
275         talloc_free(tdb_data.dptr);
276
277         return ret;
278 }
279
280
281 /*
282   check if a attribute is a single valued, for a given element
283  */
284 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
285                                   struct ldb_message_element *el)
286 {
287         if (!a) return false;
288         if (el != NULL) {
289                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
290                         /* override from a ldb module, for example
291                            used for the description field, which is
292                            marked multi-valued in the schema but which
293                            should not actually accept multiple
294                            values */
295                         return true;
296                 }
297                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
298                         /* override from a ldb module, for example used for
299                            deleted linked attribute entries */
300                         return false;
301                 }
302         }
303         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
304                 return true;
305         }
306         return false;
307 }
308
309 static int ltdb_add_internal(struct ldb_module *module,
310                              const struct ldb_message *msg)
311 {
312         struct ldb_context *ldb = ldb_module_get_ctx(module);
313         int ret = LDB_SUCCESS;
314         unsigned int i;
315
316         for (i=0;i<msg->num_elements;i++) {
317                 struct ldb_message_element *el = &msg->elements[i];
318                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
319
320                 if (el->num_values == 0) {
321                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
322                                                el->name, ldb_dn_get_linearized(msg->dn));
323                         return LDB_ERR_CONSTRAINT_VIOLATION;
324                 }
325                 if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
326                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
327                                                el->name, ldb_dn_get_linearized(msg->dn));
328                         return LDB_ERR_CONSTRAINT_VIOLATION;
329                 }
330         }
331
332         ret = ltdb_store(module, msg, TDB_INSERT);
333         if (ret != LDB_SUCCESS) {
334                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
335                         ldb_asprintf_errstring(ldb,
336                                                "Entry %s already exists",
337                                                ldb_dn_get_linearized(msg->dn));
338                 }
339                 return ret;
340         }
341
342         ret = ltdb_index_add_new(module, msg);
343         if (ret != LDB_SUCCESS) {
344                 return ret;
345         }
346
347         ret = ltdb_modified(module, msg->dn);
348
349         return ret;
350 }
351
352 /*
353   add a record to the database
354 */
355 static int ltdb_add(struct ltdb_context *ctx)
356 {
357         struct ldb_module *module = ctx->module;
358         struct ldb_request *req = ctx->req;
359         int ret = LDB_SUCCESS;
360
361         ret = ltdb_check_special_dn(module, req->op.add.message);
362         if (ret != LDB_SUCCESS) {
363                 return ret;
364         }
365
366         ldb_request_set_state(req, LDB_ASYNC_PENDING);
367
368         if (ltdb_cache_load(module) != 0) {
369                 return LDB_ERR_OPERATIONS_ERROR;
370         }
371
372         ret = ltdb_add_internal(module, req->op.add.message);
373
374         return ret;
375 }
376
377 /*
378   delete a record from the database, not updating indexes (used for deleting
379   index records)
380 */
381 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
382 {
383         void *data = ldb_module_get_private(module);
384         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
385         TDB_DATA tdb_key;
386         int ret;
387
388         tdb_key = ltdb_key(module, dn);
389         if (!tdb_key.dptr) {
390                 return LDB_ERR_OTHER;
391         }
392
393         ret = tdb_delete(ltdb->tdb, tdb_key);
394         talloc_free(tdb_key.dptr);
395
396         if (ret != 0) {
397                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
398         }
399
400         return ret;
401 }
402
403 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
404 {
405         struct ldb_message *msg;
406         int ret = LDB_SUCCESS;
407
408         msg = ldb_msg_new(module);
409         if (msg == NULL) {
410                 return LDB_ERR_OPERATIONS_ERROR;
411         }
412
413         /* in case any attribute of the message was indexed, we need
414            to fetch the old record */
415         ret = ltdb_search_dn1(module, dn, msg);
416         if (ret != LDB_SUCCESS) {
417                 /* not finding the old record is an error */
418                 goto done;
419         }
420
421         ret = ltdb_delete_noindex(module, dn);
422         if (ret != LDB_SUCCESS) {
423                 goto done;
424         }
425
426         /* remove any indexed attributes */
427         ret = ltdb_index_delete(module, msg);
428         if (ret != LDB_SUCCESS) {
429                 goto done;
430         }
431
432         ret = ltdb_modified(module, dn);
433         if (ret != LDB_SUCCESS) {
434                 goto done;
435         }
436
437 done:
438         talloc_free(msg);
439         return ret;
440 }
441
442 /*
443   delete a record from the database
444 */
445 static int ltdb_delete(struct ltdb_context *ctx)
446 {
447         struct ldb_module *module = ctx->module;
448         struct ldb_request *req = ctx->req;
449         int ret = LDB_SUCCESS;
450
451         ldb_request_set_state(req, LDB_ASYNC_PENDING);
452
453         if (ltdb_cache_load(module) != 0) {
454                 return LDB_ERR_OPERATIONS_ERROR;
455         }
456
457         ret = ltdb_delete_internal(module, req->op.del.dn);
458
459         return ret;
460 }
461
462 /*
463   find an element by attribute name. At the moment this does a linear search,
464   it should be re-coded to use a binary search once all places that modify
465   records guarantee sorted order
466
467   return the index of the first matching element if found, otherwise -1
468 */
469 static int find_element(const struct ldb_message *msg, const char *name)
470 {
471         unsigned int i;
472         for (i=0;i<msg->num_elements;i++) {
473                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
474                         return i;
475                 }
476         }
477         return -1;
478 }
479
480
481 /*
482   add an element to an existing record. Assumes a elements array that we
483   can call re-alloc on, and assumed that we can re-use the data pointers from
484   the passed in additional values. Use with care!
485
486   returns 0 on success, -1 on failure (and sets errno)
487 */
488 static int ltdb_msg_add_element(struct ldb_context *ldb,
489                                 struct ldb_message *msg,
490                                 struct ldb_message_element *el)
491 {
492         struct ldb_message_element *e2;
493         unsigned int i;
494
495         if (el->num_values == 0) {
496                 /* nothing to do here - we don't add empty elements */
497                 return 0;
498         }
499
500         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
501                               msg->num_elements+1);
502         if (!e2) {
503                 errno = ENOMEM;
504                 return -1;
505         }
506
507         msg->elements = e2;
508
509         e2 = &msg->elements[msg->num_elements];
510
511         e2->name = el->name;
512         e2->flags = el->flags;
513         e2->values = talloc_array(msg->elements,
514                                   struct ldb_val, el->num_values);
515         if (!e2->values) {
516                 errno = ENOMEM;
517                 return -1;
518         }
519         for (i=0;i<el->num_values;i++) {
520                 e2->values[i] = el->values[i];
521         }
522         e2->num_values = el->num_values;
523
524         ++msg->num_elements;
525
526         return 0;
527 }
528
529 /*
530   delete all elements having a specified attribute name
531 */
532 static int msg_delete_attribute(struct ldb_module *module,
533                                 struct ldb_context *ldb,
534                                 struct ldb_message *msg, const char *name)
535 {
536         unsigned int i;
537         int ret;
538         struct ldb_message_element *el;
539
540         el = ldb_msg_find_element(msg, name);
541         if (el == NULL) {
542                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
543         }
544         i = el - msg->elements;
545
546         ret = ltdb_index_del_element(module, msg->dn, el);
547         if (ret != LDB_SUCCESS) {
548                 return ret;
549         }
550
551         talloc_free(el->values);
552         if (msg->num_elements > (i+1)) {
553                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
554         }
555         msg->num_elements--;
556         msg->elements = talloc_realloc(msg, msg->elements,
557                                        struct ldb_message_element,
558                                        msg->num_elements);
559         return LDB_SUCCESS;
560 }
561
562 /*
563   delete all elements matching an attribute name/value
564
565   return LDB Error on failure
566 */
567 static int msg_delete_element(struct ldb_module *module,
568                               struct ldb_message *msg,
569                               const char *name,
570                               const struct ldb_val *val)
571 {
572         struct ldb_context *ldb = ldb_module_get_ctx(module);
573         unsigned int i;
574         int found, ret;
575         struct ldb_message_element *el;
576         const struct ldb_schema_attribute *a;
577
578         found = find_element(msg, name);
579         if (found == -1) {
580                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
581         }
582
583         i = (unsigned int) found;
584         el = &(msg->elements[i]);
585
586         a = ldb_schema_attribute_by_name(ldb, el->name);
587
588         for (i=0;i<el->num_values;i++) {
589                 bool matched;
590                 if (a->syntax->operator_fn) {
591                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
592                                                      &el->values[i], val, &matched);
593                         if (ret != LDB_SUCCESS) return ret;
594                 } else {
595                         matched = (a->syntax->comparison_fn(ldb, ldb,
596                                                             &el->values[i], val) == 0);
597                 }
598                 if (matched) {
599                         if (el->num_values == 1) {
600                                 return msg_delete_attribute(module, ldb, msg, name);
601                         }
602
603                         ret = ltdb_index_del_value(module, msg->dn, el, i);
604                         if (ret != LDB_SUCCESS) {
605                                 return ret;
606                         }
607
608                         if (i<el->num_values-1) {
609                                 memmove(&el->values[i], &el->values[i+1],
610                                         sizeof(el->values[i])*
611                                                 (el->num_values-(i+1)));
612                         }
613                         el->num_values--;
614
615                         /* per definition we find in a canonicalised message an
616                            attribute value only once. So we are finished here */
617                         return LDB_SUCCESS;
618                 }
619         }
620
621         /* Not found */
622         return LDB_ERR_NO_SUCH_ATTRIBUTE;
623 }
624
625
626 /*
627   modify a record - internal interface
628
629   yuck - this is O(n^2). Luckily n is usually small so we probably
630   get away with it, but if we ever have really large attribute lists
631   then we'll need to look at this again
632
633   'req' is optional, and is used to specify controls if supplied
634 */
635 int ltdb_modify_internal(struct ldb_module *module,
636                          const struct ldb_message *msg,
637                          struct ldb_request *req)
638 {
639         struct ldb_context *ldb = ldb_module_get_ctx(module);
640         void *data = ldb_module_get_private(module);
641         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
642         TDB_DATA tdb_key, tdb_data;
643         struct ldb_message *msg2;
644         unsigned int i, j, k;
645         int ret = LDB_SUCCESS, idx;
646         struct ldb_control *control_permissive = NULL;
647
648         if (req) {
649                 control_permissive = ldb_request_get_control(req,
650                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
651         }
652
653         tdb_key = ltdb_key(module, msg->dn);
654         if (!tdb_key.dptr) {
655                 return LDB_ERR_OTHER;
656         }
657
658         tdb_data = tdb_fetch_compat(ltdb->tdb, tdb_key);
659         if (!tdb_data.dptr) {
660                 talloc_free(tdb_key.dptr);
661                 return ltdb_err_map(tdb_error(ltdb->tdb));
662         }
663
664         msg2 = ldb_msg_new(tdb_key.dptr);
665         if (msg2 == NULL) {
666                 free(tdb_data.dptr);
667                 ret = LDB_ERR_OTHER;
668                 goto done;
669         }
670
671         ret = ltdb_unpack_data(module, &tdb_data, msg2);
672         free(tdb_data.dptr);
673         if (ret == -1) {
674                 ret = LDB_ERR_OTHER;
675                 goto done;
676         }
677
678         if (!msg2->dn) {
679                 msg2->dn = msg->dn;
680         }
681
682         for (i=0; i<msg->num_elements; i++) {
683                 struct ldb_message_element *el = &msg->elements[i], *el2;
684                 struct ldb_val *vals;
685                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
686                 const char *dn;
687
688                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
689                 case LDB_FLAG_MOD_ADD:
690
691                         if (el->num_values == 0) {
692                                 ldb_asprintf_errstring(ldb,
693                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
694                                                        el->name, ldb_dn_get_linearized(msg2->dn));
695                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
696                                 goto done;
697                         }
698
699                         /* make a copy of the array so that a permissive
700                          * control can remove duplicates without changing the
701                          * original values, but do not copy data as we do not
702                          * need to keep it around once the operation is
703                          * finished */
704                         if (control_permissive) {
705                                 el = talloc(msg2, struct ldb_message_element);
706                                 if (!el) {
707                                         ret = LDB_ERR_OTHER;
708                                         goto done;
709                                 }
710                                 *el = msg->elements[i];
711                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
712                                 if (el->values == NULL) {
713                                         ret = LDB_ERR_OTHER;
714                                         goto done;
715                                 }
716                                 for (j = 0; j < el->num_values; j++) {
717                                         el->values[j] = msg->elements[i].values[j];
718                                 }
719                         }
720
721                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
722                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
723                                                        el->name, ldb_dn_get_linearized(msg2->dn));
724                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
725                                 goto done;
726                         }
727
728                         /* Checks if element already exists */
729                         idx = find_element(msg2, el->name);
730                         if (idx == -1) {
731                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
732                                         ret = LDB_ERR_OTHER;
733                                         goto done;
734                                 }
735                                 ret = ltdb_index_add_element(module, msg2->dn,
736                                                              el);
737                                 if (ret != LDB_SUCCESS) {
738                                         goto done;
739                                 }
740                         } else {
741                                 j = (unsigned int) idx;
742                                 el2 = &(msg2->elements[j]);
743
744                                 /* We cannot add another value on a existing one
745                                    if the attribute is single-valued */
746                                 if (ldb_tdb_single_valued(a, el)) {
747                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
748                                                                el->name, ldb_dn_get_linearized(msg2->dn));
749                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
750                                         goto done;
751                                 }
752
753                                 /* Check that values don't exist yet on multi-
754                                    valued attributes or aren't provided twice */
755                                 for (j = 0; j < el->num_values; j++) {
756                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
757                                                 if (control_permissive) {
758                                                         /* remove this one as if it was never added */
759                                                         el->num_values--;
760                                                         for (k = j; k < el->num_values; k++) {
761                                                                 el->values[k] = el->values[k + 1];
762                                                         }
763                                                         j--; /* rewind */
764
765                                                         continue;
766                                                 }
767
768                                                 ldb_asprintf_errstring(ldb,
769                                                                        "attribute '%s': value #%u on '%s' already exists",
770                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
771                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
772                                                 goto done;
773                                         }
774                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
775                                                 ldb_asprintf_errstring(ldb,
776                                                                        "attribute '%s': value #%u on '%s' provided more than once",
777                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
778                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
779                                                 goto done;
780                                         }
781                                 }
782
783                                 /* Now combine existing and new values to a new
784                                    attribute record */
785                                 vals = talloc_realloc(msg2->elements,
786                                                       el2->values, struct ldb_val,
787                                                       el2->num_values + el->num_values);
788                                 if (vals == NULL) {
789                                         ldb_oom(ldb);
790                                         ret = LDB_ERR_OTHER;
791                                         goto done;
792                                 }
793
794                                 for (j=0; j<el->num_values; j++) {
795                                         vals[el2->num_values + j] =
796                                                 ldb_val_dup(vals, &el->values[j]);
797                                 }
798
799                                 el2->values = vals;
800                                 el2->num_values += el->num_values;
801
802                                 ret = ltdb_index_add_element(module, msg2->dn, el);
803                                 if (ret != LDB_SUCCESS) {
804                                         goto done;
805                                 }
806                         }
807
808                         break;
809
810                 case LDB_FLAG_MOD_REPLACE:
811
812                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
813                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
814                                                        el->name, ldb_dn_get_linearized(msg2->dn));
815                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
816                                 goto done;
817                         }
818
819                         /* TODO: This is O(n^2) - replace with more efficient check */
820                         for (j=0; j<el->num_values; j++) {
821                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
822                                         ldb_asprintf_errstring(ldb,
823                                                                "attribute '%s': value #%u on '%s' provided more than once",
824                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
825                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
826                                         goto done;
827                                 }
828                         }
829
830                         /* Checks if element already exists */
831                         idx = find_element(msg2, el->name);
832                         if (idx != -1) {
833                                 j = (unsigned int) idx;
834                                 el2 = &(msg2->elements[j]);
835                                 if (ldb_msg_element_compare(el, el2) == 0) {
836                                         /* we are replacing with the same values */
837                                         continue;
838                                 }
839                         
840                                 /* Delete the attribute if it exists in the DB */
841                                 if (msg_delete_attribute(module, ldb, msg2,
842                                                          el->name) != 0) {
843                                         ret = LDB_ERR_OTHER;
844                                         goto done;
845                                 }
846                         }
847
848                         /* Recreate it with the new values */
849                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
850                                 ret = LDB_ERR_OTHER;
851                                 goto done;
852                         }
853
854                         ret = ltdb_index_add_element(module, msg2->dn, el);
855                         if (ret != LDB_SUCCESS) {
856                                 goto done;
857                         }
858
859                         break;
860
861                 case LDB_FLAG_MOD_DELETE:
862                         dn = ldb_dn_get_linearized(msg2->dn);
863                         if (dn == NULL) {
864                                 ret = LDB_ERR_OTHER;
865                                 goto done;
866                         }
867
868                         if (msg->elements[i].num_values == 0) {
869                                 /* Delete the whole attribute */
870                                 ret = msg_delete_attribute(module, ldb, msg2,
871                                                            msg->elements[i].name);
872                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
873                                     control_permissive) {
874                                         ret = LDB_SUCCESS;
875                                 } else {
876                                         ldb_asprintf_errstring(ldb,
877                                                                "attribute '%s': no such attribute for delete on '%s'",
878                                                                msg->elements[i].name, dn);
879                                 }
880                                 if (ret != LDB_SUCCESS) {
881                                         goto done;
882                                 }
883                         } else {
884                                 /* Delete specified values from an attribute */
885                                 for (j=0; j < msg->elements[i].num_values; j++) {
886                                         ret = msg_delete_element(module,
887                                                                  msg2,
888                                                                  msg->elements[i].name,
889                                                                  &msg->elements[i].values[j]);
890                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
891                                             control_permissive) {
892                                                 ret = LDB_SUCCESS;
893                                         } else {
894                                                 ldb_asprintf_errstring(ldb,
895                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
896                                                                        msg->elements[i].name, dn);
897                                         }
898                                         if (ret != LDB_SUCCESS) {
899                                                 goto done;
900                                         }
901                                 }
902                         }
903                         break;
904                 default:
905                         ldb_asprintf_errstring(ldb,
906                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
907                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
908                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
909                         ret = LDB_ERR_PROTOCOL_ERROR;
910                         goto done;
911                 }
912         }
913
914         ret = ltdb_store(module, msg2, TDB_MODIFY);
915         if (ret != LDB_SUCCESS) {
916                 goto done;
917         }
918
919         ret = ltdb_modified(module, msg2->dn);
920         if (ret != LDB_SUCCESS) {
921                 goto done;
922         }
923
924 done:
925         talloc_free(tdb_key.dptr);
926         return ret;
927 }
928
929 /*
930   modify a record
931 */
932 static int ltdb_modify(struct ltdb_context *ctx)
933 {
934         struct ldb_module *module = ctx->module;
935         struct ldb_request *req = ctx->req;
936         int ret = LDB_SUCCESS;
937
938         ret = ltdb_check_special_dn(module, req->op.mod.message);
939         if (ret != LDB_SUCCESS) {
940                 return ret;
941         }
942
943         ldb_request_set_state(req, LDB_ASYNC_PENDING);
944
945         if (ltdb_cache_load(module) != 0) {
946                 return LDB_ERR_OPERATIONS_ERROR;
947         }
948
949         ret = ltdb_modify_internal(module, req->op.mod.message, req);
950
951         return ret;
952 }
953
954 /*
955   rename a record
956 */
957 static int ltdb_rename(struct ltdb_context *ctx)
958 {
959         struct ldb_module *module = ctx->module;
960         struct ldb_request *req = ctx->req;
961         struct ldb_message *msg;
962         int ret = LDB_SUCCESS;
963
964         ldb_request_set_state(req, LDB_ASYNC_PENDING);
965
966         if (ltdb_cache_load(ctx->module) != 0) {
967                 return LDB_ERR_OPERATIONS_ERROR;
968         }
969
970         msg = ldb_msg_new(ctx);
971         if (msg == NULL) {
972                 return LDB_ERR_OPERATIONS_ERROR;
973         }
974
975         /* in case any attribute of the message was indexed, we need
976            to fetch the old record */
977         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
978         if (ret != LDB_SUCCESS) {
979                 /* not finding the old record is an error */
980                 return ret;
981         }
982
983         /* Always delete first then add, to avoid conflicts with
984          * unique indexes. We rely on the transaction to make this
985          * atomic
986          */
987         ret = ltdb_delete_internal(module, msg->dn);
988         if (ret != LDB_SUCCESS) {
989                 return ret;
990         }
991
992         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
993         if (msg->dn == NULL) {
994                 return LDB_ERR_OPERATIONS_ERROR;
995         }
996
997         ret = ltdb_add_internal(module, msg);
998
999         return ret;
1000 }
1001
1002 static int ltdb_start_trans(struct ldb_module *module)
1003 {
1004         void *data = ldb_module_get_private(module);
1005         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1006
1007         if (tdb_transaction_start(ltdb->tdb) != 0) {
1008                 return ltdb_err_map(tdb_error(ltdb->tdb));
1009         }
1010
1011         ltdb->in_transaction++;
1012
1013         ltdb_index_transaction_start(module);
1014
1015         return LDB_SUCCESS;
1016 }
1017
1018 static int ltdb_prepare_commit(struct ldb_module *module)
1019 {
1020         void *data = ldb_module_get_private(module);
1021         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1022
1023         if (ltdb->in_transaction != 1) {
1024                 return LDB_SUCCESS;
1025         }
1026
1027         if (ltdb_index_transaction_commit(module) != 0) {
1028                 tdb_transaction_cancel(ltdb->tdb);
1029                 ltdb->in_transaction--;
1030                 return ltdb_err_map(tdb_error(ltdb->tdb));
1031         }
1032
1033         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1034                 ltdb->in_transaction--;
1035                 return ltdb_err_map(tdb_error(ltdb->tdb));
1036         }
1037
1038         ltdb->prepared_commit = true;
1039
1040         return LDB_SUCCESS;
1041 }
1042
1043 static int ltdb_end_trans(struct ldb_module *module)
1044 {
1045         void *data = ldb_module_get_private(module);
1046         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1047
1048         if (!ltdb->prepared_commit) {
1049                 int ret = ltdb_prepare_commit(module);
1050                 if (ret != LDB_SUCCESS) {
1051                         return ret;
1052                 }
1053         }
1054
1055         ltdb->in_transaction--;
1056         ltdb->prepared_commit = false;
1057
1058         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1059                 return ltdb_err_map(tdb_error(ltdb->tdb));
1060         }
1061
1062         return LDB_SUCCESS;
1063 }
1064
1065 static int ltdb_del_trans(struct ldb_module *module)
1066 {
1067         void *data = ldb_module_get_private(module);
1068         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1069
1070         ltdb->in_transaction--;
1071
1072         if (ltdb_index_transaction_cancel(module) != 0) {
1073                 tdb_transaction_cancel(ltdb->tdb);
1074                 return ltdb_err_map(tdb_error(ltdb->tdb));
1075         }
1076
1077         tdb_transaction_cancel(ltdb->tdb);
1078         return LDB_SUCCESS;
1079 }
1080
1081 /*
1082   return sequenceNumber from @BASEINFO
1083 */
1084 static int ltdb_sequence_number(struct ltdb_context *ctx,
1085                                 struct ldb_extended **ext)
1086 {
1087         struct ldb_context *ldb;
1088         struct ldb_module *module = ctx->module;
1089         struct ldb_request *req = ctx->req;
1090         TALLOC_CTX *tmp_ctx = NULL;
1091         struct ldb_seqnum_request *seq;
1092         struct ldb_seqnum_result *res;
1093         struct ldb_message *msg = NULL;
1094         struct ldb_dn *dn;
1095         const char *date;
1096         int ret = LDB_SUCCESS;
1097
1098         ldb = ldb_module_get_ctx(module);
1099
1100         seq = talloc_get_type(req->op.extended.data,
1101                                 struct ldb_seqnum_request);
1102         if (seq == NULL) {
1103                 return LDB_ERR_OPERATIONS_ERROR;
1104         }
1105
1106         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1107
1108         if (ltdb_lock_read(module) != 0) {
1109                 return LDB_ERR_OPERATIONS_ERROR;
1110         }
1111
1112         res = talloc_zero(req, struct ldb_seqnum_result);
1113         if (res == NULL) {
1114                 ret = LDB_ERR_OPERATIONS_ERROR;
1115                 goto done;
1116         }
1117
1118         tmp_ctx = talloc_new(req);
1119         if (tmp_ctx == NULL) {
1120                 ret = LDB_ERR_OPERATIONS_ERROR;
1121                 goto done;
1122         }
1123
1124         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1125         if (dn == NULL) {
1126                 ret = LDB_ERR_OPERATIONS_ERROR;
1127                 goto done;
1128         }
1129
1130         msg = ldb_msg_new(tmp_ctx);
1131         if (msg == NULL) {
1132                 ret = LDB_ERR_OPERATIONS_ERROR;
1133                 goto done;
1134         }
1135
1136         ret = ltdb_search_dn1(module, dn, msg);
1137         if (ret != LDB_SUCCESS) {
1138                 goto done;
1139         }
1140
1141         switch (seq->type) {
1142         case LDB_SEQ_HIGHEST_SEQ:
1143                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1144                 break;
1145         case LDB_SEQ_NEXT:
1146                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1147                 res->seq_num++;
1148                 break;
1149         case LDB_SEQ_HIGHEST_TIMESTAMP:
1150                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1151                 if (date) {
1152                         res->seq_num = ldb_string_to_time(date);
1153                 } else {
1154                         res->seq_num = 0;
1155                         /* zero is as good as anything when we don't know */
1156                 }
1157                 break;
1158         }
1159
1160         *ext = talloc_zero(req, struct ldb_extended);
1161         if (*ext == NULL) {
1162                 ret = LDB_ERR_OPERATIONS_ERROR;
1163                 goto done;
1164         }
1165         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1166         (*ext)->data = talloc_steal(*ext, res);
1167
1168 done:
1169         talloc_free(tmp_ctx);
1170         ltdb_unlock_read(module);
1171         return ret;
1172 }
1173
1174 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1175 {
1176         struct ldb_context *ldb;
1177         struct ldb_request *req;
1178         struct ldb_reply *ares;
1179
1180         ldb = ldb_module_get_ctx(ctx->module);
1181         req = ctx->req;
1182
1183         /* if we already returned an error just return */
1184         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1185                 return;
1186         }
1187
1188         ares = talloc_zero(req, struct ldb_reply);
1189         if (!ares) {
1190                 ldb_oom(ldb);
1191                 req->callback(req, NULL);
1192                 return;
1193         }
1194         ares->type = LDB_REPLY_DONE;
1195         ares->error = error;
1196
1197         req->callback(req, ares);
1198 }
1199
1200 static void ltdb_timeout(struct tevent_context *ev,
1201                           struct tevent_timer *te,
1202                           struct timeval t,
1203                           void *private_data)
1204 {
1205         struct ltdb_context *ctx;
1206         ctx = talloc_get_type(private_data, struct ltdb_context);
1207
1208         if (!ctx->request_terminated) {
1209                 /* request is done now */
1210                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1211         }
1212
1213         if (!ctx->request_terminated) {
1214                 /* neutralize the spy */
1215                 ctx->spy->ctx = NULL;
1216         }
1217         talloc_free(ctx);
1218 }
1219
1220 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1221                                         struct ldb_extended *ext,
1222                                         int error)
1223 {
1224         struct ldb_context *ldb;
1225         struct ldb_request *req;
1226         struct ldb_reply *ares;
1227
1228         ldb = ldb_module_get_ctx(ctx->module);
1229         req = ctx->req;
1230
1231         /* if we already returned an error just return */
1232         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1233                 return;
1234         }
1235
1236         ares = talloc_zero(req, struct ldb_reply);
1237         if (!ares) {
1238                 ldb_oom(ldb);
1239                 req->callback(req, NULL);
1240                 return;
1241         }
1242         ares->type = LDB_REPLY_DONE;
1243         ares->response = ext;
1244         ares->error = error;
1245
1246         req->callback(req, ares);
1247 }
1248
1249 static void ltdb_handle_extended(struct ltdb_context *ctx)
1250 {
1251         struct ldb_extended *ext = NULL;
1252         int ret;
1253
1254         if (strcmp(ctx->req->op.extended.oid,
1255                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1256                 /* get sequence number */
1257                 ret = ltdb_sequence_number(ctx, &ext);
1258         } else {
1259                 /* not recognized */
1260                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1261         }
1262
1263         ltdb_request_extended_done(ctx, ext, ret);
1264 }
1265
1266 static void ltdb_callback(struct tevent_context *ev,
1267                           struct tevent_timer *te,
1268                           struct timeval t,
1269                           void *private_data)
1270 {
1271         struct ltdb_context *ctx;
1272         int ret;
1273
1274         ctx = talloc_get_type(private_data, struct ltdb_context);
1275
1276         if (ctx->request_terminated) {
1277                 goto done;
1278         }
1279
1280         switch (ctx->req->operation) {
1281         case LDB_SEARCH:
1282                 ret = ltdb_search(ctx);
1283                 break;
1284         case LDB_ADD:
1285                 ret = ltdb_add(ctx);
1286                 break;
1287         case LDB_MODIFY:
1288                 ret = ltdb_modify(ctx);
1289                 break;
1290         case LDB_DELETE:
1291                 ret = ltdb_delete(ctx);
1292                 break;
1293         case LDB_RENAME:
1294                 ret = ltdb_rename(ctx);
1295                 break;
1296         case LDB_EXTENDED:
1297                 ltdb_handle_extended(ctx);
1298                 goto done;
1299         default:
1300                 /* no other op supported */
1301                 ret = LDB_ERR_PROTOCOL_ERROR;
1302         }
1303
1304         if (!ctx->request_terminated) {
1305                 /* request is done now */
1306                 ltdb_request_done(ctx, ret);
1307         }
1308
1309 done:
1310         if (!ctx->request_terminated) {
1311                 /* neutralize the spy */
1312                 ctx->spy->ctx = NULL;
1313         }
1314         talloc_free(ctx);
1315 }
1316
1317 static int ltdb_request_destructor(void *ptr)
1318 {
1319         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1320
1321         if (spy->ctx != NULL) {
1322                 spy->ctx->request_terminated = true;
1323         }
1324
1325         return 0;
1326 }
1327
1328 static int ltdb_handle_request(struct ldb_module *module,
1329                                struct ldb_request *req)
1330 {
1331         struct ldb_control *control_permissive;
1332         struct ldb_context *ldb;
1333         struct tevent_context *ev;
1334         struct ltdb_context *ac;
1335         struct tevent_timer *te;
1336         struct timeval tv;
1337         unsigned int i;
1338
1339         ldb = ldb_module_get_ctx(module);
1340
1341         control_permissive = ldb_request_get_control(req,
1342                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1343
1344         for (i = 0; req->controls && req->controls[i]; i++) {
1345                 if (req->controls[i]->critical &&
1346                     req->controls[i] != control_permissive) {
1347                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1348                                                req->controls[i]->oid);
1349                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1350                 }
1351         }
1352
1353         if (req->starttime == 0 || req->timeout == 0) {
1354                 ldb_set_errstring(ldb, "Invalid timeout settings");
1355                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1356         }
1357
1358         ev = ldb_get_event_context(ldb);
1359
1360         ac = talloc_zero(ldb, struct ltdb_context);
1361         if (ac == NULL) {
1362                 ldb_oom(ldb);
1363                 return LDB_ERR_OPERATIONS_ERROR;
1364         }
1365
1366         ac->module = module;
1367         ac->req = req;
1368
1369         tv.tv_sec = 0;
1370         tv.tv_usec = 0;
1371         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1372         if (NULL == te) {
1373                 talloc_free(ac);
1374                 return LDB_ERR_OPERATIONS_ERROR;
1375         }
1376
1377         tv.tv_sec = req->starttime + req->timeout;
1378         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1379         if (NULL == ac->timeout_event) {
1380                 talloc_free(ac);
1381                 return LDB_ERR_OPERATIONS_ERROR;
1382         }
1383
1384         /* set a spy so that we do not try to use the request context
1385          * if it is freed before ltdb_callback fires */
1386         ac->spy = talloc(req, struct ltdb_req_spy);
1387         if (NULL == ac->spy) {
1388                 talloc_free(ac);
1389                 return LDB_ERR_OPERATIONS_ERROR;
1390         }
1391         ac->spy->ctx = ac;
1392
1393         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1394
1395         return LDB_SUCCESS;
1396 }
1397
1398 static int ltdb_init_rootdse(struct ldb_module *module)
1399 {
1400         struct ldb_context *ldb;
1401         int ret;
1402
1403         ldb = ldb_module_get_ctx(module);
1404
1405         ret = ldb_mod_register_control(module,
1406                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1407         /* ignore errors on this - we expect it for non-sam databases */
1408
1409         /* there can be no module beyond the backend, just return */
1410         return LDB_SUCCESS;
1411 }
1412
1413 static const struct ldb_module_ops ltdb_ops = {
1414         .name              = "tdb",
1415         .init_context      = ltdb_init_rootdse,
1416         .search            = ltdb_handle_request,
1417         .add               = ltdb_handle_request,
1418         .modify            = ltdb_handle_request,
1419         .del               = ltdb_handle_request,
1420         .rename            = ltdb_handle_request,
1421         .extended          = ltdb_handle_request,
1422         .start_transaction = ltdb_start_trans,
1423         .end_transaction   = ltdb_end_trans,
1424         .prepare_commit    = ltdb_prepare_commit,
1425         .del_transaction   = ltdb_del_trans,
1426 };
1427
1428 /*
1429   connect to the database
1430 */
1431 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1432                         unsigned int flags, const char *options[],
1433                         struct ldb_module **_module)
1434 {
1435         struct ldb_module *module;
1436         const char *path;
1437         int tdb_flags, open_flags;
1438         struct ltdb_private *ltdb;
1439
1440         /* parse the url */
1441         if (strchr(url, ':')) {
1442                 if (strncmp(url, "tdb://", 6) != 0) {
1443                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1444                                   "Invalid tdb URL '%s'", url);
1445                         return LDB_ERR_OPERATIONS_ERROR;
1446                 }
1447                 path = url+6;
1448         } else {
1449                 path = url;
1450         }
1451
1452         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1453
1454         /* check for the 'nosync' option */
1455         if (flags & LDB_FLG_NOSYNC) {
1456                 tdb_flags |= TDB_NOSYNC;
1457         }
1458
1459         /* and nommap option */
1460         if (flags & LDB_FLG_NOMMAP) {
1461                 tdb_flags |= TDB_NOMMAP;
1462         }
1463
1464         if (flags & LDB_FLG_RDONLY) {
1465                 open_flags = O_RDONLY;
1466         } else {
1467                 open_flags = O_CREAT | O_RDWR;
1468         }
1469
1470         ltdb = talloc_zero(ldb, struct ltdb_private);
1471         if (!ltdb) {
1472                 ldb_oom(ldb);
1473                 return LDB_ERR_OPERATIONS_ERROR;
1474         }
1475
1476         /* note that we use quite a large default hash size */
1477         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1478                                    tdb_flags, open_flags,
1479                                    ldb_get_create_perms(ldb), ldb);
1480         if (!ltdb->tdb) {
1481                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1482                           "Unable to open tdb '%s'", path);
1483                 talloc_free(ltdb);
1484                 return LDB_ERR_OPERATIONS_ERROR;
1485         }
1486
1487         if (getenv("LDB_WARN_UNINDEXED")) {
1488                 ltdb->warn_unindexed = true;
1489         }
1490
1491         ltdb->sequence_number = 0;
1492
1493         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1494         if (!module) {
1495                 talloc_free(ltdb);
1496                 return LDB_ERR_OPERATIONS_ERROR;
1497         }
1498         ldb_module_set_private(module, ltdb);
1499         talloc_steal(module, ltdb);
1500
1501         if (ltdb_cache_load(module) != 0) {
1502                 talloc_free(module);
1503                 talloc_free(ltdb);
1504                 return LDB_ERR_OPERATIONS_ERROR;
1505         }
1506
1507         *_module = module;
1508         return LDB_SUCCESS;
1509 }
1510
1511 int ldb_tdb_init(const char *version)
1512 {
1513         LDB_MODULE_CHECK_VERSION(version);
1514         return ldb_register_backend("tdb", ltdb_connect, false);
1515 }