4fb151622aa12f19a78884bbc52678cb890c2d45
[sfrench/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 return 0;
123         }
124         ltdb->read_lock_count--;
125         return 0;
126 }
127
128
129 /*
130   form a TDB_DATA for a record key
131   caller frees
132
133   note that the key for a record can depend on whether the
134   dn refers to a case sensitive index record or not
135 */
136 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
137 {
138         struct ldb_context *ldb = ldb_module_get_ctx(module);
139         TDB_DATA key;
140         char *key_str = NULL;
141         const char *dn_folded = NULL;
142
143         /*
144           most DNs are case insensitive. The exception is index DNs for
145           case sensitive attributes
146
147           there are 3 cases dealt with in this code:
148
149           1) if the dn doesn't start with @ then uppercase the attribute
150              names and the attributes values of case insensitive attributes
151           2) if the dn starts with @ then leave it alone -
152              the indexing code handles the rest
153         */
154
155         dn_folded = ldb_dn_get_casefold(dn);
156         if (!dn_folded) {
157                 goto failed;
158         }
159
160         key_str = talloc_strdup(ldb, "DN=");
161         if (!key_str) {
162                 goto failed;
163         }
164
165         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
166         if (!key_str) {
167                 goto failed;
168         }
169
170         key.dptr = (uint8_t *)key_str;
171         key.dsize = strlen(key_str) + 1;
172
173         return key;
174
175 failed:
176         errno = ENOMEM;
177         key.dptr = NULL;
178         key.dsize = 0;
179         return key;
180 }
181
182 /*
183   check special dn's have valid attributes
184   currently only @ATTRIBUTES is checked
185 */
186 static int ltdb_check_special_dn(struct ldb_module *module,
187                                  const struct ldb_message *msg)
188 {
189         struct ldb_context *ldb = ldb_module_get_ctx(module);
190         unsigned int i, j;
191
192         if (! ldb_dn_is_special(msg->dn) ||
193             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
194                 return LDB_SUCCESS;
195         }
196
197         /* we have @ATTRIBUTES, let's check attributes are fine */
198         /* should we check that we deny multivalued attributes ? */
199         for (i = 0; i < msg->num_elements; i++) {
200                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
201
202                 for (j = 0; j < msg->elements[i].num_values; j++) {
203                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
204                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
205                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
206                         }
207                 }
208         }
209
210         return LDB_SUCCESS;
211 }
212
213
214 /*
215   we've made a modification to a dn - possibly reindex and
216   update sequence number
217 */
218 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
219 {
220         int ret = LDB_SUCCESS;
221         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
222
223         /* only allow modifies inside a transaction, otherwise the
224          * ldb is unsafe */
225         if (ltdb->in_transaction == 0) {
226                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         if (ldb_dn_is_special(dn) &&
231             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
232              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
233                 ret = ltdb_reindex(module);
234         }
235
236         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
237         if (ret == LDB_SUCCESS &&
238             !(ldb_dn_is_special(dn) &&
239               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
240                 ret = ltdb_increase_sequence_number(module);
241         }
242
243         /* If the modify was to @OPTIONS, reload the cache */
244         if (ret == LDB_SUCCESS &&
245             ldb_dn_is_special(dn) &&
246             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
247                 ret = ltdb_cache_reload(module);
248         }
249
250         return ret;
251 }
252
253 /*
254   store a record into the db
255 */
256 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
257 {
258         void *data = ldb_module_get_private(module);
259         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
260         TDB_DATA tdb_key, tdb_data;
261         int ret = LDB_SUCCESS;
262
263         tdb_key = ltdb_key(module, msg->dn);
264         if (tdb_key.dptr == NULL) {
265                 return LDB_ERR_OTHER;
266         }
267
268         ret = ldb_pack_data(ldb_module_get_ctx(module),
269                             msg, (struct ldb_val *)&tdb_data);
270         if (ret == -1) {
271                 talloc_free(tdb_key.dptr);
272                 return LDB_ERR_OTHER;
273         }
274
275         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
276         if (ret != 0) {
277                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
278                 goto done;
279         }
280
281 done:
282         talloc_free(tdb_key.dptr);
283         talloc_free(tdb_data.dptr);
284
285         return ret;
286 }
287
288
289 /*
290   check if a attribute is a single valued, for a given element
291  */
292 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
293                                   struct ldb_message_element *el)
294 {
295         if (!a) return false;
296         if (el != NULL) {
297                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
298                         /* override from a ldb module, for example
299                            used for the description field, which is
300                            marked multi-valued in the schema but which
301                            should not actually accept multiple
302                            values */
303                         return true;
304                 }
305                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
306                         /* override from a ldb module, for example used for
307                            deleted linked attribute entries */
308                         return false;
309                 }
310         }
311         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
312                 return true;
313         }
314         return false;
315 }
316
317 static int ltdb_add_internal(struct ldb_module *module,
318                              const struct ldb_message *msg,
319                              bool check_single_value)
320 {
321         struct ldb_context *ldb = ldb_module_get_ctx(module);
322         int ret = LDB_SUCCESS;
323         unsigned int i, j;
324
325         for (i=0;i<msg->num_elements;i++) {
326                 struct ldb_message_element *el = &msg->elements[i];
327                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
328
329                 if (el->num_values == 0) {
330                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
331                                                el->name, ldb_dn_get_linearized(msg->dn));
332                         return LDB_ERR_CONSTRAINT_VIOLATION;
333                 }
334                 if (check_single_value &&
335                                 el->num_values > 1 &&
336                                 ldb_tdb_single_valued(a, el)) {
337                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
338                                                el->name, ldb_dn_get_linearized(msg->dn));
339                         return LDB_ERR_CONSTRAINT_VIOLATION;
340                 }
341
342                 /* Do not check "@ATTRIBUTES" for duplicated values */
343                 if (ldb_dn_is_special(msg->dn) &&
344                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
345                         continue;
346                 }
347
348                 /* TODO: This is O(n^2) - replace with more efficient check */
349                 for (j=0; j<el->num_values; j++) {
350                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
351                                 ldb_asprintf_errstring(ldb,
352                                                        "attribute '%s': value #%u on '%s' provided more than once",
353                                                        el->name, j, ldb_dn_get_linearized(msg->dn));
354                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
355                         }
356                 }
357         }
358
359         ret = ltdb_store(module, msg, TDB_INSERT);
360         if (ret != LDB_SUCCESS) {
361                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
362                         ldb_asprintf_errstring(ldb,
363                                                "Entry %s already exists",
364                                                ldb_dn_get_linearized(msg->dn));
365                 }
366                 return ret;
367         }
368
369         ret = ltdb_index_add_new(module, msg);
370         if (ret != LDB_SUCCESS) {
371                 return ret;
372         }
373
374         ret = ltdb_modified(module, msg->dn);
375
376         return ret;
377 }
378
379 /*
380   add a record to the database
381 */
382 static int ltdb_add(struct ltdb_context *ctx)
383 {
384         struct ldb_module *module = ctx->module;
385         struct ldb_request *req = ctx->req;
386         int ret = LDB_SUCCESS;
387
388         ret = ltdb_check_special_dn(module, req->op.add.message);
389         if (ret != LDB_SUCCESS) {
390                 return ret;
391         }
392
393         ldb_request_set_state(req, LDB_ASYNC_PENDING);
394
395         if (ltdb_cache_load(module) != 0) {
396                 return LDB_ERR_OPERATIONS_ERROR;
397         }
398
399         ret = ltdb_add_internal(module, req->op.add.message, true);
400
401         return ret;
402 }
403
404 /*
405   delete a record from the database, not updating indexes (used for deleting
406   index records)
407 */
408 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
409 {
410         void *data = ldb_module_get_private(module);
411         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
412         TDB_DATA tdb_key;
413         int ret;
414
415         tdb_key = ltdb_key(module, dn);
416         if (!tdb_key.dptr) {
417                 return LDB_ERR_OTHER;
418         }
419
420         ret = tdb_delete(ltdb->tdb, tdb_key);
421         talloc_free(tdb_key.dptr);
422
423         if (ret != 0) {
424                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
425         }
426
427         return ret;
428 }
429
430 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
431 {
432         struct ldb_message *msg;
433         int ret = LDB_SUCCESS;
434
435         msg = ldb_msg_new(module);
436         if (msg == NULL) {
437                 return LDB_ERR_OPERATIONS_ERROR;
438         }
439
440         /* in case any attribute of the message was indexed, we need
441            to fetch the old record */
442         ret = ltdb_search_dn1(module, dn, msg);
443         if (ret != LDB_SUCCESS) {
444                 /* not finding the old record is an error */
445                 goto done;
446         }
447
448         ret = ltdb_delete_noindex(module, dn);
449         if (ret != LDB_SUCCESS) {
450                 goto done;
451         }
452
453         /* remove any indexed attributes */
454         ret = ltdb_index_delete(module, msg);
455         if (ret != LDB_SUCCESS) {
456                 goto done;
457         }
458
459         ret = ltdb_modified(module, dn);
460         if (ret != LDB_SUCCESS) {
461                 goto done;
462         }
463
464 done:
465         talloc_free(msg);
466         return ret;
467 }
468
469 /*
470   delete a record from the database
471 */
472 static int ltdb_delete(struct ltdb_context *ctx)
473 {
474         struct ldb_module *module = ctx->module;
475         struct ldb_request *req = ctx->req;
476         int ret = LDB_SUCCESS;
477
478         ldb_request_set_state(req, LDB_ASYNC_PENDING);
479
480         if (ltdb_cache_load(module) != 0) {
481                 return LDB_ERR_OPERATIONS_ERROR;
482         }
483
484         ret = ltdb_delete_internal(module, req->op.del.dn);
485
486         return ret;
487 }
488
489 /*
490   find an element by attribute name. At the moment this does a linear search,
491   it should be re-coded to use a binary search once all places that modify
492   records guarantee sorted order
493
494   return the index of the first matching element if found, otherwise -1
495 */
496 static int find_element(const struct ldb_message *msg, const char *name)
497 {
498         unsigned int i;
499         for (i=0;i<msg->num_elements;i++) {
500                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
501                         return i;
502                 }
503         }
504         return -1;
505 }
506
507
508 /*
509   add an element to an existing record. Assumes a elements array that we
510   can call re-alloc on, and assumed that we can re-use the data pointers from
511   the passed in additional values. Use with care!
512
513   returns 0 on success, -1 on failure (and sets errno)
514 */
515 static int ltdb_msg_add_element(struct ldb_context *ldb,
516                                 struct ldb_message *msg,
517                                 struct ldb_message_element *el)
518 {
519         struct ldb_message_element *e2;
520         unsigned int i;
521
522         if (el->num_values == 0) {
523                 /* nothing to do here - we don't add empty elements */
524                 return 0;
525         }
526
527         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
528                               msg->num_elements+1);
529         if (!e2) {
530                 errno = ENOMEM;
531                 return -1;
532         }
533
534         msg->elements = e2;
535
536         e2 = &msg->elements[msg->num_elements];
537
538         e2->name = el->name;
539         e2->flags = el->flags;
540         e2->values = talloc_array(msg->elements,
541                                   struct ldb_val, el->num_values);
542         if (!e2->values) {
543                 errno = ENOMEM;
544                 return -1;
545         }
546         for (i=0;i<el->num_values;i++) {
547                 e2->values[i] = el->values[i];
548         }
549         e2->num_values = el->num_values;
550
551         ++msg->num_elements;
552
553         return 0;
554 }
555
556 /*
557   delete all elements having a specified attribute name
558 */
559 static int msg_delete_attribute(struct ldb_module *module,
560                                 struct ldb_context *ldb,
561                                 struct ldb_message *msg, const char *name)
562 {
563         unsigned int i;
564         int ret;
565         struct ldb_message_element *el;
566
567         el = ldb_msg_find_element(msg, name);
568         if (el == NULL) {
569                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
570         }
571         i = el - msg->elements;
572
573         ret = ltdb_index_del_element(module, msg->dn, el);
574         if (ret != LDB_SUCCESS) {
575                 return ret;
576         }
577
578         talloc_free(el->values);
579         if (msg->num_elements > (i+1)) {
580                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
581         }
582         msg->num_elements--;
583         msg->elements = talloc_realloc(msg, msg->elements,
584                                        struct ldb_message_element,
585                                        msg->num_elements);
586         return LDB_SUCCESS;
587 }
588
589 /*
590   delete all elements matching an attribute name/value
591
592   return LDB Error on failure
593 */
594 static int msg_delete_element(struct ldb_module *module,
595                               struct ldb_message *msg,
596                               const char *name,
597                               const struct ldb_val *val)
598 {
599         struct ldb_context *ldb = ldb_module_get_ctx(module);
600         unsigned int i;
601         int found, ret;
602         struct ldb_message_element *el;
603         const struct ldb_schema_attribute *a;
604
605         found = find_element(msg, name);
606         if (found == -1) {
607                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
608         }
609
610         i = (unsigned int) found;
611         el = &(msg->elements[i]);
612
613         a = ldb_schema_attribute_by_name(ldb, el->name);
614
615         for (i=0;i<el->num_values;i++) {
616                 bool matched;
617                 if (a->syntax->operator_fn) {
618                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
619                                                      &el->values[i], val, &matched);
620                         if (ret != LDB_SUCCESS) return ret;
621                 } else {
622                         matched = (a->syntax->comparison_fn(ldb, ldb,
623                                                             &el->values[i], val) == 0);
624                 }
625                 if (matched) {
626                         if (el->num_values == 1) {
627                                 return msg_delete_attribute(module, ldb, msg, name);
628                         }
629
630                         ret = ltdb_index_del_value(module, msg->dn, el, i);
631                         if (ret != LDB_SUCCESS) {
632                                 return ret;
633                         }
634
635                         if (i<el->num_values-1) {
636                                 memmove(&el->values[i], &el->values[i+1],
637                                         sizeof(el->values[i])*
638                                                 (el->num_values-(i+1)));
639                         }
640                         el->num_values--;
641
642                         /* per definition we find in a canonicalised message an
643                            attribute value only once. So we are finished here */
644                         return LDB_SUCCESS;
645                 }
646         }
647
648         /* Not found */
649         return LDB_ERR_NO_SUCH_ATTRIBUTE;
650 }
651
652
653 /*
654   modify a record - internal interface
655
656   yuck - this is O(n^2). Luckily n is usually small so we probably
657   get away with it, but if we ever have really large attribute lists
658   then we'll need to look at this again
659
660   'req' is optional, and is used to specify controls if supplied
661 */
662 int ltdb_modify_internal(struct ldb_module *module,
663                          const struct ldb_message *msg,
664                          struct ldb_request *req)
665 {
666         struct ldb_context *ldb = ldb_module_get_ctx(module);
667         void *data = ldb_module_get_private(module);
668         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
669         TDB_DATA tdb_key, tdb_data;
670         struct ldb_message *msg2;
671         unsigned int i, j, k;
672         int ret = LDB_SUCCESS, idx;
673         struct ldb_control *control_permissive = NULL;
674
675         if (req) {
676                 control_permissive = ldb_request_get_control(req,
677                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
678         }
679
680         tdb_key = ltdb_key(module, msg->dn);
681         if (!tdb_key.dptr) {
682                 return LDB_ERR_OTHER;
683         }
684
685         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
686         if (!tdb_data.dptr) {
687                 talloc_free(tdb_key.dptr);
688                 return ltdb_err_map(tdb_error(ltdb->tdb));
689         }
690
691         msg2 = ldb_msg_new(tdb_key.dptr);
692         if (msg2 == NULL) {
693                 free(tdb_data.dptr);
694                 ret = LDB_ERR_OTHER;
695                 goto done;
696         }
697
698         ret = ldb_unpack_data(ldb_module_get_ctx(module), (struct ldb_val *)&tdb_data, msg2);
699         free(tdb_data.dptr);
700         if (ret == -1) {
701                 ret = LDB_ERR_OTHER;
702                 goto done;
703         }
704
705         if (!msg2->dn) {
706                 msg2->dn = msg->dn;
707         }
708
709         for (i=0; i<msg->num_elements; i++) {
710                 struct ldb_message_element *el = &msg->elements[i], *el2;
711                 struct ldb_val *vals;
712                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
713                 const char *dn;
714
715                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
716                 case LDB_FLAG_MOD_ADD:
717
718                         if (el->num_values == 0) {
719                                 ldb_asprintf_errstring(ldb,
720                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
721                                                        el->name, ldb_dn_get_linearized(msg2->dn));
722                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
723                                 goto done;
724                         }
725
726                         /* make a copy of the array so that a permissive
727                          * control can remove duplicates without changing the
728                          * original values, but do not copy data as we do not
729                          * need to keep it around once the operation is
730                          * finished */
731                         if (control_permissive) {
732                                 el = talloc(msg2, struct ldb_message_element);
733                                 if (!el) {
734                                         ret = LDB_ERR_OTHER;
735                                         goto done;
736                                 }
737                                 *el = msg->elements[i];
738                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
739                                 if (el->values == NULL) {
740                                         ret = LDB_ERR_OTHER;
741                                         goto done;
742                                 }
743                                 for (j = 0; j < el->num_values; j++) {
744                                         el->values[j] = msg->elements[i].values[j];
745                                 }
746                         }
747
748                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
749                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
750                                                        el->name, ldb_dn_get_linearized(msg2->dn));
751                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
752                                 goto done;
753                         }
754
755                         /* Checks if element already exists */
756                         idx = find_element(msg2, el->name);
757                         if (idx == -1) {
758                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
759                                         ret = LDB_ERR_OTHER;
760                                         goto done;
761                                 }
762                                 ret = ltdb_index_add_element(module, msg2->dn,
763                                                              el);
764                                 if (ret != LDB_SUCCESS) {
765                                         goto done;
766                                 }
767                         } else {
768                                 j = (unsigned int) idx;
769                                 el2 = &(msg2->elements[j]);
770
771                                 /* We cannot add another value on a existing one
772                                    if the attribute is single-valued */
773                                 if (ldb_tdb_single_valued(a, el)) {
774                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
775                                                                el->name, ldb_dn_get_linearized(msg2->dn));
776                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
777                                         goto done;
778                                 }
779
780                                 /* Check that values don't exist yet on multi-
781                                    valued attributes or aren't provided twice */
782                                 /* TODO: This is O(n^2) - replace with more efficient check */
783                                 for (j = 0; j < el->num_values; j++) {
784                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
785                                                 if (control_permissive) {
786                                                         /* remove this one as if it was never added */
787                                                         el->num_values--;
788                                                         for (k = j; k < el->num_values; k++) {
789                                                                 el->values[k] = el->values[k + 1];
790                                                         }
791                                                         j--; /* rewind */
792
793                                                         continue;
794                                                 }
795
796                                                 ldb_asprintf_errstring(ldb,
797                                                                        "attribute '%s': value #%u on '%s' already exists",
798                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
799                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
800                                                 goto done;
801                                         }
802                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
803                                                 ldb_asprintf_errstring(ldb,
804                                                                        "attribute '%s': value #%u on '%s' provided more than once",
805                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
806                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
807                                                 goto done;
808                                         }
809                                 }
810
811                                 /* Now combine existing and new values to a new
812                                    attribute record */
813                                 vals = talloc_realloc(msg2->elements,
814                                                       el2->values, struct ldb_val,
815                                                       el2->num_values + el->num_values);
816                                 if (vals == NULL) {
817                                         ldb_oom(ldb);
818                                         ret = LDB_ERR_OTHER;
819                                         goto done;
820                                 }
821
822                                 for (j=0; j<el->num_values; j++) {
823                                         vals[el2->num_values + j] =
824                                                 ldb_val_dup(vals, &el->values[j]);
825                                 }
826
827                                 el2->values = vals;
828                                 el2->num_values += el->num_values;
829
830                                 ret = ltdb_index_add_element(module, msg2->dn, el);
831                                 if (ret != LDB_SUCCESS) {
832                                         goto done;
833                                 }
834                         }
835
836                         break;
837
838                 case LDB_FLAG_MOD_REPLACE:
839
840                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
841                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
842                                                        el->name, ldb_dn_get_linearized(msg2->dn));
843                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
844                                 goto done;
845                         }
846
847                         /* TODO: This is O(n^2) - replace with more efficient check */
848                         for (j=0; j<el->num_values; j++) {
849                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
850                                         ldb_asprintf_errstring(ldb,
851                                                                "attribute '%s': value #%u on '%s' provided more than once",
852                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
853                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
854                                         goto done;
855                                 }
856                         }
857
858                         /* Checks if element already exists */
859                         idx = find_element(msg2, el->name);
860                         if (idx != -1) {
861                                 j = (unsigned int) idx;
862                                 el2 = &(msg2->elements[j]);
863
864                                 /* we consider two elements to be
865                                  * equal only if the order
866                                  * matches. This allows dbcheck to
867                                  * fix the ordering on attributes
868                                  * where order matters, such as
869                                  * objectClass
870                                  */
871                                 if (ldb_msg_element_equal_ordered(el, el2)) {
872                                         continue;
873                                 }
874
875                                 /* Delete the attribute if it exists in the DB */
876                                 if (msg_delete_attribute(module, ldb, msg2,
877                                                          el->name) != 0) {
878                                         ret = LDB_ERR_OTHER;
879                                         goto done;
880                                 }
881                         }
882
883                         /* Recreate it with the new values */
884                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
885                                 ret = LDB_ERR_OTHER;
886                                 goto done;
887                         }
888
889                         ret = ltdb_index_add_element(module, msg2->dn, el);
890                         if (ret != LDB_SUCCESS) {
891                                 goto done;
892                         }
893
894                         break;
895
896                 case LDB_FLAG_MOD_DELETE:
897                         dn = ldb_dn_get_linearized(msg2->dn);
898                         if (dn == NULL) {
899                                 ret = LDB_ERR_OTHER;
900                                 goto done;
901                         }
902
903                         if (msg->elements[i].num_values == 0) {
904                                 /* Delete the whole attribute */
905                                 ret = msg_delete_attribute(module, ldb, msg2,
906                                                            msg->elements[i].name);
907                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
908                                     control_permissive) {
909                                         ret = LDB_SUCCESS;
910                                 } else {
911                                         ldb_asprintf_errstring(ldb,
912                                                                "attribute '%s': no such attribute for delete on '%s'",
913                                                                msg->elements[i].name, dn);
914                                 }
915                                 if (ret != LDB_SUCCESS) {
916                                         goto done;
917                                 }
918                         } else {
919                                 /* Delete specified values from an attribute */
920                                 for (j=0; j < msg->elements[i].num_values; j++) {
921                                         ret = msg_delete_element(module,
922                                                                  msg2,
923                                                                  msg->elements[i].name,
924                                                                  &msg->elements[i].values[j]);
925                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
926                                             control_permissive) {
927                                                 ret = LDB_SUCCESS;
928                                         } else {
929                                                 ldb_asprintf_errstring(ldb,
930                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
931                                                                        msg->elements[i].name, dn);
932                                         }
933                                         if (ret != LDB_SUCCESS) {
934                                                 goto done;
935                                         }
936                                 }
937                         }
938                         break;
939                 default:
940                         ldb_asprintf_errstring(ldb,
941                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
942                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
943                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
944                         ret = LDB_ERR_PROTOCOL_ERROR;
945                         goto done;
946                 }
947         }
948
949         ret = ltdb_store(module, msg2, TDB_MODIFY);
950         if (ret != LDB_SUCCESS) {
951                 goto done;
952         }
953
954         ret = ltdb_modified(module, msg2->dn);
955         if (ret != LDB_SUCCESS) {
956                 goto done;
957         }
958
959 done:
960         talloc_free(tdb_key.dptr);
961         return ret;
962 }
963
964 /*
965   modify a record
966 */
967 static int ltdb_modify(struct ltdb_context *ctx)
968 {
969         struct ldb_module *module = ctx->module;
970         struct ldb_request *req = ctx->req;
971         int ret = LDB_SUCCESS;
972
973         ret = ltdb_check_special_dn(module, req->op.mod.message);
974         if (ret != LDB_SUCCESS) {
975                 return ret;
976         }
977
978         ldb_request_set_state(req, LDB_ASYNC_PENDING);
979
980         if (ltdb_cache_load(module) != 0) {
981                 return LDB_ERR_OPERATIONS_ERROR;
982         }
983
984         ret = ltdb_modify_internal(module, req->op.mod.message, req);
985
986         return ret;
987 }
988
989 /*
990   rename a record
991 */
992 static int ltdb_rename(struct ltdb_context *ctx)
993 {
994         struct ldb_module *module = ctx->module;
995         void *data = ldb_module_get_private(module);
996         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
997         struct ldb_request *req = ctx->req;
998         struct ldb_message *msg;
999         int ret = LDB_SUCCESS;
1000         TDB_DATA tdb_key, tdb_key_old;
1001
1002         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1003
1004         if (ltdb_cache_load(ctx->module) != 0) {
1005                 return LDB_ERR_OPERATIONS_ERROR;
1006         }
1007
1008         msg = ldb_msg_new(ctx);
1009         if (msg == NULL) {
1010                 return LDB_ERR_OPERATIONS_ERROR;
1011         }
1012
1013         /* we need to fetch the old record to re-add under the new name */
1014         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
1015         if (ret != LDB_SUCCESS) {
1016                 /* not finding the old record is an error */
1017                 return ret;
1018         }
1019
1020         /* We need to, before changing the DB, check if the new DN
1021          * exists, so we can return this error to the caller with an
1022          * unmodified DB */
1023         tdb_key = ltdb_key(module, req->op.rename.newdn);
1024         if (!tdb_key.dptr) {
1025                 talloc_free(msg);
1026                 return LDB_ERR_OPERATIONS_ERROR;
1027         }
1028
1029         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1030         if (!tdb_key_old.dptr) {
1031                 talloc_free(msg);
1032                 talloc_free(tdb_key.dptr);
1033                 return LDB_ERR_OPERATIONS_ERROR;
1034         }
1035
1036         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1037         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1038                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1039                         talloc_free(tdb_key_old.dptr);
1040                         talloc_free(tdb_key.dptr);
1041                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1042                                                "Entry %s already exists",
1043                                                ldb_dn_get_linearized(msg->dn));
1044                         /* finding the new record already in the DB is an error */
1045                         talloc_free(msg);
1046                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1047                 }
1048         }
1049         talloc_free(tdb_key_old.dptr);
1050         talloc_free(tdb_key.dptr);
1051
1052         /* Always delete first then add, to avoid conflicts with
1053          * unique indexes. We rely on the transaction to make this
1054          * atomic
1055          */
1056         ret = ltdb_delete_internal(module, msg->dn);
1057         if (ret != LDB_SUCCESS) {
1058                 talloc_free(msg);
1059                 return ret;
1060         }
1061
1062         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1063         if (msg->dn == NULL) {
1064                 talloc_free(msg);
1065                 return LDB_ERR_OPERATIONS_ERROR;
1066         }
1067
1068         /* We don't check single value as we can have more than 1 with
1069          * deleted attributes. We could go through all elements but that's
1070          * maybe not the most efficient way
1071          */
1072         ret = ltdb_add_internal(module, msg, false);
1073
1074         talloc_free(msg);
1075
1076         return ret;
1077 }
1078
1079 static int ltdb_start_trans(struct ldb_module *module)
1080 {
1081         void *data = ldb_module_get_private(module);
1082         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1083
1084         if (tdb_transaction_start(ltdb->tdb) != 0) {
1085                 return ltdb_err_map(tdb_error(ltdb->tdb));
1086         }
1087
1088         ltdb->in_transaction++;
1089
1090         ltdb_index_transaction_start(module);
1091
1092         return LDB_SUCCESS;
1093 }
1094
1095 static int ltdb_prepare_commit(struct ldb_module *module)
1096 {
1097         void *data = ldb_module_get_private(module);
1098         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1099
1100         if (ltdb->in_transaction != 1) {
1101                 return LDB_SUCCESS;
1102         }
1103
1104         if (ltdb_index_transaction_commit(module) != 0) {
1105                 tdb_transaction_cancel(ltdb->tdb);
1106                 ltdb->in_transaction--;
1107                 return ltdb_err_map(tdb_error(ltdb->tdb));
1108         }
1109
1110         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1111                 ltdb->in_transaction--;
1112                 return ltdb_err_map(tdb_error(ltdb->tdb));
1113         }
1114
1115         ltdb->prepared_commit = true;
1116
1117         return LDB_SUCCESS;
1118 }
1119
1120 static int ltdb_end_trans(struct ldb_module *module)
1121 {
1122         void *data = ldb_module_get_private(module);
1123         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1124
1125         if (!ltdb->prepared_commit) {
1126                 int ret = ltdb_prepare_commit(module);
1127                 if (ret != LDB_SUCCESS) {
1128                         return ret;
1129                 }
1130         }
1131
1132         ltdb->in_transaction--;
1133         ltdb->prepared_commit = false;
1134
1135         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1136                 return ltdb_err_map(tdb_error(ltdb->tdb));
1137         }
1138
1139         return LDB_SUCCESS;
1140 }
1141
1142 static int ltdb_del_trans(struct ldb_module *module)
1143 {
1144         void *data = ldb_module_get_private(module);
1145         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1146
1147         ltdb->in_transaction--;
1148
1149         if (ltdb_index_transaction_cancel(module) != 0) {
1150                 tdb_transaction_cancel(ltdb->tdb);
1151                 return ltdb_err_map(tdb_error(ltdb->tdb));
1152         }
1153
1154         tdb_transaction_cancel(ltdb->tdb);
1155         return LDB_SUCCESS;
1156 }
1157
1158 /*
1159   return sequenceNumber from @BASEINFO
1160 */
1161 static int ltdb_sequence_number(struct ltdb_context *ctx,
1162                                 struct ldb_extended **ext)
1163 {
1164         struct ldb_context *ldb;
1165         struct ldb_module *module = ctx->module;
1166         struct ldb_request *req = ctx->req;
1167         TALLOC_CTX *tmp_ctx = NULL;
1168         struct ldb_seqnum_request *seq;
1169         struct ldb_seqnum_result *res;
1170         struct ldb_message *msg = NULL;
1171         struct ldb_dn *dn;
1172         const char *date;
1173         int ret = LDB_SUCCESS;
1174
1175         ldb = ldb_module_get_ctx(module);
1176
1177         seq = talloc_get_type(req->op.extended.data,
1178                                 struct ldb_seqnum_request);
1179         if (seq == NULL) {
1180                 return LDB_ERR_OPERATIONS_ERROR;
1181         }
1182
1183         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1184
1185         if (ltdb_lock_read(module) != 0) {
1186                 return LDB_ERR_OPERATIONS_ERROR;
1187         }
1188
1189         res = talloc_zero(req, struct ldb_seqnum_result);
1190         if (res == NULL) {
1191                 ret = LDB_ERR_OPERATIONS_ERROR;
1192                 goto done;
1193         }
1194
1195         tmp_ctx = talloc_new(req);
1196         if (tmp_ctx == NULL) {
1197                 ret = LDB_ERR_OPERATIONS_ERROR;
1198                 goto done;
1199         }
1200
1201         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1202         if (dn == NULL) {
1203                 ret = LDB_ERR_OPERATIONS_ERROR;
1204                 goto done;
1205         }
1206
1207         msg = ldb_msg_new(tmp_ctx);
1208         if (msg == NULL) {
1209                 ret = LDB_ERR_OPERATIONS_ERROR;
1210                 goto done;
1211         }
1212
1213         ret = ltdb_search_dn1(module, dn, msg);
1214         if (ret != LDB_SUCCESS) {
1215                 goto done;
1216         }
1217
1218         switch (seq->type) {
1219         case LDB_SEQ_HIGHEST_SEQ:
1220                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1221                 break;
1222         case LDB_SEQ_NEXT:
1223                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1224                 res->seq_num++;
1225                 break;
1226         case LDB_SEQ_HIGHEST_TIMESTAMP:
1227                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1228                 if (date) {
1229                         res->seq_num = ldb_string_to_time(date);
1230                 } else {
1231                         res->seq_num = 0;
1232                         /* zero is as good as anything when we don't know */
1233                 }
1234                 break;
1235         }
1236
1237         *ext = talloc_zero(req, struct ldb_extended);
1238         if (*ext == NULL) {
1239                 ret = LDB_ERR_OPERATIONS_ERROR;
1240                 goto done;
1241         }
1242         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1243         (*ext)->data = talloc_steal(*ext, res);
1244
1245 done:
1246         talloc_free(tmp_ctx);
1247         ltdb_unlock_read(module);
1248         return ret;
1249 }
1250
1251 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1252 {
1253         struct ldb_context *ldb;
1254         struct ldb_request *req;
1255         struct ldb_reply *ares;
1256
1257         ldb = ldb_module_get_ctx(ctx->module);
1258         req = ctx->req;
1259
1260         /* if we already returned an error just return */
1261         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1262                 return;
1263         }
1264
1265         ares = talloc_zero(req, struct ldb_reply);
1266         if (!ares) {
1267                 ldb_oom(ldb);
1268                 req->callback(req, NULL);
1269                 return;
1270         }
1271         ares->type = LDB_REPLY_DONE;
1272         ares->error = error;
1273
1274         req->callback(req, ares);
1275 }
1276
1277 static void ltdb_timeout(struct tevent_context *ev,
1278                           struct tevent_timer *te,
1279                           struct timeval t,
1280                           void *private_data)
1281 {
1282         struct ltdb_context *ctx;
1283         ctx = talloc_get_type(private_data, struct ltdb_context);
1284
1285         if (!ctx->request_terminated) {
1286                 /* request is done now */
1287                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1288         }
1289
1290         if (ctx->spy) {
1291                 /* neutralize the spy */
1292                 ctx->spy->ctx = NULL;
1293                 ctx->spy = NULL;
1294         }
1295         talloc_free(ctx);
1296 }
1297
1298 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1299                                         struct ldb_extended *ext,
1300                                         int error)
1301 {
1302         struct ldb_context *ldb;
1303         struct ldb_request *req;
1304         struct ldb_reply *ares;
1305
1306         ldb = ldb_module_get_ctx(ctx->module);
1307         req = ctx->req;
1308
1309         /* if we already returned an error just return */
1310         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1311                 return;
1312         }
1313
1314         ares = talloc_zero(req, struct ldb_reply);
1315         if (!ares) {
1316                 ldb_oom(ldb);
1317                 req->callback(req, NULL);
1318                 return;
1319         }
1320         ares->type = LDB_REPLY_DONE;
1321         ares->response = ext;
1322         ares->error = error;
1323
1324         req->callback(req, ares);
1325 }
1326
1327 static void ltdb_handle_extended(struct ltdb_context *ctx)
1328 {
1329         struct ldb_extended *ext = NULL;
1330         int ret;
1331
1332         if (strcmp(ctx->req->op.extended.oid,
1333                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1334                 /* get sequence number */
1335                 ret = ltdb_sequence_number(ctx, &ext);
1336         } else {
1337                 /* not recognized */
1338                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1339         }
1340
1341         ltdb_request_extended_done(ctx, ext, ret);
1342 }
1343
1344 static void ltdb_callback(struct tevent_context *ev,
1345                           struct tevent_timer *te,
1346                           struct timeval t,
1347                           void *private_data)
1348 {
1349         struct ltdb_context *ctx;
1350         int ret;
1351
1352         ctx = talloc_get_type(private_data, struct ltdb_context);
1353
1354         if (ctx->request_terminated) {
1355                 goto done;
1356         }
1357
1358         switch (ctx->req->operation) {
1359         case LDB_SEARCH:
1360                 ret = ltdb_search(ctx);
1361                 break;
1362         case LDB_ADD:
1363                 ret = ltdb_add(ctx);
1364                 break;
1365         case LDB_MODIFY:
1366                 ret = ltdb_modify(ctx);
1367                 break;
1368         case LDB_DELETE:
1369                 ret = ltdb_delete(ctx);
1370                 break;
1371         case LDB_RENAME:
1372                 ret = ltdb_rename(ctx);
1373                 break;
1374         case LDB_EXTENDED:
1375                 ltdb_handle_extended(ctx);
1376                 goto done;
1377         default:
1378                 /* no other op supported */
1379                 ret = LDB_ERR_PROTOCOL_ERROR;
1380         }
1381
1382         if (!ctx->request_terminated) {
1383                 /* request is done now */
1384                 ltdb_request_done(ctx, ret);
1385         }
1386
1387 done:
1388         if (ctx->spy) {
1389                 /* neutralize the spy */
1390                 ctx->spy->ctx = NULL;
1391                 ctx->spy = NULL;
1392         }
1393         talloc_free(ctx);
1394 }
1395
1396 static int ltdb_request_destructor(void *ptr)
1397 {
1398         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1399
1400         if (spy->ctx != NULL) {
1401                 spy->ctx->spy = NULL;
1402                 spy->ctx->request_terminated = true;
1403                 spy->ctx = NULL;
1404         }
1405
1406         return 0;
1407 }
1408
1409 static int ltdb_handle_request(struct ldb_module *module,
1410                                struct ldb_request *req)
1411 {
1412         struct ldb_control *control_permissive;
1413         struct ldb_context *ldb;
1414         struct tevent_context *ev;
1415         struct ltdb_context *ac;
1416         struct tevent_timer *te;
1417         struct timeval tv;
1418         unsigned int i;
1419
1420         ldb = ldb_module_get_ctx(module);
1421
1422         control_permissive = ldb_request_get_control(req,
1423                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1424
1425         for (i = 0; req->controls && req->controls[i]; i++) {
1426                 if (req->controls[i]->critical &&
1427                     req->controls[i] != control_permissive) {
1428                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1429                                                req->controls[i]->oid);
1430                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1431                 }
1432         }
1433
1434         if (req->starttime == 0 || req->timeout == 0) {
1435                 ldb_set_errstring(ldb, "Invalid timeout settings");
1436                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1437         }
1438
1439         ev = ldb_get_event_context(ldb);
1440
1441         ac = talloc_zero(ldb, struct ltdb_context);
1442         if (ac == NULL) {
1443                 ldb_oom(ldb);
1444                 return LDB_ERR_OPERATIONS_ERROR;
1445         }
1446
1447         ac->module = module;
1448         ac->req = req;
1449
1450         tv.tv_sec = 0;
1451         tv.tv_usec = 0;
1452         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1453         if (NULL == te) {
1454                 talloc_free(ac);
1455                 return LDB_ERR_OPERATIONS_ERROR;
1456         }
1457
1458         tv.tv_sec = req->starttime + req->timeout;
1459         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1460         if (NULL == ac->timeout_event) {
1461                 talloc_free(ac);
1462                 return LDB_ERR_OPERATIONS_ERROR;
1463         }
1464
1465         /* set a spy so that we do not try to use the request context
1466          * if it is freed before ltdb_callback fires */
1467         ac->spy = talloc(req, struct ltdb_req_spy);
1468         if (NULL == ac->spy) {
1469                 talloc_free(ac);
1470                 return LDB_ERR_OPERATIONS_ERROR;
1471         }
1472         ac->spy->ctx = ac;
1473
1474         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1475
1476         return LDB_SUCCESS;
1477 }
1478
1479 static int ltdb_init_rootdse(struct ldb_module *module)
1480 {
1481         /* ignore errors on this - we expect it for non-sam databases */
1482         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1483
1484         /* there can be no module beyond the backend, just return */
1485         return LDB_SUCCESS;
1486 }
1487
1488 static const struct ldb_module_ops ltdb_ops = {
1489         .name              = "tdb",
1490         .init_context      = ltdb_init_rootdse,
1491         .search            = ltdb_handle_request,
1492         .add               = ltdb_handle_request,
1493         .modify            = ltdb_handle_request,
1494         .del               = ltdb_handle_request,
1495         .rename            = ltdb_handle_request,
1496         .extended          = ltdb_handle_request,
1497         .start_transaction = ltdb_start_trans,
1498         .end_transaction   = ltdb_end_trans,
1499         .prepare_commit    = ltdb_prepare_commit,
1500         .del_transaction   = ltdb_del_trans,
1501 };
1502
1503 /*
1504   connect to the database
1505 */
1506 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1507                         unsigned int flags, const char *options[],
1508                         struct ldb_module **_module)
1509 {
1510         struct ldb_module *module;
1511         const char *path;
1512         int tdb_flags, open_flags;
1513         struct ltdb_private *ltdb;
1514
1515         /* parse the url */
1516         if (strchr(url, ':')) {
1517                 if (strncmp(url, "tdb://", 6) != 0) {
1518                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1519                                   "Invalid tdb URL '%s'", url);
1520                         return LDB_ERR_OPERATIONS_ERROR;
1521                 }
1522                 path = url+6;
1523         } else {
1524                 path = url;
1525         }
1526
1527         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1528
1529         /* check for the 'nosync' option */
1530         if (flags & LDB_FLG_NOSYNC) {
1531                 tdb_flags |= TDB_NOSYNC;
1532         }
1533
1534         /* and nommap option */
1535         if (flags & LDB_FLG_NOMMAP) {
1536                 tdb_flags |= TDB_NOMMAP;
1537         }
1538
1539         if (flags & LDB_FLG_RDONLY) {
1540                 open_flags = O_RDONLY;
1541         } else {
1542                 open_flags = O_CREAT | O_RDWR;
1543         }
1544
1545         ltdb = talloc_zero(ldb, struct ltdb_private);
1546         if (!ltdb) {
1547                 ldb_oom(ldb);
1548                 return LDB_ERR_OPERATIONS_ERROR;
1549         }
1550
1551         /* note that we use quite a large default hash size */
1552         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1553                                    tdb_flags, open_flags,
1554                                    ldb_get_create_perms(ldb), ldb);
1555         if (!ltdb->tdb) {
1556                 ldb_asprintf_errstring(ldb,
1557                                        "Unable to open tdb '%s'", path);
1558                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1559                           "Unable to open tdb '%s'", path);
1560                 talloc_free(ltdb);
1561                 return LDB_ERR_OPERATIONS_ERROR;
1562         }
1563
1564         if (getenv("LDB_WARN_UNINDEXED")) {
1565                 ltdb->warn_unindexed = true;
1566         }
1567
1568         ltdb->sequence_number = 0;
1569
1570         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1571         if (!module) {
1572                 ldb_oom(ldb);
1573                 talloc_free(ltdb);
1574                 return LDB_ERR_OPERATIONS_ERROR;
1575         }
1576         ldb_module_set_private(module, ltdb);
1577         talloc_steal(module, ltdb);
1578
1579         if (ltdb_cache_load(module) != 0) {
1580                 ldb_asprintf_errstring(ldb,
1581                                        "Unable to load ltdb cache records of tdb '%s'", path);
1582                 talloc_free(module);
1583                 return LDB_ERR_OPERATIONS_ERROR;
1584         }
1585
1586         *_module = module;
1587         return LDB_SUCCESS;
1588 }
1589
1590 int ldb_tdb_init(const char *version)
1591 {
1592         LDB_MODULE_CHECK_VERSION(version);
1593         return ldb_register_backend("tdb", ltdb_connect, false);
1594 }