ldb: relatively efficient functions for finding duplicate values
[bbaumbach/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 return 0;
123         }
124         ltdb->read_lock_count--;
125         return 0;
126 }
127
128
129 /*
130   form a TDB_DATA for a record key
131   caller frees
132
133   note that the key for a record can depend on whether the
134   dn refers to a case sensitive index record or not
135 */
136 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
137 {
138         struct ldb_context *ldb = ldb_module_get_ctx(module);
139         TDB_DATA key;
140         char *key_str = NULL;
141         const char *dn_folded = NULL;
142
143         /*
144           most DNs are case insensitive. The exception is index DNs for
145           case sensitive attributes
146
147           there are 3 cases dealt with in this code:
148
149           1) if the dn doesn't start with @ then uppercase the attribute
150              names and the attributes values of case insensitive attributes
151           2) if the dn starts with @ then leave it alone -
152              the indexing code handles the rest
153         */
154
155         dn_folded = ldb_dn_get_casefold(dn);
156         if (!dn_folded) {
157                 goto failed;
158         }
159
160         key_str = talloc_strdup(ldb, "DN=");
161         if (!key_str) {
162                 goto failed;
163         }
164
165         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
166         if (!key_str) {
167                 goto failed;
168         }
169
170         key.dptr = (uint8_t *)key_str;
171         key.dsize = strlen(key_str) + 1;
172
173         return key;
174
175 failed:
176         errno = ENOMEM;
177         key.dptr = NULL;
178         key.dsize = 0;
179         return key;
180 }
181
182 /*
183   check special dn's have valid attributes
184   currently only @ATTRIBUTES is checked
185 */
186 static int ltdb_check_special_dn(struct ldb_module *module,
187                                  const struct ldb_message *msg)
188 {
189         struct ldb_context *ldb = ldb_module_get_ctx(module);
190         unsigned int i, j;
191
192         if (! ldb_dn_is_special(msg->dn) ||
193             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
194                 return LDB_SUCCESS;
195         }
196
197         /* we have @ATTRIBUTES, let's check attributes are fine */
198         /* should we check that we deny multivalued attributes ? */
199         for (i = 0; i < msg->num_elements; i++) {
200                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
201
202                 for (j = 0; j < msg->elements[i].num_values; j++) {
203                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
204                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
205                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
206                         }
207                 }
208         }
209
210         return LDB_SUCCESS;
211 }
212
213
214 /*
215   we've made a modification to a dn - possibly reindex and
216   update sequence number
217 */
218 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
219 {
220         int ret = LDB_SUCCESS;
221         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
222
223         /* only allow modifies inside a transaction, otherwise the
224          * ldb is unsafe */
225         if (ltdb->in_transaction == 0) {
226                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         if (ldb_dn_is_special(dn) &&
231             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
232              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
233         {
234                 if (ltdb->warn_reindex) {
235                         ldb_debug(ldb_module_get_ctx(module),
236                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
237                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
238                 }
239                 ret = ltdb_reindex(module);
240         }
241
242         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
243         if (ret == LDB_SUCCESS &&
244             !(ldb_dn_is_special(dn) &&
245               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
246                 ret = ltdb_increase_sequence_number(module);
247         }
248
249         /* If the modify was to @OPTIONS, reload the cache */
250         if (ret == LDB_SUCCESS &&
251             ldb_dn_is_special(dn) &&
252             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
253                 ret = ltdb_cache_reload(module);
254         }
255
256         return ret;
257 }
258
259 /*
260   store a record into the db
261 */
262 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
263 {
264         void *data = ldb_module_get_private(module);
265         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
266         TDB_DATA tdb_key, tdb_data;
267         struct ldb_val ldb_data;
268         int ret = LDB_SUCCESS;
269
270         tdb_key = ltdb_key(module, msg->dn);
271         if (tdb_key.dptr == NULL) {
272                 return LDB_ERR_OTHER;
273         }
274
275         ret = ldb_pack_data(ldb_module_get_ctx(module),
276                             msg, &ldb_data);
277         if (ret == -1) {
278                 talloc_free(tdb_key.dptr);
279                 return LDB_ERR_OTHER;
280         }
281
282         tdb_data.dptr = ldb_data.data;
283         tdb_data.dsize = ldb_data.length;
284
285         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
286         if (ret != 0) {
287                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
288                 goto done;
289         }
290
291 done:
292         talloc_free(tdb_key.dptr);
293         talloc_free(ldb_data.data);
294
295         return ret;
296 }
297
298
299 /*
300   check if a attribute is a single valued, for a given element
301  */
302 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
303                                   struct ldb_message_element *el)
304 {
305         if (!a) return false;
306         if (el != NULL) {
307                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
308                         /* override from a ldb module, for example
309                            used for the description field, which is
310                            marked multi-valued in the schema but which
311                            should not actually accept multiple
312                            values */
313                         return true;
314                 }
315                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
316                         /* override from a ldb module, for example used for
317                            deleted linked attribute entries */
318                         return false;
319                 }
320         }
321         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
322                 return true;
323         }
324         return false;
325 }
326
327 static int ltdb_add_internal(struct ldb_module *module,
328                              const struct ldb_message *msg,
329                              bool check_single_value)
330 {
331         struct ldb_context *ldb = ldb_module_get_ctx(module);
332         int ret = LDB_SUCCESS;
333         unsigned int i;
334
335         for (i=0;i<msg->num_elements;i++) {
336                 struct ldb_message_element *el = &msg->elements[i];
337                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
338
339                 if (el->num_values == 0) {
340                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
341                                                el->name, ldb_dn_get_linearized(msg->dn));
342                         return LDB_ERR_CONSTRAINT_VIOLATION;
343                 }
344                 if (check_single_value &&
345                                 el->num_values > 1 &&
346                                 ldb_tdb_single_valued(a, el)) {
347                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
348                                                el->name, ldb_dn_get_linearized(msg->dn));
349                         return LDB_ERR_CONSTRAINT_VIOLATION;
350                 }
351
352                 /* Do not check "@ATTRIBUTES" for duplicated values */
353                 if (ldb_dn_is_special(msg->dn) &&
354                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
355                         continue;
356                 }
357
358                 if (check_single_value) {
359                         struct ldb_val *duplicate = NULL;
360
361                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
362                                                          el, &duplicate, 0);
363                         if (ret != LDB_SUCCESS) {
364                                 return ret;
365                         }
366                         if (duplicate != NULL) {
367                                 ldb_asprintf_errstring(
368                                         ldb,
369                                         "attribute '%s': value '%.*s' on '%s' "
370                                         "provided more than once in ADD object",
371                                         el->name,
372                                         (int)duplicate->length,
373                                         duplicate->data,
374                                         ldb_dn_get_linearized(msg->dn));
375                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
376                         }
377                 }
378         }
379
380         ret = ltdb_store(module, msg, TDB_INSERT);
381         if (ret != LDB_SUCCESS) {
382                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
383                         ldb_asprintf_errstring(ldb,
384                                                "Entry %s already exists",
385                                                ldb_dn_get_linearized(msg->dn));
386                 }
387                 return ret;
388         }
389
390         ret = ltdb_index_add_new(module, msg);
391         if (ret != LDB_SUCCESS) {
392                 return ret;
393         }
394
395         ret = ltdb_modified(module, msg->dn);
396
397         return ret;
398 }
399
400 /*
401   add a record to the database
402 */
403 static int ltdb_add(struct ltdb_context *ctx)
404 {
405         struct ldb_module *module = ctx->module;
406         struct ldb_request *req = ctx->req;
407         int ret = LDB_SUCCESS;
408
409         ret = ltdb_check_special_dn(module, req->op.add.message);
410         if (ret != LDB_SUCCESS) {
411                 return ret;
412         }
413
414         ldb_request_set_state(req, LDB_ASYNC_PENDING);
415
416         if (ltdb_cache_load(module) != 0) {
417                 return LDB_ERR_OPERATIONS_ERROR;
418         }
419
420         ret = ltdb_add_internal(module, req->op.add.message, true);
421
422         return ret;
423 }
424
425 /*
426   delete a record from the database, not updating indexes (used for deleting
427   index records)
428 */
429 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
430 {
431         void *data = ldb_module_get_private(module);
432         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
433         TDB_DATA tdb_key;
434         int ret;
435
436         tdb_key = ltdb_key(module, dn);
437         if (!tdb_key.dptr) {
438                 return LDB_ERR_OTHER;
439         }
440
441         ret = tdb_delete(ltdb->tdb, tdb_key);
442         talloc_free(tdb_key.dptr);
443
444         if (ret != 0) {
445                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
446         }
447
448         return ret;
449 }
450
451 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
452 {
453         struct ldb_message *msg;
454         int ret = LDB_SUCCESS;
455
456         msg = ldb_msg_new(module);
457         if (msg == NULL) {
458                 return LDB_ERR_OPERATIONS_ERROR;
459         }
460
461         /* in case any attribute of the message was indexed, we need
462            to fetch the old record */
463         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
464         if (ret != LDB_SUCCESS) {
465                 /* not finding the old record is an error */
466                 goto done;
467         }
468
469         ret = ltdb_delete_noindex(module, dn);
470         if (ret != LDB_SUCCESS) {
471                 goto done;
472         }
473
474         /* remove any indexed attributes */
475         ret = ltdb_index_delete(module, msg);
476         if (ret != LDB_SUCCESS) {
477                 goto done;
478         }
479
480         ret = ltdb_modified(module, dn);
481         if (ret != LDB_SUCCESS) {
482                 goto done;
483         }
484
485 done:
486         talloc_free(msg);
487         return ret;
488 }
489
490 /*
491   delete a record from the database
492 */
493 static int ltdb_delete(struct ltdb_context *ctx)
494 {
495         struct ldb_module *module = ctx->module;
496         struct ldb_request *req = ctx->req;
497         int ret = LDB_SUCCESS;
498
499         ldb_request_set_state(req, LDB_ASYNC_PENDING);
500
501         if (ltdb_cache_load(module) != 0) {
502                 return LDB_ERR_OPERATIONS_ERROR;
503         }
504
505         ret = ltdb_delete_internal(module, req->op.del.dn);
506
507         return ret;
508 }
509
510 /*
511   find an element by attribute name. At the moment this does a linear search,
512   it should be re-coded to use a binary search once all places that modify
513   records guarantee sorted order
514
515   return the index of the first matching element if found, otherwise -1
516 */
517 static int find_element(const struct ldb_message *msg, const char *name)
518 {
519         unsigned int i;
520         for (i=0;i<msg->num_elements;i++) {
521                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
522                         return i;
523                 }
524         }
525         return -1;
526 }
527
528
529 /*
530   add an element to an existing record. Assumes a elements array that we
531   can call re-alloc on, and assumed that we can re-use the data pointers from
532   the passed in additional values. Use with care!
533
534   returns 0 on success, -1 on failure (and sets errno)
535 */
536 static int ltdb_msg_add_element(struct ldb_message *msg,
537                                 struct ldb_message_element *el)
538 {
539         struct ldb_message_element *e2;
540         unsigned int i;
541
542         if (el->num_values == 0) {
543                 /* nothing to do here - we don't add empty elements */
544                 return 0;
545         }
546
547         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
548                               msg->num_elements+1);
549         if (!e2) {
550                 errno = ENOMEM;
551                 return -1;
552         }
553
554         msg->elements = e2;
555
556         e2 = &msg->elements[msg->num_elements];
557
558         e2->name = el->name;
559         e2->flags = el->flags;
560         e2->values = talloc_array(msg->elements,
561                                   struct ldb_val, el->num_values);
562         if (!e2->values) {
563                 errno = ENOMEM;
564                 return -1;
565         }
566         for (i=0;i<el->num_values;i++) {
567                 e2->values[i] = el->values[i];
568         }
569         e2->num_values = el->num_values;
570
571         ++msg->num_elements;
572
573         return 0;
574 }
575
576 /*
577   delete all elements having a specified attribute name
578 */
579 static int msg_delete_attribute(struct ldb_module *module,
580                                 struct ldb_message *msg, const char *name)
581 {
582         unsigned int i;
583         int ret;
584         struct ldb_message_element *el;
585
586         el = ldb_msg_find_element(msg, name);
587         if (el == NULL) {
588                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
589         }
590         i = el - msg->elements;
591
592         ret = ltdb_index_del_element(module, msg->dn, el);
593         if (ret != LDB_SUCCESS) {
594                 return ret;
595         }
596
597         talloc_free(el->values);
598         if (msg->num_elements > (i+1)) {
599                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
600         }
601         msg->num_elements--;
602         msg->elements = talloc_realloc(msg, msg->elements,
603                                        struct ldb_message_element,
604                                        msg->num_elements);
605         return LDB_SUCCESS;
606 }
607
608 /*
609   delete all elements matching an attribute name/value
610
611   return LDB Error on failure
612 */
613 static int msg_delete_element(struct ldb_module *module,
614                               struct ldb_message *msg,
615                               const char *name,
616                               const struct ldb_val *val)
617 {
618         struct ldb_context *ldb = ldb_module_get_ctx(module);
619         unsigned int i;
620         int found, ret;
621         struct ldb_message_element *el;
622         const struct ldb_schema_attribute *a;
623
624         found = find_element(msg, name);
625         if (found == -1) {
626                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
627         }
628
629         i = (unsigned int) found;
630         el = &(msg->elements[i]);
631
632         a = ldb_schema_attribute_by_name(ldb, el->name);
633
634         for (i=0;i<el->num_values;i++) {
635                 bool matched;
636                 if (a->syntax->operator_fn) {
637                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
638                                                      &el->values[i], val, &matched);
639                         if (ret != LDB_SUCCESS) return ret;
640                 } else {
641                         matched = (a->syntax->comparison_fn(ldb, ldb,
642                                                             &el->values[i], val) == 0);
643                 }
644                 if (matched) {
645                         if (el->num_values == 1) {
646                                 return msg_delete_attribute(module, msg, name);
647                         }
648
649                         ret = ltdb_index_del_value(module, msg->dn, el, i);
650                         if (ret != LDB_SUCCESS) {
651                                 return ret;
652                         }
653
654                         if (i<el->num_values-1) {
655                                 memmove(&el->values[i], &el->values[i+1],
656                                         sizeof(el->values[i])*
657                                                 (el->num_values-(i+1)));
658                         }
659                         el->num_values--;
660
661                         /* per definition we find in a canonicalised message an
662                            attribute value only once. So we are finished here */
663                         return LDB_SUCCESS;
664                 }
665         }
666
667         /* Not found */
668         return LDB_ERR_NO_SUCH_ATTRIBUTE;
669 }
670
671
672 /*
673   modify a record - internal interface
674
675   yuck - this is O(n^2). Luckily n is usually small so we probably
676   get away with it, but if we ever have really large attribute lists
677   then we'll need to look at this again
678
679   'req' is optional, and is used to specify controls if supplied
680 */
681 int ltdb_modify_internal(struct ldb_module *module,
682                          const struct ldb_message *msg,
683                          struct ldb_request *req)
684 {
685         struct ldb_context *ldb = ldb_module_get_ctx(module);
686         void *data = ldb_module_get_private(module);
687         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
688         TDB_DATA tdb_key, tdb_data;
689         struct ldb_val ldb_data;
690         struct ldb_message *msg2;
691         unsigned int i, j;
692         int ret = LDB_SUCCESS, idx;
693         struct ldb_control *control_permissive = NULL;
694
695         if (req) {
696                 control_permissive = ldb_request_get_control(req,
697                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
698         }
699
700         tdb_key = ltdb_key(module, msg->dn);
701         if (!tdb_key.dptr) {
702                 return LDB_ERR_OTHER;
703         }
704
705         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
706         if (!tdb_data.dptr) {
707                 talloc_free(tdb_key.dptr);
708                 return ltdb_err_map(tdb_error(ltdb->tdb));
709         }
710
711         msg2 = ldb_msg_new(tdb_key.dptr);
712         if (msg2 == NULL) {
713                 free(tdb_data.dptr);
714                 ret = LDB_ERR_OTHER;
715                 goto done;
716         }
717
718         ldb_data.data = tdb_data.dptr;
719         ldb_data.length = tdb_data.dsize;
720
721         ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
722         free(tdb_data.dptr);
723         if (ret == -1) {
724                 ret = LDB_ERR_OTHER;
725                 goto done;
726         }
727
728         if (!msg2->dn) {
729                 msg2->dn = msg->dn;
730         }
731
732         for (i=0; i<msg->num_elements; i++) {
733                 struct ldb_message_element *el = &msg->elements[i], *el2;
734                 struct ldb_val *vals;
735                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
736                 const char *dn;
737                 uint32_t options = 0;
738                 if (control_permissive != NULL) {
739                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
740                 }
741
742                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
743                 case LDB_FLAG_MOD_ADD:
744
745                         if (el->num_values == 0) {
746                                 ldb_asprintf_errstring(ldb,
747                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
748                                                        el->name, ldb_dn_get_linearized(msg2->dn));
749                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
750                                 goto done;
751                         }
752
753                         /* make a copy of the array so that a permissive
754                          * control can remove duplicates without changing the
755                          * original values, but do not copy data as we do not
756                          * need to keep it around once the operation is
757                          * finished */
758                         if (control_permissive) {
759                                 el = talloc(msg2, struct ldb_message_element);
760                                 if (!el) {
761                                         ret = LDB_ERR_OTHER;
762                                         goto done;
763                                 }
764                                 *el = msg->elements[i];
765                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
766                                 if (el->values == NULL) {
767                                         ret = LDB_ERR_OTHER;
768                                         goto done;
769                                 }
770                                 for (j = 0; j < el->num_values; j++) {
771                                         el->values[j] = msg->elements[i].values[j];
772                                 }
773                         }
774
775                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
776                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
777                                                        el->name, ldb_dn_get_linearized(msg2->dn));
778                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
779                                 goto done;
780                         }
781
782                         /* Checks if element already exists */
783                         idx = find_element(msg2, el->name);
784                         if (idx == -1) {
785                                 if (ltdb_msg_add_element(msg2, el) != 0) {
786                                         ret = LDB_ERR_OTHER;
787                                         goto done;
788                                 }
789                                 ret = ltdb_index_add_element(module, msg2->dn,
790                                                              el);
791                                 if (ret != LDB_SUCCESS) {
792                                         goto done;
793                                 }
794                         } else {
795                                 j = (unsigned int) idx;
796                                 el2 = &(msg2->elements[j]);
797
798                                 /* We cannot add another value on a existing one
799                                    if the attribute is single-valued */
800                                 if (ldb_tdb_single_valued(a, el)) {
801                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
802                                                                el->name, ldb_dn_get_linearized(msg2->dn));
803                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
804                                         goto done;
805                                 }
806
807                                 /* Check that values don't exist yet on multi-
808                                    valued attributes or aren't provided twice */
809                                 if (!(el->flags &
810                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
811                                         struct ldb_val *duplicate = NULL;
812                                         ret = ldb_msg_find_common_values(ldb,
813                                                                          msg2,
814                                                                          el,
815                                                                          el2,
816                                                                          options);
817
818                                         if (ret ==
819                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
820                                                 ldb_asprintf_errstring(ldb,
821                                                         "attribute '%s': value "
822                                                         "#%u on '%s' already "
823                                                         "exists", el->name, j,
824                                                         ldb_dn_get_linearized(msg2->dn));
825                                                 goto done;
826                                         } else if (ret != LDB_SUCCESS) {
827                                                 goto done;
828                                         }
829
830                                         ret = ldb_msg_find_duplicate_val(
831                                                 ldb, msg2, el, &duplicate, 0);
832                                         if (ret != LDB_SUCCESS) {
833                                                 goto done;
834                                         }
835                                         if (duplicate != NULL) {
836                                                 ldb_asprintf_errstring(
837                                                         ldb,
838                                                         "attribute '%s': value "
839                                                         "'%.*s' on '%s' "
840                                                         "provided more than "
841                                                         "once in ADD",
842                                                         el->name,
843                                                         (int)duplicate->length,
844                                                         duplicate->data,
845                                                         ldb_dn_get_linearized(msg->dn));
846                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
847                                                 goto done;
848                                         }
849                                 }
850
851                                 /* Now combine existing and new values to a new
852                                    attribute record */
853                                 vals = talloc_realloc(msg2->elements,
854                                                       el2->values, struct ldb_val,
855                                                       el2->num_values + el->num_values);
856                                 if (vals == NULL) {
857                                         ldb_oom(ldb);
858                                         ret = LDB_ERR_OTHER;
859                                         goto done;
860                                 }
861
862                                 for (j=0; j<el->num_values; j++) {
863                                         vals[el2->num_values + j] =
864                                                 ldb_val_dup(vals, &el->values[j]);
865                                 }
866
867                                 el2->values = vals;
868                                 el2->num_values += el->num_values;
869
870                                 ret = ltdb_index_add_element(module, msg2->dn, el);
871                                 if (ret != LDB_SUCCESS) {
872                                         goto done;
873                                 }
874                         }
875
876                         break;
877
878                 case LDB_FLAG_MOD_REPLACE:
879
880                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
881                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
882                                                        el->name, ldb_dn_get_linearized(msg2->dn));
883                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
884                                 goto done;
885                         }
886
887                         /*
888                          * We don't need to check this if we have been
889                          * pre-screened by the repl_meta_data module
890                          * in Samba, or someone else who can claim to
891                          * know what they are doing. 
892                          */
893                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
894                                 struct ldb_val *duplicate = NULL;
895
896                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
897                                                                  &duplicate, 0);
898                                 if (ret != LDB_SUCCESS) {
899                                         goto done;
900                                 }
901                                 if (duplicate != NULL) {
902                                         ldb_asprintf_errstring(
903                                                 ldb,
904                                                 "attribute '%s': value '%.*s' "
905                                                 "on '%s' provided more than "
906                                                 "once in REPLACE",
907                                                 el->name,
908                                                 (int)duplicate->length,
909                                                 duplicate->data,
910                                                 ldb_dn_get_linearized(msg2->dn));
911                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
912                                         goto done;
913                                 }
914                         }
915
916                         /* Checks if element already exists */
917                         idx = find_element(msg2, el->name);
918                         if (idx != -1) {
919                                 j = (unsigned int) idx;
920                                 el2 = &(msg2->elements[j]);
921
922                                 /* we consider two elements to be
923                                  * equal only if the order
924                                  * matches. This allows dbcheck to
925                                  * fix the ordering on attributes
926                                  * where order matters, such as
927                                  * objectClass
928                                  */
929                                 if (ldb_msg_element_equal_ordered(el, el2)) {
930                                         continue;
931                                 }
932
933                                 /* Delete the attribute if it exists in the DB */
934                                 if (msg_delete_attribute(module, msg2,
935                                                          el->name) != 0) {
936                                         ret = LDB_ERR_OTHER;
937                                         goto done;
938                                 }
939                         }
940
941                         /* Recreate it with the new values */
942                         if (ltdb_msg_add_element(msg2, el) != 0) {
943                                 ret = LDB_ERR_OTHER;
944                                 goto done;
945                         }
946
947                         ret = ltdb_index_add_element(module, msg2->dn, el);
948                         if (ret != LDB_SUCCESS) {
949                                 goto done;
950                         }
951
952                         break;
953
954                 case LDB_FLAG_MOD_DELETE:
955                         dn = ldb_dn_get_linearized(msg2->dn);
956                         if (dn == NULL) {
957                                 ret = LDB_ERR_OTHER;
958                                 goto done;
959                         }
960
961                         if (msg->elements[i].num_values == 0) {
962                                 /* Delete the whole attribute */
963                                 ret = msg_delete_attribute(module, msg2,
964                                                            msg->elements[i].name);
965                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
966                                     control_permissive) {
967                                         ret = LDB_SUCCESS;
968                                 } else {
969                                         ldb_asprintf_errstring(ldb,
970                                                                "attribute '%s': no such attribute for delete on '%s'",
971                                                                msg->elements[i].name, dn);
972                                 }
973                                 if (ret != LDB_SUCCESS) {
974                                         goto done;
975                                 }
976                         } else {
977                                 /* Delete specified values from an attribute */
978                                 for (j=0; j < msg->elements[i].num_values; j++) {
979                                         ret = msg_delete_element(module,
980                                                                  msg2,
981                                                                  msg->elements[i].name,
982                                                                  &msg->elements[i].values[j]);
983                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
984                                             control_permissive) {
985                                                 ret = LDB_SUCCESS;
986                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
987                                                 ldb_asprintf_errstring(ldb,
988                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
989                                                                        msg->elements[i].name, dn);
990                                         }
991                                         if (ret != LDB_SUCCESS) {
992                                                 goto done;
993                                         }
994                                 }
995                         }
996                         break;
997                 default:
998                         ldb_asprintf_errstring(ldb,
999                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1000                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1001                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1002                         ret = LDB_ERR_PROTOCOL_ERROR;
1003                         goto done;
1004                 }
1005         }
1006
1007         ret = ltdb_store(module, msg2, TDB_MODIFY);
1008         if (ret != LDB_SUCCESS) {
1009                 goto done;
1010         }
1011
1012         ret = ltdb_modified(module, msg2->dn);
1013         if (ret != LDB_SUCCESS) {
1014                 goto done;
1015         }
1016
1017 done:
1018         talloc_free(tdb_key.dptr);
1019         return ret;
1020 }
1021
1022 /*
1023   modify a record
1024 */
1025 static int ltdb_modify(struct ltdb_context *ctx)
1026 {
1027         struct ldb_module *module = ctx->module;
1028         struct ldb_request *req = ctx->req;
1029         int ret = LDB_SUCCESS;
1030
1031         ret = ltdb_check_special_dn(module, req->op.mod.message);
1032         if (ret != LDB_SUCCESS) {
1033                 return ret;
1034         }
1035
1036         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1037
1038         if (ltdb_cache_load(module) != 0) {
1039                 return LDB_ERR_OPERATIONS_ERROR;
1040         }
1041
1042         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1043
1044         return ret;
1045 }
1046
1047 /*
1048   rename a record
1049 */
1050 static int ltdb_rename(struct ltdb_context *ctx)
1051 {
1052         struct ldb_module *module = ctx->module;
1053         void *data = ldb_module_get_private(module);
1054         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1055         struct ldb_request *req = ctx->req;
1056         struct ldb_message *msg;
1057         int ret = LDB_SUCCESS;
1058         TDB_DATA tdb_key, tdb_key_old;
1059
1060         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1061
1062         if (ltdb_cache_load(ctx->module) != 0) {
1063                 return LDB_ERR_OPERATIONS_ERROR;
1064         }
1065
1066         msg = ldb_msg_new(ctx);
1067         if (msg == NULL) {
1068                 return LDB_ERR_OPERATIONS_ERROR;
1069         }
1070
1071         /* we need to fetch the old record to re-add under the new name */
1072         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1073                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1074         if (ret != LDB_SUCCESS) {
1075                 /* not finding the old record is an error */
1076                 return ret;
1077         }
1078
1079         /* We need to, before changing the DB, check if the new DN
1080          * exists, so we can return this error to the caller with an
1081          * unmodified DB */
1082         tdb_key = ltdb_key(module, req->op.rename.newdn);
1083         if (!tdb_key.dptr) {
1084                 talloc_free(msg);
1085                 return LDB_ERR_OPERATIONS_ERROR;
1086         }
1087
1088         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1089         if (!tdb_key_old.dptr) {
1090                 talloc_free(msg);
1091                 talloc_free(tdb_key.dptr);
1092                 return LDB_ERR_OPERATIONS_ERROR;
1093         }
1094
1095         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1096         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1097                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1098                         talloc_free(tdb_key_old.dptr);
1099                         talloc_free(tdb_key.dptr);
1100                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1101                                                "Entry %s already exists",
1102                                                ldb_dn_get_linearized(req->op.rename.newdn));
1103                         /* finding the new record already in the DB is an error */
1104                         talloc_free(msg);
1105                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1106                 }
1107         }
1108         talloc_free(tdb_key_old.dptr);
1109         talloc_free(tdb_key.dptr);
1110
1111         /* Always delete first then add, to avoid conflicts with
1112          * unique indexes. We rely on the transaction to make this
1113          * atomic
1114          */
1115         ret = ltdb_delete_internal(module, msg->dn);
1116         if (ret != LDB_SUCCESS) {
1117                 talloc_free(msg);
1118                 return ret;
1119         }
1120
1121         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1122         if (msg->dn == NULL) {
1123                 talloc_free(msg);
1124                 return LDB_ERR_OPERATIONS_ERROR;
1125         }
1126
1127         /* We don't check single value as we can have more than 1 with
1128          * deleted attributes. We could go through all elements but that's
1129          * maybe not the most efficient way
1130          */
1131         ret = ltdb_add_internal(module, msg, false);
1132
1133         talloc_free(msg);
1134
1135         return ret;
1136 }
1137
1138 static int ltdb_start_trans(struct ldb_module *module)
1139 {
1140         void *data = ldb_module_get_private(module);
1141         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1142
1143         if (tdb_transaction_start(ltdb->tdb) != 0) {
1144                 return ltdb_err_map(tdb_error(ltdb->tdb));
1145         }
1146
1147         ltdb->in_transaction++;
1148
1149         ltdb_index_transaction_start(module);
1150
1151         return LDB_SUCCESS;
1152 }
1153
1154 static int ltdb_prepare_commit(struct ldb_module *module)
1155 {
1156         int ret;
1157         void *data = ldb_module_get_private(module);
1158         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1159
1160         if (ltdb->in_transaction != 1) {
1161                 return LDB_SUCCESS;
1162         }
1163
1164         ret = ltdb_index_transaction_commit(module);
1165         if (ret != LDB_SUCCESS) {
1166                 tdb_transaction_cancel(ltdb->tdb);
1167                 ltdb->in_transaction--;
1168                 return ret;
1169         }
1170
1171         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1172                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1173                 ltdb->in_transaction--;
1174                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1175                                        "Failure during tdb_transaction_prepare_commit(): %s -> %s",
1176                                        tdb_errorstr(ltdb->tdb),
1177                                        ldb_strerror(ret));
1178                 return ret;
1179         }
1180
1181         ltdb->prepared_commit = true;
1182
1183         return LDB_SUCCESS;
1184 }
1185
1186 static int ltdb_end_trans(struct ldb_module *module)
1187 {
1188         int ret;
1189         void *data = ldb_module_get_private(module);
1190         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1191
1192         if (!ltdb->prepared_commit) {
1193                 ret = ltdb_prepare_commit(module);
1194                 if (ret != LDB_SUCCESS) {
1195                         return ret;
1196                 }
1197         }
1198
1199         ltdb->in_transaction--;
1200         ltdb->prepared_commit = false;
1201
1202         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1203                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1204                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1205                                        "Failure during tdb_transaction_commit(): %s -> %s",
1206                                        tdb_errorstr(ltdb->tdb),
1207                                        ldb_strerror(ret));
1208                 return ret;
1209         }
1210
1211         return LDB_SUCCESS;
1212 }
1213
1214 static int ltdb_del_trans(struct ldb_module *module)
1215 {
1216         void *data = ldb_module_get_private(module);
1217         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1218
1219         ltdb->in_transaction--;
1220
1221         if (ltdb_index_transaction_cancel(module) != 0) {
1222                 tdb_transaction_cancel(ltdb->tdb);
1223                 return ltdb_err_map(tdb_error(ltdb->tdb));
1224         }
1225
1226         tdb_transaction_cancel(ltdb->tdb);
1227         return LDB_SUCCESS;
1228 }
1229
1230 /*
1231   return sequenceNumber from @BASEINFO
1232 */
1233 static int ltdb_sequence_number(struct ltdb_context *ctx,
1234                                 struct ldb_extended **ext)
1235 {
1236         struct ldb_context *ldb;
1237         struct ldb_module *module = ctx->module;
1238         struct ldb_request *req = ctx->req;
1239         TALLOC_CTX *tmp_ctx = NULL;
1240         struct ldb_seqnum_request *seq;
1241         struct ldb_seqnum_result *res;
1242         struct ldb_message *msg = NULL;
1243         struct ldb_dn *dn;
1244         const char *date;
1245         int ret = LDB_SUCCESS;
1246
1247         ldb = ldb_module_get_ctx(module);
1248
1249         seq = talloc_get_type(req->op.extended.data,
1250                                 struct ldb_seqnum_request);
1251         if (seq == NULL) {
1252                 return LDB_ERR_OPERATIONS_ERROR;
1253         }
1254
1255         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1256
1257         if (ltdb_lock_read(module) != 0) {
1258                 return LDB_ERR_OPERATIONS_ERROR;
1259         }
1260
1261         res = talloc_zero(req, struct ldb_seqnum_result);
1262         if (res == NULL) {
1263                 ret = LDB_ERR_OPERATIONS_ERROR;
1264                 goto done;
1265         }
1266
1267         tmp_ctx = talloc_new(req);
1268         if (tmp_ctx == NULL) {
1269                 ret = LDB_ERR_OPERATIONS_ERROR;
1270                 goto done;
1271         }
1272
1273         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1274         if (dn == NULL) {
1275                 ret = LDB_ERR_OPERATIONS_ERROR;
1276                 goto done;
1277         }
1278
1279         msg = ldb_msg_new(tmp_ctx);
1280         if (msg == NULL) {
1281                 ret = LDB_ERR_OPERATIONS_ERROR;
1282                 goto done;
1283         }
1284
1285         ret = ltdb_search_dn1(module, dn, msg, 0);
1286         if (ret != LDB_SUCCESS) {
1287                 goto done;
1288         }
1289
1290         switch (seq->type) {
1291         case LDB_SEQ_HIGHEST_SEQ:
1292                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1293                 break;
1294         case LDB_SEQ_NEXT:
1295                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1296                 res->seq_num++;
1297                 break;
1298         case LDB_SEQ_HIGHEST_TIMESTAMP:
1299                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1300                 if (date) {
1301                         res->seq_num = ldb_string_to_time(date);
1302                 } else {
1303                         res->seq_num = 0;
1304                         /* zero is as good as anything when we don't know */
1305                 }
1306                 break;
1307         }
1308
1309         *ext = talloc_zero(req, struct ldb_extended);
1310         if (*ext == NULL) {
1311                 ret = LDB_ERR_OPERATIONS_ERROR;
1312                 goto done;
1313         }
1314         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1315         (*ext)->data = talloc_steal(*ext, res);
1316
1317 done:
1318         talloc_free(tmp_ctx);
1319         ltdb_unlock_read(module);
1320         return ret;
1321 }
1322
1323 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1324 {
1325         struct ldb_context *ldb;
1326         struct ldb_request *req;
1327         struct ldb_reply *ares;
1328
1329         ldb = ldb_module_get_ctx(ctx->module);
1330         req = ctx->req;
1331
1332         /* if we already returned an error just return */
1333         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1334                 return;
1335         }
1336
1337         ares = talloc_zero(req, struct ldb_reply);
1338         if (!ares) {
1339                 ldb_oom(ldb);
1340                 req->callback(req, NULL);
1341                 return;
1342         }
1343         ares->type = LDB_REPLY_DONE;
1344         ares->error = error;
1345
1346         req->callback(req, ares);
1347 }
1348
1349 static void ltdb_timeout(struct tevent_context *ev,
1350                           struct tevent_timer *te,
1351                           struct timeval t,
1352                           void *private_data)
1353 {
1354         struct ltdb_context *ctx;
1355         ctx = talloc_get_type(private_data, struct ltdb_context);
1356
1357         if (!ctx->request_terminated) {
1358                 /* request is done now */
1359                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1360         }
1361
1362         if (ctx->spy) {
1363                 /* neutralize the spy */
1364                 ctx->spy->ctx = NULL;
1365                 ctx->spy = NULL;
1366         }
1367         talloc_free(ctx);
1368 }
1369
1370 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1371                                         struct ldb_extended *ext,
1372                                         int error)
1373 {
1374         struct ldb_context *ldb;
1375         struct ldb_request *req;
1376         struct ldb_reply *ares;
1377
1378         ldb = ldb_module_get_ctx(ctx->module);
1379         req = ctx->req;
1380
1381         /* if we already returned an error just return */
1382         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1383                 return;
1384         }
1385
1386         ares = talloc_zero(req, struct ldb_reply);
1387         if (!ares) {
1388                 ldb_oom(ldb);
1389                 req->callback(req, NULL);
1390                 return;
1391         }
1392         ares->type = LDB_REPLY_DONE;
1393         ares->response = ext;
1394         ares->error = error;
1395
1396         req->callback(req, ares);
1397 }
1398
1399 static void ltdb_handle_extended(struct ltdb_context *ctx)
1400 {
1401         struct ldb_extended *ext = NULL;
1402         int ret;
1403
1404         if (strcmp(ctx->req->op.extended.oid,
1405                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1406                 /* get sequence number */
1407                 ret = ltdb_sequence_number(ctx, &ext);
1408         } else {
1409                 /* not recognized */
1410                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1411         }
1412
1413         ltdb_request_extended_done(ctx, ext, ret);
1414 }
1415
1416 static void ltdb_callback(struct tevent_context *ev,
1417                           struct tevent_timer *te,
1418                           struct timeval t,
1419                           void *private_data)
1420 {
1421         struct ltdb_context *ctx;
1422         int ret;
1423
1424         ctx = talloc_get_type(private_data, struct ltdb_context);
1425
1426         if (ctx->request_terminated) {
1427                 goto done;
1428         }
1429
1430         switch (ctx->req->operation) {
1431         case LDB_SEARCH:
1432                 ret = ltdb_search(ctx);
1433                 break;
1434         case LDB_ADD:
1435                 ret = ltdb_add(ctx);
1436                 break;
1437         case LDB_MODIFY:
1438                 ret = ltdb_modify(ctx);
1439                 break;
1440         case LDB_DELETE:
1441                 ret = ltdb_delete(ctx);
1442                 break;
1443         case LDB_RENAME:
1444                 ret = ltdb_rename(ctx);
1445                 break;
1446         case LDB_EXTENDED:
1447                 ltdb_handle_extended(ctx);
1448                 goto done;
1449         default:
1450                 /* no other op supported */
1451                 ret = LDB_ERR_PROTOCOL_ERROR;
1452         }
1453
1454         if (!ctx->request_terminated) {
1455                 /* request is done now */
1456                 ltdb_request_done(ctx, ret);
1457         }
1458
1459 done:
1460         if (ctx->spy) {
1461                 /* neutralize the spy */
1462                 ctx->spy->ctx = NULL;
1463                 ctx->spy = NULL;
1464         }
1465         talloc_free(ctx);
1466 }
1467
1468 static int ltdb_request_destructor(void *ptr)
1469 {
1470         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1471
1472         if (spy->ctx != NULL) {
1473                 spy->ctx->spy = NULL;
1474                 spy->ctx->request_terminated = true;
1475                 spy->ctx = NULL;
1476         }
1477
1478         return 0;
1479 }
1480
1481 static int ltdb_handle_request(struct ldb_module *module,
1482                                struct ldb_request *req)
1483 {
1484         struct ldb_control *control_permissive;
1485         struct ldb_context *ldb;
1486         struct tevent_context *ev;
1487         struct ltdb_context *ac;
1488         struct tevent_timer *te;
1489         struct timeval tv;
1490         unsigned int i;
1491
1492         ldb = ldb_module_get_ctx(module);
1493
1494         control_permissive = ldb_request_get_control(req,
1495                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1496
1497         for (i = 0; req->controls && req->controls[i]; i++) {
1498                 if (req->controls[i]->critical &&
1499                     req->controls[i] != control_permissive) {
1500                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1501                                                req->controls[i]->oid);
1502                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1503                 }
1504         }
1505
1506         if (req->starttime == 0 || req->timeout == 0) {
1507                 ldb_set_errstring(ldb, "Invalid timeout settings");
1508                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1509         }
1510
1511         ev = ldb_handle_get_event_context(req->handle);
1512
1513         ac = talloc_zero(ldb, struct ltdb_context);
1514         if (ac == NULL) {
1515                 ldb_oom(ldb);
1516                 return LDB_ERR_OPERATIONS_ERROR;
1517         }
1518
1519         ac->module = module;
1520         ac->req = req;
1521
1522         tv.tv_sec = 0;
1523         tv.tv_usec = 0;
1524         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1525         if (NULL == te) {
1526                 talloc_free(ac);
1527                 return LDB_ERR_OPERATIONS_ERROR;
1528         }
1529
1530         if (req->timeout > 0) {
1531                 tv.tv_sec = req->starttime + req->timeout;
1532                 tv.tv_usec = 0;
1533                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1534                                                      ltdb_timeout, ac);
1535                 if (NULL == ac->timeout_event) {
1536                         talloc_free(ac);
1537                         return LDB_ERR_OPERATIONS_ERROR;
1538                 }
1539         }
1540
1541         /* set a spy so that we do not try to use the request context
1542          * if it is freed before ltdb_callback fires */
1543         ac->spy = talloc(req, struct ltdb_req_spy);
1544         if (NULL == ac->spy) {
1545                 talloc_free(ac);
1546                 return LDB_ERR_OPERATIONS_ERROR;
1547         }
1548         ac->spy->ctx = ac;
1549
1550         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1551
1552         return LDB_SUCCESS;
1553 }
1554
1555 static int ltdb_init_rootdse(struct ldb_module *module)
1556 {
1557         /* ignore errors on this - we expect it for non-sam databases */
1558         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1559
1560         /* there can be no module beyond the backend, just return */
1561         return LDB_SUCCESS;
1562 }
1563
1564 static const struct ldb_module_ops ltdb_ops = {
1565         .name              = "tdb",
1566         .init_context      = ltdb_init_rootdse,
1567         .search            = ltdb_handle_request,
1568         .add               = ltdb_handle_request,
1569         .modify            = ltdb_handle_request,
1570         .del               = ltdb_handle_request,
1571         .rename            = ltdb_handle_request,
1572         .extended          = ltdb_handle_request,
1573         .start_transaction = ltdb_start_trans,
1574         .end_transaction   = ltdb_end_trans,
1575         .prepare_commit    = ltdb_prepare_commit,
1576         .del_transaction   = ltdb_del_trans,
1577 };
1578
1579 /*
1580   connect to the database
1581 */
1582 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1583                         unsigned int flags, const char *options[],
1584                         struct ldb_module **_module)
1585 {
1586         struct ldb_module *module;
1587         const char *path;
1588         int tdb_flags, open_flags;
1589         struct ltdb_private *ltdb;
1590
1591         /*
1592          * We hold locks, so we must use a private event context
1593          * on each returned handle
1594          */
1595
1596         ldb_set_require_private_event_context(ldb);
1597
1598         /* parse the url */
1599         if (strchr(url, ':')) {
1600                 if (strncmp(url, "tdb://", 6) != 0) {
1601                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1602                                   "Invalid tdb URL '%s'", url);
1603                         return LDB_ERR_OPERATIONS_ERROR;
1604                 }
1605                 path = url+6;
1606         } else {
1607                 path = url;
1608         }
1609
1610         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1611
1612         /* check for the 'nosync' option */
1613         if (flags & LDB_FLG_NOSYNC) {
1614                 tdb_flags |= TDB_NOSYNC;
1615         }
1616
1617         /* and nommap option */
1618         if (flags & LDB_FLG_NOMMAP) {
1619                 tdb_flags |= TDB_NOMMAP;
1620         }
1621
1622         if (flags & LDB_FLG_RDONLY) {
1623                 open_flags = O_RDONLY;
1624         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1625                 open_flags = O_RDWR;
1626         } else {
1627                 open_flags = O_CREAT | O_RDWR;
1628         }
1629
1630         ltdb = talloc_zero(ldb, struct ltdb_private);
1631         if (!ltdb) {
1632                 ldb_oom(ldb);
1633                 return LDB_ERR_OPERATIONS_ERROR;
1634         }
1635
1636         /* note that we use quite a large default hash size */
1637         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1638                                    tdb_flags, open_flags,
1639                                    ldb_get_create_perms(ldb), ldb);
1640         if (!ltdb->tdb) {
1641                 ldb_asprintf_errstring(ldb,
1642                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1643                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1644                           "Unable to open tdb '%s': %s", path, strerror(errno));
1645                 talloc_free(ltdb);
1646                 if (errno == EACCES || errno == EPERM) {
1647                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1648                 }
1649                 return LDB_ERR_OPERATIONS_ERROR;
1650         }
1651
1652         if (getenv("LDB_WARN_UNINDEXED")) {
1653                 ltdb->warn_unindexed = true;
1654         }
1655
1656         if (getenv("LDB_WARN_REINDEX")) {
1657                 ltdb->warn_reindex = true;
1658         }
1659
1660         ltdb->sequence_number = 0;
1661
1662         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1663         if (!module) {
1664                 ldb_oom(ldb);
1665                 talloc_free(ltdb);
1666                 return LDB_ERR_OPERATIONS_ERROR;
1667         }
1668         ldb_module_set_private(module, ltdb);
1669         talloc_steal(module, ltdb);
1670
1671         if (ltdb_cache_load(module) != 0) {
1672                 ldb_asprintf_errstring(ldb,
1673                                        "Unable to load ltdb cache records of tdb '%s'", path);
1674                 talloc_free(module);
1675                 return LDB_ERR_OPERATIONS_ERROR;
1676         }
1677
1678         *_module = module;
1679         return LDB_SUCCESS;
1680 }
1681
1682 int ldb_tdb_init(const char *version)
1683 {
1684         LDB_MODULE_CHECK_VERSION(version);
1685         return ldb_register_backend("tdb", ltdb_connect, false);
1686 }