TDB2: make SAMBA use tdb1 again for the moment.
[vlendec/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include <lib/tdb_compat/tdb_compat.h>
54
55 /*
56   prevent memory errors on callbacks
57 */
58 struct ltdb_req_spy {
59         struct ltdb_context *ctx;
60 };
61
62 /*
63   map a tdb error code to a ldb error code
64 */
65 int ltdb_err_map(enum TDB_ERROR tdb_code)
66 {
67         switch (tdb_code) {
68         case TDB_SUCCESS:
69                 return LDB_SUCCESS;
70         case TDB_ERR_CORRUPT:
71         case TDB_ERR_OOM:
72         case TDB_ERR_EINVAL:
73                 return LDB_ERR_OPERATIONS_ERROR;
74         case TDB_ERR_IO:
75                 return LDB_ERR_PROTOCOL_ERROR;
76         case TDB_ERR_LOCK:
77         case TDB_ERR_NOLOCK:
78                 return LDB_ERR_BUSY;
79         case TDB_ERR_LOCK_TIMEOUT:
80                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
81         case TDB_ERR_EXISTS:
82                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
83         case TDB_ERR_NOEXIST:
84                 return LDB_ERR_NO_SUCH_OBJECT;
85         case TDB_ERR_RDONLY:
86                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
87         default:
88                 break;
89         }
90         return LDB_ERR_OTHER;
91 }
92
93 /*
94   lock the database for read - use by ltdb_search and ltdb_sequence_number
95 */
96 int ltdb_lock_read(struct ldb_module *module)
97 {
98         void *data = ldb_module_get_private(module);
99         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
100         int ret = 0;
101
102         if (ltdb->in_transaction == 0 &&
103             ltdb->read_lock_count == 0) {
104                 ret = tdb_lockall_read(ltdb->tdb);
105         }
106         if (ret == 0) {
107                 ltdb->read_lock_count++;
108         }
109         return ret;
110 }
111
112 /*
113   unlock the database after a ltdb_lock_read()
114 */
115 int ltdb_unlock_read(struct ldb_module *module)
116 {
117         void *data = ldb_module_get_private(module);
118         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
119         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
120                 tdb_unlockall_read(ltdb->tdb);
121                 return 0;
122         }
123         ltdb->read_lock_count--;
124         return 0;
125 }
126
127
128 /*
129   form a TDB_DATA for a record key
130   caller frees
131
132   note that the key for a record can depend on whether the
133   dn refers to a case sensitive index record or not
134 */
135 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
136 {
137         struct ldb_context *ldb = ldb_module_get_ctx(module);
138         TDB_DATA key;
139         char *key_str = NULL;
140         const char *dn_folded = NULL;
141
142         /*
143           most DNs are case insensitive. The exception is index DNs for
144           case sensitive attributes
145
146           there are 3 cases dealt with in this code:
147
148           1) if the dn doesn't start with @ then uppercase the attribute
149              names and the attributes values of case insensitive attributes
150           2) if the dn starts with @ then leave it alone -
151              the indexing code handles the rest
152         */
153
154         dn_folded = ldb_dn_get_casefold(dn);
155         if (!dn_folded) {
156                 goto failed;
157         }
158
159         key_str = talloc_strdup(ldb, "DN=");
160         if (!key_str) {
161                 goto failed;
162         }
163
164         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
165         if (!key_str) {
166                 goto failed;
167         }
168
169         key.dptr = (uint8_t *)key_str;
170         key.dsize = strlen(key_str) + 1;
171
172         return key;
173
174 failed:
175         errno = ENOMEM;
176         key.dptr = NULL;
177         key.dsize = 0;
178         return key;
179 }
180
181 /*
182   check special dn's have valid attributes
183   currently only @ATTRIBUTES is checked
184 */
185 static int ltdb_check_special_dn(struct ldb_module *module,
186                                  const struct ldb_message *msg)
187 {
188         struct ldb_context *ldb = ldb_module_get_ctx(module);
189         unsigned int i, j;
190
191         if (! ldb_dn_is_special(msg->dn) ||
192             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
193                 return LDB_SUCCESS;
194         }
195
196         /* we have @ATTRIBUTES, let's check attributes are fine */
197         /* should we check that we deny multivalued attributes ? */
198         for (i = 0; i < msg->num_elements; i++) {
199                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
200
201                 for (j = 0; j < msg->elements[i].num_values; j++) {
202                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
203                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
204                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
205                         }
206                 }
207         }
208
209         return LDB_SUCCESS;
210 }
211
212
213 /*
214   we've made a modification to a dn - possibly reindex and
215   update sequence number
216 */
217 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
218 {
219         int ret = LDB_SUCCESS;
220         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
221
222         /* only allow modifies inside a transaction, otherwise the
223          * ldb is unsafe */
224         if (ltdb->in_transaction == 0) {
225                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
226                 return LDB_ERR_OPERATIONS_ERROR;
227         }
228
229         if (ldb_dn_is_special(dn) &&
230             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
231              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
232                 ret = ltdb_reindex(module);
233         }
234
235         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
236         if (ret == LDB_SUCCESS &&
237             !(ldb_dn_is_special(dn) &&
238               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
239                 ret = ltdb_increase_sequence_number(module);
240         }
241
242         /* If the modify was to @OPTIONS, reload the cache */
243         if (ret == LDB_SUCCESS &&
244             ldb_dn_is_special(dn) &&
245             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
246                 ret = ltdb_cache_reload(module);
247         }
248
249         return ret;
250 }
251
252 /*
253   store a record into the db
254 */
255 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
256 {
257         void *data = ldb_module_get_private(module);
258         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
259         TDB_DATA tdb_key, tdb_data;
260         int ret = LDB_SUCCESS;
261
262         tdb_key = ltdb_key(module, msg->dn);
263         if (tdb_key.dptr == NULL) {
264                 return LDB_ERR_OTHER;
265         }
266
267         ret = ltdb_pack_data(module, msg, &tdb_data);
268         if (ret == -1) {
269                 talloc_free(tdb_key.dptr);
270                 return LDB_ERR_OTHER;
271         }
272
273         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
274         if (ret != 0) {
275                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
276                 goto done;
277         }
278
279 done:
280         talloc_free(tdb_key.dptr);
281         talloc_free(tdb_data.dptr);
282
283         return ret;
284 }
285
286
287 /*
288   check if a attribute is a single valued, for a given element
289  */
290 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
291                                   struct ldb_message_element *el)
292 {
293         if (!a) return false;
294         if (el != NULL) {
295                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
296                         /* override from a ldb module, for example
297                            used for the description field, which is
298                            marked multi-valued in the schema but which
299                            should not actually accept multiple
300                            values */
301                         return true;
302                 }
303                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
304                         /* override from a ldb module, for example used for
305                            deleted linked attribute entries */
306                         return false;
307                 }
308         }
309         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
310                 return true;
311         }
312         return false;
313 }
314
315 static int ltdb_add_internal(struct ldb_module *module,
316                              const struct ldb_message *msg,
317                              bool check_single_value)
318 {
319         struct ldb_context *ldb = ldb_module_get_ctx(module);
320         int ret = LDB_SUCCESS;
321         unsigned int i;
322
323         for (i=0;i<msg->num_elements;i++) {
324                 struct ldb_message_element *el = &msg->elements[i];
325                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
326
327                 if (el->num_values == 0) {
328                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
329                                                el->name, ldb_dn_get_linearized(msg->dn));
330                         return LDB_ERR_CONSTRAINT_VIOLATION;
331                 }
332                 if (check_single_value &&
333                                 el->num_values > 1 &&
334                                 ldb_tdb_single_valued(a, el)) {
335                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
336                                                el->name, ldb_dn_get_linearized(msg->dn));
337                         return LDB_ERR_CONSTRAINT_VIOLATION;
338                 }
339         }
340
341         ret = ltdb_store(module, msg, TDB_INSERT);
342         if (ret != LDB_SUCCESS) {
343                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
344                         ldb_asprintf_errstring(ldb,
345                                                "Entry %s already exists",
346                                                ldb_dn_get_linearized(msg->dn));
347                 }
348                 return ret;
349         }
350
351         ret = ltdb_index_add_new(module, msg);
352         if (ret != LDB_SUCCESS) {
353                 return ret;
354         }
355
356         ret = ltdb_modified(module, msg->dn);
357
358         return ret;
359 }
360
361 /*
362   add a record to the database
363 */
364 static int ltdb_add(struct ltdb_context *ctx)
365 {
366         struct ldb_module *module = ctx->module;
367         struct ldb_request *req = ctx->req;
368         int ret = LDB_SUCCESS;
369
370         ret = ltdb_check_special_dn(module, req->op.add.message);
371         if (ret != LDB_SUCCESS) {
372                 return ret;
373         }
374
375         ldb_request_set_state(req, LDB_ASYNC_PENDING);
376
377         if (ltdb_cache_load(module) != 0) {
378                 return LDB_ERR_OPERATIONS_ERROR;
379         }
380
381         ret = ltdb_add_internal(module, req->op.add.message, true);
382
383         return ret;
384 }
385
386 /*
387   delete a record from the database, not updating indexes (used for deleting
388   index records)
389 */
390 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
391 {
392         void *data = ldb_module_get_private(module);
393         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
394         TDB_DATA tdb_key;
395         int ret;
396
397         tdb_key = ltdb_key(module, dn);
398         if (!tdb_key.dptr) {
399                 return LDB_ERR_OTHER;
400         }
401
402         ret = tdb_delete(ltdb->tdb, tdb_key);
403         talloc_free(tdb_key.dptr);
404
405         if (ret != 0) {
406                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
407         }
408
409         return ret;
410 }
411
412 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
413 {
414         struct ldb_message *msg;
415         int ret = LDB_SUCCESS;
416
417         msg = ldb_msg_new(module);
418         if (msg == NULL) {
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421
422         /* in case any attribute of the message was indexed, we need
423            to fetch the old record */
424         ret = ltdb_search_dn1(module, dn, msg);
425         if (ret != LDB_SUCCESS) {
426                 /* not finding the old record is an error */
427                 goto done;
428         }
429
430         ret = ltdb_delete_noindex(module, dn);
431         if (ret != LDB_SUCCESS) {
432                 goto done;
433         }
434
435         /* remove any indexed attributes */
436         ret = ltdb_index_delete(module, msg);
437         if (ret != LDB_SUCCESS) {
438                 goto done;
439         }
440
441         ret = ltdb_modified(module, dn);
442         if (ret != LDB_SUCCESS) {
443                 goto done;
444         }
445
446 done:
447         talloc_free(msg);
448         return ret;
449 }
450
451 /*
452   delete a record from the database
453 */
454 static int ltdb_delete(struct ltdb_context *ctx)
455 {
456         struct ldb_module *module = ctx->module;
457         struct ldb_request *req = ctx->req;
458         int ret = LDB_SUCCESS;
459
460         ldb_request_set_state(req, LDB_ASYNC_PENDING);
461
462         if (ltdb_cache_load(module) != 0) {
463                 return LDB_ERR_OPERATIONS_ERROR;
464         }
465
466         ret = ltdb_delete_internal(module, req->op.del.dn);
467
468         return ret;
469 }
470
471 /*
472   find an element by attribute name. At the moment this does a linear search,
473   it should be re-coded to use a binary search once all places that modify
474   records guarantee sorted order
475
476   return the index of the first matching element if found, otherwise -1
477 */
478 static int find_element(const struct ldb_message *msg, const char *name)
479 {
480         unsigned int i;
481         for (i=0;i<msg->num_elements;i++) {
482                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
483                         return i;
484                 }
485         }
486         return -1;
487 }
488
489
490 /*
491   add an element to an existing record. Assumes a elements array that we
492   can call re-alloc on, and assumed that we can re-use the data pointers from
493   the passed in additional values. Use with care!
494
495   returns 0 on success, -1 on failure (and sets errno)
496 */
497 static int ltdb_msg_add_element(struct ldb_context *ldb,
498                                 struct ldb_message *msg,
499                                 struct ldb_message_element *el)
500 {
501         struct ldb_message_element *e2;
502         unsigned int i;
503
504         if (el->num_values == 0) {
505                 /* nothing to do here - we don't add empty elements */
506                 return 0;
507         }
508
509         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
510                               msg->num_elements+1);
511         if (!e2) {
512                 errno = ENOMEM;
513                 return -1;
514         }
515
516         msg->elements = e2;
517
518         e2 = &msg->elements[msg->num_elements];
519
520         e2->name = el->name;
521         e2->flags = el->flags;
522         e2->values = talloc_array(msg->elements,
523                                   struct ldb_val, el->num_values);
524         if (!e2->values) {
525                 errno = ENOMEM;
526                 return -1;
527         }
528         for (i=0;i<el->num_values;i++) {
529                 e2->values[i] = el->values[i];
530         }
531         e2->num_values = el->num_values;
532
533         ++msg->num_elements;
534
535         return 0;
536 }
537
538 /*
539   delete all elements having a specified attribute name
540 */
541 static int msg_delete_attribute(struct ldb_module *module,
542                                 struct ldb_context *ldb,
543                                 struct ldb_message *msg, const char *name)
544 {
545         unsigned int i;
546         int ret;
547         struct ldb_message_element *el;
548
549         el = ldb_msg_find_element(msg, name);
550         if (el == NULL) {
551                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
552         }
553         i = el - msg->elements;
554
555         ret = ltdb_index_del_element(module, msg->dn, el);
556         if (ret != LDB_SUCCESS) {
557                 return ret;
558         }
559
560         talloc_free(el->values);
561         if (msg->num_elements > (i+1)) {
562                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
563         }
564         msg->num_elements--;
565         msg->elements = talloc_realloc(msg, msg->elements,
566                                        struct ldb_message_element,
567                                        msg->num_elements);
568         return LDB_SUCCESS;
569 }
570
571 /*
572   delete all elements matching an attribute name/value
573
574   return LDB Error on failure
575 */
576 static int msg_delete_element(struct ldb_module *module,
577                               struct ldb_message *msg,
578                               const char *name,
579                               const struct ldb_val *val)
580 {
581         struct ldb_context *ldb = ldb_module_get_ctx(module);
582         unsigned int i;
583         int found, ret;
584         struct ldb_message_element *el;
585         const struct ldb_schema_attribute *a;
586
587         found = find_element(msg, name);
588         if (found == -1) {
589                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
590         }
591
592         i = (unsigned int) found;
593         el = &(msg->elements[i]);
594
595         a = ldb_schema_attribute_by_name(ldb, el->name);
596
597         for (i=0;i<el->num_values;i++) {
598                 bool matched;
599                 if (a->syntax->operator_fn) {
600                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
601                                                      &el->values[i], val, &matched);
602                         if (ret != LDB_SUCCESS) return ret;
603                 } else {
604                         matched = (a->syntax->comparison_fn(ldb, ldb,
605                                                             &el->values[i], val) == 0);
606                 }
607                 if (matched) {
608                         if (el->num_values == 1) {
609                                 return msg_delete_attribute(module, ldb, msg, name);
610                         }
611
612                         ret = ltdb_index_del_value(module, msg->dn, el, i);
613                         if (ret != LDB_SUCCESS) {
614                                 return ret;
615                         }
616
617                         if (i<el->num_values-1) {
618                                 memmove(&el->values[i], &el->values[i+1],
619                                         sizeof(el->values[i])*
620                                                 (el->num_values-(i+1)));
621                         }
622                         el->num_values--;
623
624                         /* per definition we find in a canonicalised message an
625                            attribute value only once. So we are finished here */
626                         return LDB_SUCCESS;
627                 }
628         }
629
630         /* Not found */
631         return LDB_ERR_NO_SUCH_ATTRIBUTE;
632 }
633
634
635 /*
636   modify a record - internal interface
637
638   yuck - this is O(n^2). Luckily n is usually small so we probably
639   get away with it, but if we ever have really large attribute lists
640   then we'll need to look at this again
641
642   'req' is optional, and is used to specify controls if supplied
643 */
644 int ltdb_modify_internal(struct ldb_module *module,
645                          const struct ldb_message *msg,
646                          struct ldb_request *req)
647 {
648         struct ldb_context *ldb = ldb_module_get_ctx(module);
649         void *data = ldb_module_get_private(module);
650         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
651         TDB_DATA tdb_key, tdb_data;
652         struct ldb_message *msg2;
653         unsigned int i, j, k;
654         int ret = LDB_SUCCESS, idx;
655         struct ldb_control *control_permissive = NULL;
656
657         if (req) {
658                 control_permissive = ldb_request_get_control(req,
659                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
660         }
661
662         tdb_key = ltdb_key(module, msg->dn);
663         if (!tdb_key.dptr) {
664                 return LDB_ERR_OTHER;
665         }
666
667         tdb_data = tdb_fetch_compat(ltdb->tdb, tdb_key);
668         if (!tdb_data.dptr) {
669                 talloc_free(tdb_key.dptr);
670                 return ltdb_err_map(tdb_error(ltdb->tdb));
671         }
672
673         msg2 = ldb_msg_new(tdb_key.dptr);
674         if (msg2 == NULL) {
675                 free(tdb_data.dptr);
676                 ret = LDB_ERR_OTHER;
677                 goto done;
678         }
679
680         ret = ltdb_unpack_data(module, &tdb_data, msg2);
681         free(tdb_data.dptr);
682         if (ret == -1) {
683                 ret = LDB_ERR_OTHER;
684                 goto done;
685         }
686
687         if (!msg2->dn) {
688                 msg2->dn = msg->dn;
689         }
690
691         for (i=0; i<msg->num_elements; i++) {
692                 struct ldb_message_element *el = &msg->elements[i], *el2;
693                 struct ldb_val *vals;
694                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
695                 const char *dn;
696
697                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
698                 case LDB_FLAG_MOD_ADD:
699
700                         if (el->num_values == 0) {
701                                 ldb_asprintf_errstring(ldb,
702                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
703                                                        el->name, ldb_dn_get_linearized(msg2->dn));
704                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
705                                 goto done;
706                         }
707
708                         /* make a copy of the array so that a permissive
709                          * control can remove duplicates without changing the
710                          * original values, but do not copy data as we do not
711                          * need to keep it around once the operation is
712                          * finished */
713                         if (control_permissive) {
714                                 el = talloc(msg2, struct ldb_message_element);
715                                 if (!el) {
716                                         ret = LDB_ERR_OTHER;
717                                         goto done;
718                                 }
719                                 *el = msg->elements[i];
720                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
721                                 if (el->values == NULL) {
722                                         ret = LDB_ERR_OTHER;
723                                         goto done;
724                                 }
725                                 for (j = 0; j < el->num_values; j++) {
726                                         el->values[j] = msg->elements[i].values[j];
727                                 }
728                         }
729
730                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
731                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
732                                                        el->name, ldb_dn_get_linearized(msg2->dn));
733                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
734                                 goto done;
735                         }
736
737                         /* Checks if element already exists */
738                         idx = find_element(msg2, el->name);
739                         if (idx == -1) {
740                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
741                                         ret = LDB_ERR_OTHER;
742                                         goto done;
743                                 }
744                                 ret = ltdb_index_add_element(module, msg2->dn,
745                                                              el);
746                                 if (ret != LDB_SUCCESS) {
747                                         goto done;
748                                 }
749                         } else {
750                                 j = (unsigned int) idx;
751                                 el2 = &(msg2->elements[j]);
752
753                                 /* We cannot add another value on a existing one
754                                    if the attribute is single-valued */
755                                 if (ldb_tdb_single_valued(a, el)) {
756                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
757                                                                el->name, ldb_dn_get_linearized(msg2->dn));
758                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
759                                         goto done;
760                                 }
761
762                                 /* Check that values don't exist yet on multi-
763                                    valued attributes or aren't provided twice */
764                                 for (j = 0; j < el->num_values; j++) {
765                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
766                                                 if (control_permissive) {
767                                                         /* remove this one as if it was never added */
768                                                         el->num_values--;
769                                                         for (k = j; k < el->num_values; k++) {
770                                                                 el->values[k] = el->values[k + 1];
771                                                         }
772                                                         j--; /* rewind */
773
774                                                         continue;
775                                                 }
776
777                                                 ldb_asprintf_errstring(ldb,
778                                                                        "attribute '%s': value #%u on '%s' already exists",
779                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
780                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
781                                                 goto done;
782                                         }
783                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
784                                                 ldb_asprintf_errstring(ldb,
785                                                                        "attribute '%s': value #%u on '%s' provided more than once",
786                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
787                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
788                                                 goto done;
789                                         }
790                                 }
791
792                                 /* Now combine existing and new values to a new
793                                    attribute record */
794                                 vals = talloc_realloc(msg2->elements,
795                                                       el2->values, struct ldb_val,
796                                                       el2->num_values + el->num_values);
797                                 if (vals == NULL) {
798                                         ldb_oom(ldb);
799                                         ret = LDB_ERR_OTHER;
800                                         goto done;
801                                 }
802
803                                 for (j=0; j<el->num_values; j++) {
804                                         vals[el2->num_values + j] =
805                                                 ldb_val_dup(vals, &el->values[j]);
806                                 }
807
808                                 el2->values = vals;
809                                 el2->num_values += el->num_values;
810
811                                 ret = ltdb_index_add_element(module, msg2->dn, el);
812                                 if (ret != LDB_SUCCESS) {
813                                         goto done;
814                                 }
815                         }
816
817                         break;
818
819                 case LDB_FLAG_MOD_REPLACE:
820
821                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
822                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
823                                                        el->name, ldb_dn_get_linearized(msg2->dn));
824                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
825                                 goto done;
826                         }
827
828                         /* TODO: This is O(n^2) - replace with more efficient check */
829                         for (j=0; j<el->num_values; j++) {
830                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
831                                         ldb_asprintf_errstring(ldb,
832                                                                "attribute '%s': value #%u on '%s' provided more than once",
833                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
834                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
835                                         goto done;
836                                 }
837                         }
838
839                         /* Checks if element already exists */
840                         idx = find_element(msg2, el->name);
841                         if (idx != -1) {
842                                 j = (unsigned int) idx;
843                                 el2 = &(msg2->elements[j]);
844
845                                 /* we consider two elements to be
846                                  * equal only if the order
847                                  * matches. This allows dbcheck to
848                                  * fix the ordering on attributes
849                                  * where order matters, such as
850                                  * objectClass
851                                  */
852                                 if (ldb_msg_element_equal_ordered(el, el2)) {
853                                         continue;
854                                 }
855
856                                 /* Delete the attribute if it exists in the DB */
857                                 if (msg_delete_attribute(module, ldb, msg2,
858                                                          el->name) != 0) {
859                                         ret = LDB_ERR_OTHER;
860                                         goto done;
861                                 }
862                         }
863
864                         /* Recreate it with the new values */
865                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
866                                 ret = LDB_ERR_OTHER;
867                                 goto done;
868                         }
869
870                         ret = ltdb_index_add_element(module, msg2->dn, el);
871                         if (ret != LDB_SUCCESS) {
872                                 goto done;
873                         }
874
875                         break;
876
877                 case LDB_FLAG_MOD_DELETE:
878                         dn = ldb_dn_get_linearized(msg2->dn);
879                         if (dn == NULL) {
880                                 ret = LDB_ERR_OTHER;
881                                 goto done;
882                         }
883
884                         if (msg->elements[i].num_values == 0) {
885                                 /* Delete the whole attribute */
886                                 ret = msg_delete_attribute(module, ldb, msg2,
887                                                            msg->elements[i].name);
888                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
889                                     control_permissive) {
890                                         ret = LDB_SUCCESS;
891                                 } else {
892                                         ldb_asprintf_errstring(ldb,
893                                                                "attribute '%s': no such attribute for delete on '%s'",
894                                                                msg->elements[i].name, dn);
895                                 }
896                                 if (ret != LDB_SUCCESS) {
897                                         goto done;
898                                 }
899                         } else {
900                                 /* Delete specified values from an attribute */
901                                 for (j=0; j < msg->elements[i].num_values; j++) {
902                                         ret = msg_delete_element(module,
903                                                                  msg2,
904                                                                  msg->elements[i].name,
905                                                                  &msg->elements[i].values[j]);
906                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
907                                             control_permissive) {
908                                                 ret = LDB_SUCCESS;
909                                         } else {
910                                                 ldb_asprintf_errstring(ldb,
911                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
912                                                                        msg->elements[i].name, dn);
913                                         }
914                                         if (ret != LDB_SUCCESS) {
915                                                 goto done;
916                                         }
917                                 }
918                         }
919                         break;
920                 default:
921                         ldb_asprintf_errstring(ldb,
922                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
923                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
924                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
925                         ret = LDB_ERR_PROTOCOL_ERROR;
926                         goto done;
927                 }
928         }
929
930         ret = ltdb_store(module, msg2, TDB_MODIFY);
931         if (ret != LDB_SUCCESS) {
932                 goto done;
933         }
934
935         ret = ltdb_modified(module, msg2->dn);
936         if (ret != LDB_SUCCESS) {
937                 goto done;
938         }
939
940 done:
941         talloc_free(tdb_key.dptr);
942         return ret;
943 }
944
945 /*
946   modify a record
947 */
948 static int ltdb_modify(struct ltdb_context *ctx)
949 {
950         struct ldb_module *module = ctx->module;
951         struct ldb_request *req = ctx->req;
952         int ret = LDB_SUCCESS;
953
954         ret = ltdb_check_special_dn(module, req->op.mod.message);
955         if (ret != LDB_SUCCESS) {
956                 return ret;
957         }
958
959         ldb_request_set_state(req, LDB_ASYNC_PENDING);
960
961         if (ltdb_cache_load(module) != 0) {
962                 return LDB_ERR_OPERATIONS_ERROR;
963         }
964
965         ret = ltdb_modify_internal(module, req->op.mod.message, req);
966
967         return ret;
968 }
969
970 /*
971   rename a record
972 */
973 static int ltdb_rename(struct ltdb_context *ctx)
974 {
975         struct ldb_module *module = ctx->module;
976         struct ldb_request *req = ctx->req;
977         struct ldb_message *msg;
978         int ret = LDB_SUCCESS;
979
980         ldb_request_set_state(req, LDB_ASYNC_PENDING);
981
982         if (ltdb_cache_load(ctx->module) != 0) {
983                 return LDB_ERR_OPERATIONS_ERROR;
984         }
985
986         msg = ldb_msg_new(ctx);
987         if (msg == NULL) {
988                 return LDB_ERR_OPERATIONS_ERROR;
989         }
990
991         /* in case any attribute of the message was indexed, we need
992            to fetch the old record */
993         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
994         if (ret != LDB_SUCCESS) {
995                 /* not finding the old record is an error */
996                 return ret;
997         }
998
999         /* Always delete first then add, to avoid conflicts with
1000          * unique indexes. We rely on the transaction to make this
1001          * atomic
1002          */
1003         ret = ltdb_delete_internal(module, msg->dn);
1004         if (ret != LDB_SUCCESS) {
1005                 return ret;
1006         }
1007
1008         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1009         if (msg->dn == NULL) {
1010                 return LDB_ERR_OPERATIONS_ERROR;
1011         }
1012
1013         /* We don't check single value as we can have more than 1 with
1014          * deleted attributes. We could go through all elements but that's
1015          * maybe not the most efficient way
1016          */
1017         ret = ltdb_add_internal(module, msg, false);
1018
1019         return ret;
1020 }
1021
1022 static int ltdb_start_trans(struct ldb_module *module)
1023 {
1024         void *data = ldb_module_get_private(module);
1025         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1026
1027         if (tdb_transaction_start(ltdb->tdb) != 0) {
1028                 return ltdb_err_map(tdb_error(ltdb->tdb));
1029         }
1030
1031         ltdb->in_transaction++;
1032
1033         ltdb_index_transaction_start(module);
1034
1035         return LDB_SUCCESS;
1036 }
1037
1038 static int ltdb_prepare_commit(struct ldb_module *module)
1039 {
1040         void *data = ldb_module_get_private(module);
1041         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1042
1043         if (ltdb->in_transaction != 1) {
1044                 return LDB_SUCCESS;
1045         }
1046
1047         if (ltdb_index_transaction_commit(module) != 0) {
1048                 tdb_transaction_cancel(ltdb->tdb);
1049                 ltdb->in_transaction--;
1050                 return ltdb_err_map(tdb_error(ltdb->tdb));
1051         }
1052
1053         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1054                 ltdb->in_transaction--;
1055                 return ltdb_err_map(tdb_error(ltdb->tdb));
1056         }
1057
1058         ltdb->prepared_commit = true;
1059
1060         return LDB_SUCCESS;
1061 }
1062
1063 static int ltdb_end_trans(struct ldb_module *module)
1064 {
1065         void *data = ldb_module_get_private(module);
1066         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1067
1068         if (!ltdb->prepared_commit) {
1069                 int ret = ltdb_prepare_commit(module);
1070                 if (ret != LDB_SUCCESS) {
1071                         return ret;
1072                 }
1073         }
1074
1075         ltdb->in_transaction--;
1076         ltdb->prepared_commit = false;
1077
1078         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1079                 return ltdb_err_map(tdb_error(ltdb->tdb));
1080         }
1081
1082         return LDB_SUCCESS;
1083 }
1084
1085 static int ltdb_del_trans(struct ldb_module *module)
1086 {
1087         void *data = ldb_module_get_private(module);
1088         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1089
1090         ltdb->in_transaction--;
1091
1092         if (ltdb_index_transaction_cancel(module) != 0) {
1093                 tdb_transaction_cancel(ltdb->tdb);
1094                 return ltdb_err_map(tdb_error(ltdb->tdb));
1095         }
1096
1097         tdb_transaction_cancel(ltdb->tdb);
1098         return LDB_SUCCESS;
1099 }
1100
1101 /*
1102   return sequenceNumber from @BASEINFO
1103 */
1104 static int ltdb_sequence_number(struct ltdb_context *ctx,
1105                                 struct ldb_extended **ext)
1106 {
1107         struct ldb_context *ldb;
1108         struct ldb_module *module = ctx->module;
1109         struct ldb_request *req = ctx->req;
1110         TALLOC_CTX *tmp_ctx = NULL;
1111         struct ldb_seqnum_request *seq;
1112         struct ldb_seqnum_result *res;
1113         struct ldb_message *msg = NULL;
1114         struct ldb_dn *dn;
1115         const char *date;
1116         int ret = LDB_SUCCESS;
1117
1118         ldb = ldb_module_get_ctx(module);
1119
1120         seq = talloc_get_type(req->op.extended.data,
1121                                 struct ldb_seqnum_request);
1122         if (seq == NULL) {
1123                 return LDB_ERR_OPERATIONS_ERROR;
1124         }
1125
1126         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1127
1128         if (ltdb_lock_read(module) != 0) {
1129                 return LDB_ERR_OPERATIONS_ERROR;
1130         }
1131
1132         res = talloc_zero(req, struct ldb_seqnum_result);
1133         if (res == NULL) {
1134                 ret = LDB_ERR_OPERATIONS_ERROR;
1135                 goto done;
1136         }
1137
1138         tmp_ctx = talloc_new(req);
1139         if (tmp_ctx == NULL) {
1140                 ret = LDB_ERR_OPERATIONS_ERROR;
1141                 goto done;
1142         }
1143
1144         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1145         if (dn == NULL) {
1146                 ret = LDB_ERR_OPERATIONS_ERROR;
1147                 goto done;
1148         }
1149
1150         msg = ldb_msg_new(tmp_ctx);
1151         if (msg == NULL) {
1152                 ret = LDB_ERR_OPERATIONS_ERROR;
1153                 goto done;
1154         }
1155
1156         ret = ltdb_search_dn1(module, dn, msg);
1157         if (ret != LDB_SUCCESS) {
1158                 goto done;
1159         }
1160
1161         switch (seq->type) {
1162         case LDB_SEQ_HIGHEST_SEQ:
1163                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1164                 break;
1165         case LDB_SEQ_NEXT:
1166                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1167                 res->seq_num++;
1168                 break;
1169         case LDB_SEQ_HIGHEST_TIMESTAMP:
1170                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1171                 if (date) {
1172                         res->seq_num = ldb_string_to_time(date);
1173                 } else {
1174                         res->seq_num = 0;
1175                         /* zero is as good as anything when we don't know */
1176                 }
1177                 break;
1178         }
1179
1180         *ext = talloc_zero(req, struct ldb_extended);
1181         if (*ext == NULL) {
1182                 ret = LDB_ERR_OPERATIONS_ERROR;
1183                 goto done;
1184         }
1185         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1186         (*ext)->data = talloc_steal(*ext, res);
1187
1188 done:
1189         talloc_free(tmp_ctx);
1190         ltdb_unlock_read(module);
1191         return ret;
1192 }
1193
1194 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1195 {
1196         struct ldb_context *ldb;
1197         struct ldb_request *req;
1198         struct ldb_reply *ares;
1199
1200         ldb = ldb_module_get_ctx(ctx->module);
1201         req = ctx->req;
1202
1203         /* if we already returned an error just return */
1204         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1205                 return;
1206         }
1207
1208         ares = talloc_zero(req, struct ldb_reply);
1209         if (!ares) {
1210                 ldb_oom(ldb);
1211                 req->callback(req, NULL);
1212                 return;
1213         }
1214         ares->type = LDB_REPLY_DONE;
1215         ares->error = error;
1216
1217         req->callback(req, ares);
1218 }
1219
1220 static void ltdb_timeout(struct tevent_context *ev,
1221                           struct tevent_timer *te,
1222                           struct timeval t,
1223                           void *private_data)
1224 {
1225         struct ltdb_context *ctx;
1226         ctx = talloc_get_type(private_data, struct ltdb_context);
1227
1228         if (!ctx->request_terminated) {
1229                 /* request is done now */
1230                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1231         }
1232
1233         if (ctx->spy) {
1234                 /* neutralize the spy */
1235                 ctx->spy->ctx = NULL;
1236                 ctx->spy = NULL;
1237         }
1238         talloc_free(ctx);
1239 }
1240
1241 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1242                                         struct ldb_extended *ext,
1243                                         int error)
1244 {
1245         struct ldb_context *ldb;
1246         struct ldb_request *req;
1247         struct ldb_reply *ares;
1248
1249         ldb = ldb_module_get_ctx(ctx->module);
1250         req = ctx->req;
1251
1252         /* if we already returned an error just return */
1253         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1254                 return;
1255         }
1256
1257         ares = talloc_zero(req, struct ldb_reply);
1258         if (!ares) {
1259                 ldb_oom(ldb);
1260                 req->callback(req, NULL);
1261                 return;
1262         }
1263         ares->type = LDB_REPLY_DONE;
1264         ares->response = ext;
1265         ares->error = error;
1266
1267         req->callback(req, ares);
1268 }
1269
1270 static void ltdb_handle_extended(struct ltdb_context *ctx)
1271 {
1272         struct ldb_extended *ext = NULL;
1273         int ret;
1274
1275         if (strcmp(ctx->req->op.extended.oid,
1276                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1277                 /* get sequence number */
1278                 ret = ltdb_sequence_number(ctx, &ext);
1279         } else {
1280                 /* not recognized */
1281                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1282         }
1283
1284         ltdb_request_extended_done(ctx, ext, ret);
1285 }
1286
1287 static void ltdb_callback(struct tevent_context *ev,
1288                           struct tevent_timer *te,
1289                           struct timeval t,
1290                           void *private_data)
1291 {
1292         struct ltdb_context *ctx;
1293         int ret;
1294
1295         ctx = talloc_get_type(private_data, struct ltdb_context);
1296
1297         if (ctx->request_terminated) {
1298                 goto done;
1299         }
1300
1301         switch (ctx->req->operation) {
1302         case LDB_SEARCH:
1303                 ret = ltdb_search(ctx);
1304                 break;
1305         case LDB_ADD:
1306                 ret = ltdb_add(ctx);
1307                 break;
1308         case LDB_MODIFY:
1309                 ret = ltdb_modify(ctx);
1310                 break;
1311         case LDB_DELETE:
1312                 ret = ltdb_delete(ctx);
1313                 break;
1314         case LDB_RENAME:
1315                 ret = ltdb_rename(ctx);
1316                 break;
1317         case LDB_EXTENDED:
1318                 ltdb_handle_extended(ctx);
1319                 goto done;
1320         default:
1321                 /* no other op supported */
1322                 ret = LDB_ERR_PROTOCOL_ERROR;
1323         }
1324
1325         if (!ctx->request_terminated) {
1326                 /* request is done now */
1327                 ltdb_request_done(ctx, ret);
1328         }
1329
1330 done:
1331         if (ctx->spy) {
1332                 /* neutralize the spy */
1333                 ctx->spy->ctx = NULL;
1334                 ctx->spy = NULL;
1335         }
1336         talloc_free(ctx);
1337 }
1338
1339 static int ltdb_request_destructor(void *ptr)
1340 {
1341         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1342
1343         if (spy->ctx != NULL) {
1344                 spy->ctx->spy = NULL;
1345                 spy->ctx->request_terminated = true;
1346                 spy->ctx = NULL;
1347         }
1348
1349         return 0;
1350 }
1351
1352 static int ltdb_handle_request(struct ldb_module *module,
1353                                struct ldb_request *req)
1354 {
1355         struct ldb_control *control_permissive;
1356         struct ldb_context *ldb;
1357         struct tevent_context *ev;
1358         struct ltdb_context *ac;
1359         struct tevent_timer *te;
1360         struct timeval tv;
1361         unsigned int i;
1362
1363         ldb = ldb_module_get_ctx(module);
1364
1365         control_permissive = ldb_request_get_control(req,
1366                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1367
1368         for (i = 0; req->controls && req->controls[i]; i++) {
1369                 if (req->controls[i]->critical &&
1370                     req->controls[i] != control_permissive) {
1371                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1372                                                req->controls[i]->oid);
1373                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1374                 }
1375         }
1376
1377         if (req->starttime == 0 || req->timeout == 0) {
1378                 ldb_set_errstring(ldb, "Invalid timeout settings");
1379                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1380         }
1381
1382         ev = ldb_get_event_context(ldb);
1383
1384         ac = talloc_zero(ldb, struct ltdb_context);
1385         if (ac == NULL) {
1386                 ldb_oom(ldb);
1387                 return LDB_ERR_OPERATIONS_ERROR;
1388         }
1389
1390         ac->module = module;
1391         ac->req = req;
1392
1393         tv.tv_sec = 0;
1394         tv.tv_usec = 0;
1395         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1396         if (NULL == te) {
1397                 talloc_free(ac);
1398                 return LDB_ERR_OPERATIONS_ERROR;
1399         }
1400
1401         tv.tv_sec = req->starttime + req->timeout;
1402         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1403         if (NULL == ac->timeout_event) {
1404                 talloc_free(ac);
1405                 return LDB_ERR_OPERATIONS_ERROR;
1406         }
1407
1408         /* set a spy so that we do not try to use the request context
1409          * if it is freed before ltdb_callback fires */
1410         ac->spy = talloc(req, struct ltdb_req_spy);
1411         if (NULL == ac->spy) {
1412                 talloc_free(ac);
1413                 return LDB_ERR_OPERATIONS_ERROR;
1414         }
1415         ac->spy->ctx = ac;
1416
1417         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1418
1419         return LDB_SUCCESS;
1420 }
1421
1422 static int ltdb_init_rootdse(struct ldb_module *module)
1423 {
1424         /* ignore errors on this - we expect it for non-sam databases */
1425         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1426
1427         /* there can be no module beyond the backend, just return */
1428         return LDB_SUCCESS;
1429 }
1430
1431 static const struct ldb_module_ops ltdb_ops = {
1432         .name              = "tdb",
1433         .init_context      = ltdb_init_rootdse,
1434         .search            = ltdb_handle_request,
1435         .add               = ltdb_handle_request,
1436         .modify            = ltdb_handle_request,
1437         .del               = ltdb_handle_request,
1438         .rename            = ltdb_handle_request,
1439         .extended          = ltdb_handle_request,
1440         .start_transaction = ltdb_start_trans,
1441         .end_transaction   = ltdb_end_trans,
1442         .prepare_commit    = ltdb_prepare_commit,
1443         .del_transaction   = ltdb_del_trans,
1444 };
1445
1446 /*
1447   connect to the database
1448 */
1449 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1450                         unsigned int flags, const char *options[],
1451                         struct ldb_module **_module)
1452 {
1453         struct ldb_module *module;
1454         const char *path;
1455         int tdb_flags, open_flags;
1456         struct ltdb_private *ltdb;
1457
1458         /* parse the url */
1459         if (strchr(url, ':')) {
1460                 if (strncmp(url, "tdb://", 6) != 0) {
1461                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1462                                   "Invalid tdb URL '%s'", url);
1463                         return LDB_ERR_OPERATIONS_ERROR;
1464                 }
1465                 path = url+6;
1466         } else {
1467                 path = url;
1468         }
1469
1470         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1471
1472         /* check for the 'nosync' option */
1473         if (flags & LDB_FLG_NOSYNC) {
1474                 tdb_flags |= TDB_NOSYNC;
1475         }
1476
1477         /* and nommap option */
1478         if (flags & LDB_FLG_NOMMAP) {
1479                 tdb_flags |= TDB_NOMMAP;
1480         }
1481
1482         if (flags & LDB_FLG_RDONLY) {
1483                 open_flags = O_RDONLY;
1484         } else {
1485                 open_flags = O_CREAT | O_RDWR;
1486         }
1487
1488         ltdb = talloc_zero(ldb, struct ltdb_private);
1489         if (!ltdb) {
1490                 ldb_oom(ldb);
1491                 return LDB_ERR_OPERATIONS_ERROR;
1492         }
1493
1494         /* note that we use quite a large default hash size */
1495         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1496                                    tdb_flags, open_flags,
1497                                    ldb_get_create_perms(ldb), ldb);
1498         if (!ltdb->tdb) {
1499                 ldb_asprintf_errstring(ldb,
1500                                        "Unable to open tdb '%s'", path);
1501                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1502                           "Unable to open tdb '%s'", path);
1503                 talloc_free(ltdb);
1504                 return LDB_ERR_OPERATIONS_ERROR;
1505         }
1506
1507         if (getenv("LDB_WARN_UNINDEXED")) {
1508                 ltdb->warn_unindexed = true;
1509         }
1510
1511         ltdb->sequence_number = 0;
1512
1513         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1514         if (!module) {
1515                 ldb_oom(ldb);
1516                 talloc_free(ltdb);
1517                 return LDB_ERR_OPERATIONS_ERROR;
1518         }
1519         ldb_module_set_private(module, ltdb);
1520         talloc_steal(module, ltdb);
1521
1522         if (ltdb_cache_load(module) != 0) {
1523                 ldb_asprintf_errstring(ldb,
1524                                        "Unable to load ltdb cache records of tdb '%s'", path);
1525                 talloc_free(module);
1526                 return LDB_ERR_OPERATIONS_ERROR;
1527         }
1528
1529         *_module = module;
1530         return LDB_SUCCESS;
1531 }
1532
1533 int ldb_tdb_init(const char *version)
1534 {
1535         LDB_MODULE_CHECK_VERSION(version);
1536         return ldb_register_backend("tdb", ltdb_connect, false);
1537 }