20796f2162dd2180721f8bc5a31053ddd2f52f88
[sfrench/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 static int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 static int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /* The caller is to provide a correctly sized key */
225 int ltdb_guid_to_key(struct ldb_module *module,
226                      struct ltdb_private *ltdb,
227                      const struct ldb_val *GUID_val,
228                      TDB_DATA *key)
229 {
230         const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
231         const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
232
233         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
238         memcpy(&key->dptr[GUID_prefix_len],
239                GUID_val->data, GUID_val->length);
240         return LDB_SUCCESS;
241 }
242
243 /*
244  * The caller is to provide a correctly sized key, used only in
245  * the GUID index mode
246  */
247 int ltdb_idx_to_key(struct ldb_module *module,
248                     struct ltdb_private *ltdb,
249                     TALLOC_CTX *mem_ctx,
250                     const struct ldb_val *idx_val,
251                     TDB_DATA *key)
252 {
253         struct ldb_context *ldb = ldb_module_get_ctx(module);
254         struct ldb_dn *dn;
255
256         if (ltdb->cache->GUID_index_attribute != NULL) {
257                 return ltdb_guid_to_key(module, ltdb,
258                                         idx_val, key);
259         }
260
261         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
262         if (dn == NULL) {
263                 /*
264                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
265                  * to the caller, as this in an invalid index value
266                  */
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269         /* form the key */
270         *key = ltdb_key_dn(module, mem_ctx, dn);
271         TALLOC_FREE(dn);
272         if (!key->dptr) {
273                 return ldb_module_oom(module);
274         }
275         return LDB_SUCCESS;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293         int ret;
294
295         if (ltdb->cache->GUID_index_attribute == NULL) {
296                 return ltdb_key_dn(module, mem_ctx, msg->dn);
297         }
298
299         if (ldb_dn_is_special(msg->dn)) {
300                 return ltdb_key_dn(module, mem_ctx, msg->dn);
301         }
302
303         guid_val = ldb_msg_find_ldb_val(msg,
304                                        ltdb->cache->GUID_index_attribute);
305         if (guid_val == NULL) {
306                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
307                                        "Did not find GUID attribute %s "
308                                        "in %s, required for TDB record "
309                                        "key in " LTDB_IDXGUID " mode.",
310                                        ltdb->cache->GUID_index_attribute,
311                                        ldb_dn_get_linearized(msg->dn));
312                 errno = EINVAL;
313                 key.dptr = NULL;
314                 key.dsize = 0;
315                 return key;
316         }
317
318         /* In this case, allocate with talloc */
319         key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
320         if (key.dptr == NULL) {
321                 errno = ENOMEM;
322                 key.dptr = NULL;
323                 key.dsize = 0;
324                 return key;
325         }
326         key.dsize = talloc_get_size(key.dptr);
327
328         ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
329
330         if (ret != LDB_SUCCESS) {
331                 errno = EINVAL;
332                 key.dptr = NULL;
333                 key.dsize = 0;
334                 return key;
335         }
336         return key;
337 }
338
339 /*
340   check special dn's have valid attributes
341   currently only @ATTRIBUTES is checked
342 */
343 static int ltdb_check_special_dn(struct ldb_module *module,
344                                  const struct ldb_message *msg)
345 {
346         struct ldb_context *ldb = ldb_module_get_ctx(module);
347         unsigned int i, j;
348
349         if (! ldb_dn_is_special(msg->dn) ||
350             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                 return LDB_SUCCESS;
352         }
353
354         /* we have @ATTRIBUTES, let's check attributes are fine */
355         /* should we check that we deny multivalued attributes ? */
356         for (i = 0; i < msg->num_elements; i++) {
357                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
358
359                 for (j = 0; j < msg->elements[i].num_values; j++) {
360                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
361                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
362                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
363                         }
364                 }
365         }
366
367         return LDB_SUCCESS;
368 }
369
370
371 /*
372   we've made a modification to a dn - possibly reindex and
373   update sequence number
374 */
375 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         int ret = LDB_SUCCESS;
378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
379
380         /* only allow modifies inside a transaction, otherwise the
381          * ldb is unsafe */
382         if (ltdb->in_transaction == 0) {
383                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         if (ldb_dn_is_special(dn) &&
388             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
389              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
390         {
391                 if (ltdb->warn_reindex) {
392                         ldb_debug(ldb_module_get_ctx(module),
393                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
394                                 ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
395                 }
396                 ret = ltdb_reindex(module);
397         }
398
399         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
400         if (ret == LDB_SUCCESS &&
401             !(ldb_dn_is_special(dn) &&
402               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
403                 ret = ltdb_increase_sequence_number(module);
404         }
405
406         /* If the modify was to @OPTIONS, reload the cache */
407         if (ret == LDB_SUCCESS &&
408             ldb_dn_is_special(dn) &&
409             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
410                 ret = ltdb_cache_reload(module);
411         }
412
413         if (ret != LDB_SUCCESS) {
414                 ltdb->reindex_failed = true;
415         }
416
417         return ret;
418 }
419
420 static int ltdb_tdb_store(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags)
421 {
422         return tdb_store(ltdb->tdb, key, data, flags);
423 }
424
425 static int ltdb_error(struct ltdb_private *ltdb)
426 {
427         return ltdb_err_map(tdb_error(ltdb->tdb));
428 }
429
430 static const char *ltdb_errorstr(struct ltdb_private *ltdb)
431 {
432         return tdb_errorstr(ltdb->tdb);
433 }
434
435 /*
436   store a record into the db
437 */
438 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
439 {
440         void *data = ldb_module_get_private(module);
441         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
442         TDB_DATA tdb_key, tdb_data;
443         struct ldb_val ldb_data;
444         int ret = LDB_SUCCESS;
445         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
446
447         if (tdb_key_ctx == NULL) {
448                 return ldb_module_oom(module);
449         }
450
451         if (ltdb->read_only) {
452                 return LDB_ERR_UNWILLING_TO_PERFORM;
453         }
454
455         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
456         if (tdb_key.dptr == NULL) {
457                 TALLOC_FREE(tdb_key_ctx);
458                 return LDB_ERR_OTHER;
459         }
460
461         ret = ldb_pack_data(ldb_module_get_ctx(module),
462                             msg, &ldb_data);
463         if (ret == -1) {
464                 TALLOC_FREE(tdb_key_ctx);
465                 return LDB_ERR_OTHER;
466         }
467
468         tdb_data.dptr = ldb_data.data;
469         tdb_data.dsize = ldb_data.length;
470
471         ret = ltdb->kv_ops->store(ltdb, tdb_key, tdb_data, flgs);
472         if (ret != 0) {
473                 bool is_special = ldb_dn_is_special(msg->dn);
474                 ret = ltdb->kv_ops->error(ltdb);
475
476                 /*
477                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
478                  * the GUID, so re-map
479                  */
480                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
481                     && !is_special
482                     && ltdb->cache->GUID_index_attribute != NULL) {
483                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
484                 }
485                 goto done;
486         }
487
488 done:
489         TALLOC_FREE(tdb_key_ctx);
490         talloc_free(ldb_data.data);
491
492         return ret;
493 }
494
495
496 /*
497   check if a attribute is a single valued, for a given element
498  */
499 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
500                                   struct ldb_message_element *el)
501 {
502         if (!a) return false;
503         if (el != NULL) {
504                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
505                         /* override from a ldb module, for example
506                            used for the description field, which is
507                            marked multi-valued in the schema but which
508                            should not actually accept multiple
509                            values */
510                         return true;
511                 }
512                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
513                         /* override from a ldb module, for example used for
514                            deleted linked attribute entries */
515                         return false;
516                 }
517         }
518         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
519                 return true;
520         }
521         return false;
522 }
523
524 static int ltdb_add_internal(struct ldb_module *module,
525                              struct ltdb_private *ltdb,
526                              const struct ldb_message *msg,
527                              bool check_single_value)
528 {
529         struct ldb_context *ldb = ldb_module_get_ctx(module);
530         int ret = LDB_SUCCESS;
531         unsigned int i;
532
533         for (i=0;i<msg->num_elements;i++) {
534                 struct ldb_message_element *el = &msg->elements[i];
535                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
536
537                 if (el->num_values == 0) {
538                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
539                                                el->name, ldb_dn_get_linearized(msg->dn));
540                         return LDB_ERR_CONSTRAINT_VIOLATION;
541                 }
542                 if (check_single_value &&
543                                 el->num_values > 1 &&
544                                 ldb_tdb_single_valued(a, el)) {
545                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
546                                                el->name, ldb_dn_get_linearized(msg->dn));
547                         return LDB_ERR_CONSTRAINT_VIOLATION;
548                 }
549
550                 /* Do not check "@ATTRIBUTES" for duplicated values */
551                 if (ldb_dn_is_special(msg->dn) &&
552                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
553                         continue;
554                 }
555
556                 if (check_single_value &&
557                     !(el->flags &
558                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
559                         struct ldb_val *duplicate = NULL;
560
561                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
562                                                          el, &duplicate, 0);
563                         if (ret != LDB_SUCCESS) {
564                                 return ret;
565                         }
566                         if (duplicate != NULL) {
567                                 ldb_asprintf_errstring(
568                                         ldb,
569                                         "attribute '%s': value '%.*s' on '%s' "
570                                         "provided more than once in ADD object",
571                                         el->name,
572                                         (int)duplicate->length,
573                                         duplicate->data,
574                                         ldb_dn_get_linearized(msg->dn));
575                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
576                         }
577                 }
578         }
579
580         ret = ltdb_store(module, msg, TDB_INSERT);
581         if (ret != LDB_SUCCESS) {
582                 /*
583                  * Try really hard to get the right error code for
584                  * a re-add situation, as this can matter!
585                  */
586                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
587                         int ret2;
588                         struct ldb_dn *dn2 = NULL;
589                         TALLOC_CTX *mem_ctx = talloc_new(module);
590                         if (mem_ctx == NULL) {
591                                 return ldb_module_operr(module);
592                         }
593                         ret2 = ltdb_search_base(module, module,
594                                                 msg->dn, &dn2);
595                         TALLOC_FREE(mem_ctx);
596                         if (ret2 == LDB_SUCCESS) {
597                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
598                         }
599                 }
600                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
601                         ldb_asprintf_errstring(ldb,
602                                                "Entry %s already exists",
603                                                ldb_dn_get_linearized(msg->dn));
604                 }
605                 return ret;
606         }
607
608         ret = ltdb_index_add_new(module, ltdb, msg);
609         if (ret != LDB_SUCCESS) {
610                 /*
611                  * If we failed to index, delete the message again.
612                  *
613                  * This is particularly important for the GUID index
614                  * case, which will only fail for a duplicate DN
615                  * in the index add.
616                  *
617                  * Note that the caller may not cancel the transation
618                  * and this means the above add might really show up!
619                  */
620                 ltdb_delete_noindex(module, msg);
621                 return ret;
622         }
623
624         ret = ltdb_modified(module, msg->dn);
625
626         return ret;
627 }
628
629 /*
630   add a record to the database
631 */
632 static int ltdb_add(struct ltdb_context *ctx)
633 {
634         struct ldb_module *module = ctx->module;
635         struct ldb_request *req = ctx->req;
636         void *data = ldb_module_get_private(module);
637         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
638         int ret = LDB_SUCCESS;
639
640         ret = ltdb_check_special_dn(module, req->op.add.message);
641         if (ret != LDB_SUCCESS) {
642                 return ret;
643         }
644
645         ldb_request_set_state(req, LDB_ASYNC_PENDING);
646
647         if (ltdb_cache_load(module) != 0) {
648                 return LDB_ERR_OPERATIONS_ERROR;
649         }
650
651         ret = ltdb_add_internal(module, ltdb,
652                                 req->op.add.message, true);
653
654         return ret;
655 }
656
657 static int ltdb_tdb_delete(struct ltdb_private *ltdb, TDB_DATA tdb_key)
658 {
659         return tdb_delete(ltdb->tdb, tdb_key);
660 }
661
662 /*
663   delete a record from the database, not updating indexes (used for deleting
664   index records)
665 */
666 int ltdb_delete_noindex(struct ldb_module *module,
667                         const struct ldb_message *msg)
668 {
669         void *data = ldb_module_get_private(module);
670         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
671         TDB_DATA tdb_key;
672         int ret;
673         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
674
675         if (tdb_key_ctx == NULL) {
676                 return ldb_module_oom(module);
677         }
678
679         if (ltdb->read_only) {
680                 return LDB_ERR_UNWILLING_TO_PERFORM;
681         }
682
683         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
684         if (!tdb_key.dptr) {
685                 TALLOC_FREE(tdb_key_ctx);
686                 return LDB_ERR_OTHER;
687         }
688
689         ret = ltdb->kv_ops->delete(ltdb, tdb_key);
690         TALLOC_FREE(tdb_key_ctx);
691
692         if (ret != 0) {
693                 ret = ltdb->kv_ops->error(ltdb);
694         }
695
696         return ret;
697 }
698
699 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
700 {
701         struct ldb_message *msg;
702         int ret = LDB_SUCCESS;
703
704         msg = ldb_msg_new(module);
705         if (msg == NULL) {
706                 return LDB_ERR_OPERATIONS_ERROR;
707         }
708
709         /* in case any attribute of the message was indexed, we need
710            to fetch the old record */
711         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
712         if (ret != LDB_SUCCESS) {
713                 /* not finding the old record is an error */
714                 goto done;
715         }
716
717         ret = ltdb_delete_noindex(module, msg);
718         if (ret != LDB_SUCCESS) {
719                 goto done;
720         }
721
722         /* remove any indexed attributes */
723         ret = ltdb_index_delete(module, msg);
724         if (ret != LDB_SUCCESS) {
725                 goto done;
726         }
727
728         ret = ltdb_modified(module, dn);
729         if (ret != LDB_SUCCESS) {
730                 goto done;
731         }
732
733 done:
734         talloc_free(msg);
735         return ret;
736 }
737
738 /*
739   delete a record from the database
740 */
741 static int ltdb_delete(struct ltdb_context *ctx)
742 {
743         struct ldb_module *module = ctx->module;
744         struct ldb_request *req = ctx->req;
745         int ret = LDB_SUCCESS;
746
747         ldb_request_set_state(req, LDB_ASYNC_PENDING);
748
749         if (ltdb_cache_load(module) != 0) {
750                 return LDB_ERR_OPERATIONS_ERROR;
751         }
752
753         ret = ltdb_delete_internal(module, req->op.del.dn);
754
755         return ret;
756 }
757
758 /*
759   find an element by attribute name. At the moment this does a linear search,
760   it should be re-coded to use a binary search once all places that modify
761   records guarantee sorted order
762
763   return the index of the first matching element if found, otherwise -1
764 */
765 static int find_element(const struct ldb_message *msg, const char *name)
766 {
767         unsigned int i;
768         for (i=0;i<msg->num_elements;i++) {
769                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
770                         return i;
771                 }
772         }
773         return -1;
774 }
775
776
777 /*
778   add an element to an existing record. Assumes a elements array that we
779   can call re-alloc on, and assumed that we can re-use the data pointers from
780   the passed in additional values. Use with care!
781
782   returns 0 on success, -1 on failure (and sets errno)
783 */
784 static int ltdb_msg_add_element(struct ldb_message *msg,
785                                 struct ldb_message_element *el)
786 {
787         struct ldb_message_element *e2;
788         unsigned int i;
789
790         if (el->num_values == 0) {
791                 /* nothing to do here - we don't add empty elements */
792                 return 0;
793         }
794
795         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
796                               msg->num_elements+1);
797         if (!e2) {
798                 errno = ENOMEM;
799                 return -1;
800         }
801
802         msg->elements = e2;
803
804         e2 = &msg->elements[msg->num_elements];
805
806         e2->name = el->name;
807         e2->flags = el->flags;
808         e2->values = talloc_array(msg->elements,
809                                   struct ldb_val, el->num_values);
810         if (!e2->values) {
811                 errno = ENOMEM;
812                 return -1;
813         }
814         for (i=0;i<el->num_values;i++) {
815                 e2->values[i] = el->values[i];
816         }
817         e2->num_values = el->num_values;
818
819         ++msg->num_elements;
820
821         return 0;
822 }
823
824 /*
825   delete all elements having a specified attribute name
826 */
827 static int msg_delete_attribute(struct ldb_module *module,
828                                 struct ltdb_private *ltdb,
829                                 struct ldb_message *msg, const char *name)
830 {
831         unsigned int i;
832         int ret;
833         struct ldb_message_element *el;
834         bool is_special = ldb_dn_is_special(msg->dn);
835
836         if (!is_special
837             && ltdb->cache->GUID_index_attribute != NULL
838             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
839                 struct ldb_context *ldb = ldb_module_get_ctx(module);
840                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
841                                        "attribute %s (used as DB index)",
842                                        ltdb->cache->GUID_index_attribute);
843                 return LDB_ERR_CONSTRAINT_VIOLATION;
844         }
845
846         el = ldb_msg_find_element(msg, name);
847         if (el == NULL) {
848                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
849         }
850         i = el - msg->elements;
851
852         ret = ltdb_index_del_element(module, ltdb, msg, el);
853         if (ret != LDB_SUCCESS) {
854                 return ret;
855         }
856
857         talloc_free(el->values);
858         if (msg->num_elements > (i+1)) {
859                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
860         }
861         msg->num_elements--;
862         msg->elements = talloc_realloc(msg, msg->elements,
863                                        struct ldb_message_element,
864                                        msg->num_elements);
865         return LDB_SUCCESS;
866 }
867
868 /*
869   delete all elements matching an attribute name/value
870
871   return LDB Error on failure
872 */
873 static int msg_delete_element(struct ldb_module *module,
874                               struct ltdb_private *ltdb,
875                               struct ldb_message *msg,
876                               const char *name,
877                               const struct ldb_val *val)
878 {
879         struct ldb_context *ldb = ldb_module_get_ctx(module);
880         unsigned int i;
881         int found, ret;
882         struct ldb_message_element *el;
883         const struct ldb_schema_attribute *a;
884
885         found = find_element(msg, name);
886         if (found == -1) {
887                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
888         }
889
890         i = (unsigned int) found;
891         el = &(msg->elements[i]);
892
893         a = ldb_schema_attribute_by_name(ldb, el->name);
894
895         for (i=0;i<el->num_values;i++) {
896                 bool matched;
897                 if (a->syntax->operator_fn) {
898                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
899                                                      &el->values[i], val, &matched);
900                         if (ret != LDB_SUCCESS) return ret;
901                 } else {
902                         matched = (a->syntax->comparison_fn(ldb, ldb,
903                                                             &el->values[i], val) == 0);
904                 }
905                 if (matched) {
906                         if (el->num_values == 1) {
907                                 return msg_delete_attribute(module,
908                                                             ltdb, msg, name);
909                         }
910
911                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
912                         if (ret != LDB_SUCCESS) {
913                                 return ret;
914                         }
915
916                         if (i<el->num_values-1) {
917                                 memmove(&el->values[i], &el->values[i+1],
918                                         sizeof(el->values[i])*
919                                                 (el->num_values-(i+1)));
920                         }
921                         el->num_values--;
922
923                         /* per definition we find in a canonicalised message an
924                            attribute value only once. So we are finished here */
925                         return LDB_SUCCESS;
926                 }
927         }
928
929         /* Not found */
930         return LDB_ERR_NO_SUCH_ATTRIBUTE;
931 }
932
933 /*
934   modify a record - internal interface
935
936   yuck - this is O(n^2). Luckily n is usually small so we probably
937   get away with it, but if we ever have really large attribute lists
938   then we'll need to look at this again
939
940   'req' is optional, and is used to specify controls if supplied
941 */
942 int ltdb_modify_internal(struct ldb_module *module,
943                          const struct ldb_message *msg,
944                          struct ldb_request *req)
945 {
946         struct ldb_context *ldb = ldb_module_get_ctx(module);
947         void *data = ldb_module_get_private(module);
948         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
949         struct ldb_message *msg2;
950         unsigned int i, j;
951         int ret = LDB_SUCCESS, idx;
952         struct ldb_control *control_permissive = NULL;
953         TALLOC_CTX *mem_ctx = talloc_new(req);
954
955         if (mem_ctx == NULL) {
956                 return ldb_module_oom(module);
957         }
958         
959         if (req) {
960                 control_permissive = ldb_request_get_control(req,
961                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
962         }
963
964         msg2 = ldb_msg_new(mem_ctx);
965         if (msg2 == NULL) {
966                 ret = LDB_ERR_OTHER;
967                 goto done;
968         }
969
970         ret = ltdb_search_dn1(module, msg->dn,
971                               msg2,
972                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
973         if (ret != LDB_SUCCESS) {
974                 goto done;
975         }
976
977         for (i=0; i<msg->num_elements; i++) {
978                 struct ldb_message_element *el = &msg->elements[i], *el2;
979                 struct ldb_val *vals;
980                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
981                 const char *dn;
982                 uint32_t options = 0;
983                 if (control_permissive != NULL) {
984                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
985                 }
986
987                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
988                 case LDB_FLAG_MOD_ADD:
989
990                         if (el->num_values == 0) {
991                                 ldb_asprintf_errstring(ldb,
992                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
993                                                        el->name, ldb_dn_get_linearized(msg2->dn));
994                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
995                                 goto done;
996                         }
997
998                         /* make a copy of the array so that a permissive
999                          * control can remove duplicates without changing the
1000                          * original values, but do not copy data as we do not
1001                          * need to keep it around once the operation is
1002                          * finished */
1003                         if (control_permissive) {
1004                                 el = talloc(msg2, struct ldb_message_element);
1005                                 if (!el) {
1006                                         ret = LDB_ERR_OTHER;
1007                                         goto done;
1008                                 }
1009                                 *el = msg->elements[i];
1010                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
1011                                 if (el->values == NULL) {
1012                                         ret = LDB_ERR_OTHER;
1013                                         goto done;
1014                                 }
1015                                 for (j = 0; j < el->num_values; j++) {
1016                                         el->values[j] = msg->elements[i].values[j];
1017                                 }
1018                         }
1019
1020                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1021                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1022                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1023                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1024                                 goto done;
1025                         }
1026
1027                         /* Checks if element already exists */
1028                         idx = find_element(msg2, el->name);
1029                         if (idx == -1) {
1030                                 if (ltdb_msg_add_element(msg2, el) != 0) {
1031                                         ret = LDB_ERR_OTHER;
1032                                         goto done;
1033                                 }
1034                                 ret = ltdb_index_add_element(module, ltdb,
1035                                                              msg2,
1036                                                              el);
1037                                 if (ret != LDB_SUCCESS) {
1038                                         goto done;
1039                                 }
1040                         } else {
1041                                 j = (unsigned int) idx;
1042                                 el2 = &(msg2->elements[j]);
1043
1044                                 /* We cannot add another value on a existing one
1045                                    if the attribute is single-valued */
1046                                 if (ldb_tdb_single_valued(a, el)) {
1047                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1048                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1049                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1050                                         goto done;
1051                                 }
1052
1053                                 /* Check that values don't exist yet on multi-
1054                                    valued attributes or aren't provided twice */
1055                                 if (!(el->flags &
1056                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1057                                         struct ldb_val *duplicate = NULL;
1058                                         ret = ldb_msg_find_common_values(ldb,
1059                                                                          msg2,
1060                                                                          el,
1061                                                                          el2,
1062                                                                          options);
1063
1064                                         if (ret ==
1065                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1066                                                 ldb_asprintf_errstring(ldb,
1067                                                         "attribute '%s': value "
1068                                                         "#%u on '%s' already "
1069                                                         "exists", el->name, j,
1070                                                         ldb_dn_get_linearized(msg2->dn));
1071                                                 goto done;
1072                                         } else if (ret != LDB_SUCCESS) {
1073                                                 goto done;
1074                                         }
1075
1076                                         ret = ldb_msg_find_duplicate_val(
1077                                                 ldb, msg2, el, &duplicate, 0);
1078                                         if (ret != LDB_SUCCESS) {
1079                                                 goto done;
1080                                         }
1081                                         if (duplicate != NULL) {
1082                                                 ldb_asprintf_errstring(
1083                                                         ldb,
1084                                                         "attribute '%s': value "
1085                                                         "'%.*s' on '%s' "
1086                                                         "provided more than "
1087                                                         "once in ADD",
1088                                                         el->name,
1089                                                         (int)duplicate->length,
1090                                                         duplicate->data,
1091                                                         ldb_dn_get_linearized(msg->dn));
1092                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1093                                                 goto done;
1094                                         }
1095                                 }
1096
1097                                 /* Now combine existing and new values to a new
1098                                    attribute record */
1099                                 vals = talloc_realloc(msg2->elements,
1100                                                       el2->values, struct ldb_val,
1101                                                       el2->num_values + el->num_values);
1102                                 if (vals == NULL) {
1103                                         ldb_oom(ldb);
1104                                         ret = LDB_ERR_OTHER;
1105                                         goto done;
1106                                 }
1107
1108                                 for (j=0; j<el->num_values; j++) {
1109                                         vals[el2->num_values + j] =
1110                                                 ldb_val_dup(vals, &el->values[j]);
1111                                 }
1112
1113                                 el2->values = vals;
1114                                 el2->num_values += el->num_values;
1115
1116                                 ret = ltdb_index_add_element(module, ltdb,
1117                                                              msg2, el);
1118                                 if (ret != LDB_SUCCESS) {
1119                                         goto done;
1120                                 }
1121                         }
1122
1123                         break;
1124
1125                 case LDB_FLAG_MOD_REPLACE:
1126
1127                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1128                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1129                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1130                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1131                                 goto done;
1132                         }
1133
1134                         /*
1135                          * We don't need to check this if we have been
1136                          * pre-screened by the repl_meta_data module
1137                          * in Samba, or someone else who can claim to
1138                          * know what they are doing. 
1139                          */
1140                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1141                                 struct ldb_val *duplicate = NULL;
1142
1143                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1144                                                                  &duplicate, 0);
1145                                 if (ret != LDB_SUCCESS) {
1146                                         goto done;
1147                                 }
1148                                 if (duplicate != NULL) {
1149                                         ldb_asprintf_errstring(
1150                                                 ldb,
1151                                                 "attribute '%s': value '%.*s' "
1152                                                 "on '%s' provided more than "
1153                                                 "once in REPLACE",
1154                                                 el->name,
1155                                                 (int)duplicate->length,
1156                                                 duplicate->data,
1157                                                 ldb_dn_get_linearized(msg2->dn));
1158                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1159                                         goto done;
1160                                 }
1161                         }
1162
1163                         /* Checks if element already exists */
1164                         idx = find_element(msg2, el->name);
1165                         if (idx != -1) {
1166                                 j = (unsigned int) idx;
1167                                 el2 = &(msg2->elements[j]);
1168
1169                                 /* we consider two elements to be
1170                                  * equal only if the order
1171                                  * matches. This allows dbcheck to
1172                                  * fix the ordering on attributes
1173                                  * where order matters, such as
1174                                  * objectClass
1175                                  */
1176                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1177                                         continue;
1178                                 }
1179
1180                                 /* Delete the attribute if it exists in the DB */
1181                                 if (msg_delete_attribute(module, ltdb,
1182                                                          msg2,
1183                                                          el->name) != 0) {
1184                                         ret = LDB_ERR_OTHER;
1185                                         goto done;
1186                                 }
1187                         }
1188
1189                         /* Recreate it with the new values */
1190                         if (ltdb_msg_add_element(msg2, el) != 0) {
1191                                 ret = LDB_ERR_OTHER;
1192                                 goto done;
1193                         }
1194
1195                         ret = ltdb_index_add_element(module, ltdb,
1196                                                      msg2, el);
1197                         if (ret != LDB_SUCCESS) {
1198                                 goto done;
1199                         }
1200
1201                         break;
1202
1203                 case LDB_FLAG_MOD_DELETE:
1204                         dn = ldb_dn_get_linearized(msg2->dn);
1205                         if (dn == NULL) {
1206                                 ret = LDB_ERR_OTHER;
1207                                 goto done;
1208                         }
1209
1210                         if (msg->elements[i].num_values == 0) {
1211                                 /* Delete the whole attribute */
1212                                 ret = msg_delete_attribute(module,
1213                                                            ltdb,
1214                                                            msg2,
1215                                                            msg->elements[i].name);
1216                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1217                                     control_permissive) {
1218                                         ret = LDB_SUCCESS;
1219                                 } else {
1220                                         ldb_asprintf_errstring(ldb,
1221                                                                "attribute '%s': no such attribute for delete on '%s'",
1222                                                                msg->elements[i].name, dn);
1223                                 }
1224                                 if (ret != LDB_SUCCESS) {
1225                                         goto done;
1226                                 }
1227                         } else {
1228                                 /* Delete specified values from an attribute */
1229                                 for (j=0; j < msg->elements[i].num_values; j++) {
1230                                         ret = msg_delete_element(module,
1231                                                                  ltdb,
1232                                                                  msg2,
1233                                                                  msg->elements[i].name,
1234                                                                  &msg->elements[i].values[j]);
1235                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1236                                             control_permissive) {
1237                                                 ret = LDB_SUCCESS;
1238                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1239                                                 ldb_asprintf_errstring(ldb,
1240                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1241                                                                        msg->elements[i].name, dn);
1242                                         }
1243                                         if (ret != LDB_SUCCESS) {
1244                                                 goto done;
1245                                         }
1246                                 }
1247                         }
1248                         break;
1249                 default:
1250                         ldb_asprintf_errstring(ldb,
1251                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1252                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1253                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1254                         ret = LDB_ERR_PROTOCOL_ERROR;
1255                         goto done;
1256                 }
1257         }
1258
1259         ret = ltdb_store(module, msg2, TDB_MODIFY);
1260         if (ret != LDB_SUCCESS) {
1261                 goto done;
1262         }
1263
1264         ret = ltdb_modified(module, msg2->dn);
1265         if (ret != LDB_SUCCESS) {
1266                 goto done;
1267         }
1268
1269 done:
1270         TALLOC_FREE(mem_ctx);
1271         return ret;
1272 }
1273
1274 /*
1275   modify a record
1276 */
1277 static int ltdb_modify(struct ltdb_context *ctx)
1278 {
1279         struct ldb_module *module = ctx->module;
1280         struct ldb_request *req = ctx->req;
1281         int ret = LDB_SUCCESS;
1282
1283         ret = ltdb_check_special_dn(module, req->op.mod.message);
1284         if (ret != LDB_SUCCESS) {
1285                 return ret;
1286         }
1287
1288         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1289
1290         if (ltdb_cache_load(module) != 0) {
1291                 return LDB_ERR_OPERATIONS_ERROR;
1292         }
1293
1294         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1295
1296         return ret;
1297 }
1298
1299 /*
1300   rename a record
1301 */
1302 static int ltdb_rename(struct ltdb_context *ctx)
1303 {
1304         struct ldb_module *module = ctx->module;
1305         void *data = ldb_module_get_private(module);
1306         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1307         struct ldb_request *req = ctx->req;
1308         struct ldb_message *msg;
1309         int ret = LDB_SUCCESS;
1310         TDB_DATA tdb_key, tdb_key_old;
1311         struct ldb_dn *db_dn;
1312
1313         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1314
1315         if (ltdb_cache_load(ctx->module) != 0) {
1316                 return LDB_ERR_OPERATIONS_ERROR;
1317         }
1318
1319         msg = ldb_msg_new(ctx);
1320         if (msg == NULL) {
1321                 return LDB_ERR_OPERATIONS_ERROR;
1322         }
1323
1324         /* we need to fetch the old record to re-add under the new name */
1325         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1326                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1327         if (ret != LDB_SUCCESS) {
1328                 /* not finding the old record is an error */
1329                 return ret;
1330         }
1331
1332         /* We need to, before changing the DB, check if the new DN
1333          * exists, so we can return this error to the caller with an
1334          * unmodified DB
1335          *
1336          * Even in GUID index mode we use ltdb_key_dn() as we are
1337          * trying to figure out if this is just a case rename
1338          */
1339         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1340         if (!tdb_key.dptr) {
1341                 talloc_free(msg);
1342                 return LDB_ERR_OPERATIONS_ERROR;
1343         }
1344
1345         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1346         if (!tdb_key_old.dptr) {
1347                 talloc_free(msg);
1348                 talloc_free(tdb_key.dptr);
1349                 return LDB_ERR_OPERATIONS_ERROR;
1350         }
1351
1352         /*
1353          * Only declare a conflict if the new DN already exists,
1354          * and it isn't a case change on the old DN
1355          */
1356         if (tdb_key_old.dsize != tdb_key.dsize
1357             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1358                 ret = ltdb_search_base(module, msg,
1359                                        req->op.rename.newdn,
1360                                        &db_dn);
1361                 if (ret == LDB_SUCCESS) {
1362                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1363                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1364                         ret = LDB_SUCCESS;
1365                 }
1366         }
1367
1368         /* finding the new record already in the DB is an error */
1369
1370         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1371                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1372                                        "Entry %s already exists",
1373                                        ldb_dn_get_linearized(req->op.rename.newdn));
1374         }
1375         if (ret != LDB_SUCCESS) {
1376                 talloc_free(tdb_key_old.dptr);
1377                 talloc_free(tdb_key.dptr);
1378                 talloc_free(msg);
1379                 return ret;
1380         }
1381
1382         talloc_free(tdb_key_old.dptr);
1383         talloc_free(tdb_key.dptr);
1384
1385         /* Always delete first then add, to avoid conflicts with
1386          * unique indexes. We rely on the transaction to make this
1387          * atomic
1388          */
1389         ret = ltdb_delete_internal(module, msg->dn);
1390         if (ret != LDB_SUCCESS) {
1391                 talloc_free(msg);
1392                 return ret;
1393         }
1394
1395         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1396         if (msg->dn == NULL) {
1397                 talloc_free(msg);
1398                 return LDB_ERR_OPERATIONS_ERROR;
1399         }
1400
1401         /* We don't check single value as we can have more than 1 with
1402          * deleted attributes. We could go through all elements but that's
1403          * maybe not the most efficient way
1404          */
1405         ret = ltdb_add_internal(module, ltdb, msg, false);
1406
1407         talloc_free(msg);
1408
1409         return ret;
1410 }
1411
1412 static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
1413 {
1414         return tdb_transaction_start(ltdb->tdb);
1415 }
1416
1417 static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
1418 {
1419         return tdb_transaction_cancel(ltdb->tdb);
1420 }
1421
1422 static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
1423 {
1424         return tdb_transaction_prepare_commit(ltdb->tdb);
1425 }
1426
1427 static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
1428 {
1429         return tdb_transaction_commit(ltdb->tdb);
1430 }
1431
1432 static int ltdb_start_trans(struct ldb_module *module)
1433 {
1434         void *data = ldb_module_get_private(module);
1435         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1436
1437         /* Do not take out the transaction lock on a read-only DB */
1438         if (ltdb->read_only) {
1439                 return LDB_ERR_UNWILLING_TO_PERFORM;
1440         }
1441
1442         if (ltdb->kv_ops->begin_write(ltdb) != 0) {
1443                 return ltdb->kv_ops->error(ltdb);
1444         }
1445
1446         ltdb->in_transaction++;
1447
1448         ltdb_index_transaction_start(module);
1449
1450         ltdb->reindex_failed = false;
1451
1452         return LDB_SUCCESS;
1453 }
1454
1455 /*
1456  * Forward declaration to allow prepare_commit to in fact abort the
1457  * transaction
1458  */
1459 static int ltdb_del_trans(struct ldb_module *module);
1460
1461 static int ltdb_prepare_commit(struct ldb_module *module)
1462 {
1463         int ret;
1464         void *data = ldb_module_get_private(module);
1465         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1466
1467         if (ltdb->in_transaction != 1) {
1468                 return LDB_SUCCESS;
1469         }
1470
1471         /*
1472          * Check if the last re-index failed.
1473          *
1474          * This can happen if for example a duplicate value was marked
1475          * unique.  We must not write a partial re-index into the DB.
1476          */
1477         if (ltdb->reindex_failed) {
1478                 /*
1479                  * We must instead abort the transaction so we get the
1480                  * old values and old index back
1481                  */
1482                 ltdb_del_trans(module);
1483                 ldb_set_errstring(ldb_module_get_ctx(module),
1484                                   "Failure during re-index, so "
1485                                   "transaction must be aborted.");
1486                 return LDB_ERR_OPERATIONS_ERROR;
1487         }
1488
1489         ret = ltdb_index_transaction_commit(module);
1490         if (ret != LDB_SUCCESS) {
1491                 ltdb->kv_ops->abort_write(ltdb);
1492                 ltdb->in_transaction--;
1493                 return ret;
1494         }
1495
1496         if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
1497                 ret = ltdb->kv_ops->error(ltdb);
1498                 ltdb->in_transaction--;
1499                 ldb_debug_set(ldb_module_get_ctx(module),
1500                               LDB_DEBUG_FATAL,
1501                               "Failure during "
1502                               "prepare_write): %s -> %s",
1503                               ltdb->kv_ops->errorstr(ltdb),
1504                               ldb_strerror(ret));
1505                 return ret;
1506         }
1507
1508         ltdb->prepared_commit = true;
1509
1510         return LDB_SUCCESS;
1511 }
1512
1513 static int ltdb_end_trans(struct ldb_module *module)
1514 {
1515         int ret;
1516         void *data = ldb_module_get_private(module);
1517         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1518
1519         if (!ltdb->prepared_commit) {
1520                 ret = ltdb_prepare_commit(module);
1521                 if (ret != LDB_SUCCESS) {
1522                         return ret;
1523                 }
1524         }
1525
1526         ltdb->in_transaction--;
1527         ltdb->prepared_commit = false;
1528
1529         if (ltdb->kv_ops->finish_write(ltdb) != 0) {
1530                 ret = ltdb->kv_ops->error(ltdb);
1531                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1532                                        "Failure during tdb_transaction_commit(): %s -> %s",
1533                                        ltdb->kv_ops->errorstr(ltdb),
1534                                        ldb_strerror(ret));
1535                 return ret;
1536         }
1537
1538         return LDB_SUCCESS;
1539 }
1540
1541 static int ltdb_del_trans(struct ldb_module *module)
1542 {
1543         void *data = ldb_module_get_private(module);
1544         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1545
1546         ltdb->in_transaction--;
1547
1548         if (ltdb_index_transaction_cancel(module) != 0) {
1549                 ltdb->kv_ops->abort_write(ltdb);
1550                 return ltdb->kv_ops->error(ltdb);
1551         }
1552
1553         ltdb->kv_ops->abort_write(ltdb);
1554         return LDB_SUCCESS;
1555 }
1556
1557 /*
1558   return sequenceNumber from @BASEINFO
1559 */
1560 static int ltdb_sequence_number(struct ltdb_context *ctx,
1561                                 struct ldb_extended **ext)
1562 {
1563         struct ldb_context *ldb;
1564         struct ldb_module *module = ctx->module;
1565         struct ldb_request *req = ctx->req;
1566         void *data = ldb_module_get_private(module);
1567         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1568         TALLOC_CTX *tmp_ctx = NULL;
1569         struct ldb_seqnum_request *seq;
1570         struct ldb_seqnum_result *res;
1571         struct ldb_message *msg = NULL;
1572         struct ldb_dn *dn;
1573         const char *date;
1574         int ret = LDB_SUCCESS;
1575
1576         ldb = ldb_module_get_ctx(module);
1577
1578         seq = talloc_get_type(req->op.extended.data,
1579                                 struct ldb_seqnum_request);
1580         if (seq == NULL) {
1581                 return LDB_ERR_OPERATIONS_ERROR;
1582         }
1583
1584         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1585
1586         if (ltdb->kv_ops->lock_read(module) != 0) {
1587                 return LDB_ERR_OPERATIONS_ERROR;
1588         }
1589
1590         res = talloc_zero(req, struct ldb_seqnum_result);
1591         if (res == NULL) {
1592                 ret = LDB_ERR_OPERATIONS_ERROR;
1593                 goto done;
1594         }
1595
1596         tmp_ctx = talloc_new(req);
1597         if (tmp_ctx == NULL) {
1598                 ret = LDB_ERR_OPERATIONS_ERROR;
1599                 goto done;
1600         }
1601
1602         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1603         if (dn == NULL) {
1604                 ret = LDB_ERR_OPERATIONS_ERROR;
1605                 goto done;
1606         }
1607
1608         msg = ldb_msg_new(tmp_ctx);
1609         if (msg == NULL) {
1610                 ret = LDB_ERR_OPERATIONS_ERROR;
1611                 goto done;
1612         }
1613
1614         ret = ltdb_search_dn1(module, dn, msg, 0);
1615         if (ret != LDB_SUCCESS) {
1616                 goto done;
1617         }
1618
1619         switch (seq->type) {
1620         case LDB_SEQ_HIGHEST_SEQ:
1621                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1622                 break;
1623         case LDB_SEQ_NEXT:
1624                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1625                 res->seq_num++;
1626                 break;
1627         case LDB_SEQ_HIGHEST_TIMESTAMP:
1628                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1629                 if (date) {
1630                         res->seq_num = ldb_string_to_time(date);
1631                 } else {
1632                         res->seq_num = 0;
1633                         /* zero is as good as anything when we don't know */
1634                 }
1635                 break;
1636         }
1637
1638         *ext = talloc_zero(req, struct ldb_extended);
1639         if (*ext == NULL) {
1640                 ret = LDB_ERR_OPERATIONS_ERROR;
1641                 goto done;
1642         }
1643         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1644         (*ext)->data = talloc_steal(*ext, res);
1645
1646 done:
1647         talloc_free(tmp_ctx);
1648
1649         ltdb->kv_ops->unlock_read(module);
1650         return ret;
1651 }
1652
1653 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1654 {
1655         struct ldb_context *ldb;
1656         struct ldb_request *req;
1657         struct ldb_reply *ares;
1658
1659         ldb = ldb_module_get_ctx(ctx->module);
1660         req = ctx->req;
1661
1662         /* if we already returned an error just return */
1663         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1664                 return;
1665         }
1666
1667         ares = talloc_zero(req, struct ldb_reply);
1668         if (!ares) {
1669                 ldb_oom(ldb);
1670                 req->callback(req, NULL);
1671                 return;
1672         }
1673         ares->type = LDB_REPLY_DONE;
1674         ares->error = error;
1675
1676         req->callback(req, ares);
1677 }
1678
1679 static void ltdb_timeout(struct tevent_context *ev,
1680                           struct tevent_timer *te,
1681                           struct timeval t,
1682                           void *private_data)
1683 {
1684         struct ltdb_context *ctx;
1685         ctx = talloc_get_type(private_data, struct ltdb_context);
1686
1687         if (!ctx->request_terminated) {
1688                 /* request is done now */
1689                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1690         }
1691
1692         if (ctx->spy) {
1693                 /* neutralize the spy */
1694                 ctx->spy->ctx = NULL;
1695                 ctx->spy = NULL;
1696         }
1697         talloc_free(ctx);
1698 }
1699
1700 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1701                                         struct ldb_extended *ext,
1702                                         int error)
1703 {
1704         struct ldb_context *ldb;
1705         struct ldb_request *req;
1706         struct ldb_reply *ares;
1707
1708         ldb = ldb_module_get_ctx(ctx->module);
1709         req = ctx->req;
1710
1711         /* if we already returned an error just return */
1712         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1713                 return;
1714         }
1715
1716         ares = talloc_zero(req, struct ldb_reply);
1717         if (!ares) {
1718                 ldb_oom(ldb);
1719                 req->callback(req, NULL);
1720                 return;
1721         }
1722         ares->type = LDB_REPLY_DONE;
1723         ares->response = ext;
1724         ares->error = error;
1725
1726         req->callback(req, ares);
1727 }
1728
1729 static void ltdb_handle_extended(struct ltdb_context *ctx)
1730 {
1731         struct ldb_extended *ext = NULL;
1732         int ret;
1733
1734         if (strcmp(ctx->req->op.extended.oid,
1735                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1736                 /* get sequence number */
1737                 ret = ltdb_sequence_number(ctx, &ext);
1738         } else {
1739                 /* not recognized */
1740                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1741         }
1742
1743         ltdb_request_extended_done(ctx, ext, ret);
1744 }
1745
1746 struct kv_ctx {
1747         ldb_kv_traverse_fn kv_traverse_fn;
1748         void *ctx;
1749         struct ltdb_private *ltdb;
1750 };
1751
1752 static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
1753 {
1754         struct kv_ctx *kv_ctx = ctx;
1755         struct ldb_val key = {
1756                 .length = tdb_key.dsize,
1757                 .data = tdb_key.dptr,
1758         };
1759         struct ldb_val data = {
1760                 .length = tdb_data.dsize,
1761                 .data = tdb_data.dptr,
1762         };
1763         return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
1764 }
1765
1766 static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
1767 {
1768         struct kv_ctx kv_ctx = {
1769                 .kv_traverse_fn = fn,
1770                 .ctx = ctx,
1771                 .ltdb = ltdb
1772         };
1773         if (ltdb->in_transaction != 0) {
1774                 return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1775         } else {
1776                 return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1777         }
1778 }
1779
1780 static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA key2, TDB_DATA data, void *state)
1781 {
1782         int tdb_ret;
1783         struct ldb_context *ldb;
1784         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1785         struct ldb_module *module = ctx->module;
1786
1787         ldb = ldb_module_get_ctx(module);
1788
1789         tdb_ret = tdb_delete(ltdb->tdb, key);
1790         if (tdb_ret != 0) {
1791                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1792                           "Failed to delete %*.*s "
1793                           "for rekey as %*.*s: %s",
1794                           (int)key.dsize, (int)key.dsize,
1795                           (const char *)key.dptr,
1796                           (int)key2.dsize, (int)key2.dsize,
1797                           (const char *)key.dptr,
1798                           tdb_errorstr(ltdb->tdb));
1799                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1800                 return -1;
1801         }
1802         tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
1803         if (tdb_ret != 0) {
1804                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1805                           "Failed to rekey %*.*s as %*.*s: %s",
1806                           (int)key.dsize, (int)key.dsize,
1807                           (const char *)key.dptr,
1808                           (int)key2.dsize, (int)key2.dsize,
1809                           (const char *)key.dptr,
1810                           tdb_errorstr(ltdb->tdb));
1811                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1812                 return -1;
1813         }
1814         return tdb_ret;
1815 }
1816
1817 static int ltdb_tdb_parse_record(struct ltdb_private *ltdb, TDB_DATA key,
1818                                  int (*parser)(TDB_DATA key, TDB_DATA data,
1819                                                void *private_data),
1820                                  void *ctx)
1821 {
1822         return tdb_parse_record(ltdb->tdb, key, parser, ctx);
1823 }
1824
1825 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
1826 {
1827         return tdb_name(ltdb->tdb);
1828 }
1829
1830 static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
1831 {
1832         int seq = tdb_get_seqnum(ltdb->tdb);
1833         bool has_changed = (seq != ltdb->tdb_seqnum);
1834
1835         ltdb->tdb_seqnum = seq;
1836
1837         return has_changed;
1838 }
1839
1840 static const struct kv_db_ops key_value_ops = {
1841         .store = ltdb_tdb_store,
1842         .delete = ltdb_tdb_delete,
1843         .iterate = ltdb_tdb_traverse_fn,
1844         .update_in_iterate = ltdb_tdb_update_in_iterate,
1845         .fetch_and_parse = ltdb_tdb_parse_record,
1846         .lock_read = ltdb_lock_read,
1847         .unlock_read = ltdb_unlock_read,
1848         .begin_write = ltdb_tdb_transaction_start,
1849         .prepare_write = ltdb_tdb_transaction_prepare_commit,
1850         .finish_write = ltdb_tdb_transaction_commit,
1851         .abort_write = ltdb_tdb_transaction_cancel,
1852         .error = ltdb_error,
1853         .errorstr = ltdb_errorstr,
1854         .name = ltdb_tdb_name,
1855         .has_changed = ltdb_tdb_changed,
1856 };
1857
1858 static void ltdb_callback(struct tevent_context *ev,
1859                           struct tevent_timer *te,
1860                           struct timeval t,
1861                           void *private_data)
1862 {
1863         struct ltdb_context *ctx;
1864         int ret;
1865
1866         ctx = talloc_get_type(private_data, struct ltdb_context);
1867
1868         if (ctx->request_terminated) {
1869                 goto done;
1870         }
1871
1872         switch (ctx->req->operation) {
1873         case LDB_SEARCH:
1874                 ret = ltdb_search(ctx);
1875                 break;
1876         case LDB_ADD:
1877                 ret = ltdb_add(ctx);
1878                 break;
1879         case LDB_MODIFY:
1880                 ret = ltdb_modify(ctx);
1881                 break;
1882         case LDB_DELETE:
1883                 ret = ltdb_delete(ctx);
1884                 break;
1885         case LDB_RENAME:
1886                 ret = ltdb_rename(ctx);
1887                 break;
1888         case LDB_EXTENDED:
1889                 ltdb_handle_extended(ctx);
1890                 goto done;
1891         default:
1892                 /* no other op supported */
1893                 ret = LDB_ERR_PROTOCOL_ERROR;
1894         }
1895
1896         if (!ctx->request_terminated) {
1897                 /* request is done now */
1898                 ltdb_request_done(ctx, ret);
1899         }
1900
1901 done:
1902         if (ctx->spy) {
1903                 /* neutralize the spy */
1904                 ctx->spy->ctx = NULL;
1905                 ctx->spy = NULL;
1906         }
1907         talloc_free(ctx);
1908 }
1909
1910 static int ltdb_request_destructor(void *ptr)
1911 {
1912         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1913
1914         if (spy->ctx != NULL) {
1915                 spy->ctx->spy = NULL;
1916                 spy->ctx->request_terminated = true;
1917                 spy->ctx = NULL;
1918         }
1919
1920         return 0;
1921 }
1922
1923 static int ltdb_handle_request(struct ldb_module *module,
1924                                struct ldb_request *req)
1925 {
1926         struct ldb_control *control_permissive;
1927         struct ldb_context *ldb;
1928         struct tevent_context *ev;
1929         struct ltdb_context *ac;
1930         struct tevent_timer *te;
1931         struct timeval tv;
1932         unsigned int i;
1933
1934         ldb = ldb_module_get_ctx(module);
1935
1936         control_permissive = ldb_request_get_control(req,
1937                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1938
1939         for (i = 0; req->controls && req->controls[i]; i++) {
1940                 if (req->controls[i]->critical &&
1941                     req->controls[i] != control_permissive) {
1942                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1943                                                req->controls[i]->oid);
1944                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1945                 }
1946         }
1947
1948         if (req->starttime == 0 || req->timeout == 0) {
1949                 ldb_set_errstring(ldb, "Invalid timeout settings");
1950                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1951         }
1952
1953         ev = ldb_handle_get_event_context(req->handle);
1954
1955         ac = talloc_zero(ldb, struct ltdb_context);
1956         if (ac == NULL) {
1957                 ldb_oom(ldb);
1958                 return LDB_ERR_OPERATIONS_ERROR;
1959         }
1960
1961         ac->module = module;
1962         ac->req = req;
1963
1964         tv.tv_sec = 0;
1965         tv.tv_usec = 0;
1966         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1967         if (NULL == te) {
1968                 talloc_free(ac);
1969                 return LDB_ERR_OPERATIONS_ERROR;
1970         }
1971
1972         if (req->timeout > 0) {
1973                 tv.tv_sec = req->starttime + req->timeout;
1974                 tv.tv_usec = 0;
1975                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1976                                                      ltdb_timeout, ac);
1977                 if (NULL == ac->timeout_event) {
1978                         talloc_free(ac);
1979                         return LDB_ERR_OPERATIONS_ERROR;
1980                 }
1981         }
1982
1983         /* set a spy so that we do not try to use the request context
1984          * if it is freed before ltdb_callback fires */
1985         ac->spy = talloc(req, struct ltdb_req_spy);
1986         if (NULL == ac->spy) {
1987                 talloc_free(ac);
1988                 return LDB_ERR_OPERATIONS_ERROR;
1989         }
1990         ac->spy->ctx = ac;
1991
1992         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1993
1994         return LDB_SUCCESS;
1995 }
1996
1997 static int ltdb_init_rootdse(struct ldb_module *module)
1998 {
1999         /* ignore errors on this - we expect it for non-sam databases */
2000         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2001
2002         /* there can be no module beyond the backend, just return */
2003         return LDB_SUCCESS;
2004 }
2005
2006
2007 static int generic_lock_read(struct ldb_module *module)
2008 {
2009         void *data = ldb_module_get_private(module);
2010         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2011         return ltdb->kv_ops->lock_read(module);
2012 }
2013
2014 static int generic_unlock_read(struct ldb_module *module)
2015 {
2016         void *data = ldb_module_get_private(module);
2017         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2018         return ltdb->kv_ops->unlock_read(module);
2019 }
2020
2021 static const struct ldb_module_ops ltdb_ops = {
2022         .name              = "tdb",
2023         .init_context      = ltdb_init_rootdse,
2024         .search            = ltdb_handle_request,
2025         .add               = ltdb_handle_request,
2026         .modify            = ltdb_handle_request,
2027         .del               = ltdb_handle_request,
2028         .rename            = ltdb_handle_request,
2029         .extended          = ltdb_handle_request,
2030         .start_transaction = ltdb_start_trans,
2031         .end_transaction   = ltdb_end_trans,
2032         .prepare_commit    = ltdb_prepare_commit,
2033         .del_transaction   = ltdb_del_trans,
2034         .read_lock         = generic_lock_read,
2035         .read_unlock       = generic_unlock_read,
2036 };
2037
2038 int init_store(struct ltdb_private *ltdb,
2039                       const char *name,
2040                       struct ldb_context *ldb,
2041                       const char *options[],
2042                       struct ldb_module **_module)
2043 {
2044         struct ldb_module *module;
2045
2046         if (getenv("LDB_WARN_UNINDEXED")) {
2047                 ltdb->warn_unindexed = true;
2048         }
2049
2050         if (getenv("LDB_WARN_REINDEX")) {
2051                 ltdb->warn_reindex = true;
2052         }
2053
2054         ltdb->sequence_number = 0;
2055
2056         module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
2057         if (!module) {
2058                 ldb_oom(ldb);
2059                 talloc_free(ltdb);
2060                 return LDB_ERR_OPERATIONS_ERROR;
2061         }
2062         ldb_module_set_private(module, ltdb);
2063         talloc_steal(module, ltdb);
2064
2065         if (ltdb_cache_load(module) != 0) {
2066                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
2067                                        "records for backend '%s'", name);
2068                 talloc_free(module);
2069                 return LDB_ERR_OPERATIONS_ERROR;
2070         }
2071
2072         *_module = module;
2073         /*
2074          * Set the maximum key length
2075          */
2076         {
2077                 const char *len_str =
2078                         ldb_options_find(ldb, options,
2079                                          "max_key_len_for_self_test");
2080                 if (len_str != NULL) {
2081                         unsigned len = strtoul(len_str, NULL, 0);
2082                         ltdb->max_key_length = len;
2083                 }
2084         }
2085
2086         return LDB_SUCCESS;
2087 }
2088
2089 /*
2090   connect to the database
2091 */
2092 int ltdb_connect(struct ldb_context *ldb, const char *url,
2093                  unsigned int flags, const char *options[],
2094                  struct ldb_module **_module)
2095 {
2096         const char *path;
2097         int tdb_flags, open_flags;
2098         struct ltdb_private *ltdb;
2099
2100         /*
2101          * We hold locks, so we must use a private event context
2102          * on each returned handle
2103          */
2104         ldb_set_require_private_event_context(ldb);
2105
2106         /* parse the url */
2107         if (strchr(url, ':')) {
2108                 if (strncmp(url, "tdb://", 6) != 0) {
2109                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2110                                   "Invalid tdb URL '%s'", url);
2111                         return LDB_ERR_OPERATIONS_ERROR;
2112                 }
2113                 path = url+6;
2114         } else {
2115                 path = url;
2116         }
2117
2118         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
2119
2120         /* check for the 'nosync' option */
2121         if (flags & LDB_FLG_NOSYNC) {
2122                 tdb_flags |= TDB_NOSYNC;
2123         }
2124
2125         /* and nommap option */
2126         if (flags & LDB_FLG_NOMMAP) {
2127                 tdb_flags |= TDB_NOMMAP;
2128         }
2129
2130         ltdb = talloc_zero(ldb, struct ltdb_private);
2131         if (!ltdb) {
2132                 ldb_oom(ldb);
2133                 return LDB_ERR_OPERATIONS_ERROR;
2134         }
2135
2136         if (flags & LDB_FLG_RDONLY) {
2137                 /*
2138                  * This is weird, but because we can only have one tdb
2139                  * in this process, and the other one could be
2140                  * read-write, we can't use the tdb readonly.  Plus a
2141                  * read only tdb prohibits the all-record lock.
2142                  */
2143                 open_flags = O_RDWR;
2144
2145                 ltdb->read_only = true;
2146
2147         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
2148                 /*
2149                  * This is used by ldbsearch to prevent creation of the database
2150                  * if the name is wrong
2151                  */
2152                 open_flags = O_RDWR;
2153         } else {
2154                 /*
2155                  * This is the normal case
2156                  */
2157                 open_flags = O_CREAT | O_RDWR;
2158         }
2159
2160         ltdb->kv_ops = &key_value_ops;
2161
2162         /* note that we use quite a large default hash size */
2163         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
2164                                    tdb_flags, open_flags,
2165                                    ldb_get_create_perms(ldb), ldb);
2166         if (!ltdb->tdb) {
2167                 ldb_asprintf_errstring(ldb,
2168                                        "Unable to open tdb '%s': %s", path, strerror(errno));
2169                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2170                           "Unable to open tdb '%s': %s", path, strerror(errno));
2171                 talloc_free(ltdb);
2172                 if (errno == EACCES || errno == EPERM) {
2173                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
2174                 }
2175                 return LDB_ERR_OPERATIONS_ERROR;
2176         }
2177
2178         return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
2179 }