ldb_tdb: Disallow TDB nested transactions and use tdb_transaction_active()
[samba.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 static int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (tdb_transaction_active(ltdb->tdb) == false &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 static int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (!tdb_transaction_active(ltdb->tdb) && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /* The caller is to provide a correctly sized key */
225 int ltdb_guid_to_key(struct ldb_module *module,
226                      struct ltdb_private *ltdb,
227                      const struct ldb_val *GUID_val,
228                      TDB_DATA *key)
229 {
230         const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
231         const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
232
233         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
238         memcpy(&key->dptr[GUID_prefix_len],
239                GUID_val->data, GUID_val->length);
240         return LDB_SUCCESS;
241 }
242
243 /*
244  * The caller is to provide a correctly sized key, used only in
245  * the GUID index mode
246  */
247 int ltdb_idx_to_key(struct ldb_module *module,
248                     struct ltdb_private *ltdb,
249                     TALLOC_CTX *mem_ctx,
250                     const struct ldb_val *idx_val,
251                     TDB_DATA *key)
252 {
253         struct ldb_context *ldb = ldb_module_get_ctx(module);
254         struct ldb_dn *dn;
255
256         if (ltdb->cache->GUID_index_attribute != NULL) {
257                 return ltdb_guid_to_key(module, ltdb,
258                                         idx_val, key);
259         }
260
261         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
262         if (dn == NULL) {
263                 /*
264                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
265                  * to the caller, as this in an invalid index value
266                  */
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269         /* form the key */
270         *key = ltdb_key_dn(module, mem_ctx, dn);
271         TALLOC_FREE(dn);
272         if (!key->dptr) {
273                 return ldb_module_oom(module);
274         }
275         return LDB_SUCCESS;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293         int ret;
294
295         if (ltdb->cache->GUID_index_attribute == NULL) {
296                 return ltdb_key_dn(module, mem_ctx, msg->dn);
297         }
298
299         if (ldb_dn_is_special(msg->dn)) {
300                 return ltdb_key_dn(module, mem_ctx, msg->dn);
301         }
302
303         guid_val = ldb_msg_find_ldb_val(msg,
304                                        ltdb->cache->GUID_index_attribute);
305         if (guid_val == NULL) {
306                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
307                                        "Did not find GUID attribute %s "
308                                        "in %s, required for TDB record "
309                                        "key in " LTDB_IDXGUID " mode.",
310                                        ltdb->cache->GUID_index_attribute,
311                                        ldb_dn_get_linearized(msg->dn));
312                 errno = EINVAL;
313                 key.dptr = NULL;
314                 key.dsize = 0;
315                 return key;
316         }
317
318         /* In this case, allocate with talloc */
319         key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
320         if (key.dptr == NULL) {
321                 errno = ENOMEM;
322                 key.dptr = NULL;
323                 key.dsize = 0;
324                 return key;
325         }
326         key.dsize = talloc_get_size(key.dptr);
327
328         ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
329
330         if (ret != LDB_SUCCESS) {
331                 errno = EINVAL;
332                 key.dptr = NULL;
333                 key.dsize = 0;
334                 return key;
335         }
336         return key;
337 }
338
339 /*
340   check special dn's have valid attributes
341   currently only @ATTRIBUTES is checked
342 */
343 static int ltdb_check_special_dn(struct ldb_module *module,
344                                  const struct ldb_message *msg)
345 {
346         struct ldb_context *ldb = ldb_module_get_ctx(module);
347         unsigned int i, j;
348
349         if (! ldb_dn_is_special(msg->dn) ||
350             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                 return LDB_SUCCESS;
352         }
353
354         /* we have @ATTRIBUTES, let's check attributes are fine */
355         /* should we check that we deny multivalued attributes ? */
356         for (i = 0; i < msg->num_elements; i++) {
357                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
358
359                 for (j = 0; j < msg->elements[i].num_values; j++) {
360                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
361                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
362                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
363                         }
364                 }
365         }
366
367         return LDB_SUCCESS;
368 }
369
370
371 /*
372   we've made a modification to a dn - possibly reindex and
373   update sequence number
374 */
375 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         int ret = LDB_SUCCESS;
378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
379
380         /* only allow modifies inside a transaction, otherwise the
381          * ldb is unsafe */
382         if (ltdb->kv_ops->transaction_active(ltdb) == false) {
383                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         if (ldb_dn_is_special(dn) &&
388             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
389              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
390         {
391                 if (ltdb->warn_reindex) {
392                         ldb_debug(ldb_module_get_ctx(module),
393                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
394                                 ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
395                 }
396                 ret = ltdb_reindex(module);
397         }
398
399         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
400         if (ret == LDB_SUCCESS &&
401             !(ldb_dn_is_special(dn) &&
402               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
403                 ret = ltdb_increase_sequence_number(module);
404         }
405
406         /* If the modify was to @OPTIONS, reload the cache */
407         if (ret == LDB_SUCCESS &&
408             ldb_dn_is_special(dn) &&
409             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
410                 ret = ltdb_cache_reload(module);
411         }
412
413         if (ret != LDB_SUCCESS) {
414                 ltdb->reindex_failed = true;
415         }
416
417         return ret;
418 }
419
420 static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
421                           struct ldb_val ldb_data, int flags)
422 {
423         TDB_DATA key = {
424                 .dptr = ldb_key.data,
425                 .dsize = ldb_key.length
426         };
427         TDB_DATA data = {
428                 .dptr = ldb_data.data,
429                 .dsize = ldb_data.length
430         };
431         bool transaction_active = tdb_transaction_active(ltdb->tdb);
432         if (transaction_active == false){
433                 return LDB_ERR_PROTOCOL_ERROR;
434         }
435         return tdb_store(ltdb->tdb, key, data, flags);
436 }
437
438 static int ltdb_error(struct ltdb_private *ltdb)
439 {
440         return ltdb_err_map(tdb_error(ltdb->tdb));
441 }
442
443 static const char *ltdb_errorstr(struct ltdb_private *ltdb)
444 {
445         return tdb_errorstr(ltdb->tdb);
446 }
447
448 /*
449   store a record into the db
450 */
451 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
452 {
453         void *data = ldb_module_get_private(module);
454         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
455         TDB_DATA tdb_key;
456         struct ldb_val ldb_key;
457         struct ldb_val ldb_data;
458         int ret = LDB_SUCCESS;
459         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
460
461         if (tdb_key_ctx == NULL) {
462                 return ldb_module_oom(module);
463         }
464
465         if (ltdb->read_only) {
466                 return LDB_ERR_UNWILLING_TO_PERFORM;
467         }
468
469         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
470         if (tdb_key.dptr == NULL) {
471                 TALLOC_FREE(tdb_key_ctx);
472                 return LDB_ERR_OTHER;
473         }
474
475         ret = ldb_pack_data(ldb_module_get_ctx(module),
476                             msg, &ldb_data);
477         if (ret == -1) {
478                 TALLOC_FREE(tdb_key_ctx);
479                 return LDB_ERR_OTHER;
480         }
481
482         ldb_key.data = tdb_key.dptr;
483         ldb_key.length = tdb_key.dsize;
484
485         ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
486         if (ret != 0) {
487                 bool is_special = ldb_dn_is_special(msg->dn);
488                 ret = ltdb->kv_ops->error(ltdb);
489
490                 /*
491                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
492                  * the GUID, so re-map
493                  */
494                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
495                     && !is_special
496                     && ltdb->cache->GUID_index_attribute != NULL) {
497                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
498                 }
499                 goto done;
500         }
501
502 done:
503         TALLOC_FREE(tdb_key_ctx);
504         talloc_free(ldb_data.data);
505
506         return ret;
507 }
508
509
510 /*
511   check if a attribute is a single valued, for a given element
512  */
513 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
514                                   struct ldb_message_element *el)
515 {
516         if (!a) return false;
517         if (el != NULL) {
518                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
519                         /* override from a ldb module, for example
520                            used for the description field, which is
521                            marked multi-valued in the schema but which
522                            should not actually accept multiple
523                            values */
524                         return true;
525                 }
526                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
527                         /* override from a ldb module, for example used for
528                            deleted linked attribute entries */
529                         return false;
530                 }
531         }
532         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
533                 return true;
534         }
535         return false;
536 }
537
538 static int ltdb_add_internal(struct ldb_module *module,
539                              struct ltdb_private *ltdb,
540                              const struct ldb_message *msg,
541                              bool check_single_value)
542 {
543         struct ldb_context *ldb = ldb_module_get_ctx(module);
544         int ret = LDB_SUCCESS;
545         unsigned int i;
546
547         for (i=0;i<msg->num_elements;i++) {
548                 struct ldb_message_element *el = &msg->elements[i];
549                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
550
551                 if (el->num_values == 0) {
552                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
553                                                el->name, ldb_dn_get_linearized(msg->dn));
554                         return LDB_ERR_CONSTRAINT_VIOLATION;
555                 }
556                 if (check_single_value &&
557                                 el->num_values > 1 &&
558                                 ldb_tdb_single_valued(a, el)) {
559                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
560                                                el->name, ldb_dn_get_linearized(msg->dn));
561                         return LDB_ERR_CONSTRAINT_VIOLATION;
562                 }
563
564                 /* Do not check "@ATTRIBUTES" for duplicated values */
565                 if (ldb_dn_is_special(msg->dn) &&
566                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
567                         continue;
568                 }
569
570                 if (check_single_value &&
571                     !(el->flags &
572                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
573                         struct ldb_val *duplicate = NULL;
574
575                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
576                                                          el, &duplicate, 0);
577                         if (ret != LDB_SUCCESS) {
578                                 return ret;
579                         }
580                         if (duplicate != NULL) {
581                                 ldb_asprintf_errstring(
582                                         ldb,
583                                         "attribute '%s': value '%.*s' on '%s' "
584                                         "provided more than once in ADD object",
585                                         el->name,
586                                         (int)duplicate->length,
587                                         duplicate->data,
588                                         ldb_dn_get_linearized(msg->dn));
589                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
590                         }
591                 }
592         }
593
594         ret = ltdb_store(module, msg, TDB_INSERT);
595         if (ret != LDB_SUCCESS) {
596                 /*
597                  * Try really hard to get the right error code for
598                  * a re-add situation, as this can matter!
599                  */
600                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
601                         int ret2;
602                         struct ldb_dn *dn2 = NULL;
603                         TALLOC_CTX *mem_ctx = talloc_new(module);
604                         if (mem_ctx == NULL) {
605                                 return ldb_module_operr(module);
606                         }
607                         ret2 = ltdb_search_base(module, module,
608                                                 msg->dn, &dn2);
609                         TALLOC_FREE(mem_ctx);
610                         if (ret2 == LDB_SUCCESS) {
611                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
612                         }
613                 }
614                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
615                         ldb_asprintf_errstring(ldb,
616                                                "Entry %s already exists",
617                                                ldb_dn_get_linearized(msg->dn));
618                 }
619                 return ret;
620         }
621
622         ret = ltdb_index_add_new(module, ltdb, msg);
623         if (ret != LDB_SUCCESS) {
624                 /*
625                  * If we failed to index, delete the message again.
626                  *
627                  * This is particularly important for the GUID index
628                  * case, which will only fail for a duplicate DN
629                  * in the index add.
630                  *
631                  * Note that the caller may not cancel the transation
632                  * and this means the above add might really show up!
633                  */
634                 ltdb_delete_noindex(module, msg);
635                 return ret;
636         }
637
638         ret = ltdb_modified(module, msg->dn);
639
640         return ret;
641 }
642
643 /*
644   add a record to the database
645 */
646 static int ltdb_add(struct ltdb_context *ctx)
647 {
648         struct ldb_module *module = ctx->module;
649         struct ldb_request *req = ctx->req;
650         void *data = ldb_module_get_private(module);
651         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
652         int ret = LDB_SUCCESS;
653
654         if (ltdb->max_key_length != 0 &&
655             ltdb->cache->GUID_index_attribute == NULL &&
656             !ldb_dn_is_special(req->op.add.message->dn))
657         {
658                 ldb_set_errstring(ldb_module_get_ctx(module),
659                                   "Must operate ldb_mdb in GUID "
660                                   "index mode, but " LTDB_IDXGUID " not set.");
661                 return LDB_ERR_UNWILLING_TO_PERFORM;
662         }
663
664         ret = ltdb_check_special_dn(module, req->op.add.message);
665         if (ret != LDB_SUCCESS) {
666                 return ret;
667         }
668
669         ldb_request_set_state(req, LDB_ASYNC_PENDING);
670
671         if (ltdb_cache_load(module) != 0) {
672                 return LDB_ERR_OPERATIONS_ERROR;
673         }
674
675         ret = ltdb_add_internal(module, ltdb,
676                                 req->op.add.message, true);
677
678         return ret;
679 }
680
681 static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
682 {
683         TDB_DATA tdb_key = {
684                 .dptr = ldb_key.data,
685                 .dsize = ldb_key.length
686         };
687         bool transaction_active = tdb_transaction_active(ltdb->tdb);
688         if (transaction_active == false){
689                 return LDB_ERR_PROTOCOL_ERROR;
690         }
691         return tdb_delete(ltdb->tdb, tdb_key);
692 }
693
694 /*
695   delete a record from the database, not updating indexes (used for deleting
696   index records)
697 */
698 int ltdb_delete_noindex(struct ldb_module *module,
699                         const struct ldb_message *msg)
700 {
701         void *data = ldb_module_get_private(module);
702         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
703         struct ldb_val ldb_key;
704         TDB_DATA tdb_key;
705         int ret;
706         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
707
708         if (tdb_key_ctx == NULL) {
709                 return ldb_module_oom(module);
710         }
711
712         if (ltdb->read_only) {
713                 return LDB_ERR_UNWILLING_TO_PERFORM;
714         }
715
716         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
717         if (!tdb_key.dptr) {
718                 TALLOC_FREE(tdb_key_ctx);
719                 return LDB_ERR_OTHER;
720         }
721
722         ldb_key.data = tdb_key.dptr;
723         ldb_key.length = tdb_key.dsize;
724
725         ret = ltdb->kv_ops->delete(ltdb, ldb_key);
726         TALLOC_FREE(tdb_key_ctx);
727
728         if (ret != 0) {
729                 ret = ltdb->kv_ops->error(ltdb);
730         }
731
732         return ret;
733 }
734
735 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
736 {
737         struct ldb_message *msg;
738         int ret = LDB_SUCCESS;
739
740         msg = ldb_msg_new(module);
741         if (msg == NULL) {
742                 return LDB_ERR_OPERATIONS_ERROR;
743         }
744
745         /* in case any attribute of the message was indexed, we need
746            to fetch the old record */
747         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
748         if (ret != LDB_SUCCESS) {
749                 /* not finding the old record is an error */
750                 goto done;
751         }
752
753         ret = ltdb_delete_noindex(module, msg);
754         if (ret != LDB_SUCCESS) {
755                 goto done;
756         }
757
758         /* remove any indexed attributes */
759         ret = ltdb_index_delete(module, msg);
760         if (ret != LDB_SUCCESS) {
761                 goto done;
762         }
763
764         ret = ltdb_modified(module, dn);
765         if (ret != LDB_SUCCESS) {
766                 goto done;
767         }
768
769 done:
770         talloc_free(msg);
771         return ret;
772 }
773
774 /*
775   delete a record from the database
776 */
777 static int ltdb_delete(struct ltdb_context *ctx)
778 {
779         struct ldb_module *module = ctx->module;
780         struct ldb_request *req = ctx->req;
781         int ret = LDB_SUCCESS;
782
783         ldb_request_set_state(req, LDB_ASYNC_PENDING);
784
785         if (ltdb_cache_load(module) != 0) {
786                 return LDB_ERR_OPERATIONS_ERROR;
787         }
788
789         ret = ltdb_delete_internal(module, req->op.del.dn);
790
791         return ret;
792 }
793
794 /*
795   find an element by attribute name. At the moment this does a linear search,
796   it should be re-coded to use a binary search once all places that modify
797   records guarantee sorted order
798
799   return the index of the first matching element if found, otherwise -1
800 */
801 static int find_element(const struct ldb_message *msg, const char *name)
802 {
803         unsigned int i;
804         for (i=0;i<msg->num_elements;i++) {
805                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
806                         return i;
807                 }
808         }
809         return -1;
810 }
811
812
813 /*
814   add an element to an existing record. Assumes a elements array that we
815   can call re-alloc on, and assumed that we can re-use the data pointers from
816   the passed in additional values. Use with care!
817
818   returns 0 on success, -1 on failure (and sets errno)
819 */
820 static int ltdb_msg_add_element(struct ldb_message *msg,
821                                 struct ldb_message_element *el)
822 {
823         struct ldb_message_element *e2;
824         unsigned int i;
825
826         if (el->num_values == 0) {
827                 /* nothing to do here - we don't add empty elements */
828                 return 0;
829         }
830
831         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
832                               msg->num_elements+1);
833         if (!e2) {
834                 errno = ENOMEM;
835                 return -1;
836         }
837
838         msg->elements = e2;
839
840         e2 = &msg->elements[msg->num_elements];
841
842         e2->name = el->name;
843         e2->flags = el->flags;
844         e2->values = talloc_array(msg->elements,
845                                   struct ldb_val, el->num_values);
846         if (!e2->values) {
847                 errno = ENOMEM;
848                 return -1;
849         }
850         for (i=0;i<el->num_values;i++) {
851                 e2->values[i] = el->values[i];
852         }
853         e2->num_values = el->num_values;
854
855         ++msg->num_elements;
856
857         return 0;
858 }
859
860 /*
861   delete all elements having a specified attribute name
862 */
863 static int msg_delete_attribute(struct ldb_module *module,
864                                 struct ltdb_private *ltdb,
865                                 struct ldb_message *msg, const char *name)
866 {
867         unsigned int i;
868         int ret;
869         struct ldb_message_element *el;
870         bool is_special = ldb_dn_is_special(msg->dn);
871
872         if (!is_special
873             && ltdb->cache->GUID_index_attribute != NULL
874             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
875                 struct ldb_context *ldb = ldb_module_get_ctx(module);
876                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
877                                        "attribute %s (used as DB index)",
878                                        ltdb->cache->GUID_index_attribute);
879                 return LDB_ERR_CONSTRAINT_VIOLATION;
880         }
881
882         el = ldb_msg_find_element(msg, name);
883         if (el == NULL) {
884                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
885         }
886         i = el - msg->elements;
887
888         ret = ltdb_index_del_element(module, ltdb, msg, el);
889         if (ret != LDB_SUCCESS) {
890                 return ret;
891         }
892
893         talloc_free(el->values);
894         if (msg->num_elements > (i+1)) {
895                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
896         }
897         msg->num_elements--;
898         msg->elements = talloc_realloc(msg, msg->elements,
899                                        struct ldb_message_element,
900                                        msg->num_elements);
901         return LDB_SUCCESS;
902 }
903
904 /*
905   delete all elements matching an attribute name/value
906
907   return LDB Error on failure
908 */
909 static int msg_delete_element(struct ldb_module *module,
910                               struct ltdb_private *ltdb,
911                               struct ldb_message *msg,
912                               const char *name,
913                               const struct ldb_val *val)
914 {
915         struct ldb_context *ldb = ldb_module_get_ctx(module);
916         unsigned int i;
917         int found, ret;
918         struct ldb_message_element *el;
919         const struct ldb_schema_attribute *a;
920
921         found = find_element(msg, name);
922         if (found == -1) {
923                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
924         }
925
926         i = (unsigned int) found;
927         el = &(msg->elements[i]);
928
929         a = ldb_schema_attribute_by_name(ldb, el->name);
930
931         for (i=0;i<el->num_values;i++) {
932                 bool matched;
933                 if (a->syntax->operator_fn) {
934                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
935                                                      &el->values[i], val, &matched);
936                         if (ret != LDB_SUCCESS) return ret;
937                 } else {
938                         matched = (a->syntax->comparison_fn(ldb, ldb,
939                                                             &el->values[i], val) == 0);
940                 }
941                 if (matched) {
942                         if (el->num_values == 1) {
943                                 return msg_delete_attribute(module,
944                                                             ltdb, msg, name);
945                         }
946
947                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
948                         if (ret != LDB_SUCCESS) {
949                                 return ret;
950                         }
951
952                         if (i<el->num_values-1) {
953                                 memmove(&el->values[i], &el->values[i+1],
954                                         sizeof(el->values[i])*
955                                                 (el->num_values-(i+1)));
956                         }
957                         el->num_values--;
958
959                         /* per definition we find in a canonicalised message an
960                            attribute value only once. So we are finished here */
961                         return LDB_SUCCESS;
962                 }
963         }
964
965         /* Not found */
966         return LDB_ERR_NO_SUCH_ATTRIBUTE;
967 }
968
969 /*
970   modify a record - internal interface
971
972   yuck - this is O(n^2). Luckily n is usually small so we probably
973   get away with it, but if we ever have really large attribute lists
974   then we'll need to look at this again
975
976   'req' is optional, and is used to specify controls if supplied
977 */
978 int ltdb_modify_internal(struct ldb_module *module,
979                          const struct ldb_message *msg,
980                          struct ldb_request *req)
981 {
982         struct ldb_context *ldb = ldb_module_get_ctx(module);
983         void *data = ldb_module_get_private(module);
984         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
985         struct ldb_message *msg2;
986         unsigned int i, j;
987         int ret = LDB_SUCCESS, idx;
988         struct ldb_control *control_permissive = NULL;
989         TALLOC_CTX *mem_ctx = talloc_new(req);
990
991         if (mem_ctx == NULL) {
992                 return ldb_module_oom(module);
993         }
994         
995         if (req) {
996                 control_permissive = ldb_request_get_control(req,
997                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
998         }
999
1000         msg2 = ldb_msg_new(mem_ctx);
1001         if (msg2 == NULL) {
1002                 ret = LDB_ERR_OTHER;
1003                 goto done;
1004         }
1005
1006         ret = ltdb_search_dn1(module, msg->dn,
1007                               msg2,
1008                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1009         if (ret != LDB_SUCCESS) {
1010                 goto done;
1011         }
1012
1013         for (i=0; i<msg->num_elements; i++) {
1014                 struct ldb_message_element *el = &msg->elements[i], *el2;
1015                 struct ldb_val *vals;
1016                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
1017                 const char *dn;
1018                 uint32_t options = 0;
1019                 if (control_permissive != NULL) {
1020                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
1021                 }
1022
1023                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
1024                 case LDB_FLAG_MOD_ADD:
1025
1026                         if (el->num_values == 0) {
1027                                 ldb_asprintf_errstring(ldb,
1028                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
1029                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1030                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
1031                                 goto done;
1032                         }
1033
1034                         /* make a copy of the array so that a permissive
1035                          * control can remove duplicates without changing the
1036                          * original values, but do not copy data as we do not
1037                          * need to keep it around once the operation is
1038                          * finished */
1039                         if (control_permissive) {
1040                                 el = talloc(msg2, struct ldb_message_element);
1041                                 if (!el) {
1042                                         ret = LDB_ERR_OTHER;
1043                                         goto done;
1044                                 }
1045                                 *el = msg->elements[i];
1046                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
1047                                 if (el->values == NULL) {
1048                                         ret = LDB_ERR_OTHER;
1049                                         goto done;
1050                                 }
1051                                 for (j = 0; j < el->num_values; j++) {
1052                                         el->values[j] = msg->elements[i].values[j];
1053                                 }
1054                         }
1055
1056                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1057                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1058                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1059                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1060                                 goto done;
1061                         }
1062
1063                         /* Checks if element already exists */
1064                         idx = find_element(msg2, el->name);
1065                         if (idx == -1) {
1066                                 if (ltdb_msg_add_element(msg2, el) != 0) {
1067                                         ret = LDB_ERR_OTHER;
1068                                         goto done;
1069                                 }
1070                                 ret = ltdb_index_add_element(module, ltdb,
1071                                                              msg2,
1072                                                              el);
1073                                 if (ret != LDB_SUCCESS) {
1074                                         goto done;
1075                                 }
1076                         } else {
1077                                 j = (unsigned int) idx;
1078                                 el2 = &(msg2->elements[j]);
1079
1080                                 /* We cannot add another value on a existing one
1081                                    if the attribute is single-valued */
1082                                 if (ldb_tdb_single_valued(a, el)) {
1083                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1084                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1085                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1086                                         goto done;
1087                                 }
1088
1089                                 /* Check that values don't exist yet on multi-
1090                                    valued attributes or aren't provided twice */
1091                                 if (!(el->flags &
1092                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1093                                         struct ldb_val *duplicate = NULL;
1094                                         ret = ldb_msg_find_common_values(ldb,
1095                                                                          msg2,
1096                                                                          el,
1097                                                                          el2,
1098                                                                          options);
1099
1100                                         if (ret ==
1101                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1102                                                 ldb_asprintf_errstring(ldb,
1103                                                         "attribute '%s': value "
1104                                                         "#%u on '%s' already "
1105                                                         "exists", el->name, j,
1106                                                         ldb_dn_get_linearized(msg2->dn));
1107                                                 goto done;
1108                                         } else if (ret != LDB_SUCCESS) {
1109                                                 goto done;
1110                                         }
1111
1112                                         ret = ldb_msg_find_duplicate_val(
1113                                                 ldb, msg2, el, &duplicate, 0);
1114                                         if (ret != LDB_SUCCESS) {
1115                                                 goto done;
1116                                         }
1117                                         if (duplicate != NULL) {
1118                                                 ldb_asprintf_errstring(
1119                                                         ldb,
1120                                                         "attribute '%s': value "
1121                                                         "'%.*s' on '%s' "
1122                                                         "provided more than "
1123                                                         "once in ADD",
1124                                                         el->name,
1125                                                         (int)duplicate->length,
1126                                                         duplicate->data,
1127                                                         ldb_dn_get_linearized(msg->dn));
1128                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1129                                                 goto done;
1130                                         }
1131                                 }
1132
1133                                 /* Now combine existing and new values to a new
1134                                    attribute record */
1135                                 vals = talloc_realloc(msg2->elements,
1136                                                       el2->values, struct ldb_val,
1137                                                       el2->num_values + el->num_values);
1138                                 if (vals == NULL) {
1139                                         ldb_oom(ldb);
1140                                         ret = LDB_ERR_OTHER;
1141                                         goto done;
1142                                 }
1143
1144                                 for (j=0; j<el->num_values; j++) {
1145                                         vals[el2->num_values + j] =
1146                                                 ldb_val_dup(vals, &el->values[j]);
1147                                 }
1148
1149                                 el2->values = vals;
1150                                 el2->num_values += el->num_values;
1151
1152                                 ret = ltdb_index_add_element(module, ltdb,
1153                                                              msg2, el);
1154                                 if (ret != LDB_SUCCESS) {
1155                                         goto done;
1156                                 }
1157                         }
1158
1159                         break;
1160
1161                 case LDB_FLAG_MOD_REPLACE:
1162
1163                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1164                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1165                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1166                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1167                                 goto done;
1168                         }
1169
1170                         /*
1171                          * We don't need to check this if we have been
1172                          * pre-screened by the repl_meta_data module
1173                          * in Samba, or someone else who can claim to
1174                          * know what they are doing. 
1175                          */
1176                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1177                                 struct ldb_val *duplicate = NULL;
1178
1179                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1180                                                                  &duplicate, 0);
1181                                 if (ret != LDB_SUCCESS) {
1182                                         goto done;
1183                                 }
1184                                 if (duplicate != NULL) {
1185                                         ldb_asprintf_errstring(
1186                                                 ldb,
1187                                                 "attribute '%s': value '%.*s' "
1188                                                 "on '%s' provided more than "
1189                                                 "once in REPLACE",
1190                                                 el->name,
1191                                                 (int)duplicate->length,
1192                                                 duplicate->data,
1193                                                 ldb_dn_get_linearized(msg2->dn));
1194                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1195                                         goto done;
1196                                 }
1197                         }
1198
1199                         /* Checks if element already exists */
1200                         idx = find_element(msg2, el->name);
1201                         if (idx != -1) {
1202                                 j = (unsigned int) idx;
1203                                 el2 = &(msg2->elements[j]);
1204
1205                                 /* we consider two elements to be
1206                                  * equal only if the order
1207                                  * matches. This allows dbcheck to
1208                                  * fix the ordering on attributes
1209                                  * where order matters, such as
1210                                  * objectClass
1211                                  */
1212                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1213                                         continue;
1214                                 }
1215
1216                                 /* Delete the attribute if it exists in the DB */
1217                                 if (msg_delete_attribute(module, ltdb,
1218                                                          msg2,
1219                                                          el->name) != 0) {
1220                                         ret = LDB_ERR_OTHER;
1221                                         goto done;
1222                                 }
1223                         }
1224
1225                         /* Recreate it with the new values */
1226                         if (ltdb_msg_add_element(msg2, el) != 0) {
1227                                 ret = LDB_ERR_OTHER;
1228                                 goto done;
1229                         }
1230
1231                         ret = ltdb_index_add_element(module, ltdb,
1232                                                      msg2, el);
1233                         if (ret != LDB_SUCCESS) {
1234                                 goto done;
1235                         }
1236
1237                         break;
1238
1239                 case LDB_FLAG_MOD_DELETE:
1240                         dn = ldb_dn_get_linearized(msg2->dn);
1241                         if (dn == NULL) {
1242                                 ret = LDB_ERR_OTHER;
1243                                 goto done;
1244                         }
1245
1246                         if (msg->elements[i].num_values == 0) {
1247                                 /* Delete the whole attribute */
1248                                 ret = msg_delete_attribute(module,
1249                                                            ltdb,
1250                                                            msg2,
1251                                                            msg->elements[i].name);
1252                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1253                                     control_permissive) {
1254                                         ret = LDB_SUCCESS;
1255                                 } else {
1256                                         ldb_asprintf_errstring(ldb,
1257                                                                "attribute '%s': no such attribute for delete on '%s'",
1258                                                                msg->elements[i].name, dn);
1259                                 }
1260                                 if (ret != LDB_SUCCESS) {
1261                                         goto done;
1262                                 }
1263                         } else {
1264                                 /* Delete specified values from an attribute */
1265                                 for (j=0; j < msg->elements[i].num_values; j++) {
1266                                         ret = msg_delete_element(module,
1267                                                                  ltdb,
1268                                                                  msg2,
1269                                                                  msg->elements[i].name,
1270                                                                  &msg->elements[i].values[j]);
1271                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1272                                             control_permissive) {
1273                                                 ret = LDB_SUCCESS;
1274                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1275                                                 ldb_asprintf_errstring(ldb,
1276                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1277                                                                        msg->elements[i].name, dn);
1278                                         }
1279                                         if (ret != LDB_SUCCESS) {
1280                                                 goto done;
1281                                         }
1282                                 }
1283                         }
1284                         break;
1285                 default:
1286                         ldb_asprintf_errstring(ldb,
1287                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1288                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1289                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1290                         ret = LDB_ERR_PROTOCOL_ERROR;
1291                         goto done;
1292                 }
1293         }
1294
1295         ret = ltdb_store(module, msg2, TDB_MODIFY);
1296         if (ret != LDB_SUCCESS) {
1297                 goto done;
1298         }
1299
1300         ret = ltdb_modified(module, msg2->dn);
1301         if (ret != LDB_SUCCESS) {
1302                 goto done;
1303         }
1304
1305 done:
1306         TALLOC_FREE(mem_ctx);
1307         return ret;
1308 }
1309
1310 /*
1311   modify a record
1312 */
1313 static int ltdb_modify(struct ltdb_context *ctx)
1314 {
1315         struct ldb_module *module = ctx->module;
1316         struct ldb_request *req = ctx->req;
1317         int ret = LDB_SUCCESS;
1318
1319         ret = ltdb_check_special_dn(module, req->op.mod.message);
1320         if (ret != LDB_SUCCESS) {
1321                 return ret;
1322         }
1323
1324         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1325
1326         if (ltdb_cache_load(module) != 0) {
1327                 return LDB_ERR_OPERATIONS_ERROR;
1328         }
1329
1330         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1331
1332         return ret;
1333 }
1334
1335 /*
1336   rename a record
1337 */
1338 static int ltdb_rename(struct ltdb_context *ctx)
1339 {
1340         struct ldb_module *module = ctx->module;
1341         void *data = ldb_module_get_private(module);
1342         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1343         struct ldb_request *req = ctx->req;
1344         struct ldb_message *msg;
1345         int ret = LDB_SUCCESS;
1346         TDB_DATA tdb_key, tdb_key_old;
1347         struct ldb_dn *db_dn;
1348
1349         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1350
1351         if (ltdb_cache_load(ctx->module) != 0) {
1352                 return LDB_ERR_OPERATIONS_ERROR;
1353         }
1354
1355         msg = ldb_msg_new(ctx);
1356         if (msg == NULL) {
1357                 return LDB_ERR_OPERATIONS_ERROR;
1358         }
1359
1360         /* we need to fetch the old record to re-add under the new name */
1361         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1362                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1363         if (ret != LDB_SUCCESS) {
1364                 /* not finding the old record is an error */
1365                 return ret;
1366         }
1367
1368         /* We need to, before changing the DB, check if the new DN
1369          * exists, so we can return this error to the caller with an
1370          * unmodified DB
1371          *
1372          * Even in GUID index mode we use ltdb_key_dn() as we are
1373          * trying to figure out if this is just a case rename
1374          */
1375         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1376         if (!tdb_key.dptr) {
1377                 talloc_free(msg);
1378                 return LDB_ERR_OPERATIONS_ERROR;
1379         }
1380
1381         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1382         if (!tdb_key_old.dptr) {
1383                 talloc_free(msg);
1384                 talloc_free(tdb_key.dptr);
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         /*
1389          * Only declare a conflict if the new DN already exists,
1390          * and it isn't a case change on the old DN
1391          */
1392         if (tdb_key_old.dsize != tdb_key.dsize
1393             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1394                 ret = ltdb_search_base(module, msg,
1395                                        req->op.rename.newdn,
1396                                        &db_dn);
1397                 if (ret == LDB_SUCCESS) {
1398                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1399                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1400                         ret = LDB_SUCCESS;
1401                 }
1402         }
1403
1404         /* finding the new record already in the DB is an error */
1405
1406         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1407                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1408                                        "Entry %s already exists",
1409                                        ldb_dn_get_linearized(req->op.rename.newdn));
1410         }
1411         if (ret != LDB_SUCCESS) {
1412                 talloc_free(tdb_key_old.dptr);
1413                 talloc_free(tdb_key.dptr);
1414                 talloc_free(msg);
1415                 return ret;
1416         }
1417
1418         talloc_free(tdb_key_old.dptr);
1419         talloc_free(tdb_key.dptr);
1420
1421         /* Always delete first then add, to avoid conflicts with
1422          * unique indexes. We rely on the transaction to make this
1423          * atomic
1424          */
1425         ret = ltdb_delete_internal(module, msg->dn);
1426         if (ret != LDB_SUCCESS) {
1427                 talloc_free(msg);
1428                 return ret;
1429         }
1430
1431         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1432         if (msg->dn == NULL) {
1433                 talloc_free(msg);
1434                 return LDB_ERR_OPERATIONS_ERROR;
1435         }
1436
1437         /* We don't check single value as we can have more than 1 with
1438          * deleted attributes. We could go through all elements but that's
1439          * maybe not the most efficient way
1440          */
1441         ret = ltdb_add_internal(module, ltdb, msg, false);
1442
1443         talloc_free(msg);
1444
1445         return ret;
1446 }
1447
1448 static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
1449 {
1450         return tdb_transaction_start(ltdb->tdb);
1451 }
1452
1453 static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
1454 {
1455         return tdb_transaction_cancel(ltdb->tdb);
1456 }
1457
1458 static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
1459 {
1460         return tdb_transaction_prepare_commit(ltdb->tdb);
1461 }
1462
1463 static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
1464 {
1465         return tdb_transaction_commit(ltdb->tdb);
1466 }
1467
1468 static int ltdb_start_trans(struct ldb_module *module)
1469 {
1470         void *data = ldb_module_get_private(module);
1471         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1472
1473         /* Do not take out the transaction lock on a read-only DB */
1474         if (ltdb->read_only) {
1475                 return LDB_ERR_UNWILLING_TO_PERFORM;
1476         }
1477
1478         if (ltdb->kv_ops->begin_write(ltdb) != 0) {
1479                 return ltdb->kv_ops->error(ltdb);
1480         }
1481
1482
1483         ltdb_index_transaction_start(module);
1484
1485         ltdb->reindex_failed = false;
1486
1487         return LDB_SUCCESS;
1488 }
1489
1490 /*
1491  * Forward declaration to allow prepare_commit to in fact abort the
1492  * transaction
1493  */
1494 static int ltdb_del_trans(struct ldb_module *module);
1495
1496 static int ltdb_prepare_commit(struct ldb_module *module)
1497 {
1498         int ret;
1499         void *data = ldb_module_get_private(module);
1500         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1501
1502         if (!ltdb->kv_ops->transaction_active(ltdb)) {
1503                 ldb_set_errstring(ldb_module_get_ctx(module),
1504                                   "ltdb_prepare_commit() called "
1505                                   "without transaction active");
1506                 return LDB_ERR_OPERATIONS_ERROR;
1507         }
1508
1509         /*
1510          * Check if the last re-index failed.
1511          *
1512          * This can happen if for example a duplicate value was marked
1513          * unique.  We must not write a partial re-index into the DB.
1514          */
1515         if (ltdb->reindex_failed) {
1516                 /*
1517                  * We must instead abort the transaction so we get the
1518                  * old values and old index back
1519                  */
1520                 ltdb_del_trans(module);
1521                 ldb_set_errstring(ldb_module_get_ctx(module),
1522                                   "Failure during re-index, so "
1523                                   "transaction must be aborted.");
1524                 return LDB_ERR_OPERATIONS_ERROR;
1525         }
1526
1527         ret = ltdb_index_transaction_commit(module);
1528         if (ret != LDB_SUCCESS) {
1529                 ltdb->kv_ops->abort_write(ltdb);
1530                 return ret;
1531         }
1532
1533         if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
1534                 ret = ltdb->kv_ops->error(ltdb);
1535                 ldb_debug_set(ldb_module_get_ctx(module),
1536                               LDB_DEBUG_FATAL,
1537                               "Failure during "
1538                               "prepare_write): %s -> %s",
1539                               ltdb->kv_ops->errorstr(ltdb),
1540                               ldb_strerror(ret));
1541                 return ret;
1542         }
1543
1544         ltdb->prepared_commit = true;
1545
1546         return LDB_SUCCESS;
1547 }
1548
1549 static int ltdb_end_trans(struct ldb_module *module)
1550 {
1551         int ret;
1552         void *data = ldb_module_get_private(module);
1553         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1554
1555         if (!ltdb->prepared_commit) {
1556                 ret = ltdb_prepare_commit(module);
1557                 if (ret != LDB_SUCCESS) {
1558                         return ret;
1559                 }
1560         }
1561
1562         ltdb->prepared_commit = false;
1563
1564         if (ltdb->kv_ops->finish_write(ltdb) != 0) {
1565                 ret = ltdb->kv_ops->error(ltdb);
1566                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1567                                        "Failure during tdb_transaction_commit(): %s -> %s",
1568                                        ltdb->kv_ops->errorstr(ltdb),
1569                                        ldb_strerror(ret));
1570                 return ret;
1571         }
1572
1573         return LDB_SUCCESS;
1574 }
1575
1576 static int ltdb_del_trans(struct ldb_module *module)
1577 {
1578         void *data = ldb_module_get_private(module);
1579         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1580
1581
1582         if (ltdb_index_transaction_cancel(module) != 0) {
1583                 ltdb->kv_ops->abort_write(ltdb);
1584                 return ltdb->kv_ops->error(ltdb);
1585         }
1586
1587         ltdb->kv_ops->abort_write(ltdb);
1588         return LDB_SUCCESS;
1589 }
1590
1591 /*
1592   return sequenceNumber from @BASEINFO
1593 */
1594 static int ltdb_sequence_number(struct ltdb_context *ctx,
1595                                 struct ldb_extended **ext)
1596 {
1597         struct ldb_context *ldb;
1598         struct ldb_module *module = ctx->module;
1599         struct ldb_request *req = ctx->req;
1600         void *data = ldb_module_get_private(module);
1601         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1602         TALLOC_CTX *tmp_ctx = NULL;
1603         struct ldb_seqnum_request *seq;
1604         struct ldb_seqnum_result *res;
1605         struct ldb_message *msg = NULL;
1606         struct ldb_dn *dn;
1607         const char *date;
1608         int ret = LDB_SUCCESS;
1609
1610         ldb = ldb_module_get_ctx(module);
1611
1612         seq = talloc_get_type(req->op.extended.data,
1613                                 struct ldb_seqnum_request);
1614         if (seq == NULL) {
1615                 return LDB_ERR_OPERATIONS_ERROR;
1616         }
1617
1618         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1619
1620         if (ltdb->kv_ops->lock_read(module) != 0) {
1621                 return LDB_ERR_OPERATIONS_ERROR;
1622         }
1623
1624         res = talloc_zero(req, struct ldb_seqnum_result);
1625         if (res == NULL) {
1626                 ret = LDB_ERR_OPERATIONS_ERROR;
1627                 goto done;
1628         }
1629
1630         tmp_ctx = talloc_new(req);
1631         if (tmp_ctx == NULL) {
1632                 ret = LDB_ERR_OPERATIONS_ERROR;
1633                 goto done;
1634         }
1635
1636         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1637         if (dn == NULL) {
1638                 ret = LDB_ERR_OPERATIONS_ERROR;
1639                 goto done;
1640         }
1641
1642         msg = ldb_msg_new(tmp_ctx);
1643         if (msg == NULL) {
1644                 ret = LDB_ERR_OPERATIONS_ERROR;
1645                 goto done;
1646         }
1647
1648         ret = ltdb_search_dn1(module, dn, msg, 0);
1649         if (ret != LDB_SUCCESS) {
1650                 goto done;
1651         }
1652
1653         switch (seq->type) {
1654         case LDB_SEQ_HIGHEST_SEQ:
1655                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1656                 break;
1657         case LDB_SEQ_NEXT:
1658                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1659                 res->seq_num++;
1660                 break;
1661         case LDB_SEQ_HIGHEST_TIMESTAMP:
1662                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1663                 if (date) {
1664                         res->seq_num = ldb_string_to_time(date);
1665                 } else {
1666                         res->seq_num = 0;
1667                         /* zero is as good as anything when we don't know */
1668                 }
1669                 break;
1670         }
1671
1672         *ext = talloc_zero(req, struct ldb_extended);
1673         if (*ext == NULL) {
1674                 ret = LDB_ERR_OPERATIONS_ERROR;
1675                 goto done;
1676         }
1677         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1678         (*ext)->data = talloc_steal(*ext, res);
1679
1680 done:
1681         talloc_free(tmp_ctx);
1682
1683         ltdb->kv_ops->unlock_read(module);
1684         return ret;
1685 }
1686
1687 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1688 {
1689         struct ldb_context *ldb;
1690         struct ldb_request *req;
1691         struct ldb_reply *ares;
1692
1693         ldb = ldb_module_get_ctx(ctx->module);
1694         req = ctx->req;
1695
1696         /* if we already returned an error just return */
1697         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1698                 return;
1699         }
1700
1701         ares = talloc_zero(req, struct ldb_reply);
1702         if (!ares) {
1703                 ldb_oom(ldb);
1704                 req->callback(req, NULL);
1705                 return;
1706         }
1707         ares->type = LDB_REPLY_DONE;
1708         ares->error = error;
1709
1710         req->callback(req, ares);
1711 }
1712
1713 static void ltdb_timeout(struct tevent_context *ev,
1714                           struct tevent_timer *te,
1715                           struct timeval t,
1716                           void *private_data)
1717 {
1718         struct ltdb_context *ctx;
1719         ctx = talloc_get_type(private_data, struct ltdb_context);
1720
1721         if (!ctx->request_terminated) {
1722                 /* request is done now */
1723                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1724         }
1725
1726         if (ctx->spy) {
1727                 /* neutralize the spy */
1728                 ctx->spy->ctx = NULL;
1729                 ctx->spy = NULL;
1730         }
1731         talloc_free(ctx);
1732 }
1733
1734 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1735                                         struct ldb_extended *ext,
1736                                         int error)
1737 {
1738         struct ldb_context *ldb;
1739         struct ldb_request *req;
1740         struct ldb_reply *ares;
1741
1742         ldb = ldb_module_get_ctx(ctx->module);
1743         req = ctx->req;
1744
1745         /* if we already returned an error just return */
1746         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1747                 return;
1748         }
1749
1750         ares = talloc_zero(req, struct ldb_reply);
1751         if (!ares) {
1752                 ldb_oom(ldb);
1753                 req->callback(req, NULL);
1754                 return;
1755         }
1756         ares->type = LDB_REPLY_DONE;
1757         ares->response = ext;
1758         ares->error = error;
1759
1760         req->callback(req, ares);
1761 }
1762
1763 static void ltdb_handle_extended(struct ltdb_context *ctx)
1764 {
1765         struct ldb_extended *ext = NULL;
1766         int ret;
1767
1768         if (strcmp(ctx->req->op.extended.oid,
1769                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1770                 /* get sequence number */
1771                 ret = ltdb_sequence_number(ctx, &ext);
1772         } else {
1773                 /* not recognized */
1774                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1775         }
1776
1777         ltdb_request_extended_done(ctx, ext, ret);
1778 }
1779
1780 struct kv_ctx {
1781         ldb_kv_traverse_fn kv_traverse_fn;
1782         void *ctx;
1783         struct ltdb_private *ltdb;
1784         int (*parser)(struct ldb_val key,
1785                       struct ldb_val data,
1786                       void *private_data);
1787 };
1788
1789 static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
1790 {
1791         struct kv_ctx *kv_ctx = ctx;
1792         struct ldb_val key = {
1793                 .length = tdb_key.dsize,
1794                 .data = tdb_key.dptr,
1795         };
1796         struct ldb_val data = {
1797                 .length = tdb_data.dsize,
1798                 .data = tdb_data.dptr,
1799         };
1800         return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
1801 }
1802
1803 static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
1804 {
1805         struct kv_ctx kv_ctx = {
1806                 .kv_traverse_fn = fn,
1807                 .ctx = ctx,
1808                 .ltdb = ltdb
1809         };
1810         if (tdb_transaction_active(ltdb->tdb)) {
1811                 return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1812         } else {
1813                 return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
1814         }
1815 }
1816
1817 static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
1818                                       struct ldb_val ldb_key,
1819                                       struct ldb_val ldb_key2,
1820                                       struct ldb_val ldb_data, void *state)
1821 {
1822         int tdb_ret;
1823         struct ldb_context *ldb;
1824         struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
1825         struct ldb_module *module = ctx->module;
1826         TDB_DATA key = {
1827                 .dptr = ldb_key.data,
1828                 .dsize = ldb_key.length
1829         };
1830         TDB_DATA key2 = {
1831                 .dptr = ldb_key2.data,
1832                 .dsize = ldb_key2.length
1833         };
1834         TDB_DATA data = {
1835                 .dptr = ldb_data.data,
1836                 .dsize = ldb_data.length
1837         };
1838
1839         ldb = ldb_module_get_ctx(module);
1840
1841         tdb_ret = tdb_delete(ltdb->tdb, key);
1842         if (tdb_ret != 0) {
1843                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1844                           "Failed to delete %*.*s "
1845                           "for rekey as %*.*s: %s",
1846                           (int)key.dsize, (int)key.dsize,
1847                           (const char *)key.dptr,
1848                           (int)key2.dsize, (int)key2.dsize,
1849                           (const char *)key.dptr,
1850                           tdb_errorstr(ltdb->tdb));
1851                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1852                 return -1;
1853         }
1854         tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
1855         if (tdb_ret != 0) {
1856                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1857                           "Failed to rekey %*.*s as %*.*s: %s",
1858                           (int)key.dsize, (int)key.dsize,
1859                           (const char *)key.dptr,
1860                           (int)key2.dsize, (int)key2.dsize,
1861                           (const char *)key.dptr,
1862                           tdb_errorstr(ltdb->tdb));
1863                 ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
1864                 return -1;
1865         }
1866         return tdb_ret;
1867 }
1868
1869 static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
1870                                          void *ctx)
1871 {
1872         struct kv_ctx *kv_ctx = ctx;
1873         struct ldb_val key = {
1874                 .length = tdb_key.dsize,
1875                 .data = tdb_key.dptr,
1876         };
1877         struct ldb_val data = {
1878                 .length = tdb_data.dsize,
1879                 .data = tdb_data.dptr,
1880         };
1881
1882         return kv_ctx->parser(key, data, kv_ctx->ctx);
1883 }
1884
1885 static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
1886                                  struct ldb_val ldb_key,
1887                                  int (*parser)(struct ldb_val key,
1888                                                struct ldb_val data,
1889                                                void *private_data),
1890                                  void *ctx)
1891 {
1892         struct kv_ctx kv_ctx = {
1893                 .parser = parser,
1894                 .ctx = ctx,
1895                 .ltdb = ltdb
1896         };
1897         TDB_DATA key = {
1898                 .dptr = ldb_key.data,
1899                 .dsize = ldb_key.length
1900         };
1901         int ret;
1902
1903         ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
1904                                &kv_ctx);
1905         if (ret == 0) {
1906                 return LDB_SUCCESS;
1907         }
1908         return ltdb_err_map(tdb_error(ltdb->tdb));
1909 }
1910
1911 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
1912 {
1913         return tdb_name(ltdb->tdb);
1914 }
1915
1916 static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
1917 {
1918         int seq = tdb_get_seqnum(ltdb->tdb);
1919         bool has_changed = (seq != ltdb->tdb_seqnum);
1920
1921         ltdb->tdb_seqnum = seq;
1922
1923         return has_changed;
1924 }
1925
1926 static bool ltdb_transaction_active(struct ltdb_private *ltdb)
1927 {
1928         return tdb_transaction_active(ltdb->tdb);
1929 }
1930
1931 static const struct kv_db_ops key_value_ops = {
1932         .store = ltdb_tdb_store,
1933         .delete = ltdb_tdb_delete,
1934         .iterate = ltdb_tdb_traverse_fn,
1935         .update_in_iterate = ltdb_tdb_update_in_iterate,
1936         .fetch_and_parse = ltdb_tdb_parse_record,
1937         .lock_read = ltdb_lock_read,
1938         .unlock_read = ltdb_unlock_read,
1939         .begin_write = ltdb_tdb_transaction_start,
1940         .prepare_write = ltdb_tdb_transaction_prepare_commit,
1941         .finish_write = ltdb_tdb_transaction_commit,
1942         .abort_write = ltdb_tdb_transaction_cancel,
1943         .error = ltdb_error,
1944         .errorstr = ltdb_errorstr,
1945         .name = ltdb_tdb_name,
1946         .has_changed = ltdb_tdb_changed,
1947         .transaction_active = ltdb_transaction_active,
1948 };
1949
1950 static void ltdb_callback(struct tevent_context *ev,
1951                           struct tevent_timer *te,
1952                           struct timeval t,
1953                           void *private_data)
1954 {
1955         struct ltdb_context *ctx;
1956         int ret;
1957
1958         ctx = talloc_get_type(private_data, struct ltdb_context);
1959
1960         if (ctx->request_terminated) {
1961                 goto done;
1962         }
1963
1964         switch (ctx->req->operation) {
1965         case LDB_SEARCH:
1966                 ret = ltdb_search(ctx);
1967                 break;
1968         case LDB_ADD:
1969                 ret = ltdb_add(ctx);
1970                 break;
1971         case LDB_MODIFY:
1972                 ret = ltdb_modify(ctx);
1973                 break;
1974         case LDB_DELETE:
1975                 ret = ltdb_delete(ctx);
1976                 break;
1977         case LDB_RENAME:
1978                 ret = ltdb_rename(ctx);
1979                 break;
1980         case LDB_EXTENDED:
1981                 ltdb_handle_extended(ctx);
1982                 goto done;
1983         default:
1984                 /* no other op supported */
1985                 ret = LDB_ERR_PROTOCOL_ERROR;
1986         }
1987
1988         if (!ctx->request_terminated) {
1989                 /* request is done now */
1990                 ltdb_request_done(ctx, ret);
1991         }
1992
1993 done:
1994         if (ctx->spy) {
1995                 /* neutralize the spy */
1996                 ctx->spy->ctx = NULL;
1997                 ctx->spy = NULL;
1998         }
1999         talloc_free(ctx);
2000 }
2001
2002 static int ltdb_request_destructor(void *ptr)
2003 {
2004         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
2005
2006         if (spy->ctx != NULL) {
2007                 spy->ctx->spy = NULL;
2008                 spy->ctx->request_terminated = true;
2009                 spy->ctx = NULL;
2010         }
2011
2012         return 0;
2013 }
2014
2015 static int ltdb_handle_request(struct ldb_module *module,
2016                                struct ldb_request *req)
2017 {
2018         struct ldb_control *control_permissive;
2019         struct ldb_context *ldb;
2020         struct tevent_context *ev;
2021         struct ltdb_context *ac;
2022         struct tevent_timer *te;
2023         struct timeval tv;
2024         unsigned int i;
2025
2026         ldb = ldb_module_get_ctx(module);
2027
2028         control_permissive = ldb_request_get_control(req,
2029                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2030
2031         for (i = 0; req->controls && req->controls[i]; i++) {
2032                 if (req->controls[i]->critical &&
2033                     req->controls[i] != control_permissive) {
2034                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
2035                                                req->controls[i]->oid);
2036                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
2037                 }
2038         }
2039
2040         if (req->starttime == 0 || req->timeout == 0) {
2041                 ldb_set_errstring(ldb, "Invalid timeout settings");
2042                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
2043         }
2044
2045         ev = ldb_handle_get_event_context(req->handle);
2046
2047         ac = talloc_zero(ldb, struct ltdb_context);
2048         if (ac == NULL) {
2049                 ldb_oom(ldb);
2050                 return LDB_ERR_OPERATIONS_ERROR;
2051         }
2052
2053         ac->module = module;
2054         ac->req = req;
2055
2056         tv.tv_sec = 0;
2057         tv.tv_usec = 0;
2058         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
2059         if (NULL == te) {
2060                 talloc_free(ac);
2061                 return LDB_ERR_OPERATIONS_ERROR;
2062         }
2063
2064         if (req->timeout > 0) {
2065                 tv.tv_sec = req->starttime + req->timeout;
2066                 tv.tv_usec = 0;
2067                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
2068                                                      ltdb_timeout, ac);
2069                 if (NULL == ac->timeout_event) {
2070                         talloc_free(ac);
2071                         return LDB_ERR_OPERATIONS_ERROR;
2072                 }
2073         }
2074
2075         /* set a spy so that we do not try to use the request context
2076          * if it is freed before ltdb_callback fires */
2077         ac->spy = talloc(req, struct ltdb_req_spy);
2078         if (NULL == ac->spy) {
2079                 talloc_free(ac);
2080                 return LDB_ERR_OPERATIONS_ERROR;
2081         }
2082         ac->spy->ctx = ac;
2083
2084         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
2085
2086         return LDB_SUCCESS;
2087 }
2088
2089 static int ltdb_init_rootdse(struct ldb_module *module)
2090 {
2091         /* ignore errors on this - we expect it for non-sam databases */
2092         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
2093
2094         /* there can be no module beyond the backend, just return */
2095         return LDB_SUCCESS;
2096 }
2097
2098
2099 static int generic_lock_read(struct ldb_module *module)
2100 {
2101         void *data = ldb_module_get_private(module);
2102         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2103         return ltdb->kv_ops->lock_read(module);
2104 }
2105
2106 static int generic_unlock_read(struct ldb_module *module)
2107 {
2108         void *data = ldb_module_get_private(module);
2109         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
2110         return ltdb->kv_ops->unlock_read(module);
2111 }
2112
2113 static const struct ldb_module_ops ltdb_ops = {
2114         .name              = "tdb",
2115         .init_context      = ltdb_init_rootdse,
2116         .search            = ltdb_handle_request,
2117         .add               = ltdb_handle_request,
2118         .modify            = ltdb_handle_request,
2119         .del               = ltdb_handle_request,
2120         .rename            = ltdb_handle_request,
2121         .extended          = ltdb_handle_request,
2122         .start_transaction = ltdb_start_trans,
2123         .end_transaction   = ltdb_end_trans,
2124         .prepare_commit    = ltdb_prepare_commit,
2125         .del_transaction   = ltdb_del_trans,
2126         .read_lock         = generic_lock_read,
2127         .read_unlock       = generic_unlock_read,
2128 };
2129
2130 int init_store(struct ltdb_private *ltdb,
2131                       const char *name,
2132                       struct ldb_context *ldb,
2133                       const char *options[],
2134                       struct ldb_module **_module)
2135 {
2136         struct ldb_module *module;
2137
2138         if (getenv("LDB_WARN_UNINDEXED")) {
2139                 ltdb->warn_unindexed = true;
2140         }
2141
2142         if (getenv("LDB_WARN_REINDEX")) {
2143                 ltdb->warn_reindex = true;
2144         }
2145
2146         ltdb->sequence_number = 0;
2147
2148         module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
2149         if (!module) {
2150                 ldb_oom(ldb);
2151                 talloc_free(ltdb);
2152                 return LDB_ERR_OPERATIONS_ERROR;
2153         }
2154         ldb_module_set_private(module, ltdb);
2155         talloc_steal(module, ltdb);
2156
2157         if (ltdb_cache_load(module) != 0) {
2158                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
2159                                        "records for backend '%s'", name);
2160                 talloc_free(module);
2161                 return LDB_ERR_OPERATIONS_ERROR;
2162         }
2163
2164         *_module = module;
2165         /*
2166          * Set or override the maximum key length
2167          *
2168          * The ldb_mdb code will have set this to 511, but our tests
2169          * set this even smaller (to make the tests more practical).
2170          *
2171          * This must only be used for the selftest as the length
2172          * becomes encoded in the index keys.
2173          */
2174         {
2175                 const char *len_str =
2176                         ldb_options_find(ldb, options,
2177                                          "max_key_len_for_self_test");
2178                 if (len_str != NULL) {
2179                         unsigned len = strtoul(len_str, NULL, 0);
2180                         ltdb->max_key_length = len;
2181                 }
2182         }
2183
2184         return LDB_SUCCESS;
2185 }
2186
2187 /*
2188   connect to the database
2189 */
2190 int ltdb_connect(struct ldb_context *ldb, const char *url,
2191                  unsigned int flags, const char *options[],
2192                  struct ldb_module **_module)
2193 {
2194         const char *path;
2195         int tdb_flags, open_flags;
2196         struct ltdb_private *ltdb;
2197
2198         /*
2199          * We hold locks, so we must use a private event context
2200          * on each returned handle
2201          */
2202         ldb_set_require_private_event_context(ldb);
2203
2204         /* parse the url */
2205         if (strchr(url, ':')) {
2206                 if (strncmp(url, "tdb://", 6) != 0) {
2207                         ldb_debug(ldb, LDB_DEBUG_ERROR,
2208                                   "Invalid tdb URL '%s'", url);
2209                         return LDB_ERR_OPERATIONS_ERROR;
2210                 }
2211                 path = url+6;
2212         } else {
2213                 path = url;
2214         }
2215
2216         tdb_flags = TDB_DEFAULT | TDB_SEQNUM | TDB_DISALLOW_NESTING;
2217
2218         /* check for the 'nosync' option */
2219         if (flags & LDB_FLG_NOSYNC) {
2220                 tdb_flags |= TDB_NOSYNC;
2221         }
2222
2223         /* and nommap option */
2224         if (flags & LDB_FLG_NOMMAP) {
2225                 tdb_flags |= TDB_NOMMAP;
2226         }
2227
2228         ltdb = talloc_zero(ldb, struct ltdb_private);
2229         if (!ltdb) {
2230                 ldb_oom(ldb);
2231                 return LDB_ERR_OPERATIONS_ERROR;
2232         }
2233
2234         if (flags & LDB_FLG_RDONLY) {
2235                 /*
2236                  * This is weird, but because we can only have one tdb
2237                  * in this process, and the other one could be
2238                  * read-write, we can't use the tdb readonly.  Plus a
2239                  * read only tdb prohibits the all-record lock.
2240                  */
2241                 open_flags = O_RDWR;
2242
2243                 ltdb->read_only = true;
2244
2245         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
2246                 /*
2247                  * This is used by ldbsearch to prevent creation of the database
2248                  * if the name is wrong
2249                  */
2250                 open_flags = O_RDWR;
2251         } else {
2252                 /*
2253                  * This is the normal case
2254                  */
2255                 open_flags = O_CREAT | O_RDWR;
2256         }
2257
2258         ltdb->kv_ops = &key_value_ops;
2259
2260         /* note that we use quite a large default hash size */
2261         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
2262                                    tdb_flags, open_flags,
2263                                    ldb_get_create_perms(ldb), ldb);
2264         if (!ltdb->tdb) {
2265                 ldb_asprintf_errstring(ldb,
2266                                        "Unable to open tdb '%s': %s", path, strerror(errno));
2267                 ldb_debug(ldb, LDB_DEBUG_ERROR,
2268                           "Unable to open tdb '%s': %s", path, strerror(errno));
2269                 talloc_free(ltdb);
2270                 if (errno == EACCES || errno == EPERM) {
2271                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
2272                 }
2273                 return LDB_ERR_OPERATIONS_ERROR;
2274         }
2275
2276         return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
2277 }