502afd9c3c81fab91b5880e740051aaca79756d6
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 return 0;
123         }
124         ltdb->read_lock_count--;
125         return 0;
126 }
127
128
129 /*
130   form a TDB_DATA for a record key
131   caller frees
132
133   note that the key for a record can depend on whether the
134   dn refers to a case sensitive index record or not
135 */
136 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
137 {
138         struct ldb_context *ldb = ldb_module_get_ctx(module);
139         TDB_DATA key;
140         char *key_str = NULL;
141         const char *dn_folded = NULL;
142
143         /*
144           most DNs are case insensitive. The exception is index DNs for
145           case sensitive attributes
146
147           there are 3 cases dealt with in this code:
148
149           1) if the dn doesn't start with @ then uppercase the attribute
150              names and the attributes values of case insensitive attributes
151           2) if the dn starts with @ then leave it alone -
152              the indexing code handles the rest
153         */
154
155         dn_folded = ldb_dn_get_casefold(dn);
156         if (!dn_folded) {
157                 goto failed;
158         }
159
160         key_str = talloc_strdup(ldb, "DN=");
161         if (!key_str) {
162                 goto failed;
163         }
164
165         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
166         if (!key_str) {
167                 goto failed;
168         }
169
170         key.dptr = (uint8_t *)key_str;
171         key.dsize = strlen(key_str) + 1;
172
173         return key;
174
175 failed:
176         errno = ENOMEM;
177         key.dptr = NULL;
178         key.dsize = 0;
179         return key;
180 }
181
182 /*
183   check special dn's have valid attributes
184   currently only @ATTRIBUTES is checked
185 */
186 static int ltdb_check_special_dn(struct ldb_module *module,
187                                  const struct ldb_message *msg)
188 {
189         struct ldb_context *ldb = ldb_module_get_ctx(module);
190         unsigned int i, j;
191
192         if (! ldb_dn_is_special(msg->dn) ||
193             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
194                 return LDB_SUCCESS;
195         }
196
197         /* we have @ATTRIBUTES, let's check attributes are fine */
198         /* should we check that we deny multivalued attributes ? */
199         for (i = 0; i < msg->num_elements; i++) {
200                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
201
202                 for (j = 0; j < msg->elements[i].num_values; j++) {
203                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
204                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
205                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
206                         }
207                 }
208         }
209
210         return LDB_SUCCESS;
211 }
212
213
214 /*
215   we've made a modification to a dn - possibly reindex and
216   update sequence number
217 */
218 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
219 {
220         int ret = LDB_SUCCESS;
221         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
222
223         /* only allow modifies inside a transaction, otherwise the
224          * ldb is unsafe */
225         if (ltdb->in_transaction == 0) {
226                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         if (ldb_dn_is_special(dn) &&
231             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
232              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
233         {
234                 if (ltdb->warn_reindex) {
235                         ldb_debug(ldb_module_get_ctx(module),
236                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
237                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
238                 }
239                 ret = ltdb_reindex(module);
240         }
241
242         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
243         if (ret == LDB_SUCCESS &&
244             !(ldb_dn_is_special(dn) &&
245               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
246                 ret = ltdb_increase_sequence_number(module);
247         }
248
249         /* If the modify was to @OPTIONS, reload the cache */
250         if (ret == LDB_SUCCESS &&
251             ldb_dn_is_special(dn) &&
252             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
253                 ret = ltdb_cache_reload(module);
254         }
255
256         return ret;
257 }
258
259 /*
260   store a record into the db
261 */
262 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
263 {
264         void *data = ldb_module_get_private(module);
265         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
266         TDB_DATA tdb_key, tdb_data;
267         struct ldb_val ldb_data;
268         int ret = LDB_SUCCESS;
269
270         tdb_key = ltdb_key(module, msg->dn);
271         if (tdb_key.dptr == NULL) {
272                 return LDB_ERR_OTHER;
273         }
274
275         ret = ldb_pack_data(ldb_module_get_ctx(module),
276                             msg, &ldb_data);
277         if (ret == -1) {
278                 talloc_free(tdb_key.dptr);
279                 return LDB_ERR_OTHER;
280         }
281
282         tdb_data.dptr = ldb_data.data;
283         tdb_data.dsize = ldb_data.length;
284
285         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
286         if (ret != 0) {
287                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
288                 goto done;
289         }
290
291 done:
292         talloc_free(tdb_key.dptr);
293         talloc_free(ldb_data.data);
294
295         return ret;
296 }
297
298
299 /*
300   check if a attribute is a single valued, for a given element
301  */
302 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
303                                   struct ldb_message_element *el)
304 {
305         if (!a) return false;
306         if (el != NULL) {
307                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
308                         /* override from a ldb module, for example
309                            used for the description field, which is
310                            marked multi-valued in the schema but which
311                            should not actually accept multiple
312                            values */
313                         return true;
314                 }
315                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
316                         /* override from a ldb module, for example used for
317                            deleted linked attribute entries */
318                         return false;
319                 }
320         }
321         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
322                 return true;
323         }
324         return false;
325 }
326
327 static int ltdb_add_internal(struct ldb_module *module,
328                              const struct ldb_message *msg,
329                              bool check_single_value)
330 {
331         struct ldb_context *ldb = ldb_module_get_ctx(module);
332         int ret = LDB_SUCCESS;
333         unsigned int i, j;
334
335         for (i=0;i<msg->num_elements;i++) {
336                 struct ldb_message_element *el = &msg->elements[i];
337                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
338
339                 if (el->num_values == 0) {
340                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
341                                                el->name, ldb_dn_get_linearized(msg->dn));
342                         return LDB_ERR_CONSTRAINT_VIOLATION;
343                 }
344                 if (check_single_value &&
345                                 el->num_values > 1 &&
346                                 ldb_tdb_single_valued(a, el)) {
347                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
348                                                el->name, ldb_dn_get_linearized(msg->dn));
349                         return LDB_ERR_CONSTRAINT_VIOLATION;
350                 }
351
352                 /* Do not check "@ATTRIBUTES" for duplicated values */
353                 if (ldb_dn_is_special(msg->dn) &&
354                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
355                         continue;
356                 }
357
358                 if (check_single_value) {
359                         /* TODO: This is O(n^2) - replace with more efficient check */
360                         for (j=0; j<el->num_values; j++) {
361                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
362                                         ldb_asprintf_errstring(ldb,
363                                                                "attribute '%s': value #%u on '%s' "
364                                                                "provided more than once",
365                                                                el->name, j, 
366                                                                ldb_dn_get_linearized(msg->dn));
367                                         return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
368                                 }
369                         }
370                 }
371         }
372
373         ret = ltdb_store(module, msg, TDB_INSERT);
374         if (ret != LDB_SUCCESS) {
375                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
376                         ldb_asprintf_errstring(ldb,
377                                                "Entry %s already exists",
378                                                ldb_dn_get_linearized(msg->dn));
379                 }
380                 return ret;
381         }
382
383         ret = ltdb_index_add_new(module, msg);
384         if (ret != LDB_SUCCESS) {
385                 return ret;
386         }
387
388         ret = ltdb_modified(module, msg->dn);
389
390         return ret;
391 }
392
393 /*
394   add a record to the database
395 */
396 static int ltdb_add(struct ltdb_context *ctx)
397 {
398         struct ldb_module *module = ctx->module;
399         struct ldb_request *req = ctx->req;
400         int ret = LDB_SUCCESS;
401
402         ret = ltdb_check_special_dn(module, req->op.add.message);
403         if (ret != LDB_SUCCESS) {
404                 return ret;
405         }
406
407         ldb_request_set_state(req, LDB_ASYNC_PENDING);
408
409         if (ltdb_cache_load(module) != 0) {
410                 return LDB_ERR_OPERATIONS_ERROR;
411         }
412
413         ret = ltdb_add_internal(module, req->op.add.message, true);
414
415         return ret;
416 }
417
418 /*
419   delete a record from the database, not updating indexes (used for deleting
420   index records)
421 */
422 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
423 {
424         void *data = ldb_module_get_private(module);
425         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
426         TDB_DATA tdb_key;
427         int ret;
428
429         tdb_key = ltdb_key(module, dn);
430         if (!tdb_key.dptr) {
431                 return LDB_ERR_OTHER;
432         }
433
434         ret = tdb_delete(ltdb->tdb, tdb_key);
435         talloc_free(tdb_key.dptr);
436
437         if (ret != 0) {
438                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
439         }
440
441         return ret;
442 }
443
444 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
445 {
446         struct ldb_message *msg;
447         int ret = LDB_SUCCESS;
448
449         msg = ldb_msg_new(module);
450         if (msg == NULL) {
451                 return LDB_ERR_OPERATIONS_ERROR;
452         }
453
454         /* in case any attribute of the message was indexed, we need
455            to fetch the old record */
456         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
457         if (ret != LDB_SUCCESS) {
458                 /* not finding the old record is an error */
459                 goto done;
460         }
461
462         ret = ltdb_delete_noindex(module, dn);
463         if (ret != LDB_SUCCESS) {
464                 goto done;
465         }
466
467         /* remove any indexed attributes */
468         ret = ltdb_index_delete(module, msg);
469         if (ret != LDB_SUCCESS) {
470                 goto done;
471         }
472
473         ret = ltdb_modified(module, dn);
474         if (ret != LDB_SUCCESS) {
475                 goto done;
476         }
477
478 done:
479         talloc_free(msg);
480         return ret;
481 }
482
483 /*
484   delete a record from the database
485 */
486 static int ltdb_delete(struct ltdb_context *ctx)
487 {
488         struct ldb_module *module = ctx->module;
489         struct ldb_request *req = ctx->req;
490         int ret = LDB_SUCCESS;
491
492         ldb_request_set_state(req, LDB_ASYNC_PENDING);
493
494         if (ltdb_cache_load(module) != 0) {
495                 return LDB_ERR_OPERATIONS_ERROR;
496         }
497
498         ret = ltdb_delete_internal(module, req->op.del.dn);
499
500         return ret;
501 }
502
503 /*
504   find an element by attribute name. At the moment this does a linear search,
505   it should be re-coded to use a binary search once all places that modify
506   records guarantee sorted order
507
508   return the index of the first matching element if found, otherwise -1
509 */
510 static int find_element(const struct ldb_message *msg, const char *name)
511 {
512         unsigned int i;
513         for (i=0;i<msg->num_elements;i++) {
514                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
515                         return i;
516                 }
517         }
518         return -1;
519 }
520
521
522 /*
523   add an element to an existing record. Assumes a elements array that we
524   can call re-alloc on, and assumed that we can re-use the data pointers from
525   the passed in additional values. Use with care!
526
527   returns 0 on success, -1 on failure (and sets errno)
528 */
529 static int ltdb_msg_add_element(struct ldb_context *ldb,
530                                 struct ldb_message *msg,
531                                 struct ldb_message_element *el)
532 {
533         struct ldb_message_element *e2;
534         unsigned int i;
535
536         if (el->num_values == 0) {
537                 /* nothing to do here - we don't add empty elements */
538                 return 0;
539         }
540
541         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
542                               msg->num_elements+1);
543         if (!e2) {
544                 errno = ENOMEM;
545                 return -1;
546         }
547
548         msg->elements = e2;
549
550         e2 = &msg->elements[msg->num_elements];
551
552         e2->name = el->name;
553         e2->flags = el->flags;
554         e2->values = talloc_array(msg->elements,
555                                   struct ldb_val, el->num_values);
556         if (!e2->values) {
557                 errno = ENOMEM;
558                 return -1;
559         }
560         for (i=0;i<el->num_values;i++) {
561                 e2->values[i] = el->values[i];
562         }
563         e2->num_values = el->num_values;
564
565         ++msg->num_elements;
566
567         return 0;
568 }
569
570 /*
571   delete all elements having a specified attribute name
572 */
573 static int msg_delete_attribute(struct ldb_module *module,
574                                 struct ldb_context *ldb,
575                                 struct ldb_message *msg, const char *name)
576 {
577         unsigned int i;
578         int ret;
579         struct ldb_message_element *el;
580
581         el = ldb_msg_find_element(msg, name);
582         if (el == NULL) {
583                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
584         }
585         i = el - msg->elements;
586
587         ret = ltdb_index_del_element(module, msg->dn, el);
588         if (ret != LDB_SUCCESS) {
589                 return ret;
590         }
591
592         talloc_free(el->values);
593         if (msg->num_elements > (i+1)) {
594                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
595         }
596         msg->num_elements--;
597         msg->elements = talloc_realloc(msg, msg->elements,
598                                        struct ldb_message_element,
599                                        msg->num_elements);
600         return LDB_SUCCESS;
601 }
602
603 /*
604   delete all elements matching an attribute name/value
605
606   return LDB Error on failure
607 */
608 static int msg_delete_element(struct ldb_module *module,
609                               struct ldb_message *msg,
610                               const char *name,
611                               const struct ldb_val *val)
612 {
613         struct ldb_context *ldb = ldb_module_get_ctx(module);
614         unsigned int i;
615         int found, ret;
616         struct ldb_message_element *el;
617         const struct ldb_schema_attribute *a;
618
619         found = find_element(msg, name);
620         if (found == -1) {
621                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
622         }
623
624         i = (unsigned int) found;
625         el = &(msg->elements[i]);
626
627         a = ldb_schema_attribute_by_name(ldb, el->name);
628
629         for (i=0;i<el->num_values;i++) {
630                 bool matched;
631                 if (a->syntax->operator_fn) {
632                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
633                                                      &el->values[i], val, &matched);
634                         if (ret != LDB_SUCCESS) return ret;
635                 } else {
636                         matched = (a->syntax->comparison_fn(ldb, ldb,
637                                                             &el->values[i], val) == 0);
638                 }
639                 if (matched) {
640                         if (el->num_values == 1) {
641                                 return msg_delete_attribute(module, ldb, msg, name);
642                         }
643
644                         ret = ltdb_index_del_value(module, msg->dn, el, i);
645                         if (ret != LDB_SUCCESS) {
646                                 return ret;
647                         }
648
649                         if (i<el->num_values-1) {
650                                 memmove(&el->values[i], &el->values[i+1],
651                                         sizeof(el->values[i])*
652                                                 (el->num_values-(i+1)));
653                         }
654                         el->num_values--;
655
656                         /* per definition we find in a canonicalised message an
657                            attribute value only once. So we are finished here */
658                         return LDB_SUCCESS;
659                 }
660         }
661
662         /* Not found */
663         return LDB_ERR_NO_SUCH_ATTRIBUTE;
664 }
665
666
667 /*
668   modify a record - internal interface
669
670   yuck - this is O(n^2). Luckily n is usually small so we probably
671   get away with it, but if we ever have really large attribute lists
672   then we'll need to look at this again
673
674   'req' is optional, and is used to specify controls if supplied
675 */
676 int ltdb_modify_internal(struct ldb_module *module,
677                          const struct ldb_message *msg,
678                          struct ldb_request *req)
679 {
680         struct ldb_context *ldb = ldb_module_get_ctx(module);
681         void *data = ldb_module_get_private(module);
682         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
683         TDB_DATA tdb_key, tdb_data;
684         struct ldb_val ldb_data;
685         struct ldb_message *msg2;
686         unsigned int i, j, k;
687         int ret = LDB_SUCCESS, idx;
688         struct ldb_control *control_permissive = NULL;
689
690         if (req) {
691                 control_permissive = ldb_request_get_control(req,
692                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
693         }
694
695         tdb_key = ltdb_key(module, msg->dn);
696         if (!tdb_key.dptr) {
697                 return LDB_ERR_OTHER;
698         }
699
700         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
701         if (!tdb_data.dptr) {
702                 talloc_free(tdb_key.dptr);
703                 return ltdb_err_map(tdb_error(ltdb->tdb));
704         }
705
706         msg2 = ldb_msg_new(tdb_key.dptr);
707         if (msg2 == NULL) {
708                 free(tdb_data.dptr);
709                 ret = LDB_ERR_OTHER;
710                 goto done;
711         }
712
713         ldb_data.data = tdb_data.dptr;
714         ldb_data.length = tdb_data.dsize;
715
716         ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
717         free(tdb_data.dptr);
718         if (ret == -1) {
719                 ret = LDB_ERR_OTHER;
720                 goto done;
721         }
722
723         if (!msg2->dn) {
724                 msg2->dn = msg->dn;
725         }
726
727         for (i=0; i<msg->num_elements; i++) {
728                 struct ldb_message_element *el = &msg->elements[i], *el2;
729                 struct ldb_val *vals;
730                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
731                 const char *dn;
732
733                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
734                 case LDB_FLAG_MOD_ADD:
735
736                         if (el->num_values == 0) {
737                                 ldb_asprintf_errstring(ldb,
738                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
739                                                        el->name, ldb_dn_get_linearized(msg2->dn));
740                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
741                                 goto done;
742                         }
743
744                         /* make a copy of the array so that a permissive
745                          * control can remove duplicates without changing the
746                          * original values, but do not copy data as we do not
747                          * need to keep it around once the operation is
748                          * finished */
749                         if (control_permissive) {
750                                 el = talloc(msg2, struct ldb_message_element);
751                                 if (!el) {
752                                         ret = LDB_ERR_OTHER;
753                                         goto done;
754                                 }
755                                 *el = msg->elements[i];
756                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
757                                 if (el->values == NULL) {
758                                         ret = LDB_ERR_OTHER;
759                                         goto done;
760                                 }
761                                 for (j = 0; j < el->num_values; j++) {
762                                         el->values[j] = msg->elements[i].values[j];
763                                 }
764                         }
765
766                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
767                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
768                                                        el->name, ldb_dn_get_linearized(msg2->dn));
769                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
770                                 goto done;
771                         }
772
773                         /* Checks if element already exists */
774                         idx = find_element(msg2, el->name);
775                         if (idx == -1) {
776                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
777                                         ret = LDB_ERR_OTHER;
778                                         goto done;
779                                 }
780                                 ret = ltdb_index_add_element(module, msg2->dn,
781                                                              el);
782                                 if (ret != LDB_SUCCESS) {
783                                         goto done;
784                                 }
785                         } else {
786                                 j = (unsigned int) idx;
787                                 el2 = &(msg2->elements[j]);
788
789                                 /* We cannot add another value on a existing one
790                                    if the attribute is single-valued */
791                                 if (ldb_tdb_single_valued(a, el)) {
792                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
793                                                                el->name, ldb_dn_get_linearized(msg2->dn));
794                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
795                                         goto done;
796                                 }
797
798                                 /* Check that values don't exist yet on multi-
799                                    valued attributes or aren't provided twice */
800                                 /* TODO: This is O(n^2) - replace with more efficient check */
801                                 if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
802                                         for (j = 0; j < el->num_values; j++) {
803                                                 if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
804                                                         if (control_permissive) {
805                                                                 /* remove this one as if it was never added */
806                                                                 el->num_values--;
807                                                                 for (k = j; k < el->num_values; k++) {
808                                                                         el->values[k] = el->values[k + 1];
809                                                                 }
810                                                                 j--; /* rewind */
811
812                                                                 continue;
813                                                         }
814
815                                                         ldb_asprintf_errstring(ldb,
816                                                                                "attribute '%s': value #%u on '%s' already exists",
817                                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
818                                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
819                                                         goto done;
820                                                 }
821                                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
822                                                         ldb_asprintf_errstring(ldb,
823                                                                                "attribute '%s': value #%u on '%s' provided more than once",
824                                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
825                                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
826                                                         goto done;
827                                                 }
828                                         }
829                                 }
830
831                                 /* Now combine existing and new values to a new
832                                    attribute record */
833                                 vals = talloc_realloc(msg2->elements,
834                                                       el2->values, struct ldb_val,
835                                                       el2->num_values + el->num_values);
836                                 if (vals == NULL) {
837                                         ldb_oom(ldb);
838                                         ret = LDB_ERR_OTHER;
839                                         goto done;
840                                 }
841
842                                 for (j=0; j<el->num_values; j++) {
843                                         vals[el2->num_values + j] =
844                                                 ldb_val_dup(vals, &el->values[j]);
845                                 }
846
847                                 el2->values = vals;
848                                 el2->num_values += el->num_values;
849
850                                 ret = ltdb_index_add_element(module, msg2->dn, el);
851                                 if (ret != LDB_SUCCESS) {
852                                         goto done;
853                                 }
854                         }
855
856                         break;
857
858                 case LDB_FLAG_MOD_REPLACE:
859
860                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
861                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
862                                                        el->name, ldb_dn_get_linearized(msg2->dn));
863                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
864                                 goto done;
865                         }
866
867                         /*
868                          * We don't need to check this if we have been
869                          * pre-screened by the repl_meta_data module
870                          * in Samba, or someone else who can claim to
871                          * know what they are doing. 
872                          */
873                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) { 
874                                 /* TODO: This is O(n^2) - replace with more efficient check */
875                                 for (j=0; j<el->num_values; j++) {
876                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
877                                                 ldb_asprintf_errstring(ldb,
878                                                                        "attribute '%s': value #%u on '%s' provided more than once",
879                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
880                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
881                                                 goto done;
882                                         }
883                                 }
884                         }
885
886                         /* Checks if element already exists */
887                         idx = find_element(msg2, el->name);
888                         if (idx != -1) {
889                                 j = (unsigned int) idx;
890                                 el2 = &(msg2->elements[j]);
891
892                                 /* we consider two elements to be
893                                  * equal only if the order
894                                  * matches. This allows dbcheck to
895                                  * fix the ordering on attributes
896                                  * where order matters, such as
897                                  * objectClass
898                                  */
899                                 if (ldb_msg_element_equal_ordered(el, el2)) {
900                                         continue;
901                                 }
902
903                                 /* Delete the attribute if it exists in the DB */
904                                 if (msg_delete_attribute(module, ldb, msg2,
905                                                          el->name) != 0) {
906                                         ret = LDB_ERR_OTHER;
907                                         goto done;
908                                 }
909                         }
910
911                         /* Recreate it with the new values */
912                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
913                                 ret = LDB_ERR_OTHER;
914                                 goto done;
915                         }
916
917                         ret = ltdb_index_add_element(module, msg2->dn, el);
918                         if (ret != LDB_SUCCESS) {
919                                 goto done;
920                         }
921
922                         break;
923
924                 case LDB_FLAG_MOD_DELETE:
925                         dn = ldb_dn_get_linearized(msg2->dn);
926                         if (dn == NULL) {
927                                 ret = LDB_ERR_OTHER;
928                                 goto done;
929                         }
930
931                         if (msg->elements[i].num_values == 0) {
932                                 /* Delete the whole attribute */
933                                 ret = msg_delete_attribute(module, ldb, msg2,
934                                                            msg->elements[i].name);
935                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
936                                     control_permissive) {
937                                         ret = LDB_SUCCESS;
938                                 } else {
939                                         ldb_asprintf_errstring(ldb,
940                                                                "attribute '%s': no such attribute for delete on '%s'",
941                                                                msg->elements[i].name, dn);
942                                 }
943                                 if (ret != LDB_SUCCESS) {
944                                         goto done;
945                                 }
946                         } else {
947                                 /* Delete specified values from an attribute */
948                                 for (j=0; j < msg->elements[i].num_values; j++) {
949                                         ret = msg_delete_element(module,
950                                                                  msg2,
951                                                                  msg->elements[i].name,
952                                                                  &msg->elements[i].values[j]);
953                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
954                                             control_permissive) {
955                                                 ret = LDB_SUCCESS;
956                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
957                                                 ldb_asprintf_errstring(ldb,
958                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
959                                                                        msg->elements[i].name, dn);
960                                         }
961                                         if (ret != LDB_SUCCESS) {
962                                                 goto done;
963                                         }
964                                 }
965                         }
966                         break;
967                 default:
968                         ldb_asprintf_errstring(ldb,
969                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
970                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
971                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
972                         ret = LDB_ERR_PROTOCOL_ERROR;
973                         goto done;
974                 }
975         }
976
977         ret = ltdb_store(module, msg2, TDB_MODIFY);
978         if (ret != LDB_SUCCESS) {
979                 goto done;
980         }
981
982         ret = ltdb_modified(module, msg2->dn);
983         if (ret != LDB_SUCCESS) {
984                 goto done;
985         }
986
987 done:
988         talloc_free(tdb_key.dptr);
989         return ret;
990 }
991
992 /*
993   modify a record
994 */
995 static int ltdb_modify(struct ltdb_context *ctx)
996 {
997         struct ldb_module *module = ctx->module;
998         struct ldb_request *req = ctx->req;
999         int ret = LDB_SUCCESS;
1000
1001         ret = ltdb_check_special_dn(module, req->op.mod.message);
1002         if (ret != LDB_SUCCESS) {
1003                 return ret;
1004         }
1005
1006         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1007
1008         if (ltdb_cache_load(module) != 0) {
1009                 return LDB_ERR_OPERATIONS_ERROR;
1010         }
1011
1012         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1013
1014         return ret;
1015 }
1016
1017 /*
1018   rename a record
1019 */
1020 static int ltdb_rename(struct ltdb_context *ctx)
1021 {
1022         struct ldb_module *module = ctx->module;
1023         void *data = ldb_module_get_private(module);
1024         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1025         struct ldb_request *req = ctx->req;
1026         struct ldb_message *msg;
1027         int ret = LDB_SUCCESS;
1028         TDB_DATA tdb_key, tdb_key_old;
1029
1030         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1031
1032         if (ltdb_cache_load(ctx->module) != 0) {
1033                 return LDB_ERR_OPERATIONS_ERROR;
1034         }
1035
1036         msg = ldb_msg_new(ctx);
1037         if (msg == NULL) {
1038                 return LDB_ERR_OPERATIONS_ERROR;
1039         }
1040
1041         /* we need to fetch the old record to re-add under the new name */
1042         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1043                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1044         if (ret != LDB_SUCCESS) {
1045                 /* not finding the old record is an error */
1046                 return ret;
1047         }
1048
1049         /* We need to, before changing the DB, check if the new DN
1050          * exists, so we can return this error to the caller with an
1051          * unmodified DB */
1052         tdb_key = ltdb_key(module, req->op.rename.newdn);
1053         if (!tdb_key.dptr) {
1054                 talloc_free(msg);
1055                 return LDB_ERR_OPERATIONS_ERROR;
1056         }
1057
1058         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1059         if (!tdb_key_old.dptr) {
1060                 talloc_free(msg);
1061                 talloc_free(tdb_key.dptr);
1062                 return LDB_ERR_OPERATIONS_ERROR;
1063         }
1064
1065         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1066         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1067                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1068                         talloc_free(tdb_key_old.dptr);
1069                         talloc_free(tdb_key.dptr);
1070                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1071                                                "Entry %s already exists",
1072                                                ldb_dn_get_linearized(req->op.rename.newdn));
1073                         /* finding the new record already in the DB is an error */
1074                         talloc_free(msg);
1075                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1076                 }
1077         }
1078         talloc_free(tdb_key_old.dptr);
1079         talloc_free(tdb_key.dptr);
1080
1081         /* Always delete first then add, to avoid conflicts with
1082          * unique indexes. We rely on the transaction to make this
1083          * atomic
1084          */
1085         ret = ltdb_delete_internal(module, msg->dn);
1086         if (ret != LDB_SUCCESS) {
1087                 talloc_free(msg);
1088                 return ret;
1089         }
1090
1091         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1092         if (msg->dn == NULL) {
1093                 talloc_free(msg);
1094                 return LDB_ERR_OPERATIONS_ERROR;
1095         }
1096
1097         /* We don't check single value as we can have more than 1 with
1098          * deleted attributes. We could go through all elements but that's
1099          * maybe not the most efficient way
1100          */
1101         ret = ltdb_add_internal(module, msg, false);
1102
1103         talloc_free(msg);
1104
1105         return ret;
1106 }
1107
1108 static int ltdb_start_trans(struct ldb_module *module)
1109 {
1110         void *data = ldb_module_get_private(module);
1111         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1112
1113         if (tdb_transaction_start(ltdb->tdb) != 0) {
1114                 return ltdb_err_map(tdb_error(ltdb->tdb));
1115         }
1116
1117         ltdb->in_transaction++;
1118
1119         ltdb_index_transaction_start(module);
1120
1121         return LDB_SUCCESS;
1122 }
1123
1124 static int ltdb_prepare_commit(struct ldb_module *module)
1125 {
1126         void *data = ldb_module_get_private(module);
1127         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1128
1129         if (ltdb->in_transaction != 1) {
1130                 return LDB_SUCCESS;
1131         }
1132
1133         if (ltdb_index_transaction_commit(module) != 0) {
1134                 tdb_transaction_cancel(ltdb->tdb);
1135                 ltdb->in_transaction--;
1136                 return ltdb_err_map(tdb_error(ltdb->tdb));
1137         }
1138
1139         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1140                 ltdb->in_transaction--;
1141                 return ltdb_err_map(tdb_error(ltdb->tdb));
1142         }
1143
1144         ltdb->prepared_commit = true;
1145
1146         return LDB_SUCCESS;
1147 }
1148
1149 static int ltdb_end_trans(struct ldb_module *module)
1150 {
1151         void *data = ldb_module_get_private(module);
1152         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1153
1154         if (!ltdb->prepared_commit) {
1155                 int ret = ltdb_prepare_commit(module);
1156                 if (ret != LDB_SUCCESS) {
1157                         return ret;
1158                 }
1159         }
1160
1161         ltdb->in_transaction--;
1162         ltdb->prepared_commit = false;
1163
1164         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1165                 return ltdb_err_map(tdb_error(ltdb->tdb));
1166         }
1167
1168         return LDB_SUCCESS;
1169 }
1170
1171 static int ltdb_del_trans(struct ldb_module *module)
1172 {
1173         void *data = ldb_module_get_private(module);
1174         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1175
1176         ltdb->in_transaction--;
1177
1178         if (ltdb_index_transaction_cancel(module) != 0) {
1179                 tdb_transaction_cancel(ltdb->tdb);
1180                 return ltdb_err_map(tdb_error(ltdb->tdb));
1181         }
1182
1183         tdb_transaction_cancel(ltdb->tdb);
1184         return LDB_SUCCESS;
1185 }
1186
1187 /*
1188   return sequenceNumber from @BASEINFO
1189 */
1190 static int ltdb_sequence_number(struct ltdb_context *ctx,
1191                                 struct ldb_extended **ext)
1192 {
1193         struct ldb_context *ldb;
1194         struct ldb_module *module = ctx->module;
1195         struct ldb_request *req = ctx->req;
1196         TALLOC_CTX *tmp_ctx = NULL;
1197         struct ldb_seqnum_request *seq;
1198         struct ldb_seqnum_result *res;
1199         struct ldb_message *msg = NULL;
1200         struct ldb_dn *dn;
1201         const char *date;
1202         int ret = LDB_SUCCESS;
1203
1204         ldb = ldb_module_get_ctx(module);
1205
1206         seq = talloc_get_type(req->op.extended.data,
1207                                 struct ldb_seqnum_request);
1208         if (seq == NULL) {
1209                 return LDB_ERR_OPERATIONS_ERROR;
1210         }
1211
1212         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1213
1214         if (ltdb_lock_read(module) != 0) {
1215                 return LDB_ERR_OPERATIONS_ERROR;
1216         }
1217
1218         res = talloc_zero(req, struct ldb_seqnum_result);
1219         if (res == NULL) {
1220                 ret = LDB_ERR_OPERATIONS_ERROR;
1221                 goto done;
1222         }
1223
1224         tmp_ctx = talloc_new(req);
1225         if (tmp_ctx == NULL) {
1226                 ret = LDB_ERR_OPERATIONS_ERROR;
1227                 goto done;
1228         }
1229
1230         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1231         if (dn == NULL) {
1232                 ret = LDB_ERR_OPERATIONS_ERROR;
1233                 goto done;
1234         }
1235
1236         msg = ldb_msg_new(tmp_ctx);
1237         if (msg == NULL) {
1238                 ret = LDB_ERR_OPERATIONS_ERROR;
1239                 goto done;
1240         }
1241
1242         ret = ltdb_search_dn1(module, dn, msg, 0);
1243         if (ret != LDB_SUCCESS) {
1244                 goto done;
1245         }
1246
1247         switch (seq->type) {
1248         case LDB_SEQ_HIGHEST_SEQ:
1249                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1250                 break;
1251         case LDB_SEQ_NEXT:
1252                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1253                 res->seq_num++;
1254                 break;
1255         case LDB_SEQ_HIGHEST_TIMESTAMP:
1256                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1257                 if (date) {
1258                         res->seq_num = ldb_string_to_time(date);
1259                 } else {
1260                         res->seq_num = 0;
1261                         /* zero is as good as anything when we don't know */
1262                 }
1263                 break;
1264         }
1265
1266         *ext = talloc_zero(req, struct ldb_extended);
1267         if (*ext == NULL) {
1268                 ret = LDB_ERR_OPERATIONS_ERROR;
1269                 goto done;
1270         }
1271         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1272         (*ext)->data = talloc_steal(*ext, res);
1273
1274 done:
1275         talloc_free(tmp_ctx);
1276         ltdb_unlock_read(module);
1277         return ret;
1278 }
1279
1280 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1281 {
1282         struct ldb_context *ldb;
1283         struct ldb_request *req;
1284         struct ldb_reply *ares;
1285
1286         ldb = ldb_module_get_ctx(ctx->module);
1287         req = ctx->req;
1288
1289         /* if we already returned an error just return */
1290         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1291                 return;
1292         }
1293
1294         ares = talloc_zero(req, struct ldb_reply);
1295         if (!ares) {
1296                 ldb_oom(ldb);
1297                 req->callback(req, NULL);
1298                 return;
1299         }
1300         ares->type = LDB_REPLY_DONE;
1301         ares->error = error;
1302
1303         req->callback(req, ares);
1304 }
1305
1306 static void ltdb_timeout(struct tevent_context *ev,
1307                           struct tevent_timer *te,
1308                           struct timeval t,
1309                           void *private_data)
1310 {
1311         struct ltdb_context *ctx;
1312         ctx = talloc_get_type(private_data, struct ltdb_context);
1313
1314         if (!ctx->request_terminated) {
1315                 /* request is done now */
1316                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1317         }
1318
1319         if (ctx->spy) {
1320                 /* neutralize the spy */
1321                 ctx->spy->ctx = NULL;
1322                 ctx->spy = NULL;
1323         }
1324         talloc_free(ctx);
1325 }
1326
1327 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1328                                         struct ldb_extended *ext,
1329                                         int error)
1330 {
1331         struct ldb_context *ldb;
1332         struct ldb_request *req;
1333         struct ldb_reply *ares;
1334
1335         ldb = ldb_module_get_ctx(ctx->module);
1336         req = ctx->req;
1337
1338         /* if we already returned an error just return */
1339         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1340                 return;
1341         }
1342
1343         ares = talloc_zero(req, struct ldb_reply);
1344         if (!ares) {
1345                 ldb_oom(ldb);
1346                 req->callback(req, NULL);
1347                 return;
1348         }
1349         ares->type = LDB_REPLY_DONE;
1350         ares->response = ext;
1351         ares->error = error;
1352
1353         req->callback(req, ares);
1354 }
1355
1356 static void ltdb_handle_extended(struct ltdb_context *ctx)
1357 {
1358         struct ldb_extended *ext = NULL;
1359         int ret;
1360
1361         if (strcmp(ctx->req->op.extended.oid,
1362                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1363                 /* get sequence number */
1364                 ret = ltdb_sequence_number(ctx, &ext);
1365         } else {
1366                 /* not recognized */
1367                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1368         }
1369
1370         ltdb_request_extended_done(ctx, ext, ret);
1371 }
1372
1373 static void ltdb_callback(struct tevent_context *ev,
1374                           struct tevent_timer *te,
1375                           struct timeval t,
1376                           void *private_data)
1377 {
1378         struct ltdb_context *ctx;
1379         int ret;
1380
1381         ctx = talloc_get_type(private_data, struct ltdb_context);
1382
1383         if (ctx->request_terminated) {
1384                 goto done;
1385         }
1386
1387         switch (ctx->req->operation) {
1388         case LDB_SEARCH:
1389                 ret = ltdb_search(ctx);
1390                 break;
1391         case LDB_ADD:
1392                 ret = ltdb_add(ctx);
1393                 break;
1394         case LDB_MODIFY:
1395                 ret = ltdb_modify(ctx);
1396                 break;
1397         case LDB_DELETE:
1398                 ret = ltdb_delete(ctx);
1399                 break;
1400         case LDB_RENAME:
1401                 ret = ltdb_rename(ctx);
1402                 break;
1403         case LDB_EXTENDED:
1404                 ltdb_handle_extended(ctx);
1405                 goto done;
1406         default:
1407                 /* no other op supported */
1408                 ret = LDB_ERR_PROTOCOL_ERROR;
1409         }
1410
1411         if (!ctx->request_terminated) {
1412                 /* request is done now */
1413                 ltdb_request_done(ctx, ret);
1414         }
1415
1416 done:
1417         if (ctx->spy) {
1418                 /* neutralize the spy */
1419                 ctx->spy->ctx = NULL;
1420                 ctx->spy = NULL;
1421         }
1422         talloc_free(ctx);
1423 }
1424
1425 static int ltdb_request_destructor(void *ptr)
1426 {
1427         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1428
1429         if (spy->ctx != NULL) {
1430                 spy->ctx->spy = NULL;
1431                 spy->ctx->request_terminated = true;
1432                 spy->ctx = NULL;
1433         }
1434
1435         return 0;
1436 }
1437
1438 static int ltdb_handle_request(struct ldb_module *module,
1439                                struct ldb_request *req)
1440 {
1441         struct ldb_control *control_permissive;
1442         struct ldb_context *ldb;
1443         struct tevent_context *ev;
1444         struct ltdb_context *ac;
1445         struct tevent_timer *te;
1446         struct timeval tv;
1447         unsigned int i;
1448
1449         ldb = ldb_module_get_ctx(module);
1450
1451         control_permissive = ldb_request_get_control(req,
1452                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1453
1454         for (i = 0; req->controls && req->controls[i]; i++) {
1455                 if (req->controls[i]->critical &&
1456                     req->controls[i] != control_permissive) {
1457                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1458                                                req->controls[i]->oid);
1459                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1460                 }
1461         }
1462
1463         if (req->starttime == 0 || req->timeout == 0) {
1464                 ldb_set_errstring(ldb, "Invalid timeout settings");
1465                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1466         }
1467
1468         ev = ldb_get_event_context(ldb);
1469
1470         ac = talloc_zero(ldb, struct ltdb_context);
1471         if (ac == NULL) {
1472                 ldb_oom(ldb);
1473                 return LDB_ERR_OPERATIONS_ERROR;
1474         }
1475
1476         ac->module = module;
1477         ac->req = req;
1478
1479         tv.tv_sec = 0;
1480         tv.tv_usec = 0;
1481         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1482         if (NULL == te) {
1483                 talloc_free(ac);
1484                 return LDB_ERR_OPERATIONS_ERROR;
1485         }
1486
1487         if (req->timeout > 0) {
1488                 tv.tv_sec = req->starttime + req->timeout;
1489                 tv.tv_usec = 0;
1490                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1491                                                      ltdb_timeout, ac);
1492                 if (NULL == ac->timeout_event) {
1493                         talloc_free(ac);
1494                         return LDB_ERR_OPERATIONS_ERROR;
1495                 }
1496         }
1497
1498         /* set a spy so that we do not try to use the request context
1499          * if it is freed before ltdb_callback fires */
1500         ac->spy = talloc(req, struct ltdb_req_spy);
1501         if (NULL == ac->spy) {
1502                 talloc_free(ac);
1503                 return LDB_ERR_OPERATIONS_ERROR;
1504         }
1505         ac->spy->ctx = ac;
1506
1507         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1508
1509         return LDB_SUCCESS;
1510 }
1511
1512 static int ltdb_init_rootdse(struct ldb_module *module)
1513 {
1514         /* ignore errors on this - we expect it for non-sam databases */
1515         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1516
1517         /* there can be no module beyond the backend, just return */
1518         return LDB_SUCCESS;
1519 }
1520
1521 static const struct ldb_module_ops ltdb_ops = {
1522         .name              = "tdb",
1523         .init_context      = ltdb_init_rootdse,
1524         .search            = ltdb_handle_request,
1525         .add               = ltdb_handle_request,
1526         .modify            = ltdb_handle_request,
1527         .del               = ltdb_handle_request,
1528         .rename            = ltdb_handle_request,
1529         .extended          = ltdb_handle_request,
1530         .start_transaction = ltdb_start_trans,
1531         .end_transaction   = ltdb_end_trans,
1532         .prepare_commit    = ltdb_prepare_commit,
1533         .del_transaction   = ltdb_del_trans,
1534 };
1535
1536 /*
1537   connect to the database
1538 */
1539 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1540                         unsigned int flags, const char *options[],
1541                         struct ldb_module **_module)
1542 {
1543         struct ldb_module *module;
1544         const char *path;
1545         int tdb_flags, open_flags;
1546         struct ltdb_private *ltdb;
1547
1548         /* parse the url */
1549         if (strchr(url, ':')) {
1550                 if (strncmp(url, "tdb://", 6) != 0) {
1551                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1552                                   "Invalid tdb URL '%s'", url);
1553                         return LDB_ERR_OPERATIONS_ERROR;
1554                 }
1555                 path = url+6;
1556         } else {
1557                 path = url;
1558         }
1559
1560         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1561
1562         /* check for the 'nosync' option */
1563         if (flags & LDB_FLG_NOSYNC) {
1564                 tdb_flags |= TDB_NOSYNC;
1565         }
1566
1567         /* and nommap option */
1568         if (flags & LDB_FLG_NOMMAP) {
1569                 tdb_flags |= TDB_NOMMAP;
1570         }
1571
1572         if (flags & LDB_FLG_RDONLY) {
1573                 open_flags = O_RDONLY;
1574         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1575                 open_flags = O_RDWR;
1576         } else {
1577                 open_flags = O_CREAT | O_RDWR;
1578         }
1579
1580         ltdb = talloc_zero(ldb, struct ltdb_private);
1581         if (!ltdb) {
1582                 ldb_oom(ldb);
1583                 return LDB_ERR_OPERATIONS_ERROR;
1584         }
1585
1586         /* note that we use quite a large default hash size */
1587         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1588                                    tdb_flags, open_flags,
1589                                    ldb_get_create_perms(ldb), ldb);
1590         if (!ltdb->tdb) {
1591                 ldb_asprintf_errstring(ldb,
1592                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1593                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1594                           "Unable to open tdb '%s': %s", path, strerror(errno));
1595                 talloc_free(ltdb);
1596                 if (errno == EACCES || errno == EPERM) {
1597                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1598                 }
1599                 return LDB_ERR_OPERATIONS_ERROR;
1600         }
1601
1602         if (getenv("LDB_WARN_UNINDEXED")) {
1603                 ltdb->warn_unindexed = true;
1604         }
1605
1606         if (getenv("LDB_WARN_REINDEX")) {
1607                 ltdb->warn_reindex = true;
1608         }
1609
1610         ltdb->sequence_number = 0;
1611
1612         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1613         if (!module) {
1614                 ldb_oom(ldb);
1615                 talloc_free(ltdb);
1616                 return LDB_ERR_OPERATIONS_ERROR;
1617         }
1618         ldb_module_set_private(module, ltdb);
1619         talloc_steal(module, ltdb);
1620
1621         if (ltdb_cache_load(module) != 0) {
1622                 ldb_asprintf_errstring(ldb,
1623                                        "Unable to load ltdb cache records of tdb '%s'", path);
1624                 talloc_free(module);
1625                 return LDB_ERR_OPERATIONS_ERROR;
1626         }
1627
1628         *_module = module;
1629         return LDB_SUCCESS;
1630 }
1631
1632 int ldb_tdb_init(const char *version)
1633 {
1634         LDB_MODULE_CHECK_VERSION(version);
1635         return ldb_register_backend("tdb", ltdb_connect, false);
1636 }