ldb: allow a timeout of -1 result in no timeout timer at all.
[vlendec/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 return 0;
123         }
124         ltdb->read_lock_count--;
125         return 0;
126 }
127
128
129 /*
130   form a TDB_DATA for a record key
131   caller frees
132
133   note that the key for a record can depend on whether the
134   dn refers to a case sensitive index record or not
135 */
136 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
137 {
138         struct ldb_context *ldb = ldb_module_get_ctx(module);
139         TDB_DATA key;
140         char *key_str = NULL;
141         const char *dn_folded = NULL;
142
143         /*
144           most DNs are case insensitive. The exception is index DNs for
145           case sensitive attributes
146
147           there are 3 cases dealt with in this code:
148
149           1) if the dn doesn't start with @ then uppercase the attribute
150              names and the attributes values of case insensitive attributes
151           2) if the dn starts with @ then leave it alone -
152              the indexing code handles the rest
153         */
154
155         dn_folded = ldb_dn_get_casefold(dn);
156         if (!dn_folded) {
157                 goto failed;
158         }
159
160         key_str = talloc_strdup(ldb, "DN=");
161         if (!key_str) {
162                 goto failed;
163         }
164
165         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
166         if (!key_str) {
167                 goto failed;
168         }
169
170         key.dptr = (uint8_t *)key_str;
171         key.dsize = strlen(key_str) + 1;
172
173         return key;
174
175 failed:
176         errno = ENOMEM;
177         key.dptr = NULL;
178         key.dsize = 0;
179         return key;
180 }
181
182 /*
183   check special dn's have valid attributes
184   currently only @ATTRIBUTES is checked
185 */
186 static int ltdb_check_special_dn(struct ldb_module *module,
187                                  const struct ldb_message *msg)
188 {
189         struct ldb_context *ldb = ldb_module_get_ctx(module);
190         unsigned int i, j;
191
192         if (! ldb_dn_is_special(msg->dn) ||
193             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
194                 return LDB_SUCCESS;
195         }
196
197         /* we have @ATTRIBUTES, let's check attributes are fine */
198         /* should we check that we deny multivalued attributes ? */
199         for (i = 0; i < msg->num_elements; i++) {
200                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
201
202                 for (j = 0; j < msg->elements[i].num_values; j++) {
203                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
204                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
205                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
206                         }
207                 }
208         }
209
210         return LDB_SUCCESS;
211 }
212
213
214 /*
215   we've made a modification to a dn - possibly reindex and
216   update sequence number
217 */
218 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
219 {
220         int ret = LDB_SUCCESS;
221         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
222
223         /* only allow modifies inside a transaction, otherwise the
224          * ldb is unsafe */
225         if (ltdb->in_transaction == 0) {
226                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         if (ldb_dn_is_special(dn) &&
231             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
232              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
233         {
234                 if (ltdb->warn_reindex) {
235                         ldb_debug(ldb_module_get_ctx(module),
236                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
237                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
238                 }
239                 ret = ltdb_reindex(module);
240         }
241
242         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
243         if (ret == LDB_SUCCESS &&
244             !(ldb_dn_is_special(dn) &&
245               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
246                 ret = ltdb_increase_sequence_number(module);
247         }
248
249         /* If the modify was to @OPTIONS, reload the cache */
250         if (ret == LDB_SUCCESS &&
251             ldb_dn_is_special(dn) &&
252             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
253                 ret = ltdb_cache_reload(module);
254         }
255
256         return ret;
257 }
258
259 /*
260   store a record into the db
261 */
262 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
263 {
264         void *data = ldb_module_get_private(module);
265         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
266         TDB_DATA tdb_key, tdb_data;
267         struct ldb_val ldb_data;
268         int ret = LDB_SUCCESS;
269
270         tdb_key = ltdb_key(module, msg->dn);
271         if (tdb_key.dptr == NULL) {
272                 return LDB_ERR_OTHER;
273         }
274
275         ret = ldb_pack_data(ldb_module_get_ctx(module),
276                             msg, &ldb_data);
277         if (ret == -1) {
278                 talloc_free(tdb_key.dptr);
279                 return LDB_ERR_OTHER;
280         }
281
282         tdb_data.dptr = ldb_data.data;
283         tdb_data.dsize = ldb_data.length;
284
285         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
286         if (ret != 0) {
287                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
288                 goto done;
289         }
290
291 done:
292         talloc_free(tdb_key.dptr);
293         talloc_free(ldb_data.data);
294
295         return ret;
296 }
297
298
299 /*
300   check if a attribute is a single valued, for a given element
301  */
302 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
303                                   struct ldb_message_element *el)
304 {
305         if (!a) return false;
306         if (el != NULL) {
307                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
308                         /* override from a ldb module, for example
309                            used for the description field, which is
310                            marked multi-valued in the schema but which
311                            should not actually accept multiple
312                            values */
313                         return true;
314                 }
315                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
316                         /* override from a ldb module, for example used for
317                            deleted linked attribute entries */
318                         return false;
319                 }
320         }
321         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
322                 return true;
323         }
324         return false;
325 }
326
327 static int ltdb_add_internal(struct ldb_module *module,
328                              const struct ldb_message *msg,
329                              bool check_single_value)
330 {
331         struct ldb_context *ldb = ldb_module_get_ctx(module);
332         int ret = LDB_SUCCESS;
333         unsigned int i, j;
334
335         for (i=0;i<msg->num_elements;i++) {
336                 struct ldb_message_element *el = &msg->elements[i];
337                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
338
339                 if (el->num_values == 0) {
340                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
341                                                el->name, ldb_dn_get_linearized(msg->dn));
342                         return LDB_ERR_CONSTRAINT_VIOLATION;
343                 }
344                 if (check_single_value &&
345                                 el->num_values > 1 &&
346                                 ldb_tdb_single_valued(a, el)) {
347                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
348                                                el->name, ldb_dn_get_linearized(msg->dn));
349                         return LDB_ERR_CONSTRAINT_VIOLATION;
350                 }
351
352                 /* Do not check "@ATTRIBUTES" for duplicated values */
353                 if (ldb_dn_is_special(msg->dn) &&
354                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
355                         continue;
356                 }
357
358                 /* TODO: This is O(n^2) - replace with more efficient check */
359                 for (j=0; j<el->num_values; j++) {
360                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
361                                 ldb_asprintf_errstring(ldb,
362                                                        "attribute '%s': value #%u on '%s' provided more than once",
363                                                        el->name, j, ldb_dn_get_linearized(msg->dn));
364                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
365                         }
366                 }
367         }
368
369         ret = ltdb_store(module, msg, TDB_INSERT);
370         if (ret != LDB_SUCCESS) {
371                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
372                         ldb_asprintf_errstring(ldb,
373                                                "Entry %s already exists",
374                                                ldb_dn_get_linearized(msg->dn));
375                 }
376                 return ret;
377         }
378
379         ret = ltdb_index_add_new(module, msg);
380         if (ret != LDB_SUCCESS) {
381                 return ret;
382         }
383
384         ret = ltdb_modified(module, msg->dn);
385
386         return ret;
387 }
388
389 /*
390   add a record to the database
391 */
392 static int ltdb_add(struct ltdb_context *ctx)
393 {
394         struct ldb_module *module = ctx->module;
395         struct ldb_request *req = ctx->req;
396         int ret = LDB_SUCCESS;
397
398         ret = ltdb_check_special_dn(module, req->op.add.message);
399         if (ret != LDB_SUCCESS) {
400                 return ret;
401         }
402
403         ldb_request_set_state(req, LDB_ASYNC_PENDING);
404
405         if (ltdb_cache_load(module) != 0) {
406                 return LDB_ERR_OPERATIONS_ERROR;
407         }
408
409         ret = ltdb_add_internal(module, req->op.add.message, true);
410
411         return ret;
412 }
413
414 /*
415   delete a record from the database, not updating indexes (used for deleting
416   index records)
417 */
418 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
419 {
420         void *data = ldb_module_get_private(module);
421         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
422         TDB_DATA tdb_key;
423         int ret;
424
425         tdb_key = ltdb_key(module, dn);
426         if (!tdb_key.dptr) {
427                 return LDB_ERR_OTHER;
428         }
429
430         ret = tdb_delete(ltdb->tdb, tdb_key);
431         talloc_free(tdb_key.dptr);
432
433         if (ret != 0) {
434                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
435         }
436
437         return ret;
438 }
439
440 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
441 {
442         struct ldb_message *msg;
443         int ret = LDB_SUCCESS;
444
445         msg = ldb_msg_new(module);
446         if (msg == NULL) {
447                 return LDB_ERR_OPERATIONS_ERROR;
448         }
449
450         /* in case any attribute of the message was indexed, we need
451            to fetch the old record */
452         ret = ltdb_search_dn1(module, dn, msg);
453         if (ret != LDB_SUCCESS) {
454                 /* not finding the old record is an error */
455                 goto done;
456         }
457
458         ret = ltdb_delete_noindex(module, dn);
459         if (ret != LDB_SUCCESS) {
460                 goto done;
461         }
462
463         /* remove any indexed attributes */
464         ret = ltdb_index_delete(module, msg);
465         if (ret != LDB_SUCCESS) {
466                 goto done;
467         }
468
469         ret = ltdb_modified(module, dn);
470         if (ret != LDB_SUCCESS) {
471                 goto done;
472         }
473
474 done:
475         talloc_free(msg);
476         return ret;
477 }
478
479 /*
480   delete a record from the database
481 */
482 static int ltdb_delete(struct ltdb_context *ctx)
483 {
484         struct ldb_module *module = ctx->module;
485         struct ldb_request *req = ctx->req;
486         int ret = LDB_SUCCESS;
487
488         ldb_request_set_state(req, LDB_ASYNC_PENDING);
489
490         if (ltdb_cache_load(module) != 0) {
491                 return LDB_ERR_OPERATIONS_ERROR;
492         }
493
494         ret = ltdb_delete_internal(module, req->op.del.dn);
495
496         return ret;
497 }
498
499 /*
500   find an element by attribute name. At the moment this does a linear search,
501   it should be re-coded to use a binary search once all places that modify
502   records guarantee sorted order
503
504   return the index of the first matching element if found, otherwise -1
505 */
506 static int find_element(const struct ldb_message *msg, const char *name)
507 {
508         unsigned int i;
509         for (i=0;i<msg->num_elements;i++) {
510                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
511                         return i;
512                 }
513         }
514         return -1;
515 }
516
517
518 /*
519   add an element to an existing record. Assumes a elements array that we
520   can call re-alloc on, and assumed that we can re-use the data pointers from
521   the passed in additional values. Use with care!
522
523   returns 0 on success, -1 on failure (and sets errno)
524 */
525 static int ltdb_msg_add_element(struct ldb_context *ldb,
526                                 struct ldb_message *msg,
527                                 struct ldb_message_element *el)
528 {
529         struct ldb_message_element *e2;
530         unsigned int i;
531
532         if (el->num_values == 0) {
533                 /* nothing to do here - we don't add empty elements */
534                 return 0;
535         }
536
537         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
538                               msg->num_elements+1);
539         if (!e2) {
540                 errno = ENOMEM;
541                 return -1;
542         }
543
544         msg->elements = e2;
545
546         e2 = &msg->elements[msg->num_elements];
547
548         e2->name = el->name;
549         e2->flags = el->flags;
550         e2->values = talloc_array(msg->elements,
551                                   struct ldb_val, el->num_values);
552         if (!e2->values) {
553                 errno = ENOMEM;
554                 return -1;
555         }
556         for (i=0;i<el->num_values;i++) {
557                 e2->values[i] = el->values[i];
558         }
559         e2->num_values = el->num_values;
560
561         ++msg->num_elements;
562
563         return 0;
564 }
565
566 /*
567   delete all elements having a specified attribute name
568 */
569 static int msg_delete_attribute(struct ldb_module *module,
570                                 struct ldb_context *ldb,
571                                 struct ldb_message *msg, const char *name)
572 {
573         unsigned int i;
574         int ret;
575         struct ldb_message_element *el;
576
577         el = ldb_msg_find_element(msg, name);
578         if (el == NULL) {
579                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
580         }
581         i = el - msg->elements;
582
583         ret = ltdb_index_del_element(module, msg->dn, el);
584         if (ret != LDB_SUCCESS) {
585                 return ret;
586         }
587
588         talloc_free(el->values);
589         if (msg->num_elements > (i+1)) {
590                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
591         }
592         msg->num_elements--;
593         msg->elements = talloc_realloc(msg, msg->elements,
594                                        struct ldb_message_element,
595                                        msg->num_elements);
596         return LDB_SUCCESS;
597 }
598
599 /*
600   delete all elements matching an attribute name/value
601
602   return LDB Error on failure
603 */
604 static int msg_delete_element(struct ldb_module *module,
605                               struct ldb_message *msg,
606                               const char *name,
607                               const struct ldb_val *val)
608 {
609         struct ldb_context *ldb = ldb_module_get_ctx(module);
610         unsigned int i;
611         int found, ret;
612         struct ldb_message_element *el;
613         const struct ldb_schema_attribute *a;
614
615         found = find_element(msg, name);
616         if (found == -1) {
617                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
618         }
619
620         i = (unsigned int) found;
621         el = &(msg->elements[i]);
622
623         a = ldb_schema_attribute_by_name(ldb, el->name);
624
625         for (i=0;i<el->num_values;i++) {
626                 bool matched;
627                 if (a->syntax->operator_fn) {
628                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
629                                                      &el->values[i], val, &matched);
630                         if (ret != LDB_SUCCESS) return ret;
631                 } else {
632                         matched = (a->syntax->comparison_fn(ldb, ldb,
633                                                             &el->values[i], val) == 0);
634                 }
635                 if (matched) {
636                         if (el->num_values == 1) {
637                                 return msg_delete_attribute(module, ldb, msg, name);
638                         }
639
640                         ret = ltdb_index_del_value(module, msg->dn, el, i);
641                         if (ret != LDB_SUCCESS) {
642                                 return ret;
643                         }
644
645                         if (i<el->num_values-1) {
646                                 memmove(&el->values[i], &el->values[i+1],
647                                         sizeof(el->values[i])*
648                                                 (el->num_values-(i+1)));
649                         }
650                         el->num_values--;
651
652                         /* per definition we find in a canonicalised message an
653                            attribute value only once. So we are finished here */
654                         return LDB_SUCCESS;
655                 }
656         }
657
658         /* Not found */
659         return LDB_ERR_NO_SUCH_ATTRIBUTE;
660 }
661
662
663 /*
664   modify a record - internal interface
665
666   yuck - this is O(n^2). Luckily n is usually small so we probably
667   get away with it, but if we ever have really large attribute lists
668   then we'll need to look at this again
669
670   'req' is optional, and is used to specify controls if supplied
671 */
672 int ltdb_modify_internal(struct ldb_module *module,
673                          const struct ldb_message *msg,
674                          struct ldb_request *req)
675 {
676         struct ldb_context *ldb = ldb_module_get_ctx(module);
677         void *data = ldb_module_get_private(module);
678         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
679         TDB_DATA tdb_key, tdb_data;
680         struct ldb_val ldb_data;
681         struct ldb_message *msg2;
682         unsigned int i, j, k;
683         int ret = LDB_SUCCESS, idx;
684         struct ldb_control *control_permissive = NULL;
685
686         if (req) {
687                 control_permissive = ldb_request_get_control(req,
688                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
689         }
690
691         tdb_key = ltdb_key(module, msg->dn);
692         if (!tdb_key.dptr) {
693                 return LDB_ERR_OTHER;
694         }
695
696         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
697         if (!tdb_data.dptr) {
698                 talloc_free(tdb_key.dptr);
699                 return ltdb_err_map(tdb_error(ltdb->tdb));
700         }
701
702         msg2 = ldb_msg_new(tdb_key.dptr);
703         if (msg2 == NULL) {
704                 free(tdb_data.dptr);
705                 ret = LDB_ERR_OTHER;
706                 goto done;
707         }
708
709         ldb_data.data = tdb_data.dptr;
710         ldb_data.length = tdb_data.dsize;
711
712         ret = ldb_unpack_data(ldb_module_get_ctx(module), &ldb_data, msg2);
713         free(tdb_data.dptr);
714         if (ret == -1) {
715                 ret = LDB_ERR_OTHER;
716                 goto done;
717         }
718
719         if (!msg2->dn) {
720                 msg2->dn = msg->dn;
721         }
722
723         for (i=0; i<msg->num_elements; i++) {
724                 struct ldb_message_element *el = &msg->elements[i], *el2;
725                 struct ldb_val *vals;
726                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
727                 const char *dn;
728
729                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
730                 case LDB_FLAG_MOD_ADD:
731
732                         if (el->num_values == 0) {
733                                 ldb_asprintf_errstring(ldb,
734                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
735                                                        el->name, ldb_dn_get_linearized(msg2->dn));
736                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
737                                 goto done;
738                         }
739
740                         /* make a copy of the array so that a permissive
741                          * control can remove duplicates without changing the
742                          * original values, but do not copy data as we do not
743                          * need to keep it around once the operation is
744                          * finished */
745                         if (control_permissive) {
746                                 el = talloc(msg2, struct ldb_message_element);
747                                 if (!el) {
748                                         ret = LDB_ERR_OTHER;
749                                         goto done;
750                                 }
751                                 *el = msg->elements[i];
752                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
753                                 if (el->values == NULL) {
754                                         ret = LDB_ERR_OTHER;
755                                         goto done;
756                                 }
757                                 for (j = 0; j < el->num_values; j++) {
758                                         el->values[j] = msg->elements[i].values[j];
759                                 }
760                         }
761
762                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
763                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
764                                                        el->name, ldb_dn_get_linearized(msg2->dn));
765                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
766                                 goto done;
767                         }
768
769                         /* Checks if element already exists */
770                         idx = find_element(msg2, el->name);
771                         if (idx == -1) {
772                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
773                                         ret = LDB_ERR_OTHER;
774                                         goto done;
775                                 }
776                                 ret = ltdb_index_add_element(module, msg2->dn,
777                                                              el);
778                                 if (ret != LDB_SUCCESS) {
779                                         goto done;
780                                 }
781                         } else {
782                                 j = (unsigned int) idx;
783                                 el2 = &(msg2->elements[j]);
784
785                                 /* We cannot add another value on a existing one
786                                    if the attribute is single-valued */
787                                 if (ldb_tdb_single_valued(a, el)) {
788                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
789                                                                el->name, ldb_dn_get_linearized(msg2->dn));
790                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
791                                         goto done;
792                                 }
793
794                                 /* Check that values don't exist yet on multi-
795                                    valued attributes or aren't provided twice */
796                                 /* TODO: This is O(n^2) - replace with more efficient check */
797                                 for (j = 0; j < el->num_values; j++) {
798                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
799                                                 if (control_permissive) {
800                                                         /* remove this one as if it was never added */
801                                                         el->num_values--;
802                                                         for (k = j; k < el->num_values; k++) {
803                                                                 el->values[k] = el->values[k + 1];
804                                                         }
805                                                         j--; /* rewind */
806
807                                                         continue;
808                                                 }
809
810                                                 ldb_asprintf_errstring(ldb,
811                                                                        "attribute '%s': value #%u on '%s' already exists",
812                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
813                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
814                                                 goto done;
815                                         }
816                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
817                                                 ldb_asprintf_errstring(ldb,
818                                                                        "attribute '%s': value #%u on '%s' provided more than once",
819                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
820                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
821                                                 goto done;
822                                         }
823                                 }
824
825                                 /* Now combine existing and new values to a new
826                                    attribute record */
827                                 vals = talloc_realloc(msg2->elements,
828                                                       el2->values, struct ldb_val,
829                                                       el2->num_values + el->num_values);
830                                 if (vals == NULL) {
831                                         ldb_oom(ldb);
832                                         ret = LDB_ERR_OTHER;
833                                         goto done;
834                                 }
835
836                                 for (j=0; j<el->num_values; j++) {
837                                         vals[el2->num_values + j] =
838                                                 ldb_val_dup(vals, &el->values[j]);
839                                 }
840
841                                 el2->values = vals;
842                                 el2->num_values += el->num_values;
843
844                                 ret = ltdb_index_add_element(module, msg2->dn, el);
845                                 if (ret != LDB_SUCCESS) {
846                                         goto done;
847                                 }
848                         }
849
850                         break;
851
852                 case LDB_FLAG_MOD_REPLACE:
853
854                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
855                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
856                                                        el->name, ldb_dn_get_linearized(msg2->dn));
857                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
858                                 goto done;
859                         }
860
861                         /* TODO: This is O(n^2) - replace with more efficient check */
862                         for (j=0; j<el->num_values; j++) {
863                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
864                                         ldb_asprintf_errstring(ldb,
865                                                                "attribute '%s': value #%u on '%s' provided more than once",
866                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
867                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
868                                         goto done;
869                                 }
870                         }
871
872                         /* Checks if element already exists */
873                         idx = find_element(msg2, el->name);
874                         if (idx != -1) {
875                                 j = (unsigned int) idx;
876                                 el2 = &(msg2->elements[j]);
877
878                                 /* we consider two elements to be
879                                  * equal only if the order
880                                  * matches. This allows dbcheck to
881                                  * fix the ordering on attributes
882                                  * where order matters, such as
883                                  * objectClass
884                                  */
885                                 if (ldb_msg_element_equal_ordered(el, el2)) {
886                                         continue;
887                                 }
888
889                                 /* Delete the attribute if it exists in the DB */
890                                 if (msg_delete_attribute(module, ldb, msg2,
891                                                          el->name) != 0) {
892                                         ret = LDB_ERR_OTHER;
893                                         goto done;
894                                 }
895                         }
896
897                         /* Recreate it with the new values */
898                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
899                                 ret = LDB_ERR_OTHER;
900                                 goto done;
901                         }
902
903                         ret = ltdb_index_add_element(module, msg2->dn, el);
904                         if (ret != LDB_SUCCESS) {
905                                 goto done;
906                         }
907
908                         break;
909
910                 case LDB_FLAG_MOD_DELETE:
911                         dn = ldb_dn_get_linearized(msg2->dn);
912                         if (dn == NULL) {
913                                 ret = LDB_ERR_OTHER;
914                                 goto done;
915                         }
916
917                         if (msg->elements[i].num_values == 0) {
918                                 /* Delete the whole attribute */
919                                 ret = msg_delete_attribute(module, ldb, msg2,
920                                                            msg->elements[i].name);
921                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
922                                     control_permissive) {
923                                         ret = LDB_SUCCESS;
924                                 } else {
925                                         ldb_asprintf_errstring(ldb,
926                                                                "attribute '%s': no such attribute for delete on '%s'",
927                                                                msg->elements[i].name, dn);
928                                 }
929                                 if (ret != LDB_SUCCESS) {
930                                         goto done;
931                                 }
932                         } else {
933                                 /* Delete specified values from an attribute */
934                                 for (j=0; j < msg->elements[i].num_values; j++) {
935                                         ret = msg_delete_element(module,
936                                                                  msg2,
937                                                                  msg->elements[i].name,
938                                                                  &msg->elements[i].values[j]);
939                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
940                                             control_permissive) {
941                                                 ret = LDB_SUCCESS;
942                                         } else {
943                                                 ldb_asprintf_errstring(ldb,
944                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
945                                                                        msg->elements[i].name, dn);
946                                         }
947                                         if (ret != LDB_SUCCESS) {
948                                                 goto done;
949                                         }
950                                 }
951                         }
952                         break;
953                 default:
954                         ldb_asprintf_errstring(ldb,
955                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
956                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
957                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
958                         ret = LDB_ERR_PROTOCOL_ERROR;
959                         goto done;
960                 }
961         }
962
963         ret = ltdb_store(module, msg2, TDB_MODIFY);
964         if (ret != LDB_SUCCESS) {
965                 goto done;
966         }
967
968         ret = ltdb_modified(module, msg2->dn);
969         if (ret != LDB_SUCCESS) {
970                 goto done;
971         }
972
973 done:
974         talloc_free(tdb_key.dptr);
975         return ret;
976 }
977
978 /*
979   modify a record
980 */
981 static int ltdb_modify(struct ltdb_context *ctx)
982 {
983         struct ldb_module *module = ctx->module;
984         struct ldb_request *req = ctx->req;
985         int ret = LDB_SUCCESS;
986
987         ret = ltdb_check_special_dn(module, req->op.mod.message);
988         if (ret != LDB_SUCCESS) {
989                 return ret;
990         }
991
992         ldb_request_set_state(req, LDB_ASYNC_PENDING);
993
994         if (ltdb_cache_load(module) != 0) {
995                 return LDB_ERR_OPERATIONS_ERROR;
996         }
997
998         ret = ltdb_modify_internal(module, req->op.mod.message, req);
999
1000         return ret;
1001 }
1002
1003 /*
1004   rename a record
1005 */
1006 static int ltdb_rename(struct ltdb_context *ctx)
1007 {
1008         struct ldb_module *module = ctx->module;
1009         void *data = ldb_module_get_private(module);
1010         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1011         struct ldb_request *req = ctx->req;
1012         struct ldb_message *msg;
1013         int ret = LDB_SUCCESS;
1014         TDB_DATA tdb_key, tdb_key_old;
1015
1016         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1017
1018         if (ltdb_cache_load(ctx->module) != 0) {
1019                 return LDB_ERR_OPERATIONS_ERROR;
1020         }
1021
1022         msg = ldb_msg_new(ctx);
1023         if (msg == NULL) {
1024                 return LDB_ERR_OPERATIONS_ERROR;
1025         }
1026
1027         /* we need to fetch the old record to re-add under the new name */
1028         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
1029         if (ret != LDB_SUCCESS) {
1030                 /* not finding the old record is an error */
1031                 return ret;
1032         }
1033
1034         /* We need to, before changing the DB, check if the new DN
1035          * exists, so we can return this error to the caller with an
1036          * unmodified DB */
1037         tdb_key = ltdb_key(module, req->op.rename.newdn);
1038         if (!tdb_key.dptr) {
1039                 talloc_free(msg);
1040                 return LDB_ERR_OPERATIONS_ERROR;
1041         }
1042
1043         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1044         if (!tdb_key_old.dptr) {
1045                 talloc_free(msg);
1046                 talloc_free(tdb_key.dptr);
1047                 return LDB_ERR_OPERATIONS_ERROR;
1048         }
1049
1050         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1051         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1052                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1053                         talloc_free(tdb_key_old.dptr);
1054                         talloc_free(tdb_key.dptr);
1055                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1056                                                "Entry %s already exists",
1057                                                ldb_dn_get_linearized(msg->dn));
1058                         /* finding the new record already in the DB is an error */
1059                         talloc_free(msg);
1060                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1061                 }
1062         }
1063         talloc_free(tdb_key_old.dptr);
1064         talloc_free(tdb_key.dptr);
1065
1066         /* Always delete first then add, to avoid conflicts with
1067          * unique indexes. We rely on the transaction to make this
1068          * atomic
1069          */
1070         ret = ltdb_delete_internal(module, msg->dn);
1071         if (ret != LDB_SUCCESS) {
1072                 talloc_free(msg);
1073                 return ret;
1074         }
1075
1076         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1077         if (msg->dn == NULL) {
1078                 talloc_free(msg);
1079                 return LDB_ERR_OPERATIONS_ERROR;
1080         }
1081
1082         /* We don't check single value as we can have more than 1 with
1083          * deleted attributes. We could go through all elements but that's
1084          * maybe not the most efficient way
1085          */
1086         ret = ltdb_add_internal(module, msg, false);
1087
1088         talloc_free(msg);
1089
1090         return ret;
1091 }
1092
1093 static int ltdb_start_trans(struct ldb_module *module)
1094 {
1095         void *data = ldb_module_get_private(module);
1096         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1097
1098         if (tdb_transaction_start(ltdb->tdb) != 0) {
1099                 return ltdb_err_map(tdb_error(ltdb->tdb));
1100         }
1101
1102         ltdb->in_transaction++;
1103
1104         ltdb_index_transaction_start(module);
1105
1106         return LDB_SUCCESS;
1107 }
1108
1109 static int ltdb_prepare_commit(struct ldb_module *module)
1110 {
1111         void *data = ldb_module_get_private(module);
1112         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1113
1114         if (ltdb->in_transaction != 1) {
1115                 return LDB_SUCCESS;
1116         }
1117
1118         if (ltdb_index_transaction_commit(module) != 0) {
1119                 tdb_transaction_cancel(ltdb->tdb);
1120                 ltdb->in_transaction--;
1121                 return ltdb_err_map(tdb_error(ltdb->tdb));
1122         }
1123
1124         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1125                 ltdb->in_transaction--;
1126                 return ltdb_err_map(tdb_error(ltdb->tdb));
1127         }
1128
1129         ltdb->prepared_commit = true;
1130
1131         return LDB_SUCCESS;
1132 }
1133
1134 static int ltdb_end_trans(struct ldb_module *module)
1135 {
1136         void *data = ldb_module_get_private(module);
1137         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1138
1139         if (!ltdb->prepared_commit) {
1140                 int ret = ltdb_prepare_commit(module);
1141                 if (ret != LDB_SUCCESS) {
1142                         return ret;
1143                 }
1144         }
1145
1146         ltdb->in_transaction--;
1147         ltdb->prepared_commit = false;
1148
1149         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1150                 return ltdb_err_map(tdb_error(ltdb->tdb));
1151         }
1152
1153         return LDB_SUCCESS;
1154 }
1155
1156 static int ltdb_del_trans(struct ldb_module *module)
1157 {
1158         void *data = ldb_module_get_private(module);
1159         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1160
1161         ltdb->in_transaction--;
1162
1163         if (ltdb_index_transaction_cancel(module) != 0) {
1164                 tdb_transaction_cancel(ltdb->tdb);
1165                 return ltdb_err_map(tdb_error(ltdb->tdb));
1166         }
1167
1168         tdb_transaction_cancel(ltdb->tdb);
1169         return LDB_SUCCESS;
1170 }
1171
1172 /*
1173   return sequenceNumber from @BASEINFO
1174 */
1175 static int ltdb_sequence_number(struct ltdb_context *ctx,
1176                                 struct ldb_extended **ext)
1177 {
1178         struct ldb_context *ldb;
1179         struct ldb_module *module = ctx->module;
1180         struct ldb_request *req = ctx->req;
1181         TALLOC_CTX *tmp_ctx = NULL;
1182         struct ldb_seqnum_request *seq;
1183         struct ldb_seqnum_result *res;
1184         struct ldb_message *msg = NULL;
1185         struct ldb_dn *dn;
1186         const char *date;
1187         int ret = LDB_SUCCESS;
1188
1189         ldb = ldb_module_get_ctx(module);
1190
1191         seq = talloc_get_type(req->op.extended.data,
1192                                 struct ldb_seqnum_request);
1193         if (seq == NULL) {
1194                 return LDB_ERR_OPERATIONS_ERROR;
1195         }
1196
1197         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1198
1199         if (ltdb_lock_read(module) != 0) {
1200                 return LDB_ERR_OPERATIONS_ERROR;
1201         }
1202
1203         res = talloc_zero(req, struct ldb_seqnum_result);
1204         if (res == NULL) {
1205                 ret = LDB_ERR_OPERATIONS_ERROR;
1206                 goto done;
1207         }
1208
1209         tmp_ctx = talloc_new(req);
1210         if (tmp_ctx == NULL) {
1211                 ret = LDB_ERR_OPERATIONS_ERROR;
1212                 goto done;
1213         }
1214
1215         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1216         if (dn == NULL) {
1217                 ret = LDB_ERR_OPERATIONS_ERROR;
1218                 goto done;
1219         }
1220
1221         msg = ldb_msg_new(tmp_ctx);
1222         if (msg == NULL) {
1223                 ret = LDB_ERR_OPERATIONS_ERROR;
1224                 goto done;
1225         }
1226
1227         ret = ltdb_search_dn1(module, dn, msg);
1228         if (ret != LDB_SUCCESS) {
1229                 goto done;
1230         }
1231
1232         switch (seq->type) {
1233         case LDB_SEQ_HIGHEST_SEQ:
1234                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1235                 break;
1236         case LDB_SEQ_NEXT:
1237                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1238                 res->seq_num++;
1239                 break;
1240         case LDB_SEQ_HIGHEST_TIMESTAMP:
1241                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1242                 if (date) {
1243                         res->seq_num = ldb_string_to_time(date);
1244                 } else {
1245                         res->seq_num = 0;
1246                         /* zero is as good as anything when we don't know */
1247                 }
1248                 break;
1249         }
1250
1251         *ext = talloc_zero(req, struct ldb_extended);
1252         if (*ext == NULL) {
1253                 ret = LDB_ERR_OPERATIONS_ERROR;
1254                 goto done;
1255         }
1256         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1257         (*ext)->data = talloc_steal(*ext, res);
1258
1259 done:
1260         talloc_free(tmp_ctx);
1261         ltdb_unlock_read(module);
1262         return ret;
1263 }
1264
1265 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1266 {
1267         struct ldb_context *ldb;
1268         struct ldb_request *req;
1269         struct ldb_reply *ares;
1270
1271         ldb = ldb_module_get_ctx(ctx->module);
1272         req = ctx->req;
1273
1274         /* if we already returned an error just return */
1275         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1276                 return;
1277         }
1278
1279         ares = talloc_zero(req, struct ldb_reply);
1280         if (!ares) {
1281                 ldb_oom(ldb);
1282                 req->callback(req, NULL);
1283                 return;
1284         }
1285         ares->type = LDB_REPLY_DONE;
1286         ares->error = error;
1287
1288         req->callback(req, ares);
1289 }
1290
1291 static void ltdb_timeout(struct tevent_context *ev,
1292                           struct tevent_timer *te,
1293                           struct timeval t,
1294                           void *private_data)
1295 {
1296         struct ltdb_context *ctx;
1297         ctx = talloc_get_type(private_data, struct ltdb_context);
1298
1299         if (!ctx->request_terminated) {
1300                 /* request is done now */
1301                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1302         }
1303
1304         if (ctx->spy) {
1305                 /* neutralize the spy */
1306                 ctx->spy->ctx = NULL;
1307                 ctx->spy = NULL;
1308         }
1309         talloc_free(ctx);
1310 }
1311
1312 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1313                                         struct ldb_extended *ext,
1314                                         int error)
1315 {
1316         struct ldb_context *ldb;
1317         struct ldb_request *req;
1318         struct ldb_reply *ares;
1319
1320         ldb = ldb_module_get_ctx(ctx->module);
1321         req = ctx->req;
1322
1323         /* if we already returned an error just return */
1324         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1325                 return;
1326         }
1327
1328         ares = talloc_zero(req, struct ldb_reply);
1329         if (!ares) {
1330                 ldb_oom(ldb);
1331                 req->callback(req, NULL);
1332                 return;
1333         }
1334         ares->type = LDB_REPLY_DONE;
1335         ares->response = ext;
1336         ares->error = error;
1337
1338         req->callback(req, ares);
1339 }
1340
1341 static void ltdb_handle_extended(struct ltdb_context *ctx)
1342 {
1343         struct ldb_extended *ext = NULL;
1344         int ret;
1345
1346         if (strcmp(ctx->req->op.extended.oid,
1347                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1348                 /* get sequence number */
1349                 ret = ltdb_sequence_number(ctx, &ext);
1350         } else {
1351                 /* not recognized */
1352                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1353         }
1354
1355         ltdb_request_extended_done(ctx, ext, ret);
1356 }
1357
1358 static void ltdb_callback(struct tevent_context *ev,
1359                           struct tevent_timer *te,
1360                           struct timeval t,
1361                           void *private_data)
1362 {
1363         struct ltdb_context *ctx;
1364         int ret;
1365
1366         ctx = talloc_get_type(private_data, struct ltdb_context);
1367
1368         if (ctx->request_terminated) {
1369                 goto done;
1370         }
1371
1372         switch (ctx->req->operation) {
1373         case LDB_SEARCH:
1374                 ret = ltdb_search(ctx);
1375                 break;
1376         case LDB_ADD:
1377                 ret = ltdb_add(ctx);
1378                 break;
1379         case LDB_MODIFY:
1380                 ret = ltdb_modify(ctx);
1381                 break;
1382         case LDB_DELETE:
1383                 ret = ltdb_delete(ctx);
1384                 break;
1385         case LDB_RENAME:
1386                 ret = ltdb_rename(ctx);
1387                 break;
1388         case LDB_EXTENDED:
1389                 ltdb_handle_extended(ctx);
1390                 goto done;
1391         default:
1392                 /* no other op supported */
1393                 ret = LDB_ERR_PROTOCOL_ERROR;
1394         }
1395
1396         if (!ctx->request_terminated) {
1397                 /* request is done now */
1398                 ltdb_request_done(ctx, ret);
1399         }
1400
1401 done:
1402         if (ctx->spy) {
1403                 /* neutralize the spy */
1404                 ctx->spy->ctx = NULL;
1405                 ctx->spy = NULL;
1406         }
1407         talloc_free(ctx);
1408 }
1409
1410 static int ltdb_request_destructor(void *ptr)
1411 {
1412         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1413
1414         if (spy->ctx != NULL) {
1415                 spy->ctx->spy = NULL;
1416                 spy->ctx->request_terminated = true;
1417                 spy->ctx = NULL;
1418         }
1419
1420         return 0;
1421 }
1422
1423 static int ltdb_handle_request(struct ldb_module *module,
1424                                struct ldb_request *req)
1425 {
1426         struct ldb_control *control_permissive;
1427         struct ldb_context *ldb;
1428         struct tevent_context *ev;
1429         struct ltdb_context *ac;
1430         struct tevent_timer *te;
1431         struct timeval tv;
1432         unsigned int i;
1433
1434         ldb = ldb_module_get_ctx(module);
1435
1436         control_permissive = ldb_request_get_control(req,
1437                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1438
1439         for (i = 0; req->controls && req->controls[i]; i++) {
1440                 if (req->controls[i]->critical &&
1441                     req->controls[i] != control_permissive) {
1442                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1443                                                req->controls[i]->oid);
1444                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1445                 }
1446         }
1447
1448         if (req->starttime == 0 || req->timeout == 0) {
1449                 ldb_set_errstring(ldb, "Invalid timeout settings");
1450                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1451         }
1452
1453         ev = ldb_get_event_context(ldb);
1454
1455         ac = talloc_zero(ldb, struct ltdb_context);
1456         if (ac == NULL) {
1457                 ldb_oom(ldb);
1458                 return LDB_ERR_OPERATIONS_ERROR;
1459         }
1460
1461         ac->module = module;
1462         ac->req = req;
1463
1464         tv.tv_sec = 0;
1465         tv.tv_usec = 0;
1466         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1467         if (NULL == te) {
1468                 talloc_free(ac);
1469                 return LDB_ERR_OPERATIONS_ERROR;
1470         }
1471
1472         if (req->timeout > 0) {
1473                 tv.tv_sec = req->starttime + req->timeout;
1474                 tv.tv_usec = 0;
1475                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1476                                                      ltdb_timeout, ac);
1477                 if (NULL == ac->timeout_event) {
1478                         talloc_free(ac);
1479                         return LDB_ERR_OPERATIONS_ERROR;
1480                 }
1481         }
1482
1483         /* set a spy so that we do not try to use the request context
1484          * if it is freed before ltdb_callback fires */
1485         ac->spy = talloc(req, struct ltdb_req_spy);
1486         if (NULL == ac->spy) {
1487                 talloc_free(ac);
1488                 return LDB_ERR_OPERATIONS_ERROR;
1489         }
1490         ac->spy->ctx = ac;
1491
1492         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1493
1494         return LDB_SUCCESS;
1495 }
1496
1497 static int ltdb_init_rootdse(struct ldb_module *module)
1498 {
1499         /* ignore errors on this - we expect it for non-sam databases */
1500         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1501
1502         /* there can be no module beyond the backend, just return */
1503         return LDB_SUCCESS;
1504 }
1505
1506 static const struct ldb_module_ops ltdb_ops = {
1507         .name              = "tdb",
1508         .init_context      = ltdb_init_rootdse,
1509         .search            = ltdb_handle_request,
1510         .add               = ltdb_handle_request,
1511         .modify            = ltdb_handle_request,
1512         .del               = ltdb_handle_request,
1513         .rename            = ltdb_handle_request,
1514         .extended          = ltdb_handle_request,
1515         .start_transaction = ltdb_start_trans,
1516         .end_transaction   = ltdb_end_trans,
1517         .prepare_commit    = ltdb_prepare_commit,
1518         .del_transaction   = ltdb_del_trans,
1519 };
1520
1521 /*
1522   connect to the database
1523 */
1524 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1525                         unsigned int flags, const char *options[],
1526                         struct ldb_module **_module)
1527 {
1528         struct ldb_module *module;
1529         const char *path;
1530         int tdb_flags, open_flags;
1531         struct ltdb_private *ltdb;
1532
1533         /* parse the url */
1534         if (strchr(url, ':')) {
1535                 if (strncmp(url, "tdb://", 6) != 0) {
1536                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1537                                   "Invalid tdb URL '%s'", url);
1538                         return LDB_ERR_OPERATIONS_ERROR;
1539                 }
1540                 path = url+6;
1541         } else {
1542                 path = url;
1543         }
1544
1545         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1546
1547         /* check for the 'nosync' option */
1548         if (flags & LDB_FLG_NOSYNC) {
1549                 tdb_flags |= TDB_NOSYNC;
1550         }
1551
1552         /* and nommap option */
1553         if (flags & LDB_FLG_NOMMAP) {
1554                 tdb_flags |= TDB_NOMMAP;
1555         }
1556
1557         if (flags & LDB_FLG_RDONLY) {
1558                 open_flags = O_RDONLY;
1559         } else {
1560                 open_flags = O_CREAT | O_RDWR;
1561         }
1562
1563         ltdb = talloc_zero(ldb, struct ltdb_private);
1564         if (!ltdb) {
1565                 ldb_oom(ldb);
1566                 return LDB_ERR_OPERATIONS_ERROR;
1567         }
1568
1569         /* note that we use quite a large default hash size */
1570         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1571                                    tdb_flags, open_flags,
1572                                    ldb_get_create_perms(ldb), ldb);
1573         if (!ltdb->tdb) {
1574                 ldb_asprintf_errstring(ldb,
1575                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1576                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1577                           "Unable to open tdb '%s': %s", path, strerror(errno));
1578                 talloc_free(ltdb);
1579                 if (errno == EACCES || errno == EPERM) {
1580                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1581                 }
1582                 return LDB_ERR_OPERATIONS_ERROR;
1583         }
1584
1585         if (getenv("LDB_WARN_UNINDEXED")) {
1586                 ltdb->warn_unindexed = true;
1587         }
1588
1589         if (getenv("LDB_WARN_REINDEX")) {
1590                 ltdb->warn_reindex = true;
1591         }
1592
1593         ltdb->sequence_number = 0;
1594
1595         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1596         if (!module) {
1597                 ldb_oom(ldb);
1598                 talloc_free(ltdb);
1599                 return LDB_ERR_OPERATIONS_ERROR;
1600         }
1601         ldb_module_set_private(module, ltdb);
1602         talloc_steal(module, ltdb);
1603
1604         if (ltdb_cache_load(module) != 0) {
1605                 ldb_asprintf_errstring(ldb,
1606                                        "Unable to load ltdb cache records of tdb '%s'", path);
1607                 talloc_free(module);
1608                 return LDB_ERR_OPERATIONS_ERROR;
1609         }
1610
1611         *_module = module;
1612         return LDB_SUCCESS;
1613 }
1614
1615 int ldb_tdb_init(const char *version)
1616 {
1617         LDB_MODULE_CHECK_VERSION(version);
1618         return ldb_register_backend("tdb", ltdb_connect, false);
1619 }