ldb_tdb: Give a good error message on add without an objectGUID
[metze/samba/wip.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < sizeof(LTDB_GUID_KEY_PREFIX)) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, LTDB_GUID_KEY_PREFIX,
164                    sizeof(LTDB_GUID_KEY_PREFIX) - 1) == 0) {
165                 return true;
166         }
167         
168         return false;
169 }
170
171 /*
172   form a TDB_DATA for a record key
173   caller frees
174
175   note that the key for a record can depend on whether the
176   dn refers to a case sensitive index record or not
177 */
178 TDB_DATA ltdb_key_dn(struct ldb_module *module, TALLOC_CTX *mem_ctx,
179                      struct ldb_dn *dn)
180 {
181         TDB_DATA key;
182         char *key_str = NULL;
183         const char *dn_folded = NULL;
184
185         /*
186           most DNs are case insensitive. The exception is index DNs for
187           case sensitive attributes
188
189           there are 3 cases dealt with in this code:
190
191           1) if the dn doesn't start with @ then uppercase the attribute
192              names and the attributes values of case insensitive attributes
193           2) if the dn starts with @ then leave it alone -
194              the indexing code handles the rest
195         */
196
197         dn_folded = ldb_dn_get_casefold(dn);
198         if (!dn_folded) {
199                 goto failed;
200         }
201
202         key_str = talloc_strdup(mem_ctx, "DN=");
203         if (!key_str) {
204                 goto failed;
205         }
206
207         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
208         if (!key_str) {
209                 goto failed;
210         }
211
212         key.dptr = (uint8_t *)key_str;
213         key.dsize = strlen(key_str) + 1;
214
215         return key;
216
217 failed:
218         errno = ENOMEM;
219         key.dptr = NULL;
220         key.dsize = 0;
221         return key;
222 }
223
224 /* The caller is to provide a correctly sized key */
225 int ltdb_guid_to_key(struct ldb_module *module,
226                      struct ltdb_private *ltdb,
227                      const struct ldb_val *GUID_val,
228                      TDB_DATA *key)
229 {
230         const char *GUID_prefix = LTDB_GUID_KEY_PREFIX;
231         const int GUID_prefix_len = sizeof(LTDB_GUID_KEY_PREFIX) - 1;
232
233         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
234                 return LDB_ERR_OPERATIONS_ERROR;
235         }
236
237         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
238         memcpy(&key->dptr[GUID_prefix_len],
239                GUID_val->data, GUID_val->length);
240         return LDB_SUCCESS;
241 }
242
243 /*
244  * The caller is to provide a correctly sized key, used only in
245  * the GUID index mode
246  */
247 int ltdb_idx_to_key(struct ldb_module *module,
248                     struct ltdb_private *ltdb,
249                     TALLOC_CTX *mem_ctx,
250                     const struct ldb_val *idx_val,
251                     TDB_DATA *key)
252 {
253         struct ldb_context *ldb = ldb_module_get_ctx(module);
254         struct ldb_dn *dn;
255
256         if (ltdb->cache->GUID_index_attribute != NULL) {
257                 return ltdb_guid_to_key(module, ltdb,
258                                         idx_val, key);
259         }
260
261         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
262         if (dn == NULL) {
263                 /*
264                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
265                  * to the caller, as this in an invalid index value
266                  */
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269         /* form the key */
270         *key = ltdb_key_dn(module, mem_ctx, dn);
271         TALLOC_FREE(dn);
272         if (!key->dptr) {
273                 return ldb_module_oom(module);
274         }
275         return LDB_SUCCESS;
276 }
277
278 /*
279   form a TDB_DATA for a record key
280   caller frees mem_ctx, which may or may not have the key
281   as a child.
282
283   note that the key for a record can depend on whether a
284   GUID index is in use, or the DN is used as the key
285 */
286 TDB_DATA ltdb_key_msg(struct ldb_module *module, TALLOC_CTX *mem_ctx,
287                       const struct ldb_message *msg)
288 {
289         void *data = ldb_module_get_private(module);
290         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
291         TDB_DATA key;
292         const struct ldb_val *guid_val;
293         int ret;
294
295         if (ltdb->cache->GUID_index_attribute == NULL) {
296                 return ltdb_key_dn(module, mem_ctx, msg->dn);
297         }
298
299         if (ldb_dn_is_special(msg->dn)) {
300                 return ltdb_key_dn(module, mem_ctx, msg->dn);
301         }
302
303         guid_val = ldb_msg_find_ldb_val(msg,
304                                        ltdb->cache->GUID_index_attribute);
305         if (guid_val == NULL) {
306                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
307                                        "Did not find GUID attribute %s "
308                                        "in %s, required for TDB record "
309                                        "key in " LTDB_IDXGUID " mode.",
310                                        ltdb->cache->GUID_index_attribute,
311                                        ldb_dn_get_linearized(msg->dn));
312                 errno = EINVAL;
313                 key.dptr = NULL;
314                 key.dsize = 0;
315                 return key;
316         }
317
318         /* In this case, allocate with talloc */
319         key.dptr = talloc_size(mem_ctx, LTDB_GUID_KEY_SIZE);
320         if (key.dptr == NULL) {
321                 errno = ENOMEM;
322                 key.dptr = NULL;
323                 key.dsize = 0;
324                 return key;
325         }
326         key.dsize = talloc_get_size(key.dptr);
327
328         ret = ltdb_guid_to_key(module, ltdb, guid_val, &key);
329
330         if (ret != LDB_SUCCESS) {
331                 errno = EINVAL;
332                 key.dptr = NULL;
333                 key.dsize = 0;
334                 return key;
335         }
336         return key;
337 }
338
339 /*
340   check special dn's have valid attributes
341   currently only @ATTRIBUTES is checked
342 */
343 static int ltdb_check_special_dn(struct ldb_module *module,
344                                  const struct ldb_message *msg)
345 {
346         struct ldb_context *ldb = ldb_module_get_ctx(module);
347         unsigned int i, j;
348
349         if (! ldb_dn_is_special(msg->dn) ||
350             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                 return LDB_SUCCESS;
352         }
353
354         /* we have @ATTRIBUTES, let's check attributes are fine */
355         /* should we check that we deny multivalued attributes ? */
356         for (i = 0; i < msg->num_elements; i++) {
357                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
358
359                 for (j = 0; j < msg->elements[i].num_values; j++) {
360                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
361                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
362                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
363                         }
364                 }
365         }
366
367         return LDB_SUCCESS;
368 }
369
370
371 /*
372   we've made a modification to a dn - possibly reindex and
373   update sequence number
374 */
375 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         int ret = LDB_SUCCESS;
378         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
379
380         /* only allow modifies inside a transaction, otherwise the
381          * ldb is unsafe */
382         if (ltdb->in_transaction == 0) {
383                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         if (ldb_dn_is_special(dn) &&
388             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
389              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
390         {
391                 if (ltdb->warn_reindex) {
392                         ldb_debug(ldb_module_get_ctx(module),
393                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
394                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
395                 }
396                 ret = ltdb_reindex(module);
397         }
398
399         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
400         if (ret == LDB_SUCCESS &&
401             !(ldb_dn_is_special(dn) &&
402               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
403                 ret = ltdb_increase_sequence_number(module);
404         }
405
406         /* If the modify was to @OPTIONS, reload the cache */
407         if (ret == LDB_SUCCESS &&
408             ldb_dn_is_special(dn) &&
409             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
410                 ret = ltdb_cache_reload(module);
411         }
412
413         return ret;
414 }
415
416 /*
417   store a record into the db
418 */
419 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
420 {
421         void *data = ldb_module_get_private(module);
422         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
423         TDB_DATA tdb_key, tdb_data;
424         struct ldb_val ldb_data;
425         int ret = LDB_SUCCESS;
426         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
427
428         if (tdb_key_ctx == NULL) {
429                 return ldb_module_oom(module);
430         }
431
432         if (ltdb->read_only) {
433                 return LDB_ERR_UNWILLING_TO_PERFORM;
434         }
435
436         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
437         if (tdb_key.dptr == NULL) {
438                 TALLOC_FREE(tdb_key_ctx);
439                 return LDB_ERR_OTHER;
440         }
441
442         ret = ldb_pack_data(ldb_module_get_ctx(module),
443                             msg, &ldb_data);
444         if (ret == -1) {
445                 TALLOC_FREE(tdb_key_ctx);
446                 return LDB_ERR_OTHER;
447         }
448
449         tdb_data.dptr = ldb_data.data;
450         tdb_data.dsize = ldb_data.length;
451
452         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
453         if (ret != 0) {
454                 bool is_special = ldb_dn_is_special(msg->dn);
455                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
456
457                 /*
458                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
459                  * the GUID, so re-map
460                  */
461                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS
462                     && !is_special
463                     && ltdb->cache->GUID_index_attribute != NULL) {
464                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
465                 }
466                 goto done;
467         }
468
469 done:
470         TALLOC_FREE(tdb_key_ctx);
471         talloc_free(ldb_data.data);
472
473         return ret;
474 }
475
476
477 /*
478   check if a attribute is a single valued, for a given element
479  */
480 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
481                                   struct ldb_message_element *el)
482 {
483         if (!a) return false;
484         if (el != NULL) {
485                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
486                         /* override from a ldb module, for example
487                            used for the description field, which is
488                            marked multi-valued in the schema but which
489                            should not actually accept multiple
490                            values */
491                         return true;
492                 }
493                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
494                         /* override from a ldb module, for example used for
495                            deleted linked attribute entries */
496                         return false;
497                 }
498         }
499         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
500                 return true;
501         }
502         return false;
503 }
504
505 static int ltdb_add_internal(struct ldb_module *module,
506                              struct ltdb_private *ltdb,
507                              const struct ldb_message *msg,
508                              bool check_single_value)
509 {
510         struct ldb_context *ldb = ldb_module_get_ctx(module);
511         int ret = LDB_SUCCESS;
512         unsigned int i;
513
514         for (i=0;i<msg->num_elements;i++) {
515                 struct ldb_message_element *el = &msg->elements[i];
516                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
517
518                 if (el->num_values == 0) {
519                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
520                                                el->name, ldb_dn_get_linearized(msg->dn));
521                         return LDB_ERR_CONSTRAINT_VIOLATION;
522                 }
523                 if (check_single_value &&
524                                 el->num_values > 1 &&
525                                 ldb_tdb_single_valued(a, el)) {
526                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
527                                                el->name, ldb_dn_get_linearized(msg->dn));
528                         return LDB_ERR_CONSTRAINT_VIOLATION;
529                 }
530
531                 /* Do not check "@ATTRIBUTES" for duplicated values */
532                 if (ldb_dn_is_special(msg->dn) &&
533                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
534                         continue;
535                 }
536
537                 if (check_single_value &&
538                     !(el->flags &
539                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
540                         struct ldb_val *duplicate = NULL;
541
542                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
543                                                          el, &duplicate, 0);
544                         if (ret != LDB_SUCCESS) {
545                                 return ret;
546                         }
547                         if (duplicate != NULL) {
548                                 ldb_asprintf_errstring(
549                                         ldb,
550                                         "attribute '%s': value '%.*s' on '%s' "
551                                         "provided more than once in ADD object",
552                                         el->name,
553                                         (int)duplicate->length,
554                                         duplicate->data,
555                                         ldb_dn_get_linearized(msg->dn));
556                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
557                         }
558                 }
559         }
560
561         ret = ltdb_store(module, msg, TDB_INSERT);
562         if (ret != LDB_SUCCESS) {
563                 /*
564                  * Try really hard to get the right error code for
565                  * a re-add situation, as this can matter!
566                  */
567                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
568                         int ret2;
569                         struct ldb_dn *dn2 = NULL;
570                         TALLOC_CTX *mem_ctx = talloc_new(module);
571                         if (mem_ctx == NULL) {
572                                 return ldb_module_operr(module);
573                         }
574                         ret2 = ltdb_search_base(module, module,
575                                                 msg->dn, &dn2);
576                         TALLOC_FREE(mem_ctx);
577                         if (ret2 == LDB_SUCCESS) {
578                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
579                         }
580                 }
581                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
582                         ldb_asprintf_errstring(ldb,
583                                                "Entry %s already exists",
584                                                ldb_dn_get_linearized(msg->dn));
585                 }
586                 return ret;
587         }
588
589         ret = ltdb_index_add_new(module, ltdb, msg);
590         if (ret != LDB_SUCCESS) {
591                 /*
592                  * If we failed to index, delete the message again.
593                  *
594                  * This is particularly important for the GUID index
595                  * case, which will only fail for a duplicate DN
596                  * in the index add.
597                  *
598                  * Note that the caller may not cancel the transation
599                  * and this means the above add might really show up!
600                  */
601                 ltdb_delete_noindex(module, msg);
602                 return ret;
603         }
604
605         ret = ltdb_modified(module, msg->dn);
606
607         return ret;
608 }
609
610 /*
611   add a record to the database
612 */
613 static int ltdb_add(struct ltdb_context *ctx)
614 {
615         struct ldb_module *module = ctx->module;
616         struct ldb_request *req = ctx->req;
617         void *data = ldb_module_get_private(module);
618         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
619         int ret = LDB_SUCCESS;
620
621         ret = ltdb_check_special_dn(module, req->op.add.message);
622         if (ret != LDB_SUCCESS) {
623                 return ret;
624         }
625
626         ldb_request_set_state(req, LDB_ASYNC_PENDING);
627
628         if (ltdb_cache_load(module) != 0) {
629                 return LDB_ERR_OPERATIONS_ERROR;
630         }
631
632         ret = ltdb_add_internal(module, ltdb,
633                                 req->op.add.message, true);
634
635         return ret;
636 }
637
638 /*
639   delete a record from the database, not updating indexes (used for deleting
640   index records)
641 */
642 int ltdb_delete_noindex(struct ldb_module *module,
643                         const struct ldb_message *msg)
644 {
645         void *data = ldb_module_get_private(module);
646         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
647         TDB_DATA tdb_key;
648         int ret;
649         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
650
651         if (tdb_key_ctx == NULL) {
652                 return ldb_module_oom(module);
653         }
654
655         if (ltdb->read_only) {
656                 return LDB_ERR_UNWILLING_TO_PERFORM;
657         }
658
659         tdb_key = ltdb_key_msg(module, tdb_key_ctx, msg);
660         if (!tdb_key.dptr) {
661                 TALLOC_FREE(tdb_key_ctx);
662                 return LDB_ERR_OTHER;
663         }
664
665         ret = tdb_delete(ltdb->tdb, tdb_key);
666         TALLOC_FREE(tdb_key_ctx);
667
668         if (ret != 0) {
669                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
670         }
671
672         return ret;
673 }
674
675 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
676 {
677         struct ldb_message *msg;
678         int ret = LDB_SUCCESS;
679
680         msg = ldb_msg_new(module);
681         if (msg == NULL) {
682                 return LDB_ERR_OPERATIONS_ERROR;
683         }
684
685         /* in case any attribute of the message was indexed, we need
686            to fetch the old record */
687         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
688         if (ret != LDB_SUCCESS) {
689                 /* not finding the old record is an error */
690                 goto done;
691         }
692
693         ret = ltdb_delete_noindex(module, msg);
694         if (ret != LDB_SUCCESS) {
695                 goto done;
696         }
697
698         /* remove any indexed attributes */
699         ret = ltdb_index_delete(module, msg);
700         if (ret != LDB_SUCCESS) {
701                 goto done;
702         }
703
704         ret = ltdb_modified(module, dn);
705         if (ret != LDB_SUCCESS) {
706                 goto done;
707         }
708
709 done:
710         talloc_free(msg);
711         return ret;
712 }
713
714 /*
715   delete a record from the database
716 */
717 static int ltdb_delete(struct ltdb_context *ctx)
718 {
719         struct ldb_module *module = ctx->module;
720         struct ldb_request *req = ctx->req;
721         int ret = LDB_SUCCESS;
722
723         ldb_request_set_state(req, LDB_ASYNC_PENDING);
724
725         if (ltdb_cache_load(module) != 0) {
726                 return LDB_ERR_OPERATIONS_ERROR;
727         }
728
729         ret = ltdb_delete_internal(module, req->op.del.dn);
730
731         return ret;
732 }
733
734 /*
735   find an element by attribute name. At the moment this does a linear search,
736   it should be re-coded to use a binary search once all places that modify
737   records guarantee sorted order
738
739   return the index of the first matching element if found, otherwise -1
740 */
741 static int find_element(const struct ldb_message *msg, const char *name)
742 {
743         unsigned int i;
744         for (i=0;i<msg->num_elements;i++) {
745                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
746                         return i;
747                 }
748         }
749         return -1;
750 }
751
752
753 /*
754   add an element to an existing record. Assumes a elements array that we
755   can call re-alloc on, and assumed that we can re-use the data pointers from
756   the passed in additional values. Use with care!
757
758   returns 0 on success, -1 on failure (and sets errno)
759 */
760 static int ltdb_msg_add_element(struct ldb_message *msg,
761                                 struct ldb_message_element *el)
762 {
763         struct ldb_message_element *e2;
764         unsigned int i;
765
766         if (el->num_values == 0) {
767                 /* nothing to do here - we don't add empty elements */
768                 return 0;
769         }
770
771         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
772                               msg->num_elements+1);
773         if (!e2) {
774                 errno = ENOMEM;
775                 return -1;
776         }
777
778         msg->elements = e2;
779
780         e2 = &msg->elements[msg->num_elements];
781
782         e2->name = el->name;
783         e2->flags = el->flags;
784         e2->values = talloc_array(msg->elements,
785                                   struct ldb_val, el->num_values);
786         if (!e2->values) {
787                 errno = ENOMEM;
788                 return -1;
789         }
790         for (i=0;i<el->num_values;i++) {
791                 e2->values[i] = el->values[i];
792         }
793         e2->num_values = el->num_values;
794
795         ++msg->num_elements;
796
797         return 0;
798 }
799
800 /*
801   delete all elements having a specified attribute name
802 */
803 static int msg_delete_attribute(struct ldb_module *module,
804                                 struct ltdb_private *ltdb,
805                                 struct ldb_message *msg, const char *name)
806 {
807         unsigned int i;
808         int ret;
809         struct ldb_message_element *el;
810         bool is_special = ldb_dn_is_special(msg->dn);
811
812         if (!is_special
813             && ltdb->cache->GUID_index_attribute != NULL
814             && ldb_attr_cmp(name, ltdb->cache->GUID_index_attribute) == 0) {
815                 struct ldb_context *ldb = ldb_module_get_ctx(module);
816                 ldb_asprintf_errstring(ldb, "Must not modify GUID "
817                                        "attribute %s (used as DB index)",
818                                        ltdb->cache->GUID_index_attribute);
819                 return LDB_ERR_CONSTRAINT_VIOLATION;
820         }
821
822         el = ldb_msg_find_element(msg, name);
823         if (el == NULL) {
824                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
825         }
826         i = el - msg->elements;
827
828         ret = ltdb_index_del_element(module, ltdb, msg, el);
829         if (ret != LDB_SUCCESS) {
830                 return ret;
831         }
832
833         talloc_free(el->values);
834         if (msg->num_elements > (i+1)) {
835                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
836         }
837         msg->num_elements--;
838         msg->elements = talloc_realloc(msg, msg->elements,
839                                        struct ldb_message_element,
840                                        msg->num_elements);
841         return LDB_SUCCESS;
842 }
843
844 /*
845   delete all elements matching an attribute name/value
846
847   return LDB Error on failure
848 */
849 static int msg_delete_element(struct ldb_module *module,
850                               struct ltdb_private *ltdb,
851                               struct ldb_message *msg,
852                               const char *name,
853                               const struct ldb_val *val)
854 {
855         struct ldb_context *ldb = ldb_module_get_ctx(module);
856         unsigned int i;
857         int found, ret;
858         struct ldb_message_element *el;
859         const struct ldb_schema_attribute *a;
860
861         found = find_element(msg, name);
862         if (found == -1) {
863                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
864         }
865
866         i = (unsigned int) found;
867         el = &(msg->elements[i]);
868
869         a = ldb_schema_attribute_by_name(ldb, el->name);
870
871         for (i=0;i<el->num_values;i++) {
872                 bool matched;
873                 if (a->syntax->operator_fn) {
874                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
875                                                      &el->values[i], val, &matched);
876                         if (ret != LDB_SUCCESS) return ret;
877                 } else {
878                         matched = (a->syntax->comparison_fn(ldb, ldb,
879                                                             &el->values[i], val) == 0);
880                 }
881                 if (matched) {
882                         if (el->num_values == 1) {
883                                 return msg_delete_attribute(module,
884                                                             ltdb, msg, name);
885                         }
886
887                         ret = ltdb_index_del_value(module, ltdb, msg, el, i);
888                         if (ret != LDB_SUCCESS) {
889                                 return ret;
890                         }
891
892                         if (i<el->num_values-1) {
893                                 memmove(&el->values[i], &el->values[i+1],
894                                         sizeof(el->values[i])*
895                                                 (el->num_values-(i+1)));
896                         }
897                         el->num_values--;
898
899                         /* per definition we find in a canonicalised message an
900                            attribute value only once. So we are finished here */
901                         return LDB_SUCCESS;
902                 }
903         }
904
905         /* Not found */
906         return LDB_ERR_NO_SUCH_ATTRIBUTE;
907 }
908
909
910 /*
911   modify a record - internal interface
912
913   yuck - this is O(n^2). Luckily n is usually small so we probably
914   get away with it, but if we ever have really large attribute lists
915   then we'll need to look at this again
916
917   'req' is optional, and is used to specify controls if supplied
918 */
919 int ltdb_modify_internal(struct ldb_module *module,
920                          const struct ldb_message *msg,
921                          struct ldb_request *req)
922 {
923         struct ldb_context *ldb = ldb_module_get_ctx(module);
924         void *data = ldb_module_get_private(module);
925         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
926         struct ldb_message *msg2;
927         unsigned int i, j;
928         int ret = LDB_SUCCESS, idx;
929         struct ldb_control *control_permissive = NULL;
930         TALLOC_CTX *mem_ctx = talloc_new(req);
931
932         if (mem_ctx == NULL) {
933                 return ldb_module_oom(module);
934         }
935         
936         if (req) {
937                 control_permissive = ldb_request_get_control(req,
938                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
939         }
940
941         msg2 = ldb_msg_new(mem_ctx);
942         if (msg2 == NULL) {
943                 ret = LDB_ERR_OTHER;
944                 goto done;
945         }
946
947         ret = ltdb_search_dn1(module, msg->dn,
948                               msg2,
949                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
950         if (ret != LDB_SUCCESS) {
951                 goto done;
952         }
953
954         for (i=0; i<msg->num_elements; i++) {
955                 struct ldb_message_element *el = &msg->elements[i], *el2;
956                 struct ldb_val *vals;
957                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
958                 const char *dn;
959                 uint32_t options = 0;
960                 if (control_permissive != NULL) {
961                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
962                 }
963
964                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
965                 case LDB_FLAG_MOD_ADD:
966
967                         if (el->num_values == 0) {
968                                 ldb_asprintf_errstring(ldb,
969                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
970                                                        el->name, ldb_dn_get_linearized(msg2->dn));
971                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
972                                 goto done;
973                         }
974
975                         /* make a copy of the array so that a permissive
976                          * control can remove duplicates without changing the
977                          * original values, but do not copy data as we do not
978                          * need to keep it around once the operation is
979                          * finished */
980                         if (control_permissive) {
981                                 el = talloc(msg2, struct ldb_message_element);
982                                 if (!el) {
983                                         ret = LDB_ERR_OTHER;
984                                         goto done;
985                                 }
986                                 *el = msg->elements[i];
987                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
988                                 if (el->values == NULL) {
989                                         ret = LDB_ERR_OTHER;
990                                         goto done;
991                                 }
992                                 for (j = 0; j < el->num_values; j++) {
993                                         el->values[j] = msg->elements[i].values[j];
994                                 }
995                         }
996
997                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
998                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
999                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1000                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1001                                 goto done;
1002                         }
1003
1004                         /* Checks if element already exists */
1005                         idx = find_element(msg2, el->name);
1006                         if (idx == -1) {
1007                                 if (ltdb_msg_add_element(msg2, el) != 0) {
1008                                         ret = LDB_ERR_OTHER;
1009                                         goto done;
1010                                 }
1011                                 ret = ltdb_index_add_element(module, ltdb,
1012                                                              msg2,
1013                                                              el);
1014                                 if (ret != LDB_SUCCESS) {
1015                                         goto done;
1016                                 }
1017                         } else {
1018                                 j = (unsigned int) idx;
1019                                 el2 = &(msg2->elements[j]);
1020
1021                                 /* We cannot add another value on a existing one
1022                                    if the attribute is single-valued */
1023                                 if (ldb_tdb_single_valued(a, el)) {
1024                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1025                                                                el->name, ldb_dn_get_linearized(msg2->dn));
1026                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1027                                         goto done;
1028                                 }
1029
1030                                 /* Check that values don't exist yet on multi-
1031                                    valued attributes or aren't provided twice */
1032                                 if (!(el->flags &
1033                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1034                                         struct ldb_val *duplicate = NULL;
1035                                         ret = ldb_msg_find_common_values(ldb,
1036                                                                          msg2,
1037                                                                          el,
1038                                                                          el2,
1039                                                                          options);
1040
1041                                         if (ret ==
1042                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
1043                                                 ldb_asprintf_errstring(ldb,
1044                                                         "attribute '%s': value "
1045                                                         "#%u on '%s' already "
1046                                                         "exists", el->name, j,
1047                                                         ldb_dn_get_linearized(msg2->dn));
1048                                                 goto done;
1049                                         } else if (ret != LDB_SUCCESS) {
1050                                                 goto done;
1051                                         }
1052
1053                                         ret = ldb_msg_find_duplicate_val(
1054                                                 ldb, msg2, el, &duplicate, 0);
1055                                         if (ret != LDB_SUCCESS) {
1056                                                 goto done;
1057                                         }
1058                                         if (duplicate != NULL) {
1059                                                 ldb_asprintf_errstring(
1060                                                         ldb,
1061                                                         "attribute '%s': value "
1062                                                         "'%.*s' on '%s' "
1063                                                         "provided more than "
1064                                                         "once in ADD",
1065                                                         el->name,
1066                                                         (int)duplicate->length,
1067                                                         duplicate->data,
1068                                                         ldb_dn_get_linearized(msg->dn));
1069                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1070                                                 goto done;
1071                                         }
1072                                 }
1073
1074                                 /* Now combine existing and new values to a new
1075                                    attribute record */
1076                                 vals = talloc_realloc(msg2->elements,
1077                                                       el2->values, struct ldb_val,
1078                                                       el2->num_values + el->num_values);
1079                                 if (vals == NULL) {
1080                                         ldb_oom(ldb);
1081                                         ret = LDB_ERR_OTHER;
1082                                         goto done;
1083                                 }
1084
1085                                 for (j=0; j<el->num_values; j++) {
1086                                         vals[el2->num_values + j] =
1087                                                 ldb_val_dup(vals, &el->values[j]);
1088                                 }
1089
1090                                 el2->values = vals;
1091                                 el2->num_values += el->num_values;
1092
1093                                 ret = ltdb_index_add_element(module, ltdb,
1094                                                              msg2, el);
1095                                 if (ret != LDB_SUCCESS) {
1096                                         goto done;
1097                                 }
1098                         }
1099
1100                         break;
1101
1102                 case LDB_FLAG_MOD_REPLACE:
1103
1104                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
1105                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1106                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1107                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1108                                 goto done;
1109                         }
1110
1111                         /*
1112                          * We don't need to check this if we have been
1113                          * pre-screened by the repl_meta_data module
1114                          * in Samba, or someone else who can claim to
1115                          * know what they are doing. 
1116                          */
1117                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1118                                 struct ldb_val *duplicate = NULL;
1119
1120                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1121                                                                  &duplicate, 0);
1122                                 if (ret != LDB_SUCCESS) {
1123                                         goto done;
1124                                 }
1125                                 if (duplicate != NULL) {
1126                                         ldb_asprintf_errstring(
1127                                                 ldb,
1128                                                 "attribute '%s': value '%.*s' "
1129                                                 "on '%s' provided more than "
1130                                                 "once in REPLACE",
1131                                                 el->name,
1132                                                 (int)duplicate->length,
1133                                                 duplicate->data,
1134                                                 ldb_dn_get_linearized(msg2->dn));
1135                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1136                                         goto done;
1137                                 }
1138                         }
1139
1140                         /* Checks if element already exists */
1141                         idx = find_element(msg2, el->name);
1142                         if (idx != -1) {
1143                                 j = (unsigned int) idx;
1144                                 el2 = &(msg2->elements[j]);
1145
1146                                 /* we consider two elements to be
1147                                  * equal only if the order
1148                                  * matches. This allows dbcheck to
1149                                  * fix the ordering on attributes
1150                                  * where order matters, such as
1151                                  * objectClass
1152                                  */
1153                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1154                                         continue;
1155                                 }
1156
1157                                 /* Delete the attribute if it exists in the DB */
1158                                 if (msg_delete_attribute(module, ltdb,
1159                                                          msg2,
1160                                                          el->name) != 0) {
1161                                         ret = LDB_ERR_OTHER;
1162                                         goto done;
1163                                 }
1164                         }
1165
1166                         /* Recreate it with the new values */
1167                         if (ltdb_msg_add_element(msg2, el) != 0) {
1168                                 ret = LDB_ERR_OTHER;
1169                                 goto done;
1170                         }
1171
1172                         ret = ltdb_index_add_element(module, ltdb,
1173                                                      msg2, el);
1174                         if (ret != LDB_SUCCESS) {
1175                                 goto done;
1176                         }
1177
1178                         break;
1179
1180                 case LDB_FLAG_MOD_DELETE:
1181                         dn = ldb_dn_get_linearized(msg2->dn);
1182                         if (dn == NULL) {
1183                                 ret = LDB_ERR_OTHER;
1184                                 goto done;
1185                         }
1186
1187                         if (msg->elements[i].num_values == 0) {
1188                                 /* Delete the whole attribute */
1189                                 ret = msg_delete_attribute(module,
1190                                                            ltdb,
1191                                                            msg2,
1192                                                            msg->elements[i].name);
1193                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1194                                     control_permissive) {
1195                                         ret = LDB_SUCCESS;
1196                                 } else {
1197                                         ldb_asprintf_errstring(ldb,
1198                                                                "attribute '%s': no such attribute for delete on '%s'",
1199                                                                msg->elements[i].name, dn);
1200                                 }
1201                                 if (ret != LDB_SUCCESS) {
1202                                         goto done;
1203                                 }
1204                         } else {
1205                                 /* Delete specified values from an attribute */
1206                                 for (j=0; j < msg->elements[i].num_values; j++) {
1207                                         ret = msg_delete_element(module,
1208                                                                  ltdb,
1209                                                                  msg2,
1210                                                                  msg->elements[i].name,
1211                                                                  &msg->elements[i].values[j]);
1212                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1213                                             control_permissive) {
1214                                                 ret = LDB_SUCCESS;
1215                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1216                                                 ldb_asprintf_errstring(ldb,
1217                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1218                                                                        msg->elements[i].name, dn);
1219                                         }
1220                                         if (ret != LDB_SUCCESS) {
1221                                                 goto done;
1222                                         }
1223                                 }
1224                         }
1225                         break;
1226                 default:
1227                         ldb_asprintf_errstring(ldb,
1228                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1229                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1230                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1231                         ret = LDB_ERR_PROTOCOL_ERROR;
1232                         goto done;
1233                 }
1234         }
1235
1236         ret = ltdb_store(module, msg2, TDB_MODIFY);
1237         if (ret != LDB_SUCCESS) {
1238                 goto done;
1239         }
1240
1241         ret = ltdb_modified(module, msg2->dn);
1242         if (ret != LDB_SUCCESS) {
1243                 goto done;
1244         }
1245
1246 done:
1247         TALLOC_FREE(mem_ctx);
1248         return ret;
1249 }
1250
1251 /*
1252   modify a record
1253 */
1254 static int ltdb_modify(struct ltdb_context *ctx)
1255 {
1256         struct ldb_module *module = ctx->module;
1257         struct ldb_request *req = ctx->req;
1258         int ret = LDB_SUCCESS;
1259
1260         ret = ltdb_check_special_dn(module, req->op.mod.message);
1261         if (ret != LDB_SUCCESS) {
1262                 return ret;
1263         }
1264
1265         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1266
1267         if (ltdb_cache_load(module) != 0) {
1268                 return LDB_ERR_OPERATIONS_ERROR;
1269         }
1270
1271         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1272
1273         return ret;
1274 }
1275
1276 /*
1277   rename a record
1278 */
1279 static int ltdb_rename(struct ltdb_context *ctx)
1280 {
1281         struct ldb_module *module = ctx->module;
1282         void *data = ldb_module_get_private(module);
1283         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1284         struct ldb_request *req = ctx->req;
1285         struct ldb_message *msg;
1286         int ret = LDB_SUCCESS;
1287         TDB_DATA tdb_key, tdb_key_old;
1288         struct ldb_dn *db_dn;
1289
1290         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1291
1292         if (ltdb_cache_load(ctx->module) != 0) {
1293                 return LDB_ERR_OPERATIONS_ERROR;
1294         }
1295
1296         msg = ldb_msg_new(ctx);
1297         if (msg == NULL) {
1298                 return LDB_ERR_OPERATIONS_ERROR;
1299         }
1300
1301         /* we need to fetch the old record to re-add under the new name */
1302         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1303                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1304         if (ret != LDB_SUCCESS) {
1305                 /* not finding the old record is an error */
1306                 return ret;
1307         }
1308
1309         /* We need to, before changing the DB, check if the new DN
1310          * exists, so we can return this error to the caller with an
1311          * unmodified DB
1312          *
1313          * Even in GUID index mode we use ltdb_key_dn() as we are
1314          * trying to figure out if this is just a case rename
1315          */
1316         tdb_key = ltdb_key_dn(module, msg, req->op.rename.newdn);
1317         if (!tdb_key.dptr) {
1318                 talloc_free(msg);
1319                 return LDB_ERR_OPERATIONS_ERROR;
1320         }
1321
1322         tdb_key_old = ltdb_key_dn(module, msg, req->op.rename.olddn);
1323         if (!tdb_key_old.dptr) {
1324                 talloc_free(msg);
1325                 talloc_free(tdb_key.dptr);
1326                 return LDB_ERR_OPERATIONS_ERROR;
1327         }
1328
1329         /*
1330          * Only declare a conflict if the new DN already exists,
1331          * and it isn't a case change on the old DN
1332          */
1333         if (tdb_key_old.dsize != tdb_key.dsize
1334             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1335                 ret = ltdb_search_base(module, msg,
1336                                        req->op.rename.newdn,
1337                                        &db_dn);
1338                 if (ret == LDB_SUCCESS) {
1339                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1340                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1341                         ret = LDB_SUCCESS;
1342                 }
1343         }
1344
1345         /* finding the new record already in the DB is an error */
1346
1347         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1348                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1349                                        "Entry %s already exists",
1350                                        ldb_dn_get_linearized(req->op.rename.newdn));
1351         }
1352         if (ret != LDB_SUCCESS) {
1353                 talloc_free(tdb_key_old.dptr);
1354                 talloc_free(tdb_key.dptr);
1355                 talloc_free(msg);
1356                 return ret;
1357         }
1358
1359         talloc_free(tdb_key_old.dptr);
1360         talloc_free(tdb_key.dptr);
1361
1362         /* Always delete first then add, to avoid conflicts with
1363          * unique indexes. We rely on the transaction to make this
1364          * atomic
1365          */
1366         ret = ltdb_delete_internal(module, msg->dn);
1367         if (ret != LDB_SUCCESS) {
1368                 talloc_free(msg);
1369                 return ret;
1370         }
1371
1372         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1373         if (msg->dn == NULL) {
1374                 talloc_free(msg);
1375                 return LDB_ERR_OPERATIONS_ERROR;
1376         }
1377
1378         /* We don't check single value as we can have more than 1 with
1379          * deleted attributes. We could go through all elements but that's
1380          * maybe not the most efficient way
1381          */
1382         ret = ltdb_add_internal(module, ltdb, msg, false);
1383
1384         talloc_free(msg);
1385
1386         return ret;
1387 }
1388
1389 static int ltdb_start_trans(struct ldb_module *module)
1390 {
1391         void *data = ldb_module_get_private(module);
1392         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1393
1394         /* Do not take out the transaction lock on a read-only DB */
1395         if (ltdb->read_only) {
1396                 return LDB_ERR_UNWILLING_TO_PERFORM;
1397         }
1398
1399         if (tdb_transaction_start(ltdb->tdb) != 0) {
1400                 return ltdb_err_map(tdb_error(ltdb->tdb));
1401         }
1402
1403         ltdb->in_transaction++;
1404
1405         ltdb_index_transaction_start(module);
1406
1407         return LDB_SUCCESS;
1408 }
1409
1410 static int ltdb_prepare_commit(struct ldb_module *module)
1411 {
1412         int ret;
1413         void *data = ldb_module_get_private(module);
1414         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1415
1416         if (ltdb->in_transaction != 1) {
1417                 return LDB_SUCCESS;
1418         }
1419
1420         ret = ltdb_index_transaction_commit(module);
1421         if (ret != LDB_SUCCESS) {
1422                 tdb_transaction_cancel(ltdb->tdb);
1423                 ltdb->in_transaction--;
1424                 return ret;
1425         }
1426
1427         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1428                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1429                 ltdb->in_transaction--;
1430                 ldb_debug_set(ldb_module_get_ctx(module),
1431                               LDB_DEBUG_FATAL,
1432                               "Failure during "
1433                               "tdb_transaction_prepare_commit(): %s -> %s",
1434                               tdb_errorstr(ltdb->tdb),
1435                               ldb_strerror(ret));
1436                 return ret;
1437         }
1438
1439         ltdb->prepared_commit = true;
1440
1441         return LDB_SUCCESS;
1442 }
1443
1444 static int ltdb_end_trans(struct ldb_module *module)
1445 {
1446         int ret;
1447         void *data = ldb_module_get_private(module);
1448         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1449
1450         if (!ltdb->prepared_commit) {
1451                 ret = ltdb_prepare_commit(module);
1452                 if (ret != LDB_SUCCESS) {
1453                         return ret;
1454                 }
1455         }
1456
1457         ltdb->in_transaction--;
1458         ltdb->prepared_commit = false;
1459
1460         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1461                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1462                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1463                                        "Failure during tdb_transaction_commit(): %s -> %s",
1464                                        tdb_errorstr(ltdb->tdb),
1465                                        ldb_strerror(ret));
1466                 return ret;
1467         }
1468
1469         return LDB_SUCCESS;
1470 }
1471
1472 static int ltdb_del_trans(struct ldb_module *module)
1473 {
1474         void *data = ldb_module_get_private(module);
1475         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1476
1477         ltdb->in_transaction--;
1478
1479         if (ltdb_index_transaction_cancel(module) != 0) {
1480                 tdb_transaction_cancel(ltdb->tdb);
1481                 return ltdb_err_map(tdb_error(ltdb->tdb));
1482         }
1483
1484         tdb_transaction_cancel(ltdb->tdb);
1485         return LDB_SUCCESS;
1486 }
1487
1488 /*
1489   return sequenceNumber from @BASEINFO
1490 */
1491 static int ltdb_sequence_number(struct ltdb_context *ctx,
1492                                 struct ldb_extended **ext)
1493 {
1494         struct ldb_context *ldb;
1495         struct ldb_module *module = ctx->module;
1496         struct ldb_request *req = ctx->req;
1497         TALLOC_CTX *tmp_ctx = NULL;
1498         struct ldb_seqnum_request *seq;
1499         struct ldb_seqnum_result *res;
1500         struct ldb_message *msg = NULL;
1501         struct ldb_dn *dn;
1502         const char *date;
1503         int ret = LDB_SUCCESS;
1504
1505         ldb = ldb_module_get_ctx(module);
1506
1507         seq = talloc_get_type(req->op.extended.data,
1508                                 struct ldb_seqnum_request);
1509         if (seq == NULL) {
1510                 return LDB_ERR_OPERATIONS_ERROR;
1511         }
1512
1513         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1514
1515         if (ltdb_lock_read(module) != 0) {
1516                 return LDB_ERR_OPERATIONS_ERROR;
1517         }
1518
1519         res = talloc_zero(req, struct ldb_seqnum_result);
1520         if (res == NULL) {
1521                 ret = LDB_ERR_OPERATIONS_ERROR;
1522                 goto done;
1523         }
1524
1525         tmp_ctx = talloc_new(req);
1526         if (tmp_ctx == NULL) {
1527                 ret = LDB_ERR_OPERATIONS_ERROR;
1528                 goto done;
1529         }
1530
1531         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1532         if (dn == NULL) {
1533                 ret = LDB_ERR_OPERATIONS_ERROR;
1534                 goto done;
1535         }
1536
1537         msg = ldb_msg_new(tmp_ctx);
1538         if (msg == NULL) {
1539                 ret = LDB_ERR_OPERATIONS_ERROR;
1540                 goto done;
1541         }
1542
1543         ret = ltdb_search_dn1(module, dn, msg, 0);
1544         if (ret != LDB_SUCCESS) {
1545                 goto done;
1546         }
1547
1548         switch (seq->type) {
1549         case LDB_SEQ_HIGHEST_SEQ:
1550                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1551                 break;
1552         case LDB_SEQ_NEXT:
1553                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1554                 res->seq_num++;
1555                 break;
1556         case LDB_SEQ_HIGHEST_TIMESTAMP:
1557                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1558                 if (date) {
1559                         res->seq_num = ldb_string_to_time(date);
1560                 } else {
1561                         res->seq_num = 0;
1562                         /* zero is as good as anything when we don't know */
1563                 }
1564                 break;
1565         }
1566
1567         *ext = talloc_zero(req, struct ldb_extended);
1568         if (*ext == NULL) {
1569                 ret = LDB_ERR_OPERATIONS_ERROR;
1570                 goto done;
1571         }
1572         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1573         (*ext)->data = talloc_steal(*ext, res);
1574
1575 done:
1576         talloc_free(tmp_ctx);
1577         ltdb_unlock_read(module);
1578         return ret;
1579 }
1580
1581 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1582 {
1583         struct ldb_context *ldb;
1584         struct ldb_request *req;
1585         struct ldb_reply *ares;
1586
1587         ldb = ldb_module_get_ctx(ctx->module);
1588         req = ctx->req;
1589
1590         /* if we already returned an error just return */
1591         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1592                 return;
1593         }
1594
1595         ares = talloc_zero(req, struct ldb_reply);
1596         if (!ares) {
1597                 ldb_oom(ldb);
1598                 req->callback(req, NULL);
1599                 return;
1600         }
1601         ares->type = LDB_REPLY_DONE;
1602         ares->error = error;
1603
1604         req->callback(req, ares);
1605 }
1606
1607 static void ltdb_timeout(struct tevent_context *ev,
1608                           struct tevent_timer *te,
1609                           struct timeval t,
1610                           void *private_data)
1611 {
1612         struct ltdb_context *ctx;
1613         ctx = talloc_get_type(private_data, struct ltdb_context);
1614
1615         if (!ctx->request_terminated) {
1616                 /* request is done now */
1617                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1618         }
1619
1620         if (ctx->spy) {
1621                 /* neutralize the spy */
1622                 ctx->spy->ctx = NULL;
1623                 ctx->spy = NULL;
1624         }
1625         talloc_free(ctx);
1626 }
1627
1628 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1629                                         struct ldb_extended *ext,
1630                                         int error)
1631 {
1632         struct ldb_context *ldb;
1633         struct ldb_request *req;
1634         struct ldb_reply *ares;
1635
1636         ldb = ldb_module_get_ctx(ctx->module);
1637         req = ctx->req;
1638
1639         /* if we already returned an error just return */
1640         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1641                 return;
1642         }
1643
1644         ares = talloc_zero(req, struct ldb_reply);
1645         if (!ares) {
1646                 ldb_oom(ldb);
1647                 req->callback(req, NULL);
1648                 return;
1649         }
1650         ares->type = LDB_REPLY_DONE;
1651         ares->response = ext;
1652         ares->error = error;
1653
1654         req->callback(req, ares);
1655 }
1656
1657 static void ltdb_handle_extended(struct ltdb_context *ctx)
1658 {
1659         struct ldb_extended *ext = NULL;
1660         int ret;
1661
1662         if (strcmp(ctx->req->op.extended.oid,
1663                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1664                 /* get sequence number */
1665                 ret = ltdb_sequence_number(ctx, &ext);
1666         } else {
1667                 /* not recognized */
1668                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1669         }
1670
1671         ltdb_request_extended_done(ctx, ext, ret);
1672 }
1673
1674 static void ltdb_callback(struct tevent_context *ev,
1675                           struct tevent_timer *te,
1676                           struct timeval t,
1677                           void *private_data)
1678 {
1679         struct ltdb_context *ctx;
1680         int ret;
1681
1682         ctx = talloc_get_type(private_data, struct ltdb_context);
1683
1684         if (ctx->request_terminated) {
1685                 goto done;
1686         }
1687
1688         switch (ctx->req->operation) {
1689         case LDB_SEARCH:
1690                 ret = ltdb_search(ctx);
1691                 break;
1692         case LDB_ADD:
1693                 ret = ltdb_add(ctx);
1694                 break;
1695         case LDB_MODIFY:
1696                 ret = ltdb_modify(ctx);
1697                 break;
1698         case LDB_DELETE:
1699                 ret = ltdb_delete(ctx);
1700                 break;
1701         case LDB_RENAME:
1702                 ret = ltdb_rename(ctx);
1703                 break;
1704         case LDB_EXTENDED:
1705                 ltdb_handle_extended(ctx);
1706                 goto done;
1707         default:
1708                 /* no other op supported */
1709                 ret = LDB_ERR_PROTOCOL_ERROR;
1710         }
1711
1712         if (!ctx->request_terminated) {
1713                 /* request is done now */
1714                 ltdb_request_done(ctx, ret);
1715         }
1716
1717 done:
1718         if (ctx->spy) {
1719                 /* neutralize the spy */
1720                 ctx->spy->ctx = NULL;
1721                 ctx->spy = NULL;
1722         }
1723         talloc_free(ctx);
1724 }
1725
1726 static int ltdb_request_destructor(void *ptr)
1727 {
1728         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1729
1730         if (spy->ctx != NULL) {
1731                 spy->ctx->spy = NULL;
1732                 spy->ctx->request_terminated = true;
1733                 spy->ctx = NULL;
1734         }
1735
1736         return 0;
1737 }
1738
1739 static int ltdb_handle_request(struct ldb_module *module,
1740                                struct ldb_request *req)
1741 {
1742         struct ldb_control *control_permissive;
1743         struct ldb_context *ldb;
1744         struct tevent_context *ev;
1745         struct ltdb_context *ac;
1746         struct tevent_timer *te;
1747         struct timeval tv;
1748         unsigned int i;
1749
1750         ldb = ldb_module_get_ctx(module);
1751
1752         control_permissive = ldb_request_get_control(req,
1753                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1754
1755         for (i = 0; req->controls && req->controls[i]; i++) {
1756                 if (req->controls[i]->critical &&
1757                     req->controls[i] != control_permissive) {
1758                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1759                                                req->controls[i]->oid);
1760                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1761                 }
1762         }
1763
1764         if (req->starttime == 0 || req->timeout == 0) {
1765                 ldb_set_errstring(ldb, "Invalid timeout settings");
1766                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1767         }
1768
1769         ev = ldb_handle_get_event_context(req->handle);
1770
1771         ac = talloc_zero(ldb, struct ltdb_context);
1772         if (ac == NULL) {
1773                 ldb_oom(ldb);
1774                 return LDB_ERR_OPERATIONS_ERROR;
1775         }
1776
1777         ac->module = module;
1778         ac->req = req;
1779
1780         tv.tv_sec = 0;
1781         tv.tv_usec = 0;
1782         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1783         if (NULL == te) {
1784                 talloc_free(ac);
1785                 return LDB_ERR_OPERATIONS_ERROR;
1786         }
1787
1788         if (req->timeout > 0) {
1789                 tv.tv_sec = req->starttime + req->timeout;
1790                 tv.tv_usec = 0;
1791                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1792                                                      ltdb_timeout, ac);
1793                 if (NULL == ac->timeout_event) {
1794                         talloc_free(ac);
1795                         return LDB_ERR_OPERATIONS_ERROR;
1796                 }
1797         }
1798
1799         /* set a spy so that we do not try to use the request context
1800          * if it is freed before ltdb_callback fires */
1801         ac->spy = talloc(req, struct ltdb_req_spy);
1802         if (NULL == ac->spy) {
1803                 talloc_free(ac);
1804                 return LDB_ERR_OPERATIONS_ERROR;
1805         }
1806         ac->spy->ctx = ac;
1807
1808         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1809
1810         return LDB_SUCCESS;
1811 }
1812
1813 static int ltdb_init_rootdse(struct ldb_module *module)
1814 {
1815         /* ignore errors on this - we expect it for non-sam databases */
1816         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1817
1818         /* there can be no module beyond the backend, just return */
1819         return LDB_SUCCESS;
1820 }
1821
1822 static const struct ldb_module_ops ltdb_ops = {
1823         .name              = "tdb",
1824         .init_context      = ltdb_init_rootdse,
1825         .search            = ltdb_handle_request,
1826         .add               = ltdb_handle_request,
1827         .modify            = ltdb_handle_request,
1828         .del               = ltdb_handle_request,
1829         .rename            = ltdb_handle_request,
1830         .extended          = ltdb_handle_request,
1831         .start_transaction = ltdb_start_trans,
1832         .end_transaction   = ltdb_end_trans,
1833         .prepare_commit    = ltdb_prepare_commit,
1834         .del_transaction   = ltdb_del_trans,
1835         .read_lock         = ltdb_lock_read,
1836         .read_unlock       = ltdb_unlock_read,
1837 };
1838
1839 /*
1840   connect to the database
1841 */
1842 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1843                         unsigned int flags, const char *options[],
1844                         struct ldb_module **_module)
1845 {
1846         struct ldb_module *module;
1847         const char *path;
1848         int tdb_flags, open_flags;
1849         struct ltdb_private *ltdb;
1850
1851         /*
1852          * We hold locks, so we must use a private event context
1853          * on each returned handle
1854          */
1855
1856         ldb_set_require_private_event_context(ldb);
1857
1858         /* parse the url */
1859         if (strchr(url, ':')) {
1860                 if (strncmp(url, "tdb://", 6) != 0) {
1861                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1862                                   "Invalid tdb URL '%s'", url);
1863                         return LDB_ERR_OPERATIONS_ERROR;
1864                 }
1865                 path = url+6;
1866         } else {
1867                 path = url;
1868         }
1869
1870         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1871
1872         /* check for the 'nosync' option */
1873         if (flags & LDB_FLG_NOSYNC) {
1874                 tdb_flags |= TDB_NOSYNC;
1875         }
1876
1877         /* and nommap option */
1878         if (flags & LDB_FLG_NOMMAP) {
1879                 tdb_flags |= TDB_NOMMAP;
1880         }
1881
1882         ltdb = talloc_zero(ldb, struct ltdb_private);
1883         if (!ltdb) {
1884                 ldb_oom(ldb);
1885                 return LDB_ERR_OPERATIONS_ERROR;
1886         }
1887
1888         if (flags & LDB_FLG_RDONLY) {
1889                 /*
1890                  * This is weird, but because we can only have one tdb
1891                  * in this process, and the other one could be
1892                  * read-write, we can't use the tdb readonly.  Plus a
1893                  * read only tdb prohibits the all-record lock.
1894                  */
1895                 open_flags = O_RDWR;
1896
1897                 ltdb->read_only = true;
1898
1899         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1900                 /*
1901                  * This is used by ldbsearch to prevent creation of the database
1902                  * if the name is wrong
1903                  */
1904                 open_flags = O_RDWR;
1905         } else {
1906                 /*
1907                  * This is the normal case
1908                  */
1909                 open_flags = O_CREAT | O_RDWR;
1910         }
1911
1912         /* note that we use quite a large default hash size */
1913         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1914                                    tdb_flags, open_flags,
1915                                    ldb_get_create_perms(ldb), ldb);
1916         if (!ltdb->tdb) {
1917                 ldb_asprintf_errstring(ldb,
1918                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1919                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1920                           "Unable to open tdb '%s': %s", path, strerror(errno));
1921                 talloc_free(ltdb);
1922                 if (errno == EACCES || errno == EPERM) {
1923                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1924                 }
1925                 return LDB_ERR_OPERATIONS_ERROR;
1926         }
1927
1928         if (getenv("LDB_WARN_UNINDEXED")) {
1929                 ltdb->warn_unindexed = true;
1930         }
1931
1932         if (getenv("LDB_WARN_REINDEX")) {
1933                 ltdb->warn_reindex = true;
1934         }
1935
1936         ltdb->sequence_number = 0;
1937
1938         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1939         if (!module) {
1940                 ldb_oom(ldb);
1941                 talloc_free(ltdb);
1942                 return LDB_ERR_OPERATIONS_ERROR;
1943         }
1944         ldb_module_set_private(module, ltdb);
1945         talloc_steal(module, ltdb);
1946
1947         if (ltdb_cache_load(module) != 0) {
1948                 ldb_asprintf_errstring(ldb,
1949                                        "Unable to load ltdb cache records of tdb '%s'", path);
1950                 talloc_free(module);
1951                 return LDB_ERR_OPERATIONS_ERROR;
1952         }
1953
1954         *_module = module;
1955         return LDB_SUCCESS;
1956 }
1957
1958 int ldb_tdb_init(const char *version)
1959 {
1960         LDB_MODULE_CHECK_VERSION(version);
1961         return ldb_register_backend("tdb", ltdb_connect, false);
1962 }