ldb_tdb: Create a common ltdb_key_is_record() allowing multiple key forms
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 ltdb->read_lock_count--;
123                 return 0;
124         }
125         ltdb->read_lock_count--;
126         return 0;
127 }
128
129
130 /* 
131  * Determine if this key could hold a record.  We allow the new GUID
132  * index, the old DN index and a possible future ID=
133  */
134 bool ltdb_key_is_record(TDB_DATA key)
135 {
136         if (key.dsize < 4) {
137                 return false;
138         }
139
140         if (strncmp((char *)key.dptr, "DN=", 3) == 0) {
141                 return true;
142         }
143         
144         if (strncmp((char *)key.dptr, "ID=", 3) == 0) {
145                 return true;
146         }
147
148         if (key.dsize < 6) {
149                 return false;
150         }
151
152         if (strncmp((char *)key.dptr, "GUID=", 5) == 0) {
153                 return true;
154         }
155         
156         return false;
157 }
158
159 /*
160   form a TDB_DATA for a record key
161   caller frees
162
163   note that the key for a record can depend on whether the
164   dn refers to a case sensitive index record or not
165 */
166 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
167 {
168         struct ldb_context *ldb = ldb_module_get_ctx(module);
169         TDB_DATA key;
170         char *key_str = NULL;
171         const char *dn_folded = NULL;
172
173         /*
174           most DNs are case insensitive. The exception is index DNs for
175           case sensitive attributes
176
177           there are 3 cases dealt with in this code:
178
179           1) if the dn doesn't start with @ then uppercase the attribute
180              names and the attributes values of case insensitive attributes
181           2) if the dn starts with @ then leave it alone -
182              the indexing code handles the rest
183         */
184
185         dn_folded = ldb_dn_get_casefold(dn);
186         if (!dn_folded) {
187                 goto failed;
188         }
189
190         key_str = talloc_strdup(ldb, "DN=");
191         if (!key_str) {
192                 goto failed;
193         }
194
195         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
196         if (!key_str) {
197                 goto failed;
198         }
199
200         key.dptr = (uint8_t *)key_str;
201         key.dsize = strlen(key_str) + 1;
202
203         return key;
204
205 failed:
206         errno = ENOMEM;
207         key.dptr = NULL;
208         key.dsize = 0;
209         return key;
210 }
211
212 /*
213   check special dn's have valid attributes
214   currently only @ATTRIBUTES is checked
215 */
216 static int ltdb_check_special_dn(struct ldb_module *module,
217                                  const struct ldb_message *msg)
218 {
219         struct ldb_context *ldb = ldb_module_get_ctx(module);
220         unsigned int i, j;
221
222         if (! ldb_dn_is_special(msg->dn) ||
223             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
224                 return LDB_SUCCESS;
225         }
226
227         /* we have @ATTRIBUTES, let's check attributes are fine */
228         /* should we check that we deny multivalued attributes ? */
229         for (i = 0; i < msg->num_elements; i++) {
230                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
231
232                 for (j = 0; j < msg->elements[i].num_values; j++) {
233                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
234                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
235                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
236                         }
237                 }
238         }
239
240         return LDB_SUCCESS;
241 }
242
243
244 /*
245   we've made a modification to a dn - possibly reindex and
246   update sequence number
247 */
248 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
249 {
250         int ret = LDB_SUCCESS;
251         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
252
253         /* only allow modifies inside a transaction, otherwise the
254          * ldb is unsafe */
255         if (ltdb->in_transaction == 0) {
256                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
257                 return LDB_ERR_OPERATIONS_ERROR;
258         }
259
260         if (ldb_dn_is_special(dn) &&
261             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
262              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
263         {
264                 if (ltdb->warn_reindex) {
265                         ldb_debug(ldb_module_get_ctx(module),
266                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
267                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
268                 }
269                 ret = ltdb_reindex(module);
270         }
271
272         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
273         if (ret == LDB_SUCCESS &&
274             !(ldb_dn_is_special(dn) &&
275               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
276                 ret = ltdb_increase_sequence_number(module);
277         }
278
279         /* If the modify was to @OPTIONS, reload the cache */
280         if (ret == LDB_SUCCESS &&
281             ldb_dn_is_special(dn) &&
282             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
283                 ret = ltdb_cache_reload(module);
284         }
285
286         return ret;
287 }
288
289 /*
290   store a record into the db
291 */
292 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
293 {
294         void *data = ldb_module_get_private(module);
295         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
296         TDB_DATA tdb_key, tdb_data;
297         struct ldb_val ldb_data;
298         int ret = LDB_SUCCESS;
299
300         tdb_key = ltdb_key(module, msg->dn);
301         if (tdb_key.dptr == NULL) {
302                 return LDB_ERR_OTHER;
303         }
304
305         ret = ldb_pack_data(ldb_module_get_ctx(module),
306                             msg, &ldb_data);
307         if (ret == -1) {
308                 talloc_free(tdb_key.dptr);
309                 return LDB_ERR_OTHER;
310         }
311
312         tdb_data.dptr = ldb_data.data;
313         tdb_data.dsize = ldb_data.length;
314
315         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
316         if (ret != 0) {
317                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
318                 goto done;
319         }
320
321 done:
322         talloc_free(tdb_key.dptr);
323         talloc_free(ldb_data.data);
324
325         return ret;
326 }
327
328
329 /*
330   check if a attribute is a single valued, for a given element
331  */
332 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
333                                   struct ldb_message_element *el)
334 {
335         if (!a) return false;
336         if (el != NULL) {
337                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
338                         /* override from a ldb module, for example
339                            used for the description field, which is
340                            marked multi-valued in the schema but which
341                            should not actually accept multiple
342                            values */
343                         return true;
344                 }
345                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
346                         /* override from a ldb module, for example used for
347                            deleted linked attribute entries */
348                         return false;
349                 }
350         }
351         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
352                 return true;
353         }
354         return false;
355 }
356
357 static int ltdb_add_internal(struct ldb_module *module,
358                              const struct ldb_message *msg,
359                              bool check_single_value)
360 {
361         struct ldb_context *ldb = ldb_module_get_ctx(module);
362         int ret = LDB_SUCCESS;
363         unsigned int i;
364
365         for (i=0;i<msg->num_elements;i++) {
366                 struct ldb_message_element *el = &msg->elements[i];
367                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
368
369                 if (el->num_values == 0) {
370                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
371                                                el->name, ldb_dn_get_linearized(msg->dn));
372                         return LDB_ERR_CONSTRAINT_VIOLATION;
373                 }
374                 if (check_single_value &&
375                                 el->num_values > 1 &&
376                                 ldb_tdb_single_valued(a, el)) {
377                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
378                                                el->name, ldb_dn_get_linearized(msg->dn));
379                         return LDB_ERR_CONSTRAINT_VIOLATION;
380                 }
381
382                 /* Do not check "@ATTRIBUTES" for duplicated values */
383                 if (ldb_dn_is_special(msg->dn) &&
384                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
385                         continue;
386                 }
387
388                 if (check_single_value &&
389                     !(el->flags &
390                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
391                         struct ldb_val *duplicate = NULL;
392
393                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
394                                                          el, &duplicate, 0);
395                         if (ret != LDB_SUCCESS) {
396                                 return ret;
397                         }
398                         if (duplicate != NULL) {
399                                 ldb_asprintf_errstring(
400                                         ldb,
401                                         "attribute '%s': value '%.*s' on '%s' "
402                                         "provided more than once in ADD object",
403                                         el->name,
404                                         (int)duplicate->length,
405                                         duplicate->data,
406                                         ldb_dn_get_linearized(msg->dn));
407                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
408                         }
409                 }
410         }
411
412         ret = ltdb_store(module, msg, TDB_INSERT);
413         if (ret != LDB_SUCCESS) {
414                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
415                         ldb_asprintf_errstring(ldb,
416                                                "Entry %s already exists",
417                                                ldb_dn_get_linearized(msg->dn));
418                 }
419                 return ret;
420         }
421
422         ret = ltdb_index_add_new(module, msg);
423         if (ret != LDB_SUCCESS) {
424                 return ret;
425         }
426
427         ret = ltdb_modified(module, msg->dn);
428
429         return ret;
430 }
431
432 /*
433   add a record to the database
434 */
435 static int ltdb_add(struct ltdb_context *ctx)
436 {
437         struct ldb_module *module = ctx->module;
438         struct ldb_request *req = ctx->req;
439         int ret = LDB_SUCCESS;
440
441         ret = ltdb_check_special_dn(module, req->op.add.message);
442         if (ret != LDB_SUCCESS) {
443                 return ret;
444         }
445
446         ldb_request_set_state(req, LDB_ASYNC_PENDING);
447
448         if (ltdb_cache_load(module) != 0) {
449                 return LDB_ERR_OPERATIONS_ERROR;
450         }
451
452         ret = ltdb_add_internal(module, req->op.add.message, true);
453
454         return ret;
455 }
456
457 /*
458   delete a record from the database, not updating indexes (used for deleting
459   index records)
460 */
461 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
462 {
463         void *data = ldb_module_get_private(module);
464         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
465         TDB_DATA tdb_key;
466         int ret;
467
468         tdb_key = ltdb_key(module, dn);
469         if (!tdb_key.dptr) {
470                 return LDB_ERR_OTHER;
471         }
472
473         ret = tdb_delete(ltdb->tdb, tdb_key);
474         talloc_free(tdb_key.dptr);
475
476         if (ret != 0) {
477                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
478         }
479
480         return ret;
481 }
482
483 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
484 {
485         struct ldb_message *msg;
486         int ret = LDB_SUCCESS;
487
488         msg = ldb_msg_new(module);
489         if (msg == NULL) {
490                 return LDB_ERR_OPERATIONS_ERROR;
491         }
492
493         /* in case any attribute of the message was indexed, we need
494            to fetch the old record */
495         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
496         if (ret != LDB_SUCCESS) {
497                 /* not finding the old record is an error */
498                 goto done;
499         }
500
501         ret = ltdb_delete_noindex(module, dn);
502         if (ret != LDB_SUCCESS) {
503                 goto done;
504         }
505
506         /* remove any indexed attributes */
507         ret = ltdb_index_delete(module, msg);
508         if (ret != LDB_SUCCESS) {
509                 goto done;
510         }
511
512         ret = ltdb_modified(module, dn);
513         if (ret != LDB_SUCCESS) {
514                 goto done;
515         }
516
517 done:
518         talloc_free(msg);
519         return ret;
520 }
521
522 /*
523   delete a record from the database
524 */
525 static int ltdb_delete(struct ltdb_context *ctx)
526 {
527         struct ldb_module *module = ctx->module;
528         struct ldb_request *req = ctx->req;
529         int ret = LDB_SUCCESS;
530
531         ldb_request_set_state(req, LDB_ASYNC_PENDING);
532
533         if (ltdb_cache_load(module) != 0) {
534                 return LDB_ERR_OPERATIONS_ERROR;
535         }
536
537         ret = ltdb_delete_internal(module, req->op.del.dn);
538
539         return ret;
540 }
541
542 /*
543   find an element by attribute name. At the moment this does a linear search,
544   it should be re-coded to use a binary search once all places that modify
545   records guarantee sorted order
546
547   return the index of the first matching element if found, otherwise -1
548 */
549 static int find_element(const struct ldb_message *msg, const char *name)
550 {
551         unsigned int i;
552         for (i=0;i<msg->num_elements;i++) {
553                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
554                         return i;
555                 }
556         }
557         return -1;
558 }
559
560
561 /*
562   add an element to an existing record. Assumes a elements array that we
563   can call re-alloc on, and assumed that we can re-use the data pointers from
564   the passed in additional values. Use with care!
565
566   returns 0 on success, -1 on failure (and sets errno)
567 */
568 static int ltdb_msg_add_element(struct ldb_message *msg,
569                                 struct ldb_message_element *el)
570 {
571         struct ldb_message_element *e2;
572         unsigned int i;
573
574         if (el->num_values == 0) {
575                 /* nothing to do here - we don't add empty elements */
576                 return 0;
577         }
578
579         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
580                               msg->num_elements+1);
581         if (!e2) {
582                 errno = ENOMEM;
583                 return -1;
584         }
585
586         msg->elements = e2;
587
588         e2 = &msg->elements[msg->num_elements];
589
590         e2->name = el->name;
591         e2->flags = el->flags;
592         e2->values = talloc_array(msg->elements,
593                                   struct ldb_val, el->num_values);
594         if (!e2->values) {
595                 errno = ENOMEM;
596                 return -1;
597         }
598         for (i=0;i<el->num_values;i++) {
599                 e2->values[i] = el->values[i];
600         }
601         e2->num_values = el->num_values;
602
603         ++msg->num_elements;
604
605         return 0;
606 }
607
608 /*
609   delete all elements having a specified attribute name
610 */
611 static int msg_delete_attribute(struct ldb_module *module,
612                                 struct ldb_message *msg, const char *name)
613 {
614         unsigned int i;
615         int ret;
616         struct ldb_message_element *el;
617
618         el = ldb_msg_find_element(msg, name);
619         if (el == NULL) {
620                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
621         }
622         i = el - msg->elements;
623
624         ret = ltdb_index_del_element(module, msg->dn, el);
625         if (ret != LDB_SUCCESS) {
626                 return ret;
627         }
628
629         talloc_free(el->values);
630         if (msg->num_elements > (i+1)) {
631                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
632         }
633         msg->num_elements--;
634         msg->elements = talloc_realloc(msg, msg->elements,
635                                        struct ldb_message_element,
636                                        msg->num_elements);
637         return LDB_SUCCESS;
638 }
639
640 /*
641   delete all elements matching an attribute name/value
642
643   return LDB Error on failure
644 */
645 static int msg_delete_element(struct ldb_module *module,
646                               struct ldb_message *msg,
647                               const char *name,
648                               const struct ldb_val *val)
649 {
650         struct ldb_context *ldb = ldb_module_get_ctx(module);
651         unsigned int i;
652         int found, ret;
653         struct ldb_message_element *el;
654         const struct ldb_schema_attribute *a;
655
656         found = find_element(msg, name);
657         if (found == -1) {
658                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
659         }
660
661         i = (unsigned int) found;
662         el = &(msg->elements[i]);
663
664         a = ldb_schema_attribute_by_name(ldb, el->name);
665
666         for (i=0;i<el->num_values;i++) {
667                 bool matched;
668                 if (a->syntax->operator_fn) {
669                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
670                                                      &el->values[i], val, &matched);
671                         if (ret != LDB_SUCCESS) return ret;
672                 } else {
673                         matched = (a->syntax->comparison_fn(ldb, ldb,
674                                                             &el->values[i], val) == 0);
675                 }
676                 if (matched) {
677                         if (el->num_values == 1) {
678                                 return msg_delete_attribute(module, msg, name);
679                         }
680
681                         ret = ltdb_index_del_value(module, msg->dn, el, i);
682                         if (ret != LDB_SUCCESS) {
683                                 return ret;
684                         }
685
686                         if (i<el->num_values-1) {
687                                 memmove(&el->values[i], &el->values[i+1],
688                                         sizeof(el->values[i])*
689                                                 (el->num_values-(i+1)));
690                         }
691                         el->num_values--;
692
693                         /* per definition we find in a canonicalised message an
694                            attribute value only once. So we are finished here */
695                         return LDB_SUCCESS;
696                 }
697         }
698
699         /* Not found */
700         return LDB_ERR_NO_SUCH_ATTRIBUTE;
701 }
702
703
704 /*
705   modify a record - internal interface
706
707   yuck - this is O(n^2). Luckily n is usually small so we probably
708   get away with it, but if we ever have really large attribute lists
709   then we'll need to look at this again
710
711   'req' is optional, and is used to specify controls if supplied
712 */
713 int ltdb_modify_internal(struct ldb_module *module,
714                          const struct ldb_message *msg,
715                          struct ldb_request *req)
716 {
717         struct ldb_context *ldb = ldb_module_get_ctx(module);
718         struct ldb_message *msg2;
719         unsigned int i, j;
720         int ret = LDB_SUCCESS, idx;
721         struct ldb_control *control_permissive = NULL;
722         TALLOC_CTX *mem_ctx = talloc_new(req);
723
724         if (mem_ctx == NULL) {
725                 return ldb_module_oom(module);
726         }
727         
728         if (req) {
729                 control_permissive = ldb_request_get_control(req,
730                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
731         }
732
733         msg2 = ldb_msg_new(mem_ctx);
734         if (msg2 == NULL) {
735                 ret = LDB_ERR_OTHER;
736                 goto done;
737         }
738
739         ret = ltdb_search_dn1(module, msg->dn,
740                               msg2,
741                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
742         if (ret != LDB_SUCCESS) {
743                 goto done;
744         }
745
746         for (i=0; i<msg->num_elements; i++) {
747                 struct ldb_message_element *el = &msg->elements[i], *el2;
748                 struct ldb_val *vals;
749                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
750                 const char *dn;
751                 uint32_t options = 0;
752                 if (control_permissive != NULL) {
753                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
754                 }
755
756                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
757                 case LDB_FLAG_MOD_ADD:
758
759                         if (el->num_values == 0) {
760                                 ldb_asprintf_errstring(ldb,
761                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
762                                                        el->name, ldb_dn_get_linearized(msg2->dn));
763                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
764                                 goto done;
765                         }
766
767                         /* make a copy of the array so that a permissive
768                          * control can remove duplicates without changing the
769                          * original values, but do not copy data as we do not
770                          * need to keep it around once the operation is
771                          * finished */
772                         if (control_permissive) {
773                                 el = talloc(msg2, struct ldb_message_element);
774                                 if (!el) {
775                                         ret = LDB_ERR_OTHER;
776                                         goto done;
777                                 }
778                                 *el = msg->elements[i];
779                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
780                                 if (el->values == NULL) {
781                                         ret = LDB_ERR_OTHER;
782                                         goto done;
783                                 }
784                                 for (j = 0; j < el->num_values; j++) {
785                                         el->values[j] = msg->elements[i].values[j];
786                                 }
787                         }
788
789                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
790                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
791                                                        el->name, ldb_dn_get_linearized(msg2->dn));
792                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
793                                 goto done;
794                         }
795
796                         /* Checks if element already exists */
797                         idx = find_element(msg2, el->name);
798                         if (idx == -1) {
799                                 if (ltdb_msg_add_element(msg2, el) != 0) {
800                                         ret = LDB_ERR_OTHER;
801                                         goto done;
802                                 }
803                                 ret = ltdb_index_add_element(module, msg2->dn,
804                                                              el);
805                                 if (ret != LDB_SUCCESS) {
806                                         goto done;
807                                 }
808                         } else {
809                                 j = (unsigned int) idx;
810                                 el2 = &(msg2->elements[j]);
811
812                                 /* We cannot add another value on a existing one
813                                    if the attribute is single-valued */
814                                 if (ldb_tdb_single_valued(a, el)) {
815                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
816                                                                el->name, ldb_dn_get_linearized(msg2->dn));
817                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
818                                         goto done;
819                                 }
820
821                                 /* Check that values don't exist yet on multi-
822                                    valued attributes or aren't provided twice */
823                                 if (!(el->flags &
824                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
825                                         struct ldb_val *duplicate = NULL;
826                                         ret = ldb_msg_find_common_values(ldb,
827                                                                          msg2,
828                                                                          el,
829                                                                          el2,
830                                                                          options);
831
832                                         if (ret ==
833                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
834                                                 ldb_asprintf_errstring(ldb,
835                                                         "attribute '%s': value "
836                                                         "#%u on '%s' already "
837                                                         "exists", el->name, j,
838                                                         ldb_dn_get_linearized(msg2->dn));
839                                                 goto done;
840                                         } else if (ret != LDB_SUCCESS) {
841                                                 goto done;
842                                         }
843
844                                         ret = ldb_msg_find_duplicate_val(
845                                                 ldb, msg2, el, &duplicate, 0);
846                                         if (ret != LDB_SUCCESS) {
847                                                 goto done;
848                                         }
849                                         if (duplicate != NULL) {
850                                                 ldb_asprintf_errstring(
851                                                         ldb,
852                                                         "attribute '%s': value "
853                                                         "'%.*s' on '%s' "
854                                                         "provided more than "
855                                                         "once in ADD",
856                                                         el->name,
857                                                         (int)duplicate->length,
858                                                         duplicate->data,
859                                                         ldb_dn_get_linearized(msg->dn));
860                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
861                                                 goto done;
862                                         }
863                                 }
864
865                                 /* Now combine existing and new values to a new
866                                    attribute record */
867                                 vals = talloc_realloc(msg2->elements,
868                                                       el2->values, struct ldb_val,
869                                                       el2->num_values + el->num_values);
870                                 if (vals == NULL) {
871                                         ldb_oom(ldb);
872                                         ret = LDB_ERR_OTHER;
873                                         goto done;
874                                 }
875
876                                 for (j=0; j<el->num_values; j++) {
877                                         vals[el2->num_values + j] =
878                                                 ldb_val_dup(vals, &el->values[j]);
879                                 }
880
881                                 el2->values = vals;
882                                 el2->num_values += el->num_values;
883
884                                 ret = ltdb_index_add_element(module, msg2->dn, el);
885                                 if (ret != LDB_SUCCESS) {
886                                         goto done;
887                                 }
888                         }
889
890                         break;
891
892                 case LDB_FLAG_MOD_REPLACE:
893
894                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
895                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
896                                                        el->name, ldb_dn_get_linearized(msg2->dn));
897                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
898                                 goto done;
899                         }
900
901                         /*
902                          * We don't need to check this if we have been
903                          * pre-screened by the repl_meta_data module
904                          * in Samba, or someone else who can claim to
905                          * know what they are doing. 
906                          */
907                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
908                                 struct ldb_val *duplicate = NULL;
909
910                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
911                                                                  &duplicate, 0);
912                                 if (ret != LDB_SUCCESS) {
913                                         goto done;
914                                 }
915                                 if (duplicate != NULL) {
916                                         ldb_asprintf_errstring(
917                                                 ldb,
918                                                 "attribute '%s': value '%.*s' "
919                                                 "on '%s' provided more than "
920                                                 "once in REPLACE",
921                                                 el->name,
922                                                 (int)duplicate->length,
923                                                 duplicate->data,
924                                                 ldb_dn_get_linearized(msg2->dn));
925                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
926                                         goto done;
927                                 }
928                         }
929
930                         /* Checks if element already exists */
931                         idx = find_element(msg2, el->name);
932                         if (idx != -1) {
933                                 j = (unsigned int) idx;
934                                 el2 = &(msg2->elements[j]);
935
936                                 /* we consider two elements to be
937                                  * equal only if the order
938                                  * matches. This allows dbcheck to
939                                  * fix the ordering on attributes
940                                  * where order matters, such as
941                                  * objectClass
942                                  */
943                                 if (ldb_msg_element_equal_ordered(el, el2)) {
944                                         continue;
945                                 }
946
947                                 /* Delete the attribute if it exists in the DB */
948                                 if (msg_delete_attribute(module, msg2,
949                                                          el->name) != 0) {
950                                         ret = LDB_ERR_OTHER;
951                                         goto done;
952                                 }
953                         }
954
955                         /* Recreate it with the new values */
956                         if (ltdb_msg_add_element(msg2, el) != 0) {
957                                 ret = LDB_ERR_OTHER;
958                                 goto done;
959                         }
960
961                         ret = ltdb_index_add_element(module, msg2->dn, el);
962                         if (ret != LDB_SUCCESS) {
963                                 goto done;
964                         }
965
966                         break;
967
968                 case LDB_FLAG_MOD_DELETE:
969                         dn = ldb_dn_get_linearized(msg2->dn);
970                         if (dn == NULL) {
971                                 ret = LDB_ERR_OTHER;
972                                 goto done;
973                         }
974
975                         if (msg->elements[i].num_values == 0) {
976                                 /* Delete the whole attribute */
977                                 ret = msg_delete_attribute(module, msg2,
978                                                            msg->elements[i].name);
979                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
980                                     control_permissive) {
981                                         ret = LDB_SUCCESS;
982                                 } else {
983                                         ldb_asprintf_errstring(ldb,
984                                                                "attribute '%s': no such attribute for delete on '%s'",
985                                                                msg->elements[i].name, dn);
986                                 }
987                                 if (ret != LDB_SUCCESS) {
988                                         goto done;
989                                 }
990                         } else {
991                                 /* Delete specified values from an attribute */
992                                 for (j=0; j < msg->elements[i].num_values; j++) {
993                                         ret = msg_delete_element(module,
994                                                                  msg2,
995                                                                  msg->elements[i].name,
996                                                                  &msg->elements[i].values[j]);
997                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
998                                             control_permissive) {
999                                                 ret = LDB_SUCCESS;
1000                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1001                                                 ldb_asprintf_errstring(ldb,
1002                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1003                                                                        msg->elements[i].name, dn);
1004                                         }
1005                                         if (ret != LDB_SUCCESS) {
1006                                                 goto done;
1007                                         }
1008                                 }
1009                         }
1010                         break;
1011                 default:
1012                         ldb_asprintf_errstring(ldb,
1013                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1014                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1015                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1016                         ret = LDB_ERR_PROTOCOL_ERROR;
1017                         goto done;
1018                 }
1019         }
1020
1021         ret = ltdb_store(module, msg2, TDB_MODIFY);
1022         if (ret != LDB_SUCCESS) {
1023                 goto done;
1024         }
1025
1026         ret = ltdb_modified(module, msg2->dn);
1027         if (ret != LDB_SUCCESS) {
1028                 goto done;
1029         }
1030
1031 done:
1032         TALLOC_FREE(mem_ctx);
1033         return ret;
1034 }
1035
1036 /*
1037   modify a record
1038 */
1039 static int ltdb_modify(struct ltdb_context *ctx)
1040 {
1041         struct ldb_module *module = ctx->module;
1042         struct ldb_request *req = ctx->req;
1043         int ret = LDB_SUCCESS;
1044
1045         ret = ltdb_check_special_dn(module, req->op.mod.message);
1046         if (ret != LDB_SUCCESS) {
1047                 return ret;
1048         }
1049
1050         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1051
1052         if (ltdb_cache_load(module) != 0) {
1053                 return LDB_ERR_OPERATIONS_ERROR;
1054         }
1055
1056         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1057
1058         return ret;
1059 }
1060
1061 /*
1062   rename a record
1063 */
1064 static int ltdb_rename(struct ltdb_context *ctx)
1065 {
1066         struct ldb_module *module = ctx->module;
1067         void *data = ldb_module_get_private(module);
1068         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1069         struct ldb_request *req = ctx->req;
1070         struct ldb_message *msg;
1071         int ret = LDB_SUCCESS;
1072         TDB_DATA tdb_key, tdb_key_old;
1073
1074         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1075
1076         if (ltdb_cache_load(ctx->module) != 0) {
1077                 return LDB_ERR_OPERATIONS_ERROR;
1078         }
1079
1080         msg = ldb_msg_new(ctx);
1081         if (msg == NULL) {
1082                 return LDB_ERR_OPERATIONS_ERROR;
1083         }
1084
1085         /* we need to fetch the old record to re-add under the new name */
1086         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1087                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1088         if (ret != LDB_SUCCESS) {
1089                 /* not finding the old record is an error */
1090                 return ret;
1091         }
1092
1093         /* We need to, before changing the DB, check if the new DN
1094          * exists, so we can return this error to the caller with an
1095          * unmodified DB */
1096         tdb_key = ltdb_key(module, req->op.rename.newdn);
1097         if (!tdb_key.dptr) {
1098                 talloc_free(msg);
1099                 return LDB_ERR_OPERATIONS_ERROR;
1100         }
1101
1102         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1103         if (!tdb_key_old.dptr) {
1104                 talloc_free(msg);
1105                 talloc_free(tdb_key.dptr);
1106                 return LDB_ERR_OPERATIONS_ERROR;
1107         }
1108
1109         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1110         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1111                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1112                         talloc_free(tdb_key_old.dptr);
1113                         talloc_free(tdb_key.dptr);
1114                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1115                                                "Entry %s already exists",
1116                                                ldb_dn_get_linearized(req->op.rename.newdn));
1117                         /* finding the new record already in the DB is an error */
1118                         talloc_free(msg);
1119                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1120                 }
1121         }
1122         talloc_free(tdb_key_old.dptr);
1123         talloc_free(tdb_key.dptr);
1124
1125         /* Always delete first then add, to avoid conflicts with
1126          * unique indexes. We rely on the transaction to make this
1127          * atomic
1128          */
1129         ret = ltdb_delete_internal(module, msg->dn);
1130         if (ret != LDB_SUCCESS) {
1131                 talloc_free(msg);
1132                 return ret;
1133         }
1134
1135         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1136         if (msg->dn == NULL) {
1137                 talloc_free(msg);
1138                 return LDB_ERR_OPERATIONS_ERROR;
1139         }
1140
1141         /* We don't check single value as we can have more than 1 with
1142          * deleted attributes. We could go through all elements but that's
1143          * maybe not the most efficient way
1144          */
1145         ret = ltdb_add_internal(module, msg, false);
1146
1147         talloc_free(msg);
1148
1149         return ret;
1150 }
1151
1152 static int ltdb_start_trans(struct ldb_module *module)
1153 {
1154         void *data = ldb_module_get_private(module);
1155         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1156
1157         if (tdb_transaction_start(ltdb->tdb) != 0) {
1158                 return ltdb_err_map(tdb_error(ltdb->tdb));
1159         }
1160
1161         ltdb->in_transaction++;
1162
1163         ltdb_index_transaction_start(module);
1164
1165         return LDB_SUCCESS;
1166 }
1167
1168 static int ltdb_prepare_commit(struct ldb_module *module)
1169 {
1170         int ret;
1171         void *data = ldb_module_get_private(module);
1172         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1173
1174         if (ltdb->in_transaction != 1) {
1175                 return LDB_SUCCESS;
1176         }
1177
1178         ret = ltdb_index_transaction_commit(module);
1179         if (ret != LDB_SUCCESS) {
1180                 tdb_transaction_cancel(ltdb->tdb);
1181                 ltdb->in_transaction--;
1182                 return ret;
1183         }
1184
1185         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1186                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1187                 ltdb->in_transaction--;
1188                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1189                                        "Failure during tdb_transaction_prepare_commit(): %s -> %s",
1190                                        tdb_errorstr(ltdb->tdb),
1191                                        ldb_strerror(ret));
1192                 return ret;
1193         }
1194
1195         ltdb->prepared_commit = true;
1196
1197         return LDB_SUCCESS;
1198 }
1199
1200 static int ltdb_end_trans(struct ldb_module *module)
1201 {
1202         int ret;
1203         void *data = ldb_module_get_private(module);
1204         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1205
1206         if (!ltdb->prepared_commit) {
1207                 ret = ltdb_prepare_commit(module);
1208                 if (ret != LDB_SUCCESS) {
1209                         return ret;
1210                 }
1211         }
1212
1213         ltdb->in_transaction--;
1214         ltdb->prepared_commit = false;
1215
1216         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1217                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1218                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1219                                        "Failure during tdb_transaction_commit(): %s -> %s",
1220                                        tdb_errorstr(ltdb->tdb),
1221                                        ldb_strerror(ret));
1222                 return ret;
1223         }
1224
1225         return LDB_SUCCESS;
1226 }
1227
1228 static int ltdb_del_trans(struct ldb_module *module)
1229 {
1230         void *data = ldb_module_get_private(module);
1231         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1232
1233         ltdb->in_transaction--;
1234
1235         if (ltdb_index_transaction_cancel(module) != 0) {
1236                 tdb_transaction_cancel(ltdb->tdb);
1237                 return ltdb_err_map(tdb_error(ltdb->tdb));
1238         }
1239
1240         tdb_transaction_cancel(ltdb->tdb);
1241         return LDB_SUCCESS;
1242 }
1243
1244 /*
1245   return sequenceNumber from @BASEINFO
1246 */
1247 static int ltdb_sequence_number(struct ltdb_context *ctx,
1248                                 struct ldb_extended **ext)
1249 {
1250         struct ldb_context *ldb;
1251         struct ldb_module *module = ctx->module;
1252         struct ldb_request *req = ctx->req;
1253         TALLOC_CTX *tmp_ctx = NULL;
1254         struct ldb_seqnum_request *seq;
1255         struct ldb_seqnum_result *res;
1256         struct ldb_message *msg = NULL;
1257         struct ldb_dn *dn;
1258         const char *date;
1259         int ret = LDB_SUCCESS;
1260
1261         ldb = ldb_module_get_ctx(module);
1262
1263         seq = talloc_get_type(req->op.extended.data,
1264                                 struct ldb_seqnum_request);
1265         if (seq == NULL) {
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268
1269         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1270
1271         if (ltdb_lock_read(module) != 0) {
1272                 return LDB_ERR_OPERATIONS_ERROR;
1273         }
1274
1275         res = talloc_zero(req, struct ldb_seqnum_result);
1276         if (res == NULL) {
1277                 ret = LDB_ERR_OPERATIONS_ERROR;
1278                 goto done;
1279         }
1280
1281         tmp_ctx = talloc_new(req);
1282         if (tmp_ctx == NULL) {
1283                 ret = LDB_ERR_OPERATIONS_ERROR;
1284                 goto done;
1285         }
1286
1287         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1288         if (dn == NULL) {
1289                 ret = LDB_ERR_OPERATIONS_ERROR;
1290                 goto done;
1291         }
1292
1293         msg = ldb_msg_new(tmp_ctx);
1294         if (msg == NULL) {
1295                 ret = LDB_ERR_OPERATIONS_ERROR;
1296                 goto done;
1297         }
1298
1299         ret = ltdb_search_dn1(module, dn, msg, 0);
1300         if (ret != LDB_SUCCESS) {
1301                 goto done;
1302         }
1303
1304         switch (seq->type) {
1305         case LDB_SEQ_HIGHEST_SEQ:
1306                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1307                 break;
1308         case LDB_SEQ_NEXT:
1309                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1310                 res->seq_num++;
1311                 break;
1312         case LDB_SEQ_HIGHEST_TIMESTAMP:
1313                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1314                 if (date) {
1315                         res->seq_num = ldb_string_to_time(date);
1316                 } else {
1317                         res->seq_num = 0;
1318                         /* zero is as good as anything when we don't know */
1319                 }
1320                 break;
1321         }
1322
1323         *ext = talloc_zero(req, struct ldb_extended);
1324         if (*ext == NULL) {
1325                 ret = LDB_ERR_OPERATIONS_ERROR;
1326                 goto done;
1327         }
1328         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1329         (*ext)->data = talloc_steal(*ext, res);
1330
1331 done:
1332         talloc_free(tmp_ctx);
1333         ltdb_unlock_read(module);
1334         return ret;
1335 }
1336
1337 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1338 {
1339         struct ldb_context *ldb;
1340         struct ldb_request *req;
1341         struct ldb_reply *ares;
1342
1343         ldb = ldb_module_get_ctx(ctx->module);
1344         req = ctx->req;
1345
1346         /* if we already returned an error just return */
1347         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1348                 return;
1349         }
1350
1351         ares = talloc_zero(req, struct ldb_reply);
1352         if (!ares) {
1353                 ldb_oom(ldb);
1354                 req->callback(req, NULL);
1355                 return;
1356         }
1357         ares->type = LDB_REPLY_DONE;
1358         ares->error = error;
1359
1360         req->callback(req, ares);
1361 }
1362
1363 static void ltdb_timeout(struct tevent_context *ev,
1364                           struct tevent_timer *te,
1365                           struct timeval t,
1366                           void *private_data)
1367 {
1368         struct ltdb_context *ctx;
1369         ctx = talloc_get_type(private_data, struct ltdb_context);
1370
1371         if (!ctx->request_terminated) {
1372                 /* request is done now */
1373                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1374         }
1375
1376         if (ctx->spy) {
1377                 /* neutralize the spy */
1378                 ctx->spy->ctx = NULL;
1379                 ctx->spy = NULL;
1380         }
1381         talloc_free(ctx);
1382 }
1383
1384 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1385                                         struct ldb_extended *ext,
1386                                         int error)
1387 {
1388         struct ldb_context *ldb;
1389         struct ldb_request *req;
1390         struct ldb_reply *ares;
1391
1392         ldb = ldb_module_get_ctx(ctx->module);
1393         req = ctx->req;
1394
1395         /* if we already returned an error just return */
1396         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1397                 return;
1398         }
1399
1400         ares = talloc_zero(req, struct ldb_reply);
1401         if (!ares) {
1402                 ldb_oom(ldb);
1403                 req->callback(req, NULL);
1404                 return;
1405         }
1406         ares->type = LDB_REPLY_DONE;
1407         ares->response = ext;
1408         ares->error = error;
1409
1410         req->callback(req, ares);
1411 }
1412
1413 static void ltdb_handle_extended(struct ltdb_context *ctx)
1414 {
1415         struct ldb_extended *ext = NULL;
1416         int ret;
1417
1418         if (strcmp(ctx->req->op.extended.oid,
1419                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1420                 /* get sequence number */
1421                 ret = ltdb_sequence_number(ctx, &ext);
1422         } else {
1423                 /* not recognized */
1424                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1425         }
1426
1427         ltdb_request_extended_done(ctx, ext, ret);
1428 }
1429
1430 static void ltdb_callback(struct tevent_context *ev,
1431                           struct tevent_timer *te,
1432                           struct timeval t,
1433                           void *private_data)
1434 {
1435         struct ltdb_context *ctx;
1436         int ret;
1437
1438         ctx = talloc_get_type(private_data, struct ltdb_context);
1439
1440         if (ctx->request_terminated) {
1441                 goto done;
1442         }
1443
1444         switch (ctx->req->operation) {
1445         case LDB_SEARCH:
1446                 ret = ltdb_search(ctx);
1447                 break;
1448         case LDB_ADD:
1449                 ret = ltdb_add(ctx);
1450                 break;
1451         case LDB_MODIFY:
1452                 ret = ltdb_modify(ctx);
1453                 break;
1454         case LDB_DELETE:
1455                 ret = ltdb_delete(ctx);
1456                 break;
1457         case LDB_RENAME:
1458                 ret = ltdb_rename(ctx);
1459                 break;
1460         case LDB_EXTENDED:
1461                 ltdb_handle_extended(ctx);
1462                 goto done;
1463         default:
1464                 /* no other op supported */
1465                 ret = LDB_ERR_PROTOCOL_ERROR;
1466         }
1467
1468         if (!ctx->request_terminated) {
1469                 /* request is done now */
1470                 ltdb_request_done(ctx, ret);
1471         }
1472
1473 done:
1474         if (ctx->spy) {
1475                 /* neutralize the spy */
1476                 ctx->spy->ctx = NULL;
1477                 ctx->spy = NULL;
1478         }
1479         talloc_free(ctx);
1480 }
1481
1482 static int ltdb_request_destructor(void *ptr)
1483 {
1484         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1485
1486         if (spy->ctx != NULL) {
1487                 spy->ctx->spy = NULL;
1488                 spy->ctx->request_terminated = true;
1489                 spy->ctx = NULL;
1490         }
1491
1492         return 0;
1493 }
1494
1495 static int ltdb_handle_request(struct ldb_module *module,
1496                                struct ldb_request *req)
1497 {
1498         struct ldb_control *control_permissive;
1499         struct ldb_context *ldb;
1500         struct tevent_context *ev;
1501         struct ltdb_context *ac;
1502         struct tevent_timer *te;
1503         struct timeval tv;
1504         unsigned int i;
1505
1506         ldb = ldb_module_get_ctx(module);
1507
1508         control_permissive = ldb_request_get_control(req,
1509                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1510
1511         for (i = 0; req->controls && req->controls[i]; i++) {
1512                 if (req->controls[i]->critical &&
1513                     req->controls[i] != control_permissive) {
1514                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1515                                                req->controls[i]->oid);
1516                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1517                 }
1518         }
1519
1520         if (req->starttime == 0 || req->timeout == 0) {
1521                 ldb_set_errstring(ldb, "Invalid timeout settings");
1522                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1523         }
1524
1525         ev = ldb_handle_get_event_context(req->handle);
1526
1527         ac = talloc_zero(ldb, struct ltdb_context);
1528         if (ac == NULL) {
1529                 ldb_oom(ldb);
1530                 return LDB_ERR_OPERATIONS_ERROR;
1531         }
1532
1533         ac->module = module;
1534         ac->req = req;
1535
1536         tv.tv_sec = 0;
1537         tv.tv_usec = 0;
1538         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1539         if (NULL == te) {
1540                 talloc_free(ac);
1541                 return LDB_ERR_OPERATIONS_ERROR;
1542         }
1543
1544         if (req->timeout > 0) {
1545                 tv.tv_sec = req->starttime + req->timeout;
1546                 tv.tv_usec = 0;
1547                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1548                                                      ltdb_timeout, ac);
1549                 if (NULL == ac->timeout_event) {
1550                         talloc_free(ac);
1551                         return LDB_ERR_OPERATIONS_ERROR;
1552                 }
1553         }
1554
1555         /* set a spy so that we do not try to use the request context
1556          * if it is freed before ltdb_callback fires */
1557         ac->spy = talloc(req, struct ltdb_req_spy);
1558         if (NULL == ac->spy) {
1559                 talloc_free(ac);
1560                 return LDB_ERR_OPERATIONS_ERROR;
1561         }
1562         ac->spy->ctx = ac;
1563
1564         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1565
1566         return LDB_SUCCESS;
1567 }
1568
1569 static int ltdb_init_rootdse(struct ldb_module *module)
1570 {
1571         /* ignore errors on this - we expect it for non-sam databases */
1572         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1573
1574         /* there can be no module beyond the backend, just return */
1575         return LDB_SUCCESS;
1576 }
1577
1578 static const struct ldb_module_ops ltdb_ops = {
1579         .name              = "tdb",
1580         .init_context      = ltdb_init_rootdse,
1581         .search            = ltdb_handle_request,
1582         .add               = ltdb_handle_request,
1583         .modify            = ltdb_handle_request,
1584         .del               = ltdb_handle_request,
1585         .rename            = ltdb_handle_request,
1586         .extended          = ltdb_handle_request,
1587         .start_transaction = ltdb_start_trans,
1588         .end_transaction   = ltdb_end_trans,
1589         .prepare_commit    = ltdb_prepare_commit,
1590         .del_transaction   = ltdb_del_trans,
1591         .read_lock         = ltdb_lock_read,
1592         .read_unlock       = ltdb_unlock_read,
1593 };
1594
1595 /*
1596   connect to the database
1597 */
1598 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1599                         unsigned int flags, const char *options[],
1600                         struct ldb_module **_module)
1601 {
1602         struct ldb_module *module;
1603         const char *path;
1604         int tdb_flags, open_flags;
1605         struct ltdb_private *ltdb;
1606
1607         /*
1608          * We hold locks, so we must use a private event context
1609          * on each returned handle
1610          */
1611
1612         ldb_set_require_private_event_context(ldb);
1613
1614         /* parse the url */
1615         if (strchr(url, ':')) {
1616                 if (strncmp(url, "tdb://", 6) != 0) {
1617                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1618                                   "Invalid tdb URL '%s'", url);
1619                         return LDB_ERR_OPERATIONS_ERROR;
1620                 }
1621                 path = url+6;
1622         } else {
1623                 path = url;
1624         }
1625
1626         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1627
1628         /* check for the 'nosync' option */
1629         if (flags & LDB_FLG_NOSYNC) {
1630                 tdb_flags |= TDB_NOSYNC;
1631         }
1632
1633         /* and nommap option */
1634         if (flags & LDB_FLG_NOMMAP) {
1635                 tdb_flags |= TDB_NOMMAP;
1636         }
1637
1638         if (flags & LDB_FLG_RDONLY) {
1639                 open_flags = O_RDONLY;
1640         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1641                 open_flags = O_RDWR;
1642         } else {
1643                 open_flags = O_CREAT | O_RDWR;
1644         }
1645
1646         ltdb = talloc_zero(ldb, struct ltdb_private);
1647         if (!ltdb) {
1648                 ldb_oom(ldb);
1649                 return LDB_ERR_OPERATIONS_ERROR;
1650         }
1651
1652         /* note that we use quite a large default hash size */
1653         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1654                                    tdb_flags, open_flags,
1655                                    ldb_get_create_perms(ldb), ldb);
1656         if (!ltdb->tdb) {
1657                 ldb_asprintf_errstring(ldb,
1658                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1659                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1660                           "Unable to open tdb '%s': %s", path, strerror(errno));
1661                 talloc_free(ltdb);
1662                 if (errno == EACCES || errno == EPERM) {
1663                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1664                 }
1665                 return LDB_ERR_OPERATIONS_ERROR;
1666         }
1667
1668         if (getenv("LDB_WARN_UNINDEXED")) {
1669                 ltdb->warn_unindexed = true;
1670         }
1671
1672         if (getenv("LDB_WARN_REINDEX")) {
1673                 ltdb->warn_reindex = true;
1674         }
1675
1676         ltdb->sequence_number = 0;
1677
1678         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1679         if (!module) {
1680                 ldb_oom(ldb);
1681                 talloc_free(ltdb);
1682                 return LDB_ERR_OPERATIONS_ERROR;
1683         }
1684         ldb_module_set_private(module, ltdb);
1685         talloc_steal(module, ltdb);
1686
1687         if (ltdb_cache_load(module) != 0) {
1688                 ldb_asprintf_errstring(ldb,
1689                                        "Unable to load ltdb cache records of tdb '%s'", path);
1690                 talloc_free(module);
1691                 return LDB_ERR_OPERATIONS_ERROR;
1692         }
1693
1694         *_module = module;
1695         return LDB_SUCCESS;
1696 }
1697
1698 int ldb_tdb_init(const char *version)
1699 {
1700         LDB_MODULE_CHECK_VERSION(version);
1701         return ldb_register_backend("tdb", ltdb_connect, false);
1702 }