ldb_tdb: Rework ltdb_modify_internal() to use ltdb_search_dn1() internally
[vlendec/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 ltdb->read_lock_count--;
123                 return 0;
124         }
125         ltdb->read_lock_count--;
126         return 0;
127 }
128
129
130 /*
131   form a TDB_DATA for a record key
132   caller frees
133
134   note that the key for a record can depend on whether the
135   dn refers to a case sensitive index record or not
136 */
137 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
138 {
139         struct ldb_context *ldb = ldb_module_get_ctx(module);
140         TDB_DATA key;
141         char *key_str = NULL;
142         const char *dn_folded = NULL;
143
144         /*
145           most DNs are case insensitive. The exception is index DNs for
146           case sensitive attributes
147
148           there are 3 cases dealt with in this code:
149
150           1) if the dn doesn't start with @ then uppercase the attribute
151              names and the attributes values of case insensitive attributes
152           2) if the dn starts with @ then leave it alone -
153              the indexing code handles the rest
154         */
155
156         dn_folded = ldb_dn_get_casefold(dn);
157         if (!dn_folded) {
158                 goto failed;
159         }
160
161         key_str = talloc_strdup(ldb, "DN=");
162         if (!key_str) {
163                 goto failed;
164         }
165
166         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
167         if (!key_str) {
168                 goto failed;
169         }
170
171         key.dptr = (uint8_t *)key_str;
172         key.dsize = strlen(key_str) + 1;
173
174         return key;
175
176 failed:
177         errno = ENOMEM;
178         key.dptr = NULL;
179         key.dsize = 0;
180         return key;
181 }
182
183 /*
184   check special dn's have valid attributes
185   currently only @ATTRIBUTES is checked
186 */
187 static int ltdb_check_special_dn(struct ldb_module *module,
188                                  const struct ldb_message *msg)
189 {
190         struct ldb_context *ldb = ldb_module_get_ctx(module);
191         unsigned int i, j;
192
193         if (! ldb_dn_is_special(msg->dn) ||
194             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
195                 return LDB_SUCCESS;
196         }
197
198         /* we have @ATTRIBUTES, let's check attributes are fine */
199         /* should we check that we deny multivalued attributes ? */
200         for (i = 0; i < msg->num_elements; i++) {
201                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
202
203                 for (j = 0; j < msg->elements[i].num_values; j++) {
204                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
205                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
206                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
207                         }
208                 }
209         }
210
211         return LDB_SUCCESS;
212 }
213
214
215 /*
216   we've made a modification to a dn - possibly reindex and
217   update sequence number
218 */
219 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
220 {
221         int ret = LDB_SUCCESS;
222         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
223
224         /* only allow modifies inside a transaction, otherwise the
225          * ldb is unsafe */
226         if (ltdb->in_transaction == 0) {
227                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
228                 return LDB_ERR_OPERATIONS_ERROR;
229         }
230
231         if (ldb_dn_is_special(dn) &&
232             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
233              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
234         {
235                 if (ltdb->warn_reindex) {
236                         ldb_debug(ldb_module_get_ctx(module),
237                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
238                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
239                 }
240                 ret = ltdb_reindex(module);
241         }
242
243         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
244         if (ret == LDB_SUCCESS &&
245             !(ldb_dn_is_special(dn) &&
246               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
247                 ret = ltdb_increase_sequence_number(module);
248         }
249
250         /* If the modify was to @OPTIONS, reload the cache */
251         if (ret == LDB_SUCCESS &&
252             ldb_dn_is_special(dn) &&
253             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
254                 ret = ltdb_cache_reload(module);
255         }
256
257         return ret;
258 }
259
260 /*
261   store a record into the db
262 */
263 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
264 {
265         void *data = ldb_module_get_private(module);
266         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
267         TDB_DATA tdb_key, tdb_data;
268         struct ldb_val ldb_data;
269         int ret = LDB_SUCCESS;
270
271         tdb_key = ltdb_key(module, msg->dn);
272         if (tdb_key.dptr == NULL) {
273                 return LDB_ERR_OTHER;
274         }
275
276         ret = ldb_pack_data(ldb_module_get_ctx(module),
277                             msg, &ldb_data);
278         if (ret == -1) {
279                 talloc_free(tdb_key.dptr);
280                 return LDB_ERR_OTHER;
281         }
282
283         tdb_data.dptr = ldb_data.data;
284         tdb_data.dsize = ldb_data.length;
285
286         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
287         if (ret != 0) {
288                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
289                 goto done;
290         }
291
292 done:
293         talloc_free(tdb_key.dptr);
294         talloc_free(ldb_data.data);
295
296         return ret;
297 }
298
299
300 /*
301   check if a attribute is a single valued, for a given element
302  */
303 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
304                                   struct ldb_message_element *el)
305 {
306         if (!a) return false;
307         if (el != NULL) {
308                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
309                         /* override from a ldb module, for example
310                            used for the description field, which is
311                            marked multi-valued in the schema but which
312                            should not actually accept multiple
313                            values */
314                         return true;
315                 }
316                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
317                         /* override from a ldb module, for example used for
318                            deleted linked attribute entries */
319                         return false;
320                 }
321         }
322         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
323                 return true;
324         }
325         return false;
326 }
327
328 static int ltdb_add_internal(struct ldb_module *module,
329                              const struct ldb_message *msg,
330                              bool check_single_value)
331 {
332         struct ldb_context *ldb = ldb_module_get_ctx(module);
333         int ret = LDB_SUCCESS;
334         unsigned int i;
335
336         for (i=0;i<msg->num_elements;i++) {
337                 struct ldb_message_element *el = &msg->elements[i];
338                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
339
340                 if (el->num_values == 0) {
341                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
342                                                el->name, ldb_dn_get_linearized(msg->dn));
343                         return LDB_ERR_CONSTRAINT_VIOLATION;
344                 }
345                 if (check_single_value &&
346                                 el->num_values > 1 &&
347                                 ldb_tdb_single_valued(a, el)) {
348                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
349                                                el->name, ldb_dn_get_linearized(msg->dn));
350                         return LDB_ERR_CONSTRAINT_VIOLATION;
351                 }
352
353                 /* Do not check "@ATTRIBUTES" for duplicated values */
354                 if (ldb_dn_is_special(msg->dn) &&
355                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
356                         continue;
357                 }
358
359                 if (check_single_value &&
360                     !(el->flags &
361                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
362                         struct ldb_val *duplicate = NULL;
363
364                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
365                                                          el, &duplicate, 0);
366                         if (ret != LDB_SUCCESS) {
367                                 return ret;
368                         }
369                         if (duplicate != NULL) {
370                                 ldb_asprintf_errstring(
371                                         ldb,
372                                         "attribute '%s': value '%.*s' on '%s' "
373                                         "provided more than once in ADD object",
374                                         el->name,
375                                         (int)duplicate->length,
376                                         duplicate->data,
377                                         ldb_dn_get_linearized(msg->dn));
378                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
379                         }
380                 }
381         }
382
383         ret = ltdb_store(module, msg, TDB_INSERT);
384         if (ret != LDB_SUCCESS) {
385                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
386                         ldb_asprintf_errstring(ldb,
387                                                "Entry %s already exists",
388                                                ldb_dn_get_linearized(msg->dn));
389                 }
390                 return ret;
391         }
392
393         ret = ltdb_index_add_new(module, msg);
394         if (ret != LDB_SUCCESS) {
395                 return ret;
396         }
397
398         ret = ltdb_modified(module, msg->dn);
399
400         return ret;
401 }
402
403 /*
404   add a record to the database
405 */
406 static int ltdb_add(struct ltdb_context *ctx)
407 {
408         struct ldb_module *module = ctx->module;
409         struct ldb_request *req = ctx->req;
410         int ret = LDB_SUCCESS;
411
412         ret = ltdb_check_special_dn(module, req->op.add.message);
413         if (ret != LDB_SUCCESS) {
414                 return ret;
415         }
416
417         ldb_request_set_state(req, LDB_ASYNC_PENDING);
418
419         if (ltdb_cache_load(module) != 0) {
420                 return LDB_ERR_OPERATIONS_ERROR;
421         }
422
423         ret = ltdb_add_internal(module, req->op.add.message, true);
424
425         return ret;
426 }
427
428 /*
429   delete a record from the database, not updating indexes (used for deleting
430   index records)
431 */
432 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
433 {
434         void *data = ldb_module_get_private(module);
435         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
436         TDB_DATA tdb_key;
437         int ret;
438
439         tdb_key = ltdb_key(module, dn);
440         if (!tdb_key.dptr) {
441                 return LDB_ERR_OTHER;
442         }
443
444         ret = tdb_delete(ltdb->tdb, tdb_key);
445         talloc_free(tdb_key.dptr);
446
447         if (ret != 0) {
448                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
449         }
450
451         return ret;
452 }
453
454 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
455 {
456         struct ldb_message *msg;
457         int ret = LDB_SUCCESS;
458
459         msg = ldb_msg_new(module);
460         if (msg == NULL) {
461                 return LDB_ERR_OPERATIONS_ERROR;
462         }
463
464         /* in case any attribute of the message was indexed, we need
465            to fetch the old record */
466         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
467         if (ret != LDB_SUCCESS) {
468                 /* not finding the old record is an error */
469                 goto done;
470         }
471
472         ret = ltdb_delete_noindex(module, dn);
473         if (ret != LDB_SUCCESS) {
474                 goto done;
475         }
476
477         /* remove any indexed attributes */
478         ret = ltdb_index_delete(module, msg);
479         if (ret != LDB_SUCCESS) {
480                 goto done;
481         }
482
483         ret = ltdb_modified(module, dn);
484         if (ret != LDB_SUCCESS) {
485                 goto done;
486         }
487
488 done:
489         talloc_free(msg);
490         return ret;
491 }
492
493 /*
494   delete a record from the database
495 */
496 static int ltdb_delete(struct ltdb_context *ctx)
497 {
498         struct ldb_module *module = ctx->module;
499         struct ldb_request *req = ctx->req;
500         int ret = LDB_SUCCESS;
501
502         ldb_request_set_state(req, LDB_ASYNC_PENDING);
503
504         if (ltdb_cache_load(module) != 0) {
505                 return LDB_ERR_OPERATIONS_ERROR;
506         }
507
508         ret = ltdb_delete_internal(module, req->op.del.dn);
509
510         return ret;
511 }
512
513 /*
514   find an element by attribute name. At the moment this does a linear search,
515   it should be re-coded to use a binary search once all places that modify
516   records guarantee sorted order
517
518   return the index of the first matching element if found, otherwise -1
519 */
520 static int find_element(const struct ldb_message *msg, const char *name)
521 {
522         unsigned int i;
523         for (i=0;i<msg->num_elements;i++) {
524                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
525                         return i;
526                 }
527         }
528         return -1;
529 }
530
531
532 /*
533   add an element to an existing record. Assumes a elements array that we
534   can call re-alloc on, and assumed that we can re-use the data pointers from
535   the passed in additional values. Use with care!
536
537   returns 0 on success, -1 on failure (and sets errno)
538 */
539 static int ltdb_msg_add_element(struct ldb_message *msg,
540                                 struct ldb_message_element *el)
541 {
542         struct ldb_message_element *e2;
543         unsigned int i;
544
545         if (el->num_values == 0) {
546                 /* nothing to do here - we don't add empty elements */
547                 return 0;
548         }
549
550         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
551                               msg->num_elements+1);
552         if (!e2) {
553                 errno = ENOMEM;
554                 return -1;
555         }
556
557         msg->elements = e2;
558
559         e2 = &msg->elements[msg->num_elements];
560
561         e2->name = el->name;
562         e2->flags = el->flags;
563         e2->values = talloc_array(msg->elements,
564                                   struct ldb_val, el->num_values);
565         if (!e2->values) {
566                 errno = ENOMEM;
567                 return -1;
568         }
569         for (i=0;i<el->num_values;i++) {
570                 e2->values[i] = el->values[i];
571         }
572         e2->num_values = el->num_values;
573
574         ++msg->num_elements;
575
576         return 0;
577 }
578
579 /*
580   delete all elements having a specified attribute name
581 */
582 static int msg_delete_attribute(struct ldb_module *module,
583                                 struct ldb_message *msg, const char *name)
584 {
585         unsigned int i;
586         int ret;
587         struct ldb_message_element *el;
588
589         el = ldb_msg_find_element(msg, name);
590         if (el == NULL) {
591                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
592         }
593         i = el - msg->elements;
594
595         ret = ltdb_index_del_element(module, msg->dn, el);
596         if (ret != LDB_SUCCESS) {
597                 return ret;
598         }
599
600         talloc_free(el->values);
601         if (msg->num_elements > (i+1)) {
602                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
603         }
604         msg->num_elements--;
605         msg->elements = talloc_realloc(msg, msg->elements,
606                                        struct ldb_message_element,
607                                        msg->num_elements);
608         return LDB_SUCCESS;
609 }
610
611 /*
612   delete all elements matching an attribute name/value
613
614   return LDB Error on failure
615 */
616 static int msg_delete_element(struct ldb_module *module,
617                               struct ldb_message *msg,
618                               const char *name,
619                               const struct ldb_val *val)
620 {
621         struct ldb_context *ldb = ldb_module_get_ctx(module);
622         unsigned int i;
623         int found, ret;
624         struct ldb_message_element *el;
625         const struct ldb_schema_attribute *a;
626
627         found = find_element(msg, name);
628         if (found == -1) {
629                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
630         }
631
632         i = (unsigned int) found;
633         el = &(msg->elements[i]);
634
635         a = ldb_schema_attribute_by_name(ldb, el->name);
636
637         for (i=0;i<el->num_values;i++) {
638                 bool matched;
639                 if (a->syntax->operator_fn) {
640                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
641                                                      &el->values[i], val, &matched);
642                         if (ret != LDB_SUCCESS) return ret;
643                 } else {
644                         matched = (a->syntax->comparison_fn(ldb, ldb,
645                                                             &el->values[i], val) == 0);
646                 }
647                 if (matched) {
648                         if (el->num_values == 1) {
649                                 return msg_delete_attribute(module, msg, name);
650                         }
651
652                         ret = ltdb_index_del_value(module, msg->dn, el, i);
653                         if (ret != LDB_SUCCESS) {
654                                 return ret;
655                         }
656
657                         if (i<el->num_values-1) {
658                                 memmove(&el->values[i], &el->values[i+1],
659                                         sizeof(el->values[i])*
660                                                 (el->num_values-(i+1)));
661                         }
662                         el->num_values--;
663
664                         /* per definition we find in a canonicalised message an
665                            attribute value only once. So we are finished here */
666                         return LDB_SUCCESS;
667                 }
668         }
669
670         /* Not found */
671         return LDB_ERR_NO_SUCH_ATTRIBUTE;
672 }
673
674
675 /*
676   modify a record - internal interface
677
678   yuck - this is O(n^2). Luckily n is usually small so we probably
679   get away with it, but if we ever have really large attribute lists
680   then we'll need to look at this again
681
682   'req' is optional, and is used to specify controls if supplied
683 */
684 int ltdb_modify_internal(struct ldb_module *module,
685                          const struct ldb_message *msg,
686                          struct ldb_request *req)
687 {
688         struct ldb_context *ldb = ldb_module_get_ctx(module);
689         struct ldb_message *msg2;
690         unsigned int i, j;
691         int ret = LDB_SUCCESS, idx;
692         struct ldb_control *control_permissive = NULL;
693         TALLOC_CTX *mem_ctx = talloc_new(req);
694
695         if (mem_ctx == NULL) {
696                 return ldb_module_oom(module);
697         }
698         
699         if (req) {
700                 control_permissive = ldb_request_get_control(req,
701                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
702         }
703
704         msg2 = ldb_msg_new(mem_ctx);
705         if (msg2 == NULL) {
706                 ret = LDB_ERR_OTHER;
707                 goto done;
708         }
709
710         ret = ltdb_search_dn1(module, msg->dn,
711                               msg2,
712                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
713         if (ret != LDB_SUCCESS) {
714                 goto done;
715         }
716
717         for (i=0; i<msg->num_elements; i++) {
718                 struct ldb_message_element *el = &msg->elements[i], *el2;
719                 struct ldb_val *vals;
720                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
721                 const char *dn;
722                 uint32_t options = 0;
723                 if (control_permissive != NULL) {
724                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
725                 }
726
727                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
728                 case LDB_FLAG_MOD_ADD:
729
730                         if (el->num_values == 0) {
731                                 ldb_asprintf_errstring(ldb,
732                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
733                                                        el->name, ldb_dn_get_linearized(msg2->dn));
734                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
735                                 goto done;
736                         }
737
738                         /* make a copy of the array so that a permissive
739                          * control can remove duplicates without changing the
740                          * original values, but do not copy data as we do not
741                          * need to keep it around once the operation is
742                          * finished */
743                         if (control_permissive) {
744                                 el = talloc(msg2, struct ldb_message_element);
745                                 if (!el) {
746                                         ret = LDB_ERR_OTHER;
747                                         goto done;
748                                 }
749                                 *el = msg->elements[i];
750                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
751                                 if (el->values == NULL) {
752                                         ret = LDB_ERR_OTHER;
753                                         goto done;
754                                 }
755                                 for (j = 0; j < el->num_values; j++) {
756                                         el->values[j] = msg->elements[i].values[j];
757                                 }
758                         }
759
760                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
761                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
762                                                        el->name, ldb_dn_get_linearized(msg2->dn));
763                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
764                                 goto done;
765                         }
766
767                         /* Checks if element already exists */
768                         idx = find_element(msg2, el->name);
769                         if (idx == -1) {
770                                 if (ltdb_msg_add_element(msg2, el) != 0) {
771                                         ret = LDB_ERR_OTHER;
772                                         goto done;
773                                 }
774                                 ret = ltdb_index_add_element(module, msg2->dn,
775                                                              el);
776                                 if (ret != LDB_SUCCESS) {
777                                         goto done;
778                                 }
779                         } else {
780                                 j = (unsigned int) idx;
781                                 el2 = &(msg2->elements[j]);
782
783                                 /* We cannot add another value on a existing one
784                                    if the attribute is single-valued */
785                                 if (ldb_tdb_single_valued(a, el)) {
786                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
787                                                                el->name, ldb_dn_get_linearized(msg2->dn));
788                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
789                                         goto done;
790                                 }
791
792                                 /* Check that values don't exist yet on multi-
793                                    valued attributes or aren't provided twice */
794                                 if (!(el->flags &
795                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
796                                         struct ldb_val *duplicate = NULL;
797                                         ret = ldb_msg_find_common_values(ldb,
798                                                                          msg2,
799                                                                          el,
800                                                                          el2,
801                                                                          options);
802
803                                         if (ret ==
804                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
805                                                 ldb_asprintf_errstring(ldb,
806                                                         "attribute '%s': value "
807                                                         "#%u on '%s' already "
808                                                         "exists", el->name, j,
809                                                         ldb_dn_get_linearized(msg2->dn));
810                                                 goto done;
811                                         } else if (ret != LDB_SUCCESS) {
812                                                 goto done;
813                                         }
814
815                                         ret = ldb_msg_find_duplicate_val(
816                                                 ldb, msg2, el, &duplicate, 0);
817                                         if (ret != LDB_SUCCESS) {
818                                                 goto done;
819                                         }
820                                         if (duplicate != NULL) {
821                                                 ldb_asprintf_errstring(
822                                                         ldb,
823                                                         "attribute '%s': value "
824                                                         "'%.*s' on '%s' "
825                                                         "provided more than "
826                                                         "once in ADD",
827                                                         el->name,
828                                                         (int)duplicate->length,
829                                                         duplicate->data,
830                                                         ldb_dn_get_linearized(msg->dn));
831                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
832                                                 goto done;
833                                         }
834                                 }
835
836                                 /* Now combine existing and new values to a new
837                                    attribute record */
838                                 vals = talloc_realloc(msg2->elements,
839                                                       el2->values, struct ldb_val,
840                                                       el2->num_values + el->num_values);
841                                 if (vals == NULL) {
842                                         ldb_oom(ldb);
843                                         ret = LDB_ERR_OTHER;
844                                         goto done;
845                                 }
846
847                                 for (j=0; j<el->num_values; j++) {
848                                         vals[el2->num_values + j] =
849                                                 ldb_val_dup(vals, &el->values[j]);
850                                 }
851
852                                 el2->values = vals;
853                                 el2->num_values += el->num_values;
854
855                                 ret = ltdb_index_add_element(module, msg2->dn, el);
856                                 if (ret != LDB_SUCCESS) {
857                                         goto done;
858                                 }
859                         }
860
861                         break;
862
863                 case LDB_FLAG_MOD_REPLACE:
864
865                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
866                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
867                                                        el->name, ldb_dn_get_linearized(msg2->dn));
868                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
869                                 goto done;
870                         }
871
872                         /*
873                          * We don't need to check this if we have been
874                          * pre-screened by the repl_meta_data module
875                          * in Samba, or someone else who can claim to
876                          * know what they are doing. 
877                          */
878                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
879                                 struct ldb_val *duplicate = NULL;
880
881                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
882                                                                  &duplicate, 0);
883                                 if (ret != LDB_SUCCESS) {
884                                         goto done;
885                                 }
886                                 if (duplicate != NULL) {
887                                         ldb_asprintf_errstring(
888                                                 ldb,
889                                                 "attribute '%s': value '%.*s' "
890                                                 "on '%s' provided more than "
891                                                 "once in REPLACE",
892                                                 el->name,
893                                                 (int)duplicate->length,
894                                                 duplicate->data,
895                                                 ldb_dn_get_linearized(msg2->dn));
896                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
897                                         goto done;
898                                 }
899                         }
900
901                         /* Checks if element already exists */
902                         idx = find_element(msg2, el->name);
903                         if (idx != -1) {
904                                 j = (unsigned int) idx;
905                                 el2 = &(msg2->elements[j]);
906
907                                 /* we consider two elements to be
908                                  * equal only if the order
909                                  * matches. This allows dbcheck to
910                                  * fix the ordering on attributes
911                                  * where order matters, such as
912                                  * objectClass
913                                  */
914                                 if (ldb_msg_element_equal_ordered(el, el2)) {
915                                         continue;
916                                 }
917
918                                 /* Delete the attribute if it exists in the DB */
919                                 if (msg_delete_attribute(module, msg2,
920                                                          el->name) != 0) {
921                                         ret = LDB_ERR_OTHER;
922                                         goto done;
923                                 }
924                         }
925
926                         /* Recreate it with the new values */
927                         if (ltdb_msg_add_element(msg2, el) != 0) {
928                                 ret = LDB_ERR_OTHER;
929                                 goto done;
930                         }
931
932                         ret = ltdb_index_add_element(module, msg2->dn, el);
933                         if (ret != LDB_SUCCESS) {
934                                 goto done;
935                         }
936
937                         break;
938
939                 case LDB_FLAG_MOD_DELETE:
940                         dn = ldb_dn_get_linearized(msg2->dn);
941                         if (dn == NULL) {
942                                 ret = LDB_ERR_OTHER;
943                                 goto done;
944                         }
945
946                         if (msg->elements[i].num_values == 0) {
947                                 /* Delete the whole attribute */
948                                 ret = msg_delete_attribute(module, msg2,
949                                                            msg->elements[i].name);
950                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
951                                     control_permissive) {
952                                         ret = LDB_SUCCESS;
953                                 } else {
954                                         ldb_asprintf_errstring(ldb,
955                                                                "attribute '%s': no such attribute for delete on '%s'",
956                                                                msg->elements[i].name, dn);
957                                 }
958                                 if (ret != LDB_SUCCESS) {
959                                         goto done;
960                                 }
961                         } else {
962                                 /* Delete specified values from an attribute */
963                                 for (j=0; j < msg->elements[i].num_values; j++) {
964                                         ret = msg_delete_element(module,
965                                                                  msg2,
966                                                                  msg->elements[i].name,
967                                                                  &msg->elements[i].values[j]);
968                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
969                                             control_permissive) {
970                                                 ret = LDB_SUCCESS;
971                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
972                                                 ldb_asprintf_errstring(ldb,
973                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
974                                                                        msg->elements[i].name, dn);
975                                         }
976                                         if (ret != LDB_SUCCESS) {
977                                                 goto done;
978                                         }
979                                 }
980                         }
981                         break;
982                 default:
983                         ldb_asprintf_errstring(ldb,
984                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
985                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
986                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
987                         ret = LDB_ERR_PROTOCOL_ERROR;
988                         goto done;
989                 }
990         }
991
992         ret = ltdb_store(module, msg2, TDB_MODIFY);
993         if (ret != LDB_SUCCESS) {
994                 goto done;
995         }
996
997         ret = ltdb_modified(module, msg2->dn);
998         if (ret != LDB_SUCCESS) {
999                 goto done;
1000         }
1001
1002 done:
1003         TALLOC_FREE(mem_ctx);
1004         return ret;
1005 }
1006
1007 /*
1008   modify a record
1009 */
1010 static int ltdb_modify(struct ltdb_context *ctx)
1011 {
1012         struct ldb_module *module = ctx->module;
1013         struct ldb_request *req = ctx->req;
1014         int ret = LDB_SUCCESS;
1015
1016         ret = ltdb_check_special_dn(module, req->op.mod.message);
1017         if (ret != LDB_SUCCESS) {
1018                 return ret;
1019         }
1020
1021         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1022
1023         if (ltdb_cache_load(module) != 0) {
1024                 return LDB_ERR_OPERATIONS_ERROR;
1025         }
1026
1027         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1028
1029         return ret;
1030 }
1031
1032 /*
1033   rename a record
1034 */
1035 static int ltdb_rename(struct ltdb_context *ctx)
1036 {
1037         struct ldb_module *module = ctx->module;
1038         void *data = ldb_module_get_private(module);
1039         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1040         struct ldb_request *req = ctx->req;
1041         struct ldb_message *msg;
1042         int ret = LDB_SUCCESS;
1043         TDB_DATA tdb_key, tdb_key_old;
1044
1045         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1046
1047         if (ltdb_cache_load(ctx->module) != 0) {
1048                 return LDB_ERR_OPERATIONS_ERROR;
1049         }
1050
1051         msg = ldb_msg_new(ctx);
1052         if (msg == NULL) {
1053                 return LDB_ERR_OPERATIONS_ERROR;
1054         }
1055
1056         /* we need to fetch the old record to re-add under the new name */
1057         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1058                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1059         if (ret != LDB_SUCCESS) {
1060                 /* not finding the old record is an error */
1061                 return ret;
1062         }
1063
1064         /* We need to, before changing the DB, check if the new DN
1065          * exists, so we can return this error to the caller with an
1066          * unmodified DB */
1067         tdb_key = ltdb_key(module, req->op.rename.newdn);
1068         if (!tdb_key.dptr) {
1069                 talloc_free(msg);
1070                 return LDB_ERR_OPERATIONS_ERROR;
1071         }
1072
1073         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1074         if (!tdb_key_old.dptr) {
1075                 talloc_free(msg);
1076                 talloc_free(tdb_key.dptr);
1077                 return LDB_ERR_OPERATIONS_ERROR;
1078         }
1079
1080         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1081         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1082                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1083                         talloc_free(tdb_key_old.dptr);
1084                         talloc_free(tdb_key.dptr);
1085                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1086                                                "Entry %s already exists",
1087                                                ldb_dn_get_linearized(req->op.rename.newdn));
1088                         /* finding the new record already in the DB is an error */
1089                         talloc_free(msg);
1090                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1091                 }
1092         }
1093         talloc_free(tdb_key_old.dptr);
1094         talloc_free(tdb_key.dptr);
1095
1096         /* Always delete first then add, to avoid conflicts with
1097          * unique indexes. We rely on the transaction to make this
1098          * atomic
1099          */
1100         ret = ltdb_delete_internal(module, msg->dn);
1101         if (ret != LDB_SUCCESS) {
1102                 talloc_free(msg);
1103                 return ret;
1104         }
1105
1106         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1107         if (msg->dn == NULL) {
1108                 talloc_free(msg);
1109                 return LDB_ERR_OPERATIONS_ERROR;
1110         }
1111
1112         /* We don't check single value as we can have more than 1 with
1113          * deleted attributes. We could go through all elements but that's
1114          * maybe not the most efficient way
1115          */
1116         ret = ltdb_add_internal(module, msg, false);
1117
1118         talloc_free(msg);
1119
1120         return ret;
1121 }
1122
1123 static int ltdb_start_trans(struct ldb_module *module)
1124 {
1125         void *data = ldb_module_get_private(module);
1126         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1127
1128         if (tdb_transaction_start(ltdb->tdb) != 0) {
1129                 return ltdb_err_map(tdb_error(ltdb->tdb));
1130         }
1131
1132         ltdb->in_transaction++;
1133
1134         ltdb_index_transaction_start(module);
1135
1136         return LDB_SUCCESS;
1137 }
1138
1139 static int ltdb_prepare_commit(struct ldb_module *module)
1140 {
1141         int ret;
1142         void *data = ldb_module_get_private(module);
1143         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1144
1145         if (ltdb->in_transaction != 1) {
1146                 return LDB_SUCCESS;
1147         }
1148
1149         ret = ltdb_index_transaction_commit(module);
1150         if (ret != LDB_SUCCESS) {
1151                 tdb_transaction_cancel(ltdb->tdb);
1152                 ltdb->in_transaction--;
1153                 return ret;
1154         }
1155
1156         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1157                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1158                 ltdb->in_transaction--;
1159                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1160                                        "Failure during tdb_transaction_prepare_commit(): %s -> %s",
1161                                        tdb_errorstr(ltdb->tdb),
1162                                        ldb_strerror(ret));
1163                 return ret;
1164         }
1165
1166         ltdb->prepared_commit = true;
1167
1168         return LDB_SUCCESS;
1169 }
1170
1171 static int ltdb_end_trans(struct ldb_module *module)
1172 {
1173         int ret;
1174         void *data = ldb_module_get_private(module);
1175         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1176
1177         if (!ltdb->prepared_commit) {
1178                 ret = ltdb_prepare_commit(module);
1179                 if (ret != LDB_SUCCESS) {
1180                         return ret;
1181                 }
1182         }
1183
1184         ltdb->in_transaction--;
1185         ltdb->prepared_commit = false;
1186
1187         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1188                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1189                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1190                                        "Failure during tdb_transaction_commit(): %s -> %s",
1191                                        tdb_errorstr(ltdb->tdb),
1192                                        ldb_strerror(ret));
1193                 return ret;
1194         }
1195
1196         return LDB_SUCCESS;
1197 }
1198
1199 static int ltdb_del_trans(struct ldb_module *module)
1200 {
1201         void *data = ldb_module_get_private(module);
1202         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1203
1204         ltdb->in_transaction--;
1205
1206         if (ltdb_index_transaction_cancel(module) != 0) {
1207                 tdb_transaction_cancel(ltdb->tdb);
1208                 return ltdb_err_map(tdb_error(ltdb->tdb));
1209         }
1210
1211         tdb_transaction_cancel(ltdb->tdb);
1212         return LDB_SUCCESS;
1213 }
1214
1215 /*
1216   return sequenceNumber from @BASEINFO
1217 */
1218 static int ltdb_sequence_number(struct ltdb_context *ctx,
1219                                 struct ldb_extended **ext)
1220 {
1221         struct ldb_context *ldb;
1222         struct ldb_module *module = ctx->module;
1223         struct ldb_request *req = ctx->req;
1224         TALLOC_CTX *tmp_ctx = NULL;
1225         struct ldb_seqnum_request *seq;
1226         struct ldb_seqnum_result *res;
1227         struct ldb_message *msg = NULL;
1228         struct ldb_dn *dn;
1229         const char *date;
1230         int ret = LDB_SUCCESS;
1231
1232         ldb = ldb_module_get_ctx(module);
1233
1234         seq = talloc_get_type(req->op.extended.data,
1235                                 struct ldb_seqnum_request);
1236         if (seq == NULL) {
1237                 return LDB_ERR_OPERATIONS_ERROR;
1238         }
1239
1240         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1241
1242         if (ltdb_lock_read(module) != 0) {
1243                 return LDB_ERR_OPERATIONS_ERROR;
1244         }
1245
1246         res = talloc_zero(req, struct ldb_seqnum_result);
1247         if (res == NULL) {
1248                 ret = LDB_ERR_OPERATIONS_ERROR;
1249                 goto done;
1250         }
1251
1252         tmp_ctx = talloc_new(req);
1253         if (tmp_ctx == NULL) {
1254                 ret = LDB_ERR_OPERATIONS_ERROR;
1255                 goto done;
1256         }
1257
1258         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1259         if (dn == NULL) {
1260                 ret = LDB_ERR_OPERATIONS_ERROR;
1261                 goto done;
1262         }
1263
1264         msg = ldb_msg_new(tmp_ctx);
1265         if (msg == NULL) {
1266                 ret = LDB_ERR_OPERATIONS_ERROR;
1267                 goto done;
1268         }
1269
1270         ret = ltdb_search_dn1(module, dn, msg, 0);
1271         if (ret != LDB_SUCCESS) {
1272                 goto done;
1273         }
1274
1275         switch (seq->type) {
1276         case LDB_SEQ_HIGHEST_SEQ:
1277                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1278                 break;
1279         case LDB_SEQ_NEXT:
1280                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1281                 res->seq_num++;
1282                 break;
1283         case LDB_SEQ_HIGHEST_TIMESTAMP:
1284                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1285                 if (date) {
1286                         res->seq_num = ldb_string_to_time(date);
1287                 } else {
1288                         res->seq_num = 0;
1289                         /* zero is as good as anything when we don't know */
1290                 }
1291                 break;
1292         }
1293
1294         *ext = talloc_zero(req, struct ldb_extended);
1295         if (*ext == NULL) {
1296                 ret = LDB_ERR_OPERATIONS_ERROR;
1297                 goto done;
1298         }
1299         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1300         (*ext)->data = talloc_steal(*ext, res);
1301
1302 done:
1303         talloc_free(tmp_ctx);
1304         ltdb_unlock_read(module);
1305         return ret;
1306 }
1307
1308 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1309 {
1310         struct ldb_context *ldb;
1311         struct ldb_request *req;
1312         struct ldb_reply *ares;
1313
1314         ldb = ldb_module_get_ctx(ctx->module);
1315         req = ctx->req;
1316
1317         /* if we already returned an error just return */
1318         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1319                 return;
1320         }
1321
1322         ares = talloc_zero(req, struct ldb_reply);
1323         if (!ares) {
1324                 ldb_oom(ldb);
1325                 req->callback(req, NULL);
1326                 return;
1327         }
1328         ares->type = LDB_REPLY_DONE;
1329         ares->error = error;
1330
1331         req->callback(req, ares);
1332 }
1333
1334 static void ltdb_timeout(struct tevent_context *ev,
1335                           struct tevent_timer *te,
1336                           struct timeval t,
1337                           void *private_data)
1338 {
1339         struct ltdb_context *ctx;
1340         ctx = talloc_get_type(private_data, struct ltdb_context);
1341
1342         if (!ctx->request_terminated) {
1343                 /* request is done now */
1344                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1345         }
1346
1347         if (ctx->spy) {
1348                 /* neutralize the spy */
1349                 ctx->spy->ctx = NULL;
1350                 ctx->spy = NULL;
1351         }
1352         talloc_free(ctx);
1353 }
1354
1355 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1356                                         struct ldb_extended *ext,
1357                                         int error)
1358 {
1359         struct ldb_context *ldb;
1360         struct ldb_request *req;
1361         struct ldb_reply *ares;
1362
1363         ldb = ldb_module_get_ctx(ctx->module);
1364         req = ctx->req;
1365
1366         /* if we already returned an error just return */
1367         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1368                 return;
1369         }
1370
1371         ares = talloc_zero(req, struct ldb_reply);
1372         if (!ares) {
1373                 ldb_oom(ldb);
1374                 req->callback(req, NULL);
1375                 return;
1376         }
1377         ares->type = LDB_REPLY_DONE;
1378         ares->response = ext;
1379         ares->error = error;
1380
1381         req->callback(req, ares);
1382 }
1383
1384 static void ltdb_handle_extended(struct ltdb_context *ctx)
1385 {
1386         struct ldb_extended *ext = NULL;
1387         int ret;
1388
1389         if (strcmp(ctx->req->op.extended.oid,
1390                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1391                 /* get sequence number */
1392                 ret = ltdb_sequence_number(ctx, &ext);
1393         } else {
1394                 /* not recognized */
1395                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1396         }
1397
1398         ltdb_request_extended_done(ctx, ext, ret);
1399 }
1400
1401 static void ltdb_callback(struct tevent_context *ev,
1402                           struct tevent_timer *te,
1403                           struct timeval t,
1404                           void *private_data)
1405 {
1406         struct ltdb_context *ctx;
1407         int ret;
1408
1409         ctx = talloc_get_type(private_data, struct ltdb_context);
1410
1411         if (ctx->request_terminated) {
1412                 goto done;
1413         }
1414
1415         switch (ctx->req->operation) {
1416         case LDB_SEARCH:
1417                 ret = ltdb_search(ctx);
1418                 break;
1419         case LDB_ADD:
1420                 ret = ltdb_add(ctx);
1421                 break;
1422         case LDB_MODIFY:
1423                 ret = ltdb_modify(ctx);
1424                 break;
1425         case LDB_DELETE:
1426                 ret = ltdb_delete(ctx);
1427                 break;
1428         case LDB_RENAME:
1429                 ret = ltdb_rename(ctx);
1430                 break;
1431         case LDB_EXTENDED:
1432                 ltdb_handle_extended(ctx);
1433                 goto done;
1434         default:
1435                 /* no other op supported */
1436                 ret = LDB_ERR_PROTOCOL_ERROR;
1437         }
1438
1439         if (!ctx->request_terminated) {
1440                 /* request is done now */
1441                 ltdb_request_done(ctx, ret);
1442         }
1443
1444 done:
1445         if (ctx->spy) {
1446                 /* neutralize the spy */
1447                 ctx->spy->ctx = NULL;
1448                 ctx->spy = NULL;
1449         }
1450         talloc_free(ctx);
1451 }
1452
1453 static int ltdb_request_destructor(void *ptr)
1454 {
1455         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1456
1457         if (spy->ctx != NULL) {
1458                 spy->ctx->spy = NULL;
1459                 spy->ctx->request_terminated = true;
1460                 spy->ctx = NULL;
1461         }
1462
1463         return 0;
1464 }
1465
1466 static int ltdb_handle_request(struct ldb_module *module,
1467                                struct ldb_request *req)
1468 {
1469         struct ldb_control *control_permissive;
1470         struct ldb_context *ldb;
1471         struct tevent_context *ev;
1472         struct ltdb_context *ac;
1473         struct tevent_timer *te;
1474         struct timeval tv;
1475         unsigned int i;
1476
1477         ldb = ldb_module_get_ctx(module);
1478
1479         control_permissive = ldb_request_get_control(req,
1480                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1481
1482         for (i = 0; req->controls && req->controls[i]; i++) {
1483                 if (req->controls[i]->critical &&
1484                     req->controls[i] != control_permissive) {
1485                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1486                                                req->controls[i]->oid);
1487                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1488                 }
1489         }
1490
1491         if (req->starttime == 0 || req->timeout == 0) {
1492                 ldb_set_errstring(ldb, "Invalid timeout settings");
1493                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1494         }
1495
1496         ev = ldb_handle_get_event_context(req->handle);
1497
1498         ac = talloc_zero(ldb, struct ltdb_context);
1499         if (ac == NULL) {
1500                 ldb_oom(ldb);
1501                 return LDB_ERR_OPERATIONS_ERROR;
1502         }
1503
1504         ac->module = module;
1505         ac->req = req;
1506
1507         tv.tv_sec = 0;
1508         tv.tv_usec = 0;
1509         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1510         if (NULL == te) {
1511                 talloc_free(ac);
1512                 return LDB_ERR_OPERATIONS_ERROR;
1513         }
1514
1515         if (req->timeout > 0) {
1516                 tv.tv_sec = req->starttime + req->timeout;
1517                 tv.tv_usec = 0;
1518                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1519                                                      ltdb_timeout, ac);
1520                 if (NULL == ac->timeout_event) {
1521                         talloc_free(ac);
1522                         return LDB_ERR_OPERATIONS_ERROR;
1523                 }
1524         }
1525
1526         /* set a spy so that we do not try to use the request context
1527          * if it is freed before ltdb_callback fires */
1528         ac->spy = talloc(req, struct ltdb_req_spy);
1529         if (NULL == ac->spy) {
1530                 talloc_free(ac);
1531                 return LDB_ERR_OPERATIONS_ERROR;
1532         }
1533         ac->spy->ctx = ac;
1534
1535         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1536
1537         return LDB_SUCCESS;
1538 }
1539
1540 static int ltdb_init_rootdse(struct ldb_module *module)
1541 {
1542         /* ignore errors on this - we expect it for non-sam databases */
1543         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1544
1545         /* there can be no module beyond the backend, just return */
1546         return LDB_SUCCESS;
1547 }
1548
1549 static const struct ldb_module_ops ltdb_ops = {
1550         .name              = "tdb",
1551         .init_context      = ltdb_init_rootdse,
1552         .search            = ltdb_handle_request,
1553         .add               = ltdb_handle_request,
1554         .modify            = ltdb_handle_request,
1555         .del               = ltdb_handle_request,
1556         .rename            = ltdb_handle_request,
1557         .extended          = ltdb_handle_request,
1558         .start_transaction = ltdb_start_trans,
1559         .end_transaction   = ltdb_end_trans,
1560         .prepare_commit    = ltdb_prepare_commit,
1561         .del_transaction   = ltdb_del_trans,
1562         .read_lock         = ltdb_lock_read,
1563         .read_unlock       = ltdb_unlock_read,
1564 };
1565
1566 /*
1567   connect to the database
1568 */
1569 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1570                         unsigned int flags, const char *options[],
1571                         struct ldb_module **_module)
1572 {
1573         struct ldb_module *module;
1574         const char *path;
1575         int tdb_flags, open_flags;
1576         struct ltdb_private *ltdb;
1577
1578         /*
1579          * We hold locks, so we must use a private event context
1580          * on each returned handle
1581          */
1582
1583         ldb_set_require_private_event_context(ldb);
1584
1585         /* parse the url */
1586         if (strchr(url, ':')) {
1587                 if (strncmp(url, "tdb://", 6) != 0) {
1588                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1589                                   "Invalid tdb URL '%s'", url);
1590                         return LDB_ERR_OPERATIONS_ERROR;
1591                 }
1592                 path = url+6;
1593         } else {
1594                 path = url;
1595         }
1596
1597         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1598
1599         /* check for the 'nosync' option */
1600         if (flags & LDB_FLG_NOSYNC) {
1601                 tdb_flags |= TDB_NOSYNC;
1602         }
1603
1604         /* and nommap option */
1605         if (flags & LDB_FLG_NOMMAP) {
1606                 tdb_flags |= TDB_NOMMAP;
1607         }
1608
1609         if (flags & LDB_FLG_RDONLY) {
1610                 open_flags = O_RDONLY;
1611         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1612                 open_flags = O_RDWR;
1613         } else {
1614                 open_flags = O_CREAT | O_RDWR;
1615         }
1616
1617         ltdb = talloc_zero(ldb, struct ltdb_private);
1618         if (!ltdb) {
1619                 ldb_oom(ldb);
1620                 return LDB_ERR_OPERATIONS_ERROR;
1621         }
1622
1623         /* note that we use quite a large default hash size */
1624         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1625                                    tdb_flags, open_flags,
1626                                    ldb_get_create_perms(ldb), ldb);
1627         if (!ltdb->tdb) {
1628                 ldb_asprintf_errstring(ldb,
1629                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1630                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1631                           "Unable to open tdb '%s': %s", path, strerror(errno));
1632                 talloc_free(ltdb);
1633                 if (errno == EACCES || errno == EPERM) {
1634                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1635                 }
1636                 return LDB_ERR_OPERATIONS_ERROR;
1637         }
1638
1639         if (getenv("LDB_WARN_UNINDEXED")) {
1640                 ltdb->warn_unindexed = true;
1641         }
1642
1643         if (getenv("LDB_WARN_REINDEX")) {
1644                 ltdb->warn_reindex = true;
1645         }
1646
1647         ltdb->sequence_number = 0;
1648
1649         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1650         if (!module) {
1651                 ldb_oom(ldb);
1652                 talloc_free(ltdb);
1653                 return LDB_ERR_OPERATIONS_ERROR;
1654         }
1655         ldb_module_set_private(module, ltdb);
1656         talloc_steal(module, ltdb);
1657
1658         if (ltdb_cache_load(module) != 0) {
1659                 ldb_asprintf_errstring(ldb,
1660                                        "Unable to load ltdb cache records of tdb '%s'", path);
1661                 talloc_free(module);
1662                 return LDB_ERR_OPERATIONS_ERROR;
1663         }
1664
1665         *_module = module;
1666         return LDB_SUCCESS;
1667 }
1668
1669 int ldb_tdb_init(const char *version)
1670 {
1671         LDB_MODULE_CHECK_VERSION(version);
1672         return ldb_register_backend("tdb", ltdb_connect, false);
1673 }