ad15a5e70a549807caabe2be466ed7827977e25b
[sfrench/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 ltdb->read_lock_count--;
123                 return 0;
124         }
125         ltdb->read_lock_count--;
126         return 0;
127 }
128
129
130 /*
131   form a TDB_DATA for a record key
132   caller frees
133
134   note that the key for a record can depend on whether the
135   dn refers to a case sensitive index record or not
136 */
137 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
138 {
139         struct ldb_context *ldb = ldb_module_get_ctx(module);
140         TDB_DATA key;
141         char *key_str = NULL;
142         const char *dn_folded = NULL;
143
144         /*
145           most DNs are case insensitive. The exception is index DNs for
146           case sensitive attributes
147
148           there are 3 cases dealt with in this code:
149
150           1) if the dn doesn't start with @ then uppercase the attribute
151              names and the attributes values of case insensitive attributes
152           2) if the dn starts with @ then leave it alone -
153              the indexing code handles the rest
154         */
155
156         dn_folded = ldb_dn_get_casefold(dn);
157         if (!dn_folded) {
158                 goto failed;
159         }
160
161         key_str = talloc_strdup(ldb, "DN=");
162         if (!key_str) {
163                 goto failed;
164         }
165
166         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
167         if (!key_str) {
168                 goto failed;
169         }
170
171         key.dptr = (uint8_t *)key_str;
172         key.dsize = strlen(key_str) + 1;
173
174         return key;
175
176 failed:
177         errno = ENOMEM;
178         key.dptr = NULL;
179         key.dsize = 0;
180         return key;
181 }
182
183 /*
184   check special dn's have valid attributes
185   currently only @ATTRIBUTES is checked
186 */
187 static int ltdb_check_special_dn(struct ldb_module *module,
188                                  const struct ldb_message *msg)
189 {
190         struct ldb_context *ldb = ldb_module_get_ctx(module);
191         unsigned int i, j;
192
193         if (! ldb_dn_is_special(msg->dn) ||
194             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
195                 return LDB_SUCCESS;
196         }
197
198         /* we have @ATTRIBUTES, let's check attributes are fine */
199         /* should we check that we deny multivalued attributes ? */
200         for (i = 0; i < msg->num_elements; i++) {
201                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
202
203                 for (j = 0; j < msg->elements[i].num_values; j++) {
204                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
205                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
206                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
207                         }
208                 }
209         }
210
211         return LDB_SUCCESS;
212 }
213
214
215 /*
216   we've made a modification to a dn - possibly reindex and
217   update sequence number
218 */
219 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
220 {
221         int ret = LDB_SUCCESS;
222         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
223
224         /* only allow modifies inside a transaction, otherwise the
225          * ldb is unsafe */
226         if (ltdb->in_transaction == 0) {
227                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
228                 return LDB_ERR_OPERATIONS_ERROR;
229         }
230
231         if (ldb_dn_is_special(dn) &&
232             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
233              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
234         {
235                 if (ltdb->warn_reindex) {
236                         ldb_debug(ldb_module_get_ctx(module),
237                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
238                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
239                 }
240                 ret = ltdb_reindex(module);
241         }
242
243         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
244         if (ret == LDB_SUCCESS &&
245             !(ldb_dn_is_special(dn) &&
246               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
247                 ret = ltdb_increase_sequence_number(module);
248         }
249
250         /* If the modify was to @OPTIONS, reload the cache */
251         if (ret == LDB_SUCCESS &&
252             ldb_dn_is_special(dn) &&
253             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
254                 ret = ltdb_cache_reload(module);
255         }
256
257         return ret;
258 }
259
260 /*
261   store a record into the db
262 */
263 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
264 {
265         void *data = ldb_module_get_private(module);
266         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
267         TDB_DATA tdb_key, tdb_data;
268         struct ldb_val ldb_data;
269         int ret = LDB_SUCCESS;
270
271         tdb_key = ltdb_key(module, msg->dn);
272         if (tdb_key.dptr == NULL) {
273                 return LDB_ERR_OTHER;
274         }
275
276         ret = ldb_pack_data(ldb_module_get_ctx(module),
277                             msg, &ldb_data);
278         if (ret == -1) {
279                 talloc_free(tdb_key.dptr);
280                 return LDB_ERR_OTHER;
281         }
282
283         tdb_data.dptr = ldb_data.data;
284         tdb_data.dsize = ldb_data.length;
285
286         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
287         if (ret != 0) {
288                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
289                 goto done;
290         }
291
292 done:
293         talloc_free(tdb_key.dptr);
294         talloc_free(ldb_data.data);
295
296         return ret;
297 }
298
299
300 /*
301   check if a attribute is a single valued, for a given element
302  */
303 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
304                                   struct ldb_message_element *el)
305 {
306         if (!a) return false;
307         if (el != NULL) {
308                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
309                         /* override from a ldb module, for example
310                            used for the description field, which is
311                            marked multi-valued in the schema but which
312                            should not actually accept multiple
313                            values */
314                         return true;
315                 }
316                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
317                         /* override from a ldb module, for example used for
318                            deleted linked attribute entries */
319                         return false;
320                 }
321         }
322         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
323                 return true;
324         }
325         return false;
326 }
327
328 static int ltdb_add_internal(struct ldb_module *module,
329                              const struct ldb_message *msg,
330                              bool check_single_value)
331 {
332         struct ldb_context *ldb = ldb_module_get_ctx(module);
333         int ret = LDB_SUCCESS;
334         unsigned int i;
335
336         for (i=0;i<msg->num_elements;i++) {
337                 struct ldb_message_element *el = &msg->elements[i];
338                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
339
340                 if (el->num_values == 0) {
341                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
342                                                el->name, ldb_dn_get_linearized(msg->dn));
343                         return LDB_ERR_CONSTRAINT_VIOLATION;
344                 }
345                 if (check_single_value &&
346                                 el->num_values > 1 &&
347                                 ldb_tdb_single_valued(a, el)) {
348                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
349                                                el->name, ldb_dn_get_linearized(msg->dn));
350                         return LDB_ERR_CONSTRAINT_VIOLATION;
351                 }
352
353                 /* Do not check "@ATTRIBUTES" for duplicated values */
354                 if (ldb_dn_is_special(msg->dn) &&
355                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
356                         continue;
357                 }
358
359                 if (check_single_value &&
360                     !(el->flags &
361                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
362                         struct ldb_val *duplicate = NULL;
363
364                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
365                                                          el, &duplicate, 0);
366                         if (ret != LDB_SUCCESS) {
367                                 return ret;
368                         }
369                         if (duplicate != NULL) {
370                                 ldb_asprintf_errstring(
371                                         ldb,
372                                         "attribute '%s': value '%.*s' on '%s' "
373                                         "provided more than once in ADD object",
374                                         el->name,
375                                         (int)duplicate->length,
376                                         duplicate->data,
377                                         ldb_dn_get_linearized(msg->dn));
378                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
379                         }
380                 }
381         }
382
383         ret = ltdb_store(module, msg, TDB_INSERT);
384         if (ret != LDB_SUCCESS) {
385                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
386                         ldb_asprintf_errstring(ldb,
387                                                "Entry %s already exists",
388                                                ldb_dn_get_linearized(msg->dn));
389                 }
390                 return ret;
391         }
392
393         ret = ltdb_index_add_new(module, msg);
394         if (ret != LDB_SUCCESS) {
395                 return ret;
396         }
397
398         ret = ltdb_modified(module, msg->dn);
399
400         return ret;
401 }
402
403 /*
404   add a record to the database
405 */
406 static int ltdb_add(struct ltdb_context *ctx)
407 {
408         struct ldb_module *module = ctx->module;
409         struct ldb_request *req = ctx->req;
410         int ret = LDB_SUCCESS;
411
412         ret = ltdb_check_special_dn(module, req->op.add.message);
413         if (ret != LDB_SUCCESS) {
414                 return ret;
415         }
416
417         ldb_request_set_state(req, LDB_ASYNC_PENDING);
418
419         if (ltdb_cache_load(module) != 0) {
420                 return LDB_ERR_OPERATIONS_ERROR;
421         }
422
423         ret = ltdb_add_internal(module, req->op.add.message, true);
424
425         return ret;
426 }
427
428 /*
429   delete a record from the database, not updating indexes (used for deleting
430   index records)
431 */
432 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
433 {
434         void *data = ldb_module_get_private(module);
435         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
436         TDB_DATA tdb_key;
437         int ret;
438
439         tdb_key = ltdb_key(module, dn);
440         if (!tdb_key.dptr) {
441                 return LDB_ERR_OTHER;
442         }
443
444         ret = tdb_delete(ltdb->tdb, tdb_key);
445         talloc_free(tdb_key.dptr);
446
447         if (ret != 0) {
448                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
449         }
450
451         return ret;
452 }
453
454 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
455 {
456         struct ldb_message *msg;
457         int ret = LDB_SUCCESS;
458
459         msg = ldb_msg_new(module);
460         if (msg == NULL) {
461                 return LDB_ERR_OPERATIONS_ERROR;
462         }
463
464         /* in case any attribute of the message was indexed, we need
465            to fetch the old record */
466         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
467         if (ret != LDB_SUCCESS) {
468                 /* not finding the old record is an error */
469                 goto done;
470         }
471
472         ret = ltdb_delete_noindex(module, dn);
473         if (ret != LDB_SUCCESS) {
474                 goto done;
475         }
476
477         /* remove any indexed attributes */
478         ret = ltdb_index_delete(module, msg);
479         if (ret != LDB_SUCCESS) {
480                 goto done;
481         }
482
483         ret = ltdb_modified(module, dn);
484         if (ret != LDB_SUCCESS) {
485                 goto done;
486         }
487
488 done:
489         talloc_free(msg);
490         return ret;
491 }
492
493 /*
494   delete a record from the database
495 */
496 static int ltdb_delete(struct ltdb_context *ctx)
497 {
498         struct ldb_module *module = ctx->module;
499         struct ldb_request *req = ctx->req;
500         int ret = LDB_SUCCESS;
501
502         ldb_request_set_state(req, LDB_ASYNC_PENDING);
503
504         if (ltdb_cache_load(module) != 0) {
505                 return LDB_ERR_OPERATIONS_ERROR;
506         }
507
508         ret = ltdb_delete_internal(module, req->op.del.dn);
509
510         return ret;
511 }
512
513 /*
514   find an element by attribute name. At the moment this does a linear search,
515   it should be re-coded to use a binary search once all places that modify
516   records guarantee sorted order
517
518   return the index of the first matching element if found, otherwise -1
519 */
520 static int find_element(const struct ldb_message *msg, const char *name)
521 {
522         unsigned int i;
523         for (i=0;i<msg->num_elements;i++) {
524                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
525                         return i;
526                 }
527         }
528         return -1;
529 }
530
531
532 /*
533   add an element to an existing record. Assumes a elements array that we
534   can call re-alloc on, and assumed that we can re-use the data pointers from
535   the passed in additional values. Use with care!
536
537   returns 0 on success, -1 on failure (and sets errno)
538 */
539 static int ltdb_msg_add_element(struct ldb_message *msg,
540                                 struct ldb_message_element *el)
541 {
542         struct ldb_message_element *e2;
543         unsigned int i;
544
545         if (el->num_values == 0) {
546                 /* nothing to do here - we don't add empty elements */
547                 return 0;
548         }
549
550         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
551                               msg->num_elements+1);
552         if (!e2) {
553                 errno = ENOMEM;
554                 return -1;
555         }
556
557         msg->elements = e2;
558
559         e2 = &msg->elements[msg->num_elements];
560
561         e2->name = el->name;
562         e2->flags = el->flags;
563         e2->values = talloc_array(msg->elements,
564                                   struct ldb_val, el->num_values);
565         if (!e2->values) {
566                 errno = ENOMEM;
567                 return -1;
568         }
569         for (i=0;i<el->num_values;i++) {
570                 e2->values[i] = el->values[i];
571         }
572         e2->num_values = el->num_values;
573
574         ++msg->num_elements;
575
576         return 0;
577 }
578
579 /*
580   delete all elements having a specified attribute name
581 */
582 static int msg_delete_attribute(struct ldb_module *module,
583                                 struct ldb_message *msg, const char *name)
584 {
585         unsigned int i;
586         int ret;
587         struct ldb_message_element *el;
588
589         el = ldb_msg_find_element(msg, name);
590         if (el == NULL) {
591                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
592         }
593         i = el - msg->elements;
594
595         ret = ltdb_index_del_element(module, msg->dn, el);
596         if (ret != LDB_SUCCESS) {
597                 return ret;
598         }
599
600         talloc_free(el->values);
601         if (msg->num_elements > (i+1)) {
602                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
603         }
604         msg->num_elements--;
605         msg->elements = talloc_realloc(msg, msg->elements,
606                                        struct ldb_message_element,
607                                        msg->num_elements);
608         return LDB_SUCCESS;
609 }
610
611 /*
612   delete all elements matching an attribute name/value
613
614   return LDB Error on failure
615 */
616 static int msg_delete_element(struct ldb_module *module,
617                               struct ldb_message *msg,
618                               const char *name,
619                               const struct ldb_val *val)
620 {
621         struct ldb_context *ldb = ldb_module_get_ctx(module);
622         unsigned int i;
623         int found, ret;
624         struct ldb_message_element *el;
625         const struct ldb_schema_attribute *a;
626
627         found = find_element(msg, name);
628         if (found == -1) {
629                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
630         }
631
632         i = (unsigned int) found;
633         el = &(msg->elements[i]);
634
635         a = ldb_schema_attribute_by_name(ldb, el->name);
636
637         for (i=0;i<el->num_values;i++) {
638                 bool matched;
639                 if (a->syntax->operator_fn) {
640                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
641                                                      &el->values[i], val, &matched);
642                         if (ret != LDB_SUCCESS) return ret;
643                 } else {
644                         matched = (a->syntax->comparison_fn(ldb, ldb,
645                                                             &el->values[i], val) == 0);
646                 }
647                 if (matched) {
648                         if (el->num_values == 1) {
649                                 return msg_delete_attribute(module, msg, name);
650                         }
651
652                         ret = ltdb_index_del_value(module, msg->dn, el, i);
653                         if (ret != LDB_SUCCESS) {
654                                 return ret;
655                         }
656
657                         if (i<el->num_values-1) {
658                                 memmove(&el->values[i], &el->values[i+1],
659                                         sizeof(el->values[i])*
660                                                 (el->num_values-(i+1)));
661                         }
662                         el->num_values--;
663
664                         /* per definition we find in a canonicalised message an
665                            attribute value only once. So we are finished here */
666                         return LDB_SUCCESS;
667                 }
668         }
669
670         /* Not found */
671         return LDB_ERR_NO_SUCH_ATTRIBUTE;
672 }
673
674
675 /*
676   modify a record - internal interface
677
678   yuck - this is O(n^2). Luckily n is usually small so we probably
679   get away with it, but if we ever have really large attribute lists
680   then we'll need to look at this again
681
682   'req' is optional, and is used to specify controls if supplied
683 */
684 int ltdb_modify_internal(struct ldb_module *module,
685                          const struct ldb_message *msg,
686                          struct ldb_request *req)
687 {
688         struct ldb_context *ldb = ldb_module_get_ctx(module);
689         void *data = ldb_module_get_private(module);
690         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
691         TDB_DATA tdb_key, tdb_data;
692         struct ldb_val ldb_data;
693         struct ldb_message *msg2;
694         unsigned int i, j;
695         int ret = LDB_SUCCESS, idx;
696         struct ldb_control *control_permissive = NULL;
697
698         if (req) {
699                 control_permissive = ldb_request_get_control(req,
700                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
701         }
702
703         tdb_key = ltdb_key(module, msg->dn);
704         if (!tdb_key.dptr) {
705                 return LDB_ERR_OTHER;
706         }
707
708         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
709         if (!tdb_data.dptr) {
710                 talloc_free(tdb_key.dptr);
711                 return ltdb_err_map(tdb_error(ltdb->tdb));
712         }
713
714         msg2 = ldb_msg_new(tdb_key.dptr);
715         if (msg2 == NULL) {
716                 free(tdb_data.dptr);
717                 ret = LDB_ERR_OTHER;
718                 goto done;
719         }
720
721         ldb_data.data = tdb_data.dptr;
722         ldb_data.length = tdb_data.dsize;
723
724         ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
725         free(tdb_data.dptr);
726         if (ret == -1) {
727                 ret = LDB_ERR_OTHER;
728                 goto done;
729         }
730
731         if (!msg2->dn) {
732                 msg2->dn = msg->dn;
733         }
734
735         for (i=0; i<msg->num_elements; i++) {
736                 struct ldb_message_element *el = &msg->elements[i], *el2;
737                 struct ldb_val *vals;
738                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
739                 const char *dn;
740                 uint32_t options = 0;
741                 if (control_permissive != NULL) {
742                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
743                 }
744
745                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
746                 case LDB_FLAG_MOD_ADD:
747
748                         if (el->num_values == 0) {
749                                 ldb_asprintf_errstring(ldb,
750                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
751                                                        el->name, ldb_dn_get_linearized(msg2->dn));
752                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
753                                 goto done;
754                         }
755
756                         /* make a copy of the array so that a permissive
757                          * control can remove duplicates without changing the
758                          * original values, but do not copy data as we do not
759                          * need to keep it around once the operation is
760                          * finished */
761                         if (control_permissive) {
762                                 el = talloc(msg2, struct ldb_message_element);
763                                 if (!el) {
764                                         ret = LDB_ERR_OTHER;
765                                         goto done;
766                                 }
767                                 *el = msg->elements[i];
768                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
769                                 if (el->values == NULL) {
770                                         ret = LDB_ERR_OTHER;
771                                         goto done;
772                                 }
773                                 for (j = 0; j < el->num_values; j++) {
774                                         el->values[j] = msg->elements[i].values[j];
775                                 }
776                         }
777
778                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
779                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
780                                                        el->name, ldb_dn_get_linearized(msg2->dn));
781                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
782                                 goto done;
783                         }
784
785                         /* Checks if element already exists */
786                         idx = find_element(msg2, el->name);
787                         if (idx == -1) {
788                                 if (ltdb_msg_add_element(msg2, el) != 0) {
789                                         ret = LDB_ERR_OTHER;
790                                         goto done;
791                                 }
792                                 ret = ltdb_index_add_element(module, msg2->dn,
793                                                              el);
794                                 if (ret != LDB_SUCCESS) {
795                                         goto done;
796                                 }
797                         } else {
798                                 j = (unsigned int) idx;
799                                 el2 = &(msg2->elements[j]);
800
801                                 /* We cannot add another value on a existing one
802                                    if the attribute is single-valued */
803                                 if (ldb_tdb_single_valued(a, el)) {
804                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
805                                                                el->name, ldb_dn_get_linearized(msg2->dn));
806                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
807                                         goto done;
808                                 }
809
810                                 /* Check that values don't exist yet on multi-
811                                    valued attributes or aren't provided twice */
812                                 if (!(el->flags &
813                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
814                                         struct ldb_val *duplicate = NULL;
815                                         ret = ldb_msg_find_common_values(ldb,
816                                                                          msg2,
817                                                                          el,
818                                                                          el2,
819                                                                          options);
820
821                                         if (ret ==
822                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
823                                                 ldb_asprintf_errstring(ldb,
824                                                         "attribute '%s': value "
825                                                         "#%u on '%s' already "
826                                                         "exists", el->name, j,
827                                                         ldb_dn_get_linearized(msg2->dn));
828                                                 goto done;
829                                         } else if (ret != LDB_SUCCESS) {
830                                                 goto done;
831                                         }
832
833                                         ret = ldb_msg_find_duplicate_val(
834                                                 ldb, msg2, el, &duplicate, 0);
835                                         if (ret != LDB_SUCCESS) {
836                                                 goto done;
837                                         }
838                                         if (duplicate != NULL) {
839                                                 ldb_asprintf_errstring(
840                                                         ldb,
841                                                         "attribute '%s': value "
842                                                         "'%.*s' on '%s' "
843                                                         "provided more than "
844                                                         "once in ADD",
845                                                         el->name,
846                                                         (int)duplicate->length,
847                                                         duplicate->data,
848                                                         ldb_dn_get_linearized(msg->dn));
849                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
850                                                 goto done;
851                                         }
852                                 }
853
854                                 /* Now combine existing and new values to a new
855                                    attribute record */
856                                 vals = talloc_realloc(msg2->elements,
857                                                       el2->values, struct ldb_val,
858                                                       el2->num_values + el->num_values);
859                                 if (vals == NULL) {
860                                         ldb_oom(ldb);
861                                         ret = LDB_ERR_OTHER;
862                                         goto done;
863                                 }
864
865                                 for (j=0; j<el->num_values; j++) {
866                                         vals[el2->num_values + j] =
867                                                 ldb_val_dup(vals, &el->values[j]);
868                                 }
869
870                                 el2->values = vals;
871                                 el2->num_values += el->num_values;
872
873                                 ret = ltdb_index_add_element(module, msg2->dn, el);
874                                 if (ret != LDB_SUCCESS) {
875                                         goto done;
876                                 }
877                         }
878
879                         break;
880
881                 case LDB_FLAG_MOD_REPLACE:
882
883                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
884                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
885                                                        el->name, ldb_dn_get_linearized(msg2->dn));
886                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
887                                 goto done;
888                         }
889
890                         /*
891                          * We don't need to check this if we have been
892                          * pre-screened by the repl_meta_data module
893                          * in Samba, or someone else who can claim to
894                          * know what they are doing. 
895                          */
896                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
897                                 struct ldb_val *duplicate = NULL;
898
899                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
900                                                                  &duplicate, 0);
901                                 if (ret != LDB_SUCCESS) {
902                                         goto done;
903                                 }
904                                 if (duplicate != NULL) {
905                                         ldb_asprintf_errstring(
906                                                 ldb,
907                                                 "attribute '%s': value '%.*s' "
908                                                 "on '%s' provided more than "
909                                                 "once in REPLACE",
910                                                 el->name,
911                                                 (int)duplicate->length,
912                                                 duplicate->data,
913                                                 ldb_dn_get_linearized(msg2->dn));
914                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
915                                         goto done;
916                                 }
917                         }
918
919                         /* Checks if element already exists */
920                         idx = find_element(msg2, el->name);
921                         if (idx != -1) {
922                                 j = (unsigned int) idx;
923                                 el2 = &(msg2->elements[j]);
924
925                                 /* we consider two elements to be
926                                  * equal only if the order
927                                  * matches. This allows dbcheck to
928                                  * fix the ordering on attributes
929                                  * where order matters, such as
930                                  * objectClass
931                                  */
932                                 if (ldb_msg_element_equal_ordered(el, el2)) {
933                                         continue;
934                                 }
935
936                                 /* Delete the attribute if it exists in the DB */
937                                 if (msg_delete_attribute(module, msg2,
938                                                          el->name) != 0) {
939                                         ret = LDB_ERR_OTHER;
940                                         goto done;
941                                 }
942                         }
943
944                         /* Recreate it with the new values */
945                         if (ltdb_msg_add_element(msg2, el) != 0) {
946                                 ret = LDB_ERR_OTHER;
947                                 goto done;
948                         }
949
950                         ret = ltdb_index_add_element(module, msg2->dn, el);
951                         if (ret != LDB_SUCCESS) {
952                                 goto done;
953                         }
954
955                         break;
956
957                 case LDB_FLAG_MOD_DELETE:
958                         dn = ldb_dn_get_linearized(msg2->dn);
959                         if (dn == NULL) {
960                                 ret = LDB_ERR_OTHER;
961                                 goto done;
962                         }
963
964                         if (msg->elements[i].num_values == 0) {
965                                 /* Delete the whole attribute */
966                                 ret = msg_delete_attribute(module, msg2,
967                                                            msg->elements[i].name);
968                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
969                                     control_permissive) {
970                                         ret = LDB_SUCCESS;
971                                 } else {
972                                         ldb_asprintf_errstring(ldb,
973                                                                "attribute '%s': no such attribute for delete on '%s'",
974                                                                msg->elements[i].name, dn);
975                                 }
976                                 if (ret != LDB_SUCCESS) {
977                                         goto done;
978                                 }
979                         } else {
980                                 /* Delete specified values from an attribute */
981                                 for (j=0; j < msg->elements[i].num_values; j++) {
982                                         ret = msg_delete_element(module,
983                                                                  msg2,
984                                                                  msg->elements[i].name,
985                                                                  &msg->elements[i].values[j]);
986                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
987                                             control_permissive) {
988                                                 ret = LDB_SUCCESS;
989                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
990                                                 ldb_asprintf_errstring(ldb,
991                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
992                                                                        msg->elements[i].name, dn);
993                                         }
994                                         if (ret != LDB_SUCCESS) {
995                                                 goto done;
996                                         }
997                                 }
998                         }
999                         break;
1000                 default:
1001                         ldb_asprintf_errstring(ldb,
1002                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1003                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1004                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1005                         ret = LDB_ERR_PROTOCOL_ERROR;
1006                         goto done;
1007                 }
1008         }
1009
1010         ret = ltdb_store(module, msg2, TDB_MODIFY);
1011         if (ret != LDB_SUCCESS) {
1012                 goto done;
1013         }
1014
1015         ret = ltdb_modified(module, msg2->dn);
1016         if (ret != LDB_SUCCESS) {
1017                 goto done;
1018         }
1019
1020 done:
1021         talloc_free(tdb_key.dptr);
1022         return ret;
1023 }
1024
1025 /*
1026   modify a record
1027 */
1028 static int ltdb_modify(struct ltdb_context *ctx)
1029 {
1030         struct ldb_module *module = ctx->module;
1031         struct ldb_request *req = ctx->req;
1032         int ret = LDB_SUCCESS;
1033
1034         ret = ltdb_check_special_dn(module, req->op.mod.message);
1035         if (ret != LDB_SUCCESS) {
1036                 return ret;
1037         }
1038
1039         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1040
1041         if (ltdb_cache_load(module) != 0) {
1042                 return LDB_ERR_OPERATIONS_ERROR;
1043         }
1044
1045         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1046
1047         return ret;
1048 }
1049
1050 /*
1051   rename a record
1052 */
1053 static int ltdb_rename(struct ltdb_context *ctx)
1054 {
1055         struct ldb_module *module = ctx->module;
1056         void *data = ldb_module_get_private(module);
1057         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1058         struct ldb_request *req = ctx->req;
1059         struct ldb_message *msg;
1060         int ret = LDB_SUCCESS;
1061         TDB_DATA tdb_key, tdb_key_old;
1062
1063         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1064
1065         if (ltdb_cache_load(ctx->module) != 0) {
1066                 return LDB_ERR_OPERATIONS_ERROR;
1067         }
1068
1069         msg = ldb_msg_new(ctx);
1070         if (msg == NULL) {
1071                 return LDB_ERR_OPERATIONS_ERROR;
1072         }
1073
1074         /* we need to fetch the old record to re-add under the new name */
1075         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1076                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1077         if (ret != LDB_SUCCESS) {
1078                 /* not finding the old record is an error */
1079                 return ret;
1080         }
1081
1082         /* We need to, before changing the DB, check if the new DN
1083          * exists, so we can return this error to the caller with an
1084          * unmodified DB */
1085         tdb_key = ltdb_key(module, req->op.rename.newdn);
1086         if (!tdb_key.dptr) {
1087                 talloc_free(msg);
1088                 return LDB_ERR_OPERATIONS_ERROR;
1089         }
1090
1091         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1092         if (!tdb_key_old.dptr) {
1093                 talloc_free(msg);
1094                 talloc_free(tdb_key.dptr);
1095                 return LDB_ERR_OPERATIONS_ERROR;
1096         }
1097
1098         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1099         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1100                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1101                         talloc_free(tdb_key_old.dptr);
1102                         talloc_free(tdb_key.dptr);
1103                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1104                                                "Entry %s already exists",
1105                                                ldb_dn_get_linearized(req->op.rename.newdn));
1106                         /* finding the new record already in the DB is an error */
1107                         talloc_free(msg);
1108                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1109                 }
1110         }
1111         talloc_free(tdb_key_old.dptr);
1112         talloc_free(tdb_key.dptr);
1113
1114         /* Always delete first then add, to avoid conflicts with
1115          * unique indexes. We rely on the transaction to make this
1116          * atomic
1117          */
1118         ret = ltdb_delete_internal(module, msg->dn);
1119         if (ret != LDB_SUCCESS) {
1120                 talloc_free(msg);
1121                 return ret;
1122         }
1123
1124         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1125         if (msg->dn == NULL) {
1126                 talloc_free(msg);
1127                 return LDB_ERR_OPERATIONS_ERROR;
1128         }
1129
1130         /* We don't check single value as we can have more than 1 with
1131          * deleted attributes. We could go through all elements but that's
1132          * maybe not the most efficient way
1133          */
1134         ret = ltdb_add_internal(module, msg, false);
1135
1136         talloc_free(msg);
1137
1138         return ret;
1139 }
1140
1141 static int ltdb_start_trans(struct ldb_module *module)
1142 {
1143         void *data = ldb_module_get_private(module);
1144         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1145
1146         if (tdb_transaction_start(ltdb->tdb) != 0) {
1147                 return ltdb_err_map(tdb_error(ltdb->tdb));
1148         }
1149
1150         ltdb->in_transaction++;
1151
1152         ltdb_index_transaction_start(module);
1153
1154         return LDB_SUCCESS;
1155 }
1156
1157 static int ltdb_prepare_commit(struct ldb_module *module)
1158 {
1159         int ret;
1160         void *data = ldb_module_get_private(module);
1161         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1162
1163         if (ltdb->in_transaction != 1) {
1164                 return LDB_SUCCESS;
1165         }
1166
1167         ret = ltdb_index_transaction_commit(module);
1168         if (ret != LDB_SUCCESS) {
1169                 tdb_transaction_cancel(ltdb->tdb);
1170                 ltdb->in_transaction--;
1171                 return ret;
1172         }
1173
1174         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1175                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1176                 ltdb->in_transaction--;
1177                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1178                                        "Failure during tdb_transaction_prepare_commit(): %s -> %s",
1179                                        tdb_errorstr(ltdb->tdb),
1180                                        ldb_strerror(ret));
1181                 return ret;
1182         }
1183
1184         ltdb->prepared_commit = true;
1185
1186         return LDB_SUCCESS;
1187 }
1188
1189 static int ltdb_end_trans(struct ldb_module *module)
1190 {
1191         int ret;
1192         void *data = ldb_module_get_private(module);
1193         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1194
1195         if (!ltdb->prepared_commit) {
1196                 ret = ltdb_prepare_commit(module);
1197                 if (ret != LDB_SUCCESS) {
1198                         return ret;
1199                 }
1200         }
1201
1202         ltdb->in_transaction--;
1203         ltdb->prepared_commit = false;
1204
1205         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1206                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1207                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1208                                        "Failure during tdb_transaction_commit(): %s -> %s",
1209                                        tdb_errorstr(ltdb->tdb),
1210                                        ldb_strerror(ret));
1211                 return ret;
1212         }
1213
1214         return LDB_SUCCESS;
1215 }
1216
1217 static int ltdb_del_trans(struct ldb_module *module)
1218 {
1219         void *data = ldb_module_get_private(module);
1220         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1221
1222         ltdb->in_transaction--;
1223
1224         if (ltdb_index_transaction_cancel(module) != 0) {
1225                 tdb_transaction_cancel(ltdb->tdb);
1226                 return ltdb_err_map(tdb_error(ltdb->tdb));
1227         }
1228
1229         tdb_transaction_cancel(ltdb->tdb);
1230         return LDB_SUCCESS;
1231 }
1232
1233 /*
1234   return sequenceNumber from @BASEINFO
1235 */
1236 static int ltdb_sequence_number(struct ltdb_context *ctx,
1237                                 struct ldb_extended **ext)
1238 {
1239         struct ldb_context *ldb;
1240         struct ldb_module *module = ctx->module;
1241         struct ldb_request *req = ctx->req;
1242         TALLOC_CTX *tmp_ctx = NULL;
1243         struct ldb_seqnum_request *seq;
1244         struct ldb_seqnum_result *res;
1245         struct ldb_message *msg = NULL;
1246         struct ldb_dn *dn;
1247         const char *date;
1248         int ret = LDB_SUCCESS;
1249
1250         ldb = ldb_module_get_ctx(module);
1251
1252         seq = talloc_get_type(req->op.extended.data,
1253                                 struct ldb_seqnum_request);
1254         if (seq == NULL) {
1255                 return LDB_ERR_OPERATIONS_ERROR;
1256         }
1257
1258         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1259
1260         if (ltdb_lock_read(module) != 0) {
1261                 return LDB_ERR_OPERATIONS_ERROR;
1262         }
1263
1264         res = talloc_zero(req, struct ldb_seqnum_result);
1265         if (res == NULL) {
1266                 ret = LDB_ERR_OPERATIONS_ERROR;
1267                 goto done;
1268         }
1269
1270         tmp_ctx = talloc_new(req);
1271         if (tmp_ctx == NULL) {
1272                 ret = LDB_ERR_OPERATIONS_ERROR;
1273                 goto done;
1274         }
1275
1276         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1277         if (dn == NULL) {
1278                 ret = LDB_ERR_OPERATIONS_ERROR;
1279                 goto done;
1280         }
1281
1282         msg = ldb_msg_new(tmp_ctx);
1283         if (msg == NULL) {
1284                 ret = LDB_ERR_OPERATIONS_ERROR;
1285                 goto done;
1286         }
1287
1288         ret = ltdb_search_dn1(module, dn, msg, 0);
1289         if (ret != LDB_SUCCESS) {
1290                 goto done;
1291         }
1292
1293         switch (seq->type) {
1294         case LDB_SEQ_HIGHEST_SEQ:
1295                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1296                 break;
1297         case LDB_SEQ_NEXT:
1298                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1299                 res->seq_num++;
1300                 break;
1301         case LDB_SEQ_HIGHEST_TIMESTAMP:
1302                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1303                 if (date) {
1304                         res->seq_num = ldb_string_to_time(date);
1305                 } else {
1306                         res->seq_num = 0;
1307                         /* zero is as good as anything when we don't know */
1308                 }
1309                 break;
1310         }
1311
1312         *ext = talloc_zero(req, struct ldb_extended);
1313         if (*ext == NULL) {
1314                 ret = LDB_ERR_OPERATIONS_ERROR;
1315                 goto done;
1316         }
1317         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1318         (*ext)->data = talloc_steal(*ext, res);
1319
1320 done:
1321         talloc_free(tmp_ctx);
1322         ltdb_unlock_read(module);
1323         return ret;
1324 }
1325
1326 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1327 {
1328         struct ldb_context *ldb;
1329         struct ldb_request *req;
1330         struct ldb_reply *ares;
1331
1332         ldb = ldb_module_get_ctx(ctx->module);
1333         req = ctx->req;
1334
1335         /* if we already returned an error just return */
1336         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1337                 return;
1338         }
1339
1340         ares = talloc_zero(req, struct ldb_reply);
1341         if (!ares) {
1342                 ldb_oom(ldb);
1343                 req->callback(req, NULL);
1344                 return;
1345         }
1346         ares->type = LDB_REPLY_DONE;
1347         ares->error = error;
1348
1349         req->callback(req, ares);
1350 }
1351
1352 static void ltdb_timeout(struct tevent_context *ev,
1353                           struct tevent_timer *te,
1354                           struct timeval t,
1355                           void *private_data)
1356 {
1357         struct ltdb_context *ctx;
1358         ctx = talloc_get_type(private_data, struct ltdb_context);
1359
1360         if (!ctx->request_terminated) {
1361                 /* request is done now */
1362                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1363         }
1364
1365         if (ctx->spy) {
1366                 /* neutralize the spy */
1367                 ctx->spy->ctx = NULL;
1368                 ctx->spy = NULL;
1369         }
1370         talloc_free(ctx);
1371 }
1372
1373 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1374                                         struct ldb_extended *ext,
1375                                         int error)
1376 {
1377         struct ldb_context *ldb;
1378         struct ldb_request *req;
1379         struct ldb_reply *ares;
1380
1381         ldb = ldb_module_get_ctx(ctx->module);
1382         req = ctx->req;
1383
1384         /* if we already returned an error just return */
1385         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1386                 return;
1387         }
1388
1389         ares = talloc_zero(req, struct ldb_reply);
1390         if (!ares) {
1391                 ldb_oom(ldb);
1392                 req->callback(req, NULL);
1393                 return;
1394         }
1395         ares->type = LDB_REPLY_DONE;
1396         ares->response = ext;
1397         ares->error = error;
1398
1399         req->callback(req, ares);
1400 }
1401
1402 static void ltdb_handle_extended(struct ltdb_context *ctx)
1403 {
1404         struct ldb_extended *ext = NULL;
1405         int ret;
1406
1407         if (strcmp(ctx->req->op.extended.oid,
1408                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1409                 /* get sequence number */
1410                 ret = ltdb_sequence_number(ctx, &ext);
1411         } else {
1412                 /* not recognized */
1413                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1414         }
1415
1416         ltdb_request_extended_done(ctx, ext, ret);
1417 }
1418
1419 static void ltdb_callback(struct tevent_context *ev,
1420                           struct tevent_timer *te,
1421                           struct timeval t,
1422                           void *private_data)
1423 {
1424         struct ltdb_context *ctx;
1425         int ret;
1426
1427         ctx = talloc_get_type(private_data, struct ltdb_context);
1428
1429         if (ctx->request_terminated) {
1430                 goto done;
1431         }
1432
1433         switch (ctx->req->operation) {
1434         case LDB_SEARCH:
1435                 ret = ltdb_search(ctx);
1436                 break;
1437         case LDB_ADD:
1438                 ret = ltdb_add(ctx);
1439                 break;
1440         case LDB_MODIFY:
1441                 ret = ltdb_modify(ctx);
1442                 break;
1443         case LDB_DELETE:
1444                 ret = ltdb_delete(ctx);
1445                 break;
1446         case LDB_RENAME:
1447                 ret = ltdb_rename(ctx);
1448                 break;
1449         case LDB_EXTENDED:
1450                 ltdb_handle_extended(ctx);
1451                 goto done;
1452         default:
1453                 /* no other op supported */
1454                 ret = LDB_ERR_PROTOCOL_ERROR;
1455         }
1456
1457         if (!ctx->request_terminated) {
1458                 /* request is done now */
1459                 ltdb_request_done(ctx, ret);
1460         }
1461
1462 done:
1463         if (ctx->spy) {
1464                 /* neutralize the spy */
1465                 ctx->spy->ctx = NULL;
1466                 ctx->spy = NULL;
1467         }
1468         talloc_free(ctx);
1469 }
1470
1471 static int ltdb_request_destructor(void *ptr)
1472 {
1473         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1474
1475         if (spy->ctx != NULL) {
1476                 spy->ctx->spy = NULL;
1477                 spy->ctx->request_terminated = true;
1478                 spy->ctx = NULL;
1479         }
1480
1481         return 0;
1482 }
1483
1484 static int ltdb_handle_request(struct ldb_module *module,
1485                                struct ldb_request *req)
1486 {
1487         struct ldb_control *control_permissive;
1488         struct ldb_context *ldb;
1489         struct tevent_context *ev;
1490         struct ltdb_context *ac;
1491         struct tevent_timer *te;
1492         struct timeval tv;
1493         unsigned int i;
1494
1495         ldb = ldb_module_get_ctx(module);
1496
1497         control_permissive = ldb_request_get_control(req,
1498                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1499
1500         for (i = 0; req->controls && req->controls[i]; i++) {
1501                 if (req->controls[i]->critical &&
1502                     req->controls[i] != control_permissive) {
1503                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1504                                                req->controls[i]->oid);
1505                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1506                 }
1507         }
1508
1509         if (req->starttime == 0 || req->timeout == 0) {
1510                 ldb_set_errstring(ldb, "Invalid timeout settings");
1511                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1512         }
1513
1514         ev = ldb_handle_get_event_context(req->handle);
1515
1516         ac = talloc_zero(ldb, struct ltdb_context);
1517         if (ac == NULL) {
1518                 ldb_oom(ldb);
1519                 return LDB_ERR_OPERATIONS_ERROR;
1520         }
1521
1522         ac->module = module;
1523         ac->req = req;
1524
1525         tv.tv_sec = 0;
1526         tv.tv_usec = 0;
1527         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1528         if (NULL == te) {
1529                 talloc_free(ac);
1530                 return LDB_ERR_OPERATIONS_ERROR;
1531         }
1532
1533         if (req->timeout > 0) {
1534                 tv.tv_sec = req->starttime + req->timeout;
1535                 tv.tv_usec = 0;
1536                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1537                                                      ltdb_timeout, ac);
1538                 if (NULL == ac->timeout_event) {
1539                         talloc_free(ac);
1540                         return LDB_ERR_OPERATIONS_ERROR;
1541                 }
1542         }
1543
1544         /* set a spy so that we do not try to use the request context
1545          * if it is freed before ltdb_callback fires */
1546         ac->spy = talloc(req, struct ltdb_req_spy);
1547         if (NULL == ac->spy) {
1548                 talloc_free(ac);
1549                 return LDB_ERR_OPERATIONS_ERROR;
1550         }
1551         ac->spy->ctx = ac;
1552
1553         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1554
1555         return LDB_SUCCESS;
1556 }
1557
1558 static int ltdb_init_rootdse(struct ldb_module *module)
1559 {
1560         /* ignore errors on this - we expect it for non-sam databases */
1561         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1562
1563         /* there can be no module beyond the backend, just return */
1564         return LDB_SUCCESS;
1565 }
1566
1567 static const struct ldb_module_ops ltdb_ops = {
1568         .name              = "tdb",
1569         .init_context      = ltdb_init_rootdse,
1570         .search            = ltdb_handle_request,
1571         .add               = ltdb_handle_request,
1572         .modify            = ltdb_handle_request,
1573         .del               = ltdb_handle_request,
1574         .rename            = ltdb_handle_request,
1575         .extended          = ltdb_handle_request,
1576         .start_transaction = ltdb_start_trans,
1577         .end_transaction   = ltdb_end_trans,
1578         .prepare_commit    = ltdb_prepare_commit,
1579         .del_transaction   = ltdb_del_trans,
1580 };
1581
1582 /*
1583   connect to the database
1584 */
1585 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1586                         unsigned int flags, const char *options[],
1587                         struct ldb_module **_module)
1588 {
1589         struct ldb_module *module;
1590         const char *path;
1591         int tdb_flags, open_flags;
1592         struct ltdb_private *ltdb;
1593
1594         /*
1595          * We hold locks, so we must use a private event context
1596          * on each returned handle
1597          */
1598
1599         ldb_set_require_private_event_context(ldb);
1600
1601         /* parse the url */
1602         if (strchr(url, ':')) {
1603                 if (strncmp(url, "tdb://", 6) != 0) {
1604                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1605                                   "Invalid tdb URL '%s'", url);
1606                         return LDB_ERR_OPERATIONS_ERROR;
1607                 }
1608                 path = url+6;
1609         } else {
1610                 path = url;
1611         }
1612
1613         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1614
1615         /* check for the 'nosync' option */
1616         if (flags & LDB_FLG_NOSYNC) {
1617                 tdb_flags |= TDB_NOSYNC;
1618         }
1619
1620         /* and nommap option */
1621         if (flags & LDB_FLG_NOMMAP) {
1622                 tdb_flags |= TDB_NOMMAP;
1623         }
1624
1625         if (flags & LDB_FLG_RDONLY) {
1626                 open_flags = O_RDONLY;
1627         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1628                 open_flags = O_RDWR;
1629         } else {
1630                 open_flags = O_CREAT | O_RDWR;
1631         }
1632
1633         ltdb = talloc_zero(ldb, struct ltdb_private);
1634         if (!ltdb) {
1635                 ldb_oom(ldb);
1636                 return LDB_ERR_OPERATIONS_ERROR;
1637         }
1638
1639         /* note that we use quite a large default hash size */
1640         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1641                                    tdb_flags, open_flags,
1642                                    ldb_get_create_perms(ldb), ldb);
1643         if (!ltdb->tdb) {
1644                 ldb_asprintf_errstring(ldb,
1645                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1646                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1647                           "Unable to open tdb '%s': %s", path, strerror(errno));
1648                 talloc_free(ltdb);
1649                 if (errno == EACCES || errno == EPERM) {
1650                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1651                 }
1652                 return LDB_ERR_OPERATIONS_ERROR;
1653         }
1654
1655         if (getenv("LDB_WARN_UNINDEXED")) {
1656                 ltdb->warn_unindexed = true;
1657         }
1658
1659         if (getenv("LDB_WARN_REINDEX")) {
1660                 ltdb->warn_reindex = true;
1661         }
1662
1663         ltdb->sequence_number = 0;
1664
1665         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1666         if (!module) {
1667                 ldb_oom(ldb);
1668                 talloc_free(ltdb);
1669                 return LDB_ERR_OPERATIONS_ERROR;
1670         }
1671         ldb_module_set_private(module, ltdb);
1672         talloc_steal(module, ltdb);
1673
1674         if (ltdb_cache_load(module) != 0) {
1675                 ldb_asprintf_errstring(ldb,
1676                                        "Unable to load ltdb cache records of tdb '%s'", path);
1677                 talloc_free(module);
1678                 return LDB_ERR_OPERATIONS_ERROR;
1679         }
1680
1681         *_module = module;
1682         return LDB_SUCCESS;
1683 }
1684
1685 int ldb_tdb_init(const char *version)
1686 {
1687         LDB_MODULE_CHECK_VERSION(version);
1688         return ldb_register_backend("tdb", ltdb_connect, false);
1689 }