replmd: check single values in replmd_add_fix_la
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 return 0;
123         }
124         ltdb->read_lock_count--;
125         return 0;
126 }
127
128
129 /*
130   form a TDB_DATA for a record key
131   caller frees
132
133   note that the key for a record can depend on whether the
134   dn refers to a case sensitive index record or not
135 */
136 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
137 {
138         struct ldb_context *ldb = ldb_module_get_ctx(module);
139         TDB_DATA key;
140         char *key_str = NULL;
141         const char *dn_folded = NULL;
142
143         /*
144           most DNs are case insensitive. The exception is index DNs for
145           case sensitive attributes
146
147           there are 3 cases dealt with in this code:
148
149           1) if the dn doesn't start with @ then uppercase the attribute
150              names and the attributes values of case insensitive attributes
151           2) if the dn starts with @ then leave it alone -
152              the indexing code handles the rest
153         */
154
155         dn_folded = ldb_dn_get_casefold(dn);
156         if (!dn_folded) {
157                 goto failed;
158         }
159
160         key_str = talloc_strdup(ldb, "DN=");
161         if (!key_str) {
162                 goto failed;
163         }
164
165         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
166         if (!key_str) {
167                 goto failed;
168         }
169
170         key.dptr = (uint8_t *)key_str;
171         key.dsize = strlen(key_str) + 1;
172
173         return key;
174
175 failed:
176         errno = ENOMEM;
177         key.dptr = NULL;
178         key.dsize = 0;
179         return key;
180 }
181
182 /*
183   check special dn's have valid attributes
184   currently only @ATTRIBUTES is checked
185 */
186 static int ltdb_check_special_dn(struct ldb_module *module,
187                                  const struct ldb_message *msg)
188 {
189         struct ldb_context *ldb = ldb_module_get_ctx(module);
190         unsigned int i, j;
191
192         if (! ldb_dn_is_special(msg->dn) ||
193             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
194                 return LDB_SUCCESS;
195         }
196
197         /* we have @ATTRIBUTES, let's check attributes are fine */
198         /* should we check that we deny multivalued attributes ? */
199         for (i = 0; i < msg->num_elements; i++) {
200                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
201
202                 for (j = 0; j < msg->elements[i].num_values; j++) {
203                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
204                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
205                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
206                         }
207                 }
208         }
209
210         return LDB_SUCCESS;
211 }
212
213
214 /*
215   we've made a modification to a dn - possibly reindex and
216   update sequence number
217 */
218 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
219 {
220         int ret = LDB_SUCCESS;
221         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
222
223         /* only allow modifies inside a transaction, otherwise the
224          * ldb is unsafe */
225         if (ltdb->in_transaction == 0) {
226                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         if (ldb_dn_is_special(dn) &&
231             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
232              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
233         {
234                 if (ltdb->warn_reindex) {
235                         ldb_debug(ldb_module_get_ctx(module),
236                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
237                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
238                 }
239                 ret = ltdb_reindex(module);
240         }
241
242         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
243         if (ret == LDB_SUCCESS &&
244             !(ldb_dn_is_special(dn) &&
245               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
246                 ret = ltdb_increase_sequence_number(module);
247         }
248
249         /* If the modify was to @OPTIONS, reload the cache */
250         if (ret == LDB_SUCCESS &&
251             ldb_dn_is_special(dn) &&
252             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
253                 ret = ltdb_cache_reload(module);
254         }
255
256         return ret;
257 }
258
259 /*
260   store a record into the db
261 */
262 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
263 {
264         void *data = ldb_module_get_private(module);
265         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
266         TDB_DATA tdb_key, tdb_data;
267         struct ldb_val ldb_data;
268         int ret = LDB_SUCCESS;
269
270         tdb_key = ltdb_key(module, msg->dn);
271         if (tdb_key.dptr == NULL) {
272                 return LDB_ERR_OTHER;
273         }
274
275         ret = ldb_pack_data(ldb_module_get_ctx(module),
276                             msg, &ldb_data);
277         if (ret == -1) {
278                 talloc_free(tdb_key.dptr);
279                 return LDB_ERR_OTHER;
280         }
281
282         tdb_data.dptr = ldb_data.data;
283         tdb_data.dsize = ldb_data.length;
284
285         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
286         if (ret != 0) {
287                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
288                 goto done;
289         }
290
291 done:
292         talloc_free(tdb_key.dptr);
293         talloc_free(ldb_data.data);
294
295         return ret;
296 }
297
298
299 /*
300   check if a attribute is a single valued, for a given element
301  */
302 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
303                                   struct ldb_message_element *el)
304 {
305         if (!a) return false;
306         if (el != NULL) {
307                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
308                         /* override from a ldb module, for example
309                            used for the description field, which is
310                            marked multi-valued in the schema but which
311                            should not actually accept multiple
312                            values */
313                         return true;
314                 }
315                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
316                         /* override from a ldb module, for example used for
317                            deleted linked attribute entries */
318                         return false;
319                 }
320         }
321         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
322                 return true;
323         }
324         return false;
325 }
326
327 static int ltdb_add_internal(struct ldb_module *module,
328                              const struct ldb_message *msg,
329                              bool check_single_value)
330 {
331         struct ldb_context *ldb = ldb_module_get_ctx(module);
332         int ret = LDB_SUCCESS;
333         unsigned int i;
334
335         for (i=0;i<msg->num_elements;i++) {
336                 struct ldb_message_element *el = &msg->elements[i];
337                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
338
339                 if (el->num_values == 0) {
340                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
341                                                el->name, ldb_dn_get_linearized(msg->dn));
342                         return LDB_ERR_CONSTRAINT_VIOLATION;
343                 }
344                 if (check_single_value &&
345                                 el->num_values > 1 &&
346                                 ldb_tdb_single_valued(a, el)) {
347                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
348                                                el->name, ldb_dn_get_linearized(msg->dn));
349                         return LDB_ERR_CONSTRAINT_VIOLATION;
350                 }
351
352                 /* Do not check "@ATTRIBUTES" for duplicated values */
353                 if (ldb_dn_is_special(msg->dn) &&
354                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
355                         continue;
356                 }
357
358                 if (check_single_value &&
359                     !(el->flags &
360                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
361                         struct ldb_val *duplicate = NULL;
362
363                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
364                                                          el, &duplicate, 0);
365                         if (ret != LDB_SUCCESS) {
366                                 return ret;
367                         }
368                         if (duplicate != NULL) {
369                                 ldb_asprintf_errstring(
370                                         ldb,
371                                         "attribute '%s': value '%.*s' on '%s' "
372                                         "provided more than once in ADD object",
373                                         el->name,
374                                         (int)duplicate->length,
375                                         duplicate->data,
376                                         ldb_dn_get_linearized(msg->dn));
377                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
378                         }
379                 }
380         }
381
382         ret = ltdb_store(module, msg, TDB_INSERT);
383         if (ret != LDB_SUCCESS) {
384                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
385                         ldb_asprintf_errstring(ldb,
386                                                "Entry %s already exists",
387                                                ldb_dn_get_linearized(msg->dn));
388                 }
389                 return ret;
390         }
391
392         ret = ltdb_index_add_new(module, msg);
393         if (ret != LDB_SUCCESS) {
394                 return ret;
395         }
396
397         ret = ltdb_modified(module, msg->dn);
398
399         return ret;
400 }
401
402 /*
403   add a record to the database
404 */
405 static int ltdb_add(struct ltdb_context *ctx)
406 {
407         struct ldb_module *module = ctx->module;
408         struct ldb_request *req = ctx->req;
409         int ret = LDB_SUCCESS;
410
411         ret = ltdb_check_special_dn(module, req->op.add.message);
412         if (ret != LDB_SUCCESS) {
413                 return ret;
414         }
415
416         ldb_request_set_state(req, LDB_ASYNC_PENDING);
417
418         if (ltdb_cache_load(module) != 0) {
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421
422         ret = ltdb_add_internal(module, req->op.add.message, true);
423
424         return ret;
425 }
426
427 /*
428   delete a record from the database, not updating indexes (used for deleting
429   index records)
430 */
431 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
432 {
433         void *data = ldb_module_get_private(module);
434         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
435         TDB_DATA tdb_key;
436         int ret;
437
438         tdb_key = ltdb_key(module, dn);
439         if (!tdb_key.dptr) {
440                 return LDB_ERR_OTHER;
441         }
442
443         ret = tdb_delete(ltdb->tdb, tdb_key);
444         talloc_free(tdb_key.dptr);
445
446         if (ret != 0) {
447                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
448         }
449
450         return ret;
451 }
452
453 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
454 {
455         struct ldb_message *msg;
456         int ret = LDB_SUCCESS;
457
458         msg = ldb_msg_new(module);
459         if (msg == NULL) {
460                 return LDB_ERR_OPERATIONS_ERROR;
461         }
462
463         /* in case any attribute of the message was indexed, we need
464            to fetch the old record */
465         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
466         if (ret != LDB_SUCCESS) {
467                 /* not finding the old record is an error */
468                 goto done;
469         }
470
471         ret = ltdb_delete_noindex(module, dn);
472         if (ret != LDB_SUCCESS) {
473                 goto done;
474         }
475
476         /* remove any indexed attributes */
477         ret = ltdb_index_delete(module, msg);
478         if (ret != LDB_SUCCESS) {
479                 goto done;
480         }
481
482         ret = ltdb_modified(module, dn);
483         if (ret != LDB_SUCCESS) {
484                 goto done;
485         }
486
487 done:
488         talloc_free(msg);
489         return ret;
490 }
491
492 /*
493   delete a record from the database
494 */
495 static int ltdb_delete(struct ltdb_context *ctx)
496 {
497         struct ldb_module *module = ctx->module;
498         struct ldb_request *req = ctx->req;
499         int ret = LDB_SUCCESS;
500
501         ldb_request_set_state(req, LDB_ASYNC_PENDING);
502
503         if (ltdb_cache_load(module) != 0) {
504                 return LDB_ERR_OPERATIONS_ERROR;
505         }
506
507         ret = ltdb_delete_internal(module, req->op.del.dn);
508
509         return ret;
510 }
511
512 /*
513   find an element by attribute name. At the moment this does a linear search,
514   it should be re-coded to use a binary search once all places that modify
515   records guarantee sorted order
516
517   return the index of the first matching element if found, otherwise -1
518 */
519 static int find_element(const struct ldb_message *msg, const char *name)
520 {
521         unsigned int i;
522         for (i=0;i<msg->num_elements;i++) {
523                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
524                         return i;
525                 }
526         }
527         return -1;
528 }
529
530
531 /*
532   add an element to an existing record. Assumes a elements array that we
533   can call re-alloc on, and assumed that we can re-use the data pointers from
534   the passed in additional values. Use with care!
535
536   returns 0 on success, -1 on failure (and sets errno)
537 */
538 static int ltdb_msg_add_element(struct ldb_message *msg,
539                                 struct ldb_message_element *el)
540 {
541         struct ldb_message_element *e2;
542         unsigned int i;
543
544         if (el->num_values == 0) {
545                 /* nothing to do here - we don't add empty elements */
546                 return 0;
547         }
548
549         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
550                               msg->num_elements+1);
551         if (!e2) {
552                 errno = ENOMEM;
553                 return -1;
554         }
555
556         msg->elements = e2;
557
558         e2 = &msg->elements[msg->num_elements];
559
560         e2->name = el->name;
561         e2->flags = el->flags;
562         e2->values = talloc_array(msg->elements,
563                                   struct ldb_val, el->num_values);
564         if (!e2->values) {
565                 errno = ENOMEM;
566                 return -1;
567         }
568         for (i=0;i<el->num_values;i++) {
569                 e2->values[i] = el->values[i];
570         }
571         e2->num_values = el->num_values;
572
573         ++msg->num_elements;
574
575         return 0;
576 }
577
578 /*
579   delete all elements having a specified attribute name
580 */
581 static int msg_delete_attribute(struct ldb_module *module,
582                                 struct ldb_message *msg, const char *name)
583 {
584         unsigned int i;
585         int ret;
586         struct ldb_message_element *el;
587
588         el = ldb_msg_find_element(msg, name);
589         if (el == NULL) {
590                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
591         }
592         i = el - msg->elements;
593
594         ret = ltdb_index_del_element(module, msg->dn, el);
595         if (ret != LDB_SUCCESS) {
596                 return ret;
597         }
598
599         talloc_free(el->values);
600         if (msg->num_elements > (i+1)) {
601                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
602         }
603         msg->num_elements--;
604         msg->elements = talloc_realloc(msg, msg->elements,
605                                        struct ldb_message_element,
606                                        msg->num_elements);
607         return LDB_SUCCESS;
608 }
609
610 /*
611   delete all elements matching an attribute name/value
612
613   return LDB Error on failure
614 */
615 static int msg_delete_element(struct ldb_module *module,
616                               struct ldb_message *msg,
617                               const char *name,
618                               const struct ldb_val *val)
619 {
620         struct ldb_context *ldb = ldb_module_get_ctx(module);
621         unsigned int i;
622         int found, ret;
623         struct ldb_message_element *el;
624         const struct ldb_schema_attribute *a;
625
626         found = find_element(msg, name);
627         if (found == -1) {
628                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
629         }
630
631         i = (unsigned int) found;
632         el = &(msg->elements[i]);
633
634         a = ldb_schema_attribute_by_name(ldb, el->name);
635
636         for (i=0;i<el->num_values;i++) {
637                 bool matched;
638                 if (a->syntax->operator_fn) {
639                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
640                                                      &el->values[i], val, &matched);
641                         if (ret != LDB_SUCCESS) return ret;
642                 } else {
643                         matched = (a->syntax->comparison_fn(ldb, ldb,
644                                                             &el->values[i], val) == 0);
645                 }
646                 if (matched) {
647                         if (el->num_values == 1) {
648                                 return msg_delete_attribute(module, msg, name);
649                         }
650
651                         ret = ltdb_index_del_value(module, msg->dn, el, i);
652                         if (ret != LDB_SUCCESS) {
653                                 return ret;
654                         }
655
656                         if (i<el->num_values-1) {
657                                 memmove(&el->values[i], &el->values[i+1],
658                                         sizeof(el->values[i])*
659                                                 (el->num_values-(i+1)));
660                         }
661                         el->num_values--;
662
663                         /* per definition we find in a canonicalised message an
664                            attribute value only once. So we are finished here */
665                         return LDB_SUCCESS;
666                 }
667         }
668
669         /* Not found */
670         return LDB_ERR_NO_SUCH_ATTRIBUTE;
671 }
672
673
674 /*
675   modify a record - internal interface
676
677   yuck - this is O(n^2). Luckily n is usually small so we probably
678   get away with it, but if we ever have really large attribute lists
679   then we'll need to look at this again
680
681   'req' is optional, and is used to specify controls if supplied
682 */
683 int ltdb_modify_internal(struct ldb_module *module,
684                          const struct ldb_message *msg,
685                          struct ldb_request *req)
686 {
687         struct ldb_context *ldb = ldb_module_get_ctx(module);
688         void *data = ldb_module_get_private(module);
689         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
690         TDB_DATA tdb_key, tdb_data;
691         struct ldb_val ldb_data;
692         struct ldb_message *msg2;
693         unsigned int i, j;
694         int ret = LDB_SUCCESS, idx;
695         struct ldb_control *control_permissive = NULL;
696
697         if (req) {
698                 control_permissive = ldb_request_get_control(req,
699                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
700         }
701
702         tdb_key = ltdb_key(module, msg->dn);
703         if (!tdb_key.dptr) {
704                 return LDB_ERR_OTHER;
705         }
706
707         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
708         if (!tdb_data.dptr) {
709                 talloc_free(tdb_key.dptr);
710                 return ltdb_err_map(tdb_error(ltdb->tdb));
711         }
712
713         msg2 = ldb_msg_new(tdb_key.dptr);
714         if (msg2 == NULL) {
715                 free(tdb_data.dptr);
716                 ret = LDB_ERR_OTHER;
717                 goto done;
718         }
719
720         ldb_data.data = tdb_data.dptr;
721         ldb_data.length = tdb_data.dsize;
722
723         ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
724         free(tdb_data.dptr);
725         if (ret == -1) {
726                 ret = LDB_ERR_OTHER;
727                 goto done;
728         }
729
730         if (!msg2->dn) {
731                 msg2->dn = msg->dn;
732         }
733
734         for (i=0; i<msg->num_elements; i++) {
735                 struct ldb_message_element *el = &msg->elements[i], *el2;
736                 struct ldb_val *vals;
737                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
738                 const char *dn;
739                 uint32_t options = 0;
740                 if (control_permissive != NULL) {
741                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
742                 }
743
744                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
745                 case LDB_FLAG_MOD_ADD:
746
747                         if (el->num_values == 0) {
748                                 ldb_asprintf_errstring(ldb,
749                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
750                                                        el->name, ldb_dn_get_linearized(msg2->dn));
751                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
752                                 goto done;
753                         }
754
755                         /* make a copy of the array so that a permissive
756                          * control can remove duplicates without changing the
757                          * original values, but do not copy data as we do not
758                          * need to keep it around once the operation is
759                          * finished */
760                         if (control_permissive) {
761                                 el = talloc(msg2, struct ldb_message_element);
762                                 if (!el) {
763                                         ret = LDB_ERR_OTHER;
764                                         goto done;
765                                 }
766                                 *el = msg->elements[i];
767                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
768                                 if (el->values == NULL) {
769                                         ret = LDB_ERR_OTHER;
770                                         goto done;
771                                 }
772                                 for (j = 0; j < el->num_values; j++) {
773                                         el->values[j] = msg->elements[i].values[j];
774                                 }
775                         }
776
777                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
778                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
779                                                        el->name, ldb_dn_get_linearized(msg2->dn));
780                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
781                                 goto done;
782                         }
783
784                         /* Checks if element already exists */
785                         idx = find_element(msg2, el->name);
786                         if (idx == -1) {
787                                 if (ltdb_msg_add_element(msg2, el) != 0) {
788                                         ret = LDB_ERR_OTHER;
789                                         goto done;
790                                 }
791                                 ret = ltdb_index_add_element(module, msg2->dn,
792                                                              el);
793                                 if (ret != LDB_SUCCESS) {
794                                         goto done;
795                                 }
796                         } else {
797                                 j = (unsigned int) idx;
798                                 el2 = &(msg2->elements[j]);
799
800                                 /* We cannot add another value on a existing one
801                                    if the attribute is single-valued */
802                                 if (ldb_tdb_single_valued(a, el)) {
803                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
804                                                                el->name, ldb_dn_get_linearized(msg2->dn));
805                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
806                                         goto done;
807                                 }
808
809                                 /* Check that values don't exist yet on multi-
810                                    valued attributes or aren't provided twice */
811                                 if (!(el->flags &
812                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
813                                         struct ldb_val *duplicate = NULL;
814                                         ret = ldb_msg_find_common_values(ldb,
815                                                                          msg2,
816                                                                          el,
817                                                                          el2,
818                                                                          options);
819
820                                         if (ret ==
821                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
822                                                 ldb_asprintf_errstring(ldb,
823                                                         "attribute '%s': value "
824                                                         "#%u on '%s' already "
825                                                         "exists", el->name, j,
826                                                         ldb_dn_get_linearized(msg2->dn));
827                                                 goto done;
828                                         } else if (ret != LDB_SUCCESS) {
829                                                 goto done;
830                                         }
831
832                                         ret = ldb_msg_find_duplicate_val(
833                                                 ldb, msg2, el, &duplicate, 0);
834                                         if (ret != LDB_SUCCESS) {
835                                                 goto done;
836                                         }
837                                         if (duplicate != NULL) {
838                                                 ldb_asprintf_errstring(
839                                                         ldb,
840                                                         "attribute '%s': value "
841                                                         "'%.*s' on '%s' "
842                                                         "provided more than "
843                                                         "once in ADD",
844                                                         el->name,
845                                                         (int)duplicate->length,
846                                                         duplicate->data,
847                                                         ldb_dn_get_linearized(msg->dn));
848                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
849                                                 goto done;
850                                         }
851                                 }
852
853                                 /* Now combine existing and new values to a new
854                                    attribute record */
855                                 vals = talloc_realloc(msg2->elements,
856                                                       el2->values, struct ldb_val,
857                                                       el2->num_values + el->num_values);
858                                 if (vals == NULL) {
859                                         ldb_oom(ldb);
860                                         ret = LDB_ERR_OTHER;
861                                         goto done;
862                                 }
863
864                                 for (j=0; j<el->num_values; j++) {
865                                         vals[el2->num_values + j] =
866                                                 ldb_val_dup(vals, &el->values[j]);
867                                 }
868
869                                 el2->values = vals;
870                                 el2->num_values += el->num_values;
871
872                                 ret = ltdb_index_add_element(module, msg2->dn, el);
873                                 if (ret != LDB_SUCCESS) {
874                                         goto done;
875                                 }
876                         }
877
878                         break;
879
880                 case LDB_FLAG_MOD_REPLACE:
881
882                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
883                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
884                                                        el->name, ldb_dn_get_linearized(msg2->dn));
885                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
886                                 goto done;
887                         }
888
889                         /*
890                          * We don't need to check this if we have been
891                          * pre-screened by the repl_meta_data module
892                          * in Samba, or someone else who can claim to
893                          * know what they are doing. 
894                          */
895                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
896                                 struct ldb_val *duplicate = NULL;
897
898                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
899                                                                  &duplicate, 0);
900                                 if (ret != LDB_SUCCESS) {
901                                         goto done;
902                                 }
903                                 if (duplicate != NULL) {
904                                         ldb_asprintf_errstring(
905                                                 ldb,
906                                                 "attribute '%s': value '%.*s' "
907                                                 "on '%s' provided more than "
908                                                 "once in REPLACE",
909                                                 el->name,
910                                                 (int)duplicate->length,
911                                                 duplicate->data,
912                                                 ldb_dn_get_linearized(msg2->dn));
913                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
914                                         goto done;
915                                 }
916                         }
917
918                         /* Checks if element already exists */
919                         idx = find_element(msg2, el->name);
920                         if (idx != -1) {
921                                 j = (unsigned int) idx;
922                                 el2 = &(msg2->elements[j]);
923
924                                 /* we consider two elements to be
925                                  * equal only if the order
926                                  * matches. This allows dbcheck to
927                                  * fix the ordering on attributes
928                                  * where order matters, such as
929                                  * objectClass
930                                  */
931                                 if (ldb_msg_element_equal_ordered(el, el2)) {
932                                         continue;
933                                 }
934
935                                 /* Delete the attribute if it exists in the DB */
936                                 if (msg_delete_attribute(module, msg2,
937                                                          el->name) != 0) {
938                                         ret = LDB_ERR_OTHER;
939                                         goto done;
940                                 }
941                         }
942
943                         /* Recreate it with the new values */
944                         if (ltdb_msg_add_element(msg2, el) != 0) {
945                                 ret = LDB_ERR_OTHER;
946                                 goto done;
947                         }
948
949                         ret = ltdb_index_add_element(module, msg2->dn, el);
950                         if (ret != LDB_SUCCESS) {
951                                 goto done;
952                         }
953
954                         break;
955
956                 case LDB_FLAG_MOD_DELETE:
957                         dn = ldb_dn_get_linearized(msg2->dn);
958                         if (dn == NULL) {
959                                 ret = LDB_ERR_OTHER;
960                                 goto done;
961                         }
962
963                         if (msg->elements[i].num_values == 0) {
964                                 /* Delete the whole attribute */
965                                 ret = msg_delete_attribute(module, msg2,
966                                                            msg->elements[i].name);
967                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
968                                     control_permissive) {
969                                         ret = LDB_SUCCESS;
970                                 } else {
971                                         ldb_asprintf_errstring(ldb,
972                                                                "attribute '%s': no such attribute for delete on '%s'",
973                                                                msg->elements[i].name, dn);
974                                 }
975                                 if (ret != LDB_SUCCESS) {
976                                         goto done;
977                                 }
978                         } else {
979                                 /* Delete specified values from an attribute */
980                                 for (j=0; j < msg->elements[i].num_values; j++) {
981                                         ret = msg_delete_element(module,
982                                                                  msg2,
983                                                                  msg->elements[i].name,
984                                                                  &msg->elements[i].values[j]);
985                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
986                                             control_permissive) {
987                                                 ret = LDB_SUCCESS;
988                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
989                                                 ldb_asprintf_errstring(ldb,
990                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
991                                                                        msg->elements[i].name, dn);
992                                         }
993                                         if (ret != LDB_SUCCESS) {
994                                                 goto done;
995                                         }
996                                 }
997                         }
998                         break;
999                 default:
1000                         ldb_asprintf_errstring(ldb,
1001                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1002                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1003                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1004                         ret = LDB_ERR_PROTOCOL_ERROR;
1005                         goto done;
1006                 }
1007         }
1008
1009         ret = ltdb_store(module, msg2, TDB_MODIFY);
1010         if (ret != LDB_SUCCESS) {
1011                 goto done;
1012         }
1013
1014         ret = ltdb_modified(module, msg2->dn);
1015         if (ret != LDB_SUCCESS) {
1016                 goto done;
1017         }
1018
1019 done:
1020         talloc_free(tdb_key.dptr);
1021         return ret;
1022 }
1023
1024 /*
1025   modify a record
1026 */
1027 static int ltdb_modify(struct ltdb_context *ctx)
1028 {
1029         struct ldb_module *module = ctx->module;
1030         struct ldb_request *req = ctx->req;
1031         int ret = LDB_SUCCESS;
1032
1033         ret = ltdb_check_special_dn(module, req->op.mod.message);
1034         if (ret != LDB_SUCCESS) {
1035                 return ret;
1036         }
1037
1038         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1039
1040         if (ltdb_cache_load(module) != 0) {
1041                 return LDB_ERR_OPERATIONS_ERROR;
1042         }
1043
1044         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1045
1046         return ret;
1047 }
1048
1049 /*
1050   rename a record
1051 */
1052 static int ltdb_rename(struct ltdb_context *ctx)
1053 {
1054         struct ldb_module *module = ctx->module;
1055         void *data = ldb_module_get_private(module);
1056         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1057         struct ldb_request *req = ctx->req;
1058         struct ldb_message *msg;
1059         int ret = LDB_SUCCESS;
1060         TDB_DATA tdb_key, tdb_key_old;
1061
1062         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1063
1064         if (ltdb_cache_load(ctx->module) != 0) {
1065                 return LDB_ERR_OPERATIONS_ERROR;
1066         }
1067
1068         msg = ldb_msg_new(ctx);
1069         if (msg == NULL) {
1070                 return LDB_ERR_OPERATIONS_ERROR;
1071         }
1072
1073         /* we need to fetch the old record to re-add under the new name */
1074         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1075                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1076         if (ret != LDB_SUCCESS) {
1077                 /* not finding the old record is an error */
1078                 return ret;
1079         }
1080
1081         /* We need to, before changing the DB, check if the new DN
1082          * exists, so we can return this error to the caller with an
1083          * unmodified DB */
1084         tdb_key = ltdb_key(module, req->op.rename.newdn);
1085         if (!tdb_key.dptr) {
1086                 talloc_free(msg);
1087                 return LDB_ERR_OPERATIONS_ERROR;
1088         }
1089
1090         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1091         if (!tdb_key_old.dptr) {
1092                 talloc_free(msg);
1093                 talloc_free(tdb_key.dptr);
1094                 return LDB_ERR_OPERATIONS_ERROR;
1095         }
1096
1097         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1098         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1099                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1100                         talloc_free(tdb_key_old.dptr);
1101                         talloc_free(tdb_key.dptr);
1102                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1103                                                "Entry %s already exists",
1104                                                ldb_dn_get_linearized(req->op.rename.newdn));
1105                         /* finding the new record already in the DB is an error */
1106                         talloc_free(msg);
1107                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1108                 }
1109         }
1110         talloc_free(tdb_key_old.dptr);
1111         talloc_free(tdb_key.dptr);
1112
1113         /* Always delete first then add, to avoid conflicts with
1114          * unique indexes. We rely on the transaction to make this
1115          * atomic
1116          */
1117         ret = ltdb_delete_internal(module, msg->dn);
1118         if (ret != LDB_SUCCESS) {
1119                 talloc_free(msg);
1120                 return ret;
1121         }
1122
1123         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1124         if (msg->dn == NULL) {
1125                 talloc_free(msg);
1126                 return LDB_ERR_OPERATIONS_ERROR;
1127         }
1128
1129         /* We don't check single value as we can have more than 1 with
1130          * deleted attributes. We could go through all elements but that's
1131          * maybe not the most efficient way
1132          */
1133         ret = ltdb_add_internal(module, msg, false);
1134
1135         talloc_free(msg);
1136
1137         return ret;
1138 }
1139
1140 static int ltdb_start_trans(struct ldb_module *module)
1141 {
1142         void *data = ldb_module_get_private(module);
1143         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1144
1145         if (tdb_transaction_start(ltdb->tdb) != 0) {
1146                 return ltdb_err_map(tdb_error(ltdb->tdb));
1147         }
1148
1149         ltdb->in_transaction++;
1150
1151         ltdb_index_transaction_start(module);
1152
1153         return LDB_SUCCESS;
1154 }
1155
1156 static int ltdb_prepare_commit(struct ldb_module *module)
1157 {
1158         int ret;
1159         void *data = ldb_module_get_private(module);
1160         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1161
1162         if (ltdb->in_transaction != 1) {
1163                 return LDB_SUCCESS;
1164         }
1165
1166         ret = ltdb_index_transaction_commit(module);
1167         if (ret != LDB_SUCCESS) {
1168                 tdb_transaction_cancel(ltdb->tdb);
1169                 ltdb->in_transaction--;
1170                 return ret;
1171         }
1172
1173         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1174                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1175                 ltdb->in_transaction--;
1176                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1177                                        "Failure during tdb_transaction_prepare_commit(): %s -> %s",
1178                                        tdb_errorstr(ltdb->tdb),
1179                                        ldb_strerror(ret));
1180                 return ret;
1181         }
1182
1183         ltdb->prepared_commit = true;
1184
1185         return LDB_SUCCESS;
1186 }
1187
1188 static int ltdb_end_trans(struct ldb_module *module)
1189 {
1190         int ret;
1191         void *data = ldb_module_get_private(module);
1192         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1193
1194         if (!ltdb->prepared_commit) {
1195                 ret = ltdb_prepare_commit(module);
1196                 if (ret != LDB_SUCCESS) {
1197                         return ret;
1198                 }
1199         }
1200
1201         ltdb->in_transaction--;
1202         ltdb->prepared_commit = false;
1203
1204         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1205                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1206                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1207                                        "Failure during tdb_transaction_commit(): %s -> %s",
1208                                        tdb_errorstr(ltdb->tdb),
1209                                        ldb_strerror(ret));
1210                 return ret;
1211         }
1212
1213         return LDB_SUCCESS;
1214 }
1215
1216 static int ltdb_del_trans(struct ldb_module *module)
1217 {
1218         void *data = ldb_module_get_private(module);
1219         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1220
1221         ltdb->in_transaction--;
1222
1223         if (ltdb_index_transaction_cancel(module) != 0) {
1224                 tdb_transaction_cancel(ltdb->tdb);
1225                 return ltdb_err_map(tdb_error(ltdb->tdb));
1226         }
1227
1228         tdb_transaction_cancel(ltdb->tdb);
1229         return LDB_SUCCESS;
1230 }
1231
1232 /*
1233   return sequenceNumber from @BASEINFO
1234 */
1235 static int ltdb_sequence_number(struct ltdb_context *ctx,
1236                                 struct ldb_extended **ext)
1237 {
1238         struct ldb_context *ldb;
1239         struct ldb_module *module = ctx->module;
1240         struct ldb_request *req = ctx->req;
1241         TALLOC_CTX *tmp_ctx = NULL;
1242         struct ldb_seqnum_request *seq;
1243         struct ldb_seqnum_result *res;
1244         struct ldb_message *msg = NULL;
1245         struct ldb_dn *dn;
1246         const char *date;
1247         int ret = LDB_SUCCESS;
1248
1249         ldb = ldb_module_get_ctx(module);
1250
1251         seq = talloc_get_type(req->op.extended.data,
1252                                 struct ldb_seqnum_request);
1253         if (seq == NULL) {
1254                 return LDB_ERR_OPERATIONS_ERROR;
1255         }
1256
1257         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1258
1259         if (ltdb_lock_read(module) != 0) {
1260                 return LDB_ERR_OPERATIONS_ERROR;
1261         }
1262
1263         res = talloc_zero(req, struct ldb_seqnum_result);
1264         if (res == NULL) {
1265                 ret = LDB_ERR_OPERATIONS_ERROR;
1266                 goto done;
1267         }
1268
1269         tmp_ctx = talloc_new(req);
1270         if (tmp_ctx == NULL) {
1271                 ret = LDB_ERR_OPERATIONS_ERROR;
1272                 goto done;
1273         }
1274
1275         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1276         if (dn == NULL) {
1277                 ret = LDB_ERR_OPERATIONS_ERROR;
1278                 goto done;
1279         }
1280
1281         msg = ldb_msg_new(tmp_ctx);
1282         if (msg == NULL) {
1283                 ret = LDB_ERR_OPERATIONS_ERROR;
1284                 goto done;
1285         }
1286
1287         ret = ltdb_search_dn1(module, dn, msg, 0);
1288         if (ret != LDB_SUCCESS) {
1289                 goto done;
1290         }
1291
1292         switch (seq->type) {
1293         case LDB_SEQ_HIGHEST_SEQ:
1294                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1295                 break;
1296         case LDB_SEQ_NEXT:
1297                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1298                 res->seq_num++;
1299                 break;
1300         case LDB_SEQ_HIGHEST_TIMESTAMP:
1301                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1302                 if (date) {
1303                         res->seq_num = ldb_string_to_time(date);
1304                 } else {
1305                         res->seq_num = 0;
1306                         /* zero is as good as anything when we don't know */
1307                 }
1308                 break;
1309         }
1310
1311         *ext = talloc_zero(req, struct ldb_extended);
1312         if (*ext == NULL) {
1313                 ret = LDB_ERR_OPERATIONS_ERROR;
1314                 goto done;
1315         }
1316         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1317         (*ext)->data = talloc_steal(*ext, res);
1318
1319 done:
1320         talloc_free(tmp_ctx);
1321         ltdb_unlock_read(module);
1322         return ret;
1323 }
1324
1325 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1326 {
1327         struct ldb_context *ldb;
1328         struct ldb_request *req;
1329         struct ldb_reply *ares;
1330
1331         ldb = ldb_module_get_ctx(ctx->module);
1332         req = ctx->req;
1333
1334         /* if we already returned an error just return */
1335         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1336                 return;
1337         }
1338
1339         ares = talloc_zero(req, struct ldb_reply);
1340         if (!ares) {
1341                 ldb_oom(ldb);
1342                 req->callback(req, NULL);
1343                 return;
1344         }
1345         ares->type = LDB_REPLY_DONE;
1346         ares->error = error;
1347
1348         req->callback(req, ares);
1349 }
1350
1351 static void ltdb_timeout(struct tevent_context *ev,
1352                           struct tevent_timer *te,
1353                           struct timeval t,
1354                           void *private_data)
1355 {
1356         struct ltdb_context *ctx;
1357         ctx = talloc_get_type(private_data, struct ltdb_context);
1358
1359         if (!ctx->request_terminated) {
1360                 /* request is done now */
1361                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1362         }
1363
1364         if (ctx->spy) {
1365                 /* neutralize the spy */
1366                 ctx->spy->ctx = NULL;
1367                 ctx->spy = NULL;
1368         }
1369         talloc_free(ctx);
1370 }
1371
1372 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1373                                         struct ldb_extended *ext,
1374                                         int error)
1375 {
1376         struct ldb_context *ldb;
1377         struct ldb_request *req;
1378         struct ldb_reply *ares;
1379
1380         ldb = ldb_module_get_ctx(ctx->module);
1381         req = ctx->req;
1382
1383         /* if we already returned an error just return */
1384         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1385                 return;
1386         }
1387
1388         ares = talloc_zero(req, struct ldb_reply);
1389         if (!ares) {
1390                 ldb_oom(ldb);
1391                 req->callback(req, NULL);
1392                 return;
1393         }
1394         ares->type = LDB_REPLY_DONE;
1395         ares->response = ext;
1396         ares->error = error;
1397
1398         req->callback(req, ares);
1399 }
1400
1401 static void ltdb_handle_extended(struct ltdb_context *ctx)
1402 {
1403         struct ldb_extended *ext = NULL;
1404         int ret;
1405
1406         if (strcmp(ctx->req->op.extended.oid,
1407                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1408                 /* get sequence number */
1409                 ret = ltdb_sequence_number(ctx, &ext);
1410         } else {
1411                 /* not recognized */
1412                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1413         }
1414
1415         ltdb_request_extended_done(ctx, ext, ret);
1416 }
1417
1418 static void ltdb_callback(struct tevent_context *ev,
1419                           struct tevent_timer *te,
1420                           struct timeval t,
1421                           void *private_data)
1422 {
1423         struct ltdb_context *ctx;
1424         int ret;
1425
1426         ctx = talloc_get_type(private_data, struct ltdb_context);
1427
1428         if (ctx->request_terminated) {
1429                 goto done;
1430         }
1431
1432         switch (ctx->req->operation) {
1433         case LDB_SEARCH:
1434                 ret = ltdb_search(ctx);
1435                 break;
1436         case LDB_ADD:
1437                 ret = ltdb_add(ctx);
1438                 break;
1439         case LDB_MODIFY:
1440                 ret = ltdb_modify(ctx);
1441                 break;
1442         case LDB_DELETE:
1443                 ret = ltdb_delete(ctx);
1444                 break;
1445         case LDB_RENAME:
1446                 ret = ltdb_rename(ctx);
1447                 break;
1448         case LDB_EXTENDED:
1449                 ltdb_handle_extended(ctx);
1450                 goto done;
1451         default:
1452                 /* no other op supported */
1453                 ret = LDB_ERR_PROTOCOL_ERROR;
1454         }
1455
1456         if (!ctx->request_terminated) {
1457                 /* request is done now */
1458                 ltdb_request_done(ctx, ret);
1459         }
1460
1461 done:
1462         if (ctx->spy) {
1463                 /* neutralize the spy */
1464                 ctx->spy->ctx = NULL;
1465                 ctx->spy = NULL;
1466         }
1467         talloc_free(ctx);
1468 }
1469
1470 static int ltdb_request_destructor(void *ptr)
1471 {
1472         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1473
1474         if (spy->ctx != NULL) {
1475                 spy->ctx->spy = NULL;
1476                 spy->ctx->request_terminated = true;
1477                 spy->ctx = NULL;
1478         }
1479
1480         return 0;
1481 }
1482
1483 static int ltdb_handle_request(struct ldb_module *module,
1484                                struct ldb_request *req)
1485 {
1486         struct ldb_control *control_permissive;
1487         struct ldb_context *ldb;
1488         struct tevent_context *ev;
1489         struct ltdb_context *ac;
1490         struct tevent_timer *te;
1491         struct timeval tv;
1492         unsigned int i;
1493
1494         ldb = ldb_module_get_ctx(module);
1495
1496         control_permissive = ldb_request_get_control(req,
1497                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1498
1499         for (i = 0; req->controls && req->controls[i]; i++) {
1500                 if (req->controls[i]->critical &&
1501                     req->controls[i] != control_permissive) {
1502                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1503                                                req->controls[i]->oid);
1504                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1505                 }
1506         }
1507
1508         if (req->starttime == 0 || req->timeout == 0) {
1509                 ldb_set_errstring(ldb, "Invalid timeout settings");
1510                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1511         }
1512
1513         ev = ldb_handle_get_event_context(req->handle);
1514
1515         ac = talloc_zero(ldb, struct ltdb_context);
1516         if (ac == NULL) {
1517                 ldb_oom(ldb);
1518                 return LDB_ERR_OPERATIONS_ERROR;
1519         }
1520
1521         ac->module = module;
1522         ac->req = req;
1523
1524         tv.tv_sec = 0;
1525         tv.tv_usec = 0;
1526         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1527         if (NULL == te) {
1528                 talloc_free(ac);
1529                 return LDB_ERR_OPERATIONS_ERROR;
1530         }
1531
1532         if (req->timeout > 0) {
1533                 tv.tv_sec = req->starttime + req->timeout;
1534                 tv.tv_usec = 0;
1535                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1536                                                      ltdb_timeout, ac);
1537                 if (NULL == ac->timeout_event) {
1538                         talloc_free(ac);
1539                         return LDB_ERR_OPERATIONS_ERROR;
1540                 }
1541         }
1542
1543         /* set a spy so that we do not try to use the request context
1544          * if it is freed before ltdb_callback fires */
1545         ac->spy = talloc(req, struct ltdb_req_spy);
1546         if (NULL == ac->spy) {
1547                 talloc_free(ac);
1548                 return LDB_ERR_OPERATIONS_ERROR;
1549         }
1550         ac->spy->ctx = ac;
1551
1552         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1553
1554         return LDB_SUCCESS;
1555 }
1556
1557 static int ltdb_init_rootdse(struct ldb_module *module)
1558 {
1559         /* ignore errors on this - we expect it for non-sam databases */
1560         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1561
1562         /* there can be no module beyond the backend, just return */
1563         return LDB_SUCCESS;
1564 }
1565
1566 static const struct ldb_module_ops ltdb_ops = {
1567         .name              = "tdb",
1568         .init_context      = ltdb_init_rootdse,
1569         .search            = ltdb_handle_request,
1570         .add               = ltdb_handle_request,
1571         .modify            = ltdb_handle_request,
1572         .del               = ltdb_handle_request,
1573         .rename            = ltdb_handle_request,
1574         .extended          = ltdb_handle_request,
1575         .start_transaction = ltdb_start_trans,
1576         .end_transaction   = ltdb_end_trans,
1577         .prepare_commit    = ltdb_prepare_commit,
1578         .del_transaction   = ltdb_del_trans,
1579 };
1580
1581 /*
1582   connect to the database
1583 */
1584 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1585                         unsigned int flags, const char *options[],
1586                         struct ldb_module **_module)
1587 {
1588         struct ldb_module *module;
1589         const char *path;
1590         int tdb_flags, open_flags;
1591         struct ltdb_private *ltdb;
1592
1593         /*
1594          * We hold locks, so we must use a private event context
1595          * on each returned handle
1596          */
1597
1598         ldb_set_require_private_event_context(ldb);
1599
1600         /* parse the url */
1601         if (strchr(url, ':')) {
1602                 if (strncmp(url, "tdb://", 6) != 0) {
1603                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1604                                   "Invalid tdb URL '%s'", url);
1605                         return LDB_ERR_OPERATIONS_ERROR;
1606                 }
1607                 path = url+6;
1608         } else {
1609                 path = url;
1610         }
1611
1612         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1613
1614         /* check for the 'nosync' option */
1615         if (flags & LDB_FLG_NOSYNC) {
1616                 tdb_flags |= TDB_NOSYNC;
1617         }
1618
1619         /* and nommap option */
1620         if (flags & LDB_FLG_NOMMAP) {
1621                 tdb_flags |= TDB_NOMMAP;
1622         }
1623
1624         if (flags & LDB_FLG_RDONLY) {
1625                 open_flags = O_RDONLY;
1626         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1627                 open_flags = O_RDWR;
1628         } else {
1629                 open_flags = O_CREAT | O_RDWR;
1630         }
1631
1632         ltdb = talloc_zero(ldb, struct ltdb_private);
1633         if (!ltdb) {
1634                 ldb_oom(ldb);
1635                 return LDB_ERR_OPERATIONS_ERROR;
1636         }
1637
1638         /* note that we use quite a large default hash size */
1639         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1640                                    tdb_flags, open_flags,
1641                                    ldb_get_create_perms(ldb), ldb);
1642         if (!ltdb->tdb) {
1643                 ldb_asprintf_errstring(ldb,
1644                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1645                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1646                           "Unable to open tdb '%s': %s", path, strerror(errno));
1647                 talloc_free(ltdb);
1648                 if (errno == EACCES || errno == EPERM) {
1649                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1650                 }
1651                 return LDB_ERR_OPERATIONS_ERROR;
1652         }
1653
1654         if (getenv("LDB_WARN_UNINDEXED")) {
1655                 ltdb->warn_unindexed = true;
1656         }
1657
1658         if (getenv("LDB_WARN_REINDEX")) {
1659                 ltdb->warn_reindex = true;
1660         }
1661
1662         ltdb->sequence_number = 0;
1663
1664         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1665         if (!module) {
1666                 ldb_oom(ldb);
1667                 talloc_free(ltdb);
1668                 return LDB_ERR_OPERATIONS_ERROR;
1669         }
1670         ldb_module_set_private(module, ltdb);
1671         talloc_steal(module, ltdb);
1672
1673         if (ltdb_cache_load(module) != 0) {
1674                 ldb_asprintf_errstring(ldb,
1675                                        "Unable to load ltdb cache records of tdb '%s'", path);
1676                 talloc_free(module);
1677                 return LDB_ERR_OPERATIONS_ERROR;
1678         }
1679
1680         *_module = module;
1681         return LDB_SUCCESS;
1682 }
1683
1684 int ldb_tdb_init(const char *version)
1685 {
1686         LDB_MODULE_CHECK_VERSION(version);
1687         return ldb_register_backend("tdb", ltdb_connect, false);
1688 }