ldb-tdb: Introduce a flag on ltdb_add_internal to indicate whether unique value test...
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include <lib/tdb_compat/tdb_compat.h>
54
55
56 /*
57   map a tdb error code to a ldb error code
58 */
59 int ltdb_err_map(enum TDB_ERROR tdb_code)
60 {
61         switch (tdb_code) {
62         case TDB_SUCCESS:
63                 return LDB_SUCCESS;
64         case TDB_ERR_CORRUPT:
65         case TDB_ERR_OOM:
66         case TDB_ERR_EINVAL:
67                 return LDB_ERR_OPERATIONS_ERROR;
68         case TDB_ERR_IO:
69                 return LDB_ERR_PROTOCOL_ERROR;
70         case TDB_ERR_LOCK:
71 #ifndef BUILD_TDB2
72         case TDB_ERR_NOLOCK:
73 #endif
74                 return LDB_ERR_BUSY;
75 #ifndef BUILD_TDB2
76         case TDB_ERR_LOCK_TIMEOUT:
77 #endif
78                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
79         case TDB_ERR_EXISTS:
80                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
81         case TDB_ERR_NOEXIST:
82                 return LDB_ERR_NO_SUCH_OBJECT;
83         case TDB_ERR_RDONLY:
84                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
85         default:
86                 break;
87         }
88         return LDB_ERR_OTHER;
89 }
90
91 /*
92   lock the database for read - use by ltdb_search and ltdb_sequence_number
93 */
94 int ltdb_lock_read(struct ldb_module *module)
95 {
96         void *data = ldb_module_get_private(module);
97         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
98         int ret = 0;
99
100         if (ltdb->in_transaction == 0 &&
101             ltdb->read_lock_count == 0) {
102                 ret = tdb_lockall_read(ltdb->tdb);
103         }
104         if (ret == 0) {
105                 ltdb->read_lock_count++;
106         }
107         return ret;
108 }
109
110 /*
111   unlock the database after a ltdb_lock_read()
112 */
113 int ltdb_unlock_read(struct ldb_module *module)
114 {
115         void *data = ldb_module_get_private(module);
116         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
117         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
118                 tdb_unlockall_read(ltdb->tdb);
119                 return 0;
120         }
121         ltdb->read_lock_count--;
122         return 0;
123 }
124
125
126 /*
127   form a TDB_DATA for a record key
128   caller frees
129
130   note that the key for a record can depend on whether the
131   dn refers to a case sensitive index record or not
132 */
133 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
134 {
135         struct ldb_context *ldb = ldb_module_get_ctx(module);
136         TDB_DATA key;
137         char *key_str = NULL;
138         const char *dn_folded = NULL;
139
140         /*
141           most DNs are case insensitive. The exception is index DNs for
142           case sensitive attributes
143
144           there are 3 cases dealt with in this code:
145
146           1) if the dn doesn't start with @ then uppercase the attribute
147              names and the attributes values of case insensitive attributes
148           2) if the dn starts with @ then leave it alone -
149              the indexing code handles the rest
150         */
151
152         dn_folded = ldb_dn_get_casefold(dn);
153         if (!dn_folded) {
154                 goto failed;
155         }
156
157         key_str = talloc_strdup(ldb, "DN=");
158         if (!key_str) {
159                 goto failed;
160         }
161
162         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
163         if (!key_str) {
164                 goto failed;
165         }
166
167         key.dptr = (uint8_t *)key_str;
168         key.dsize = strlen(key_str) + 1;
169
170         return key;
171
172 failed:
173         errno = ENOMEM;
174         key.dptr = NULL;
175         key.dsize = 0;
176         return key;
177 }
178
179 /*
180   check special dn's have valid attributes
181   currently only @ATTRIBUTES is checked
182 */
183 static int ltdb_check_special_dn(struct ldb_module *module,
184                                  const struct ldb_message *msg)
185 {
186         struct ldb_context *ldb = ldb_module_get_ctx(module);
187         unsigned int i, j;
188
189         if (! ldb_dn_is_special(msg->dn) ||
190             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
191                 return LDB_SUCCESS;
192         }
193
194         /* we have @ATTRIBUTES, let's check attributes are fine */
195         /* should we check that we deny multivalued attributes ? */
196         for (i = 0; i < msg->num_elements; i++) {
197                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
198
199                 for (j = 0; j < msg->elements[i].num_values; j++) {
200                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
201                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
202                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
203                         }
204                 }
205         }
206
207         return LDB_SUCCESS;
208 }
209
210
211 /*
212   we've made a modification to a dn - possibly reindex and
213   update sequence number
214 */
215 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
216 {
217         int ret = LDB_SUCCESS;
218         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
219
220         /* only allow modifies inside a transaction, otherwise the
221          * ldb is unsafe */
222         if (ltdb->in_transaction == 0) {
223                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
224                 return LDB_ERR_OPERATIONS_ERROR;
225         }
226
227         if (ldb_dn_is_special(dn) &&
228             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
229              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
230                 ret = ltdb_reindex(module);
231         }
232
233         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
234         if (ret == LDB_SUCCESS &&
235             !(ldb_dn_is_special(dn) &&
236               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
237                 ret = ltdb_increase_sequence_number(module);
238         }
239
240         /* If the modify was to @OPTIONS, reload the cache */
241         if (ret == LDB_SUCCESS &&
242             ldb_dn_is_special(dn) &&
243             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
244                 ret = ltdb_cache_reload(module);
245         }
246
247         return ret;
248 }
249
250 /*
251   store a record into the db
252 */
253 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
254 {
255         void *data = ldb_module_get_private(module);
256         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
257         TDB_DATA tdb_key, tdb_data;
258         int ret = LDB_SUCCESS;
259
260         tdb_key = ltdb_key(module, msg->dn);
261         if (tdb_key.dptr == NULL) {
262                 return LDB_ERR_OTHER;
263         }
264
265         ret = ltdb_pack_data(module, msg, &tdb_data);
266         if (ret == -1) {
267                 talloc_free(tdb_key.dptr);
268                 return LDB_ERR_OTHER;
269         }
270
271         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
272         if (ret != 0) {
273                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
274                 goto done;
275         }
276
277 done:
278         talloc_free(tdb_key.dptr);
279         talloc_free(tdb_data.dptr);
280
281         return ret;
282 }
283
284
285 /*
286   check if a attribute is a single valued, for a given element
287  */
288 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
289                                   struct ldb_message_element *el)
290 {
291         if (!a) return false;
292         if (el != NULL) {
293                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
294                         /* override from a ldb module, for example
295                            used for the description field, which is
296                            marked multi-valued in the schema but which
297                            should not actually accept multiple
298                            values */
299                         return true;
300                 }
301                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
302                         /* override from a ldb module, for example used for
303                            deleted linked attribute entries */
304                         return false;
305                 }
306         }
307         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
308                 return true;
309         }
310         return false;
311 }
312
313 static int ltdb_add_internal(struct ldb_module *module,
314                              const struct ldb_message *msg,
315                              bool check_single_value)
316 {
317         struct ldb_context *ldb = ldb_module_get_ctx(module);
318         int ret = LDB_SUCCESS;
319         unsigned int i;
320
321         for (i=0;i<msg->num_elements;i++) {
322                 struct ldb_message_element *el = &msg->elements[i];
323                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
324
325                 if (el->num_values == 0) {
326                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
327                                                el->name, ldb_dn_get_linearized(msg->dn));
328                         return LDB_ERR_CONSTRAINT_VIOLATION;
329                 }
330                 if (check_single_value &&
331                                 el->num_values > 1 &&
332                                 ldb_tdb_single_valued(a, el)) {
333                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
334                                                el->name, ldb_dn_get_linearized(msg->dn));
335                         return LDB_ERR_CONSTRAINT_VIOLATION;
336                 }
337         }
338
339         ret = ltdb_store(module, msg, TDB_INSERT);
340         if (ret != LDB_SUCCESS) {
341                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
342                         ldb_asprintf_errstring(ldb,
343                                                "Entry %s already exists",
344                                                ldb_dn_get_linearized(msg->dn));
345                 }
346                 return ret;
347         }
348
349         ret = ltdb_index_add_new(module, msg);
350         if (ret != LDB_SUCCESS) {
351                 return ret;
352         }
353
354         ret = ltdb_modified(module, msg->dn);
355
356         return ret;
357 }
358
359 /*
360   add a record to the database
361 */
362 static int ltdb_add(struct ltdb_context *ctx)
363 {
364         struct ldb_module *module = ctx->module;
365         struct ldb_request *req = ctx->req;
366         int ret = LDB_SUCCESS;
367
368         ret = ltdb_check_special_dn(module, req->op.add.message);
369         if (ret != LDB_SUCCESS) {
370                 return ret;
371         }
372
373         ldb_request_set_state(req, LDB_ASYNC_PENDING);
374
375         if (ltdb_cache_load(module) != 0) {
376                 return LDB_ERR_OPERATIONS_ERROR;
377         }
378
379         ret = ltdb_add_internal(module, req->op.add.message, true);
380
381         return ret;
382 }
383
384 /*
385   delete a record from the database, not updating indexes (used for deleting
386   index records)
387 */
388 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
389 {
390         void *data = ldb_module_get_private(module);
391         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
392         TDB_DATA tdb_key;
393         int ret;
394
395         tdb_key = ltdb_key(module, dn);
396         if (!tdb_key.dptr) {
397                 return LDB_ERR_OTHER;
398         }
399
400         ret = tdb_delete(ltdb->tdb, tdb_key);
401         talloc_free(tdb_key.dptr);
402
403         if (ret != 0) {
404                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
405         }
406
407         return ret;
408 }
409
410 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
411 {
412         struct ldb_message *msg;
413         int ret = LDB_SUCCESS;
414
415         msg = ldb_msg_new(module);
416         if (msg == NULL) {
417                 return LDB_ERR_OPERATIONS_ERROR;
418         }
419
420         /* in case any attribute of the message was indexed, we need
421            to fetch the old record */
422         ret = ltdb_search_dn1(module, dn, msg);
423         if (ret != LDB_SUCCESS) {
424                 /* not finding the old record is an error */
425                 goto done;
426         }
427
428         ret = ltdb_delete_noindex(module, dn);
429         if (ret != LDB_SUCCESS) {
430                 goto done;
431         }
432
433         /* remove any indexed attributes */
434         ret = ltdb_index_delete(module, msg);
435         if (ret != LDB_SUCCESS) {
436                 goto done;
437         }
438
439         ret = ltdb_modified(module, dn);
440         if (ret != LDB_SUCCESS) {
441                 goto done;
442         }
443
444 done:
445         talloc_free(msg);
446         return ret;
447 }
448
449 /*
450   delete a record from the database
451 */
452 static int ltdb_delete(struct ltdb_context *ctx)
453 {
454         struct ldb_module *module = ctx->module;
455         struct ldb_request *req = ctx->req;
456         int ret = LDB_SUCCESS;
457
458         ldb_request_set_state(req, LDB_ASYNC_PENDING);
459
460         if (ltdb_cache_load(module) != 0) {
461                 return LDB_ERR_OPERATIONS_ERROR;
462         }
463
464         ret = ltdb_delete_internal(module, req->op.del.dn);
465
466         return ret;
467 }
468
469 /*
470   find an element by attribute name. At the moment this does a linear search,
471   it should be re-coded to use a binary search once all places that modify
472   records guarantee sorted order
473
474   return the index of the first matching element if found, otherwise -1
475 */
476 static int find_element(const struct ldb_message *msg, const char *name)
477 {
478         unsigned int i;
479         for (i=0;i<msg->num_elements;i++) {
480                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
481                         return i;
482                 }
483         }
484         return -1;
485 }
486
487
488 /*
489   add an element to an existing record. Assumes a elements array that we
490   can call re-alloc on, and assumed that we can re-use the data pointers from
491   the passed in additional values. Use with care!
492
493   returns 0 on success, -1 on failure (and sets errno)
494 */
495 static int ltdb_msg_add_element(struct ldb_context *ldb,
496                                 struct ldb_message *msg,
497                                 struct ldb_message_element *el)
498 {
499         struct ldb_message_element *e2;
500         unsigned int i;
501
502         if (el->num_values == 0) {
503                 /* nothing to do here - we don't add empty elements */
504                 return 0;
505         }
506
507         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
508                               msg->num_elements+1);
509         if (!e2) {
510                 errno = ENOMEM;
511                 return -1;
512         }
513
514         msg->elements = e2;
515
516         e2 = &msg->elements[msg->num_elements];
517
518         e2->name = el->name;
519         e2->flags = el->flags;
520         e2->values = talloc_array(msg->elements,
521                                   struct ldb_val, el->num_values);
522         if (!e2->values) {
523                 errno = ENOMEM;
524                 return -1;
525         }
526         for (i=0;i<el->num_values;i++) {
527                 e2->values[i] = el->values[i];
528         }
529         e2->num_values = el->num_values;
530
531         ++msg->num_elements;
532
533         return 0;
534 }
535
536 /*
537   delete all elements having a specified attribute name
538 */
539 static int msg_delete_attribute(struct ldb_module *module,
540                                 struct ldb_context *ldb,
541                                 struct ldb_message *msg, const char *name)
542 {
543         unsigned int i;
544         int ret;
545         struct ldb_message_element *el;
546
547         el = ldb_msg_find_element(msg, name);
548         if (el == NULL) {
549                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
550         }
551         i = el - msg->elements;
552
553         ret = ltdb_index_del_element(module, msg->dn, el);
554         if (ret != LDB_SUCCESS) {
555                 return ret;
556         }
557
558         talloc_free(el->values);
559         if (msg->num_elements > (i+1)) {
560                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
561         }
562         msg->num_elements--;
563         msg->elements = talloc_realloc(msg, msg->elements,
564                                        struct ldb_message_element,
565                                        msg->num_elements);
566         return LDB_SUCCESS;
567 }
568
569 /*
570   delete all elements matching an attribute name/value
571
572   return LDB Error on failure
573 */
574 static int msg_delete_element(struct ldb_module *module,
575                               struct ldb_message *msg,
576                               const char *name,
577                               const struct ldb_val *val)
578 {
579         struct ldb_context *ldb = ldb_module_get_ctx(module);
580         unsigned int i;
581         int found, ret;
582         struct ldb_message_element *el;
583         const struct ldb_schema_attribute *a;
584
585         found = find_element(msg, name);
586         if (found == -1) {
587                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
588         }
589
590         i = (unsigned int) found;
591         el = &(msg->elements[i]);
592
593         a = ldb_schema_attribute_by_name(ldb, el->name);
594
595         for (i=0;i<el->num_values;i++) {
596                 bool matched;
597                 if (a->syntax->operator_fn) {
598                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
599                                                      &el->values[i], val, &matched);
600                         if (ret != LDB_SUCCESS) return ret;
601                 } else {
602                         matched = (a->syntax->comparison_fn(ldb, ldb,
603                                                             &el->values[i], val) == 0);
604                 }
605                 if (matched) {
606                         if (el->num_values == 1) {
607                                 return msg_delete_attribute(module, ldb, msg, name);
608                         }
609
610                         ret = ltdb_index_del_value(module, msg->dn, el, i);
611                         if (ret != LDB_SUCCESS) {
612                                 return ret;
613                         }
614
615                         if (i<el->num_values-1) {
616                                 memmove(&el->values[i], &el->values[i+1],
617                                         sizeof(el->values[i])*
618                                                 (el->num_values-(i+1)));
619                         }
620                         el->num_values--;
621
622                         /* per definition we find in a canonicalised message an
623                            attribute value only once. So we are finished here */
624                         return LDB_SUCCESS;
625                 }
626         }
627
628         /* Not found */
629         return LDB_ERR_NO_SUCH_ATTRIBUTE;
630 }
631
632
633 /*
634   modify a record - internal interface
635
636   yuck - this is O(n^2). Luckily n is usually small so we probably
637   get away with it, but if we ever have really large attribute lists
638   then we'll need to look at this again
639
640   'req' is optional, and is used to specify controls if supplied
641 */
642 int ltdb_modify_internal(struct ldb_module *module,
643                          const struct ldb_message *msg,
644                          struct ldb_request *req)
645 {
646         struct ldb_context *ldb = ldb_module_get_ctx(module);
647         void *data = ldb_module_get_private(module);
648         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
649         TDB_DATA tdb_key, tdb_data;
650         struct ldb_message *msg2;
651         unsigned int i, j, k;
652         int ret = LDB_SUCCESS, idx;
653         struct ldb_control *control_permissive = NULL;
654
655         if (req) {
656                 control_permissive = ldb_request_get_control(req,
657                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
658         }
659
660         tdb_key = ltdb_key(module, msg->dn);
661         if (!tdb_key.dptr) {
662                 return LDB_ERR_OTHER;
663         }
664
665         tdb_data = tdb_fetch_compat(ltdb->tdb, tdb_key);
666         if (!tdb_data.dptr) {
667                 talloc_free(tdb_key.dptr);
668                 return ltdb_err_map(tdb_error(ltdb->tdb));
669         }
670
671         msg2 = ldb_msg_new(tdb_key.dptr);
672         if (msg2 == NULL) {
673                 free(tdb_data.dptr);
674                 ret = LDB_ERR_OTHER;
675                 goto done;
676         }
677
678         ret = ltdb_unpack_data(module, &tdb_data, msg2);
679         free(tdb_data.dptr);
680         if (ret == -1) {
681                 ret = LDB_ERR_OTHER;
682                 goto done;
683         }
684
685         if (!msg2->dn) {
686                 msg2->dn = msg->dn;
687         }
688
689         for (i=0; i<msg->num_elements; i++) {
690                 struct ldb_message_element *el = &msg->elements[i], *el2;
691                 struct ldb_val *vals;
692                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
693                 const char *dn;
694
695                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
696                 case LDB_FLAG_MOD_ADD:
697
698                         if (el->num_values == 0) {
699                                 ldb_asprintf_errstring(ldb,
700                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
701                                                        el->name, ldb_dn_get_linearized(msg2->dn));
702                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
703                                 goto done;
704                         }
705
706                         /* make a copy of the array so that a permissive
707                          * control can remove duplicates without changing the
708                          * original values, but do not copy data as we do not
709                          * need to keep it around once the operation is
710                          * finished */
711                         if (control_permissive) {
712                                 el = talloc(msg2, struct ldb_message_element);
713                                 if (!el) {
714                                         ret = LDB_ERR_OTHER;
715                                         goto done;
716                                 }
717                                 *el = msg->elements[i];
718                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
719                                 if (el->values == NULL) {
720                                         ret = LDB_ERR_OTHER;
721                                         goto done;
722                                 }
723                                 for (j = 0; j < el->num_values; j++) {
724                                         el->values[j] = msg->elements[i].values[j];
725                                 }
726                         }
727
728                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
729                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
730                                                        el->name, ldb_dn_get_linearized(msg2->dn));
731                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
732                                 goto done;
733                         }
734
735                         /* Checks if element already exists */
736                         idx = find_element(msg2, el->name);
737                         if (idx == -1) {
738                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
739                                         ret = LDB_ERR_OTHER;
740                                         goto done;
741                                 }
742                                 ret = ltdb_index_add_element(module, msg2->dn,
743                                                              el);
744                                 if (ret != LDB_SUCCESS) {
745                                         goto done;
746                                 }
747                         } else {
748                                 j = (unsigned int) idx;
749                                 el2 = &(msg2->elements[j]);
750
751                                 /* We cannot add another value on a existing one
752                                    if the attribute is single-valued */
753                                 if (ldb_tdb_single_valued(a, el)) {
754                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
755                                                                el->name, ldb_dn_get_linearized(msg2->dn));
756                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
757                                         goto done;
758                                 }
759
760                                 /* Check that values don't exist yet on multi-
761                                    valued attributes or aren't provided twice */
762                                 for (j = 0; j < el->num_values; j++) {
763                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
764                                                 if (control_permissive) {
765                                                         /* remove this one as if it was never added */
766                                                         el->num_values--;
767                                                         for (k = j; k < el->num_values; k++) {
768                                                                 el->values[k] = el->values[k + 1];
769                                                         }
770                                                         j--; /* rewind */
771
772                                                         continue;
773                                                 }
774
775                                                 ldb_asprintf_errstring(ldb,
776                                                                        "attribute '%s': value #%u on '%s' already exists",
777                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
778                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
779                                                 goto done;
780                                         }
781                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
782                                                 ldb_asprintf_errstring(ldb,
783                                                                        "attribute '%s': value #%u on '%s' provided more than once",
784                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
785                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
786                                                 goto done;
787                                         }
788                                 }
789
790                                 /* Now combine existing and new values to a new
791                                    attribute record */
792                                 vals = talloc_realloc(msg2->elements,
793                                                       el2->values, struct ldb_val,
794                                                       el2->num_values + el->num_values);
795                                 if (vals == NULL) {
796                                         ldb_oom(ldb);
797                                         ret = LDB_ERR_OTHER;
798                                         goto done;
799                                 }
800
801                                 for (j=0; j<el->num_values; j++) {
802                                         vals[el2->num_values + j] =
803                                                 ldb_val_dup(vals, &el->values[j]);
804                                 }
805
806                                 el2->values = vals;
807                                 el2->num_values += el->num_values;
808
809                                 ret = ltdb_index_add_element(module, msg2->dn, el);
810                                 if (ret != LDB_SUCCESS) {
811                                         goto done;
812                                 }
813                         }
814
815                         break;
816
817                 case LDB_FLAG_MOD_REPLACE:
818
819                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
820                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
821                                                        el->name, ldb_dn_get_linearized(msg2->dn));
822                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
823                                 goto done;
824                         }
825
826                         /* TODO: This is O(n^2) - replace with more efficient check */
827                         for (j=0; j<el->num_values; j++) {
828                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
829                                         ldb_asprintf_errstring(ldb,
830                                                                "attribute '%s': value #%u on '%s' provided more than once",
831                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
832                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
833                                         goto done;
834                                 }
835                         }
836
837                         /* Checks if element already exists */
838                         idx = find_element(msg2, el->name);
839                         if (idx != -1) {
840                                 j = (unsigned int) idx;
841                                 el2 = &(msg2->elements[j]);
842                                 if (ldb_msg_element_compare(el, el2) == 0) {
843                                         /* we are replacing with the same values */
844                                         continue;
845                                 }
846                         
847                                 /* Delete the attribute if it exists in the DB */
848                                 if (msg_delete_attribute(module, ldb, msg2,
849                                                          el->name) != 0) {
850                                         ret = LDB_ERR_OTHER;
851                                         goto done;
852                                 }
853                         }
854
855                         /* Recreate it with the new values */
856                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
857                                 ret = LDB_ERR_OTHER;
858                                 goto done;
859                         }
860
861                         ret = ltdb_index_add_element(module, msg2->dn, el);
862                         if (ret != LDB_SUCCESS) {
863                                 goto done;
864                         }
865
866                         break;
867
868                 case LDB_FLAG_MOD_DELETE:
869                         dn = ldb_dn_get_linearized(msg2->dn);
870                         if (dn == NULL) {
871                                 ret = LDB_ERR_OTHER;
872                                 goto done;
873                         }
874
875                         if (msg->elements[i].num_values == 0) {
876                                 /* Delete the whole attribute */
877                                 ret = msg_delete_attribute(module, ldb, msg2,
878                                                            msg->elements[i].name);
879                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
880                                     control_permissive) {
881                                         ret = LDB_SUCCESS;
882                                 } else {
883                                         ldb_asprintf_errstring(ldb,
884                                                                "attribute '%s': no such attribute for delete on '%s'",
885                                                                msg->elements[i].name, dn);
886                                 }
887                                 if (ret != LDB_SUCCESS) {
888                                         goto done;
889                                 }
890                         } else {
891                                 /* Delete specified values from an attribute */
892                                 for (j=0; j < msg->elements[i].num_values; j++) {
893                                         ret = msg_delete_element(module,
894                                                                  msg2,
895                                                                  msg->elements[i].name,
896                                                                  &msg->elements[i].values[j]);
897                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
898                                             control_permissive) {
899                                                 ret = LDB_SUCCESS;
900                                         } else {
901                                                 ldb_asprintf_errstring(ldb,
902                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
903                                                                        msg->elements[i].name, dn);
904                                         }
905                                         if (ret != LDB_SUCCESS) {
906                                                 goto done;
907                                         }
908                                 }
909                         }
910                         break;
911                 default:
912                         ldb_asprintf_errstring(ldb,
913                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
914                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
915                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
916                         ret = LDB_ERR_PROTOCOL_ERROR;
917                         goto done;
918                 }
919         }
920
921         ret = ltdb_store(module, msg2, TDB_MODIFY);
922         if (ret != LDB_SUCCESS) {
923                 goto done;
924         }
925
926         ret = ltdb_modified(module, msg2->dn);
927         if (ret != LDB_SUCCESS) {
928                 goto done;
929         }
930
931 done:
932         talloc_free(tdb_key.dptr);
933         return ret;
934 }
935
936 /*
937   modify a record
938 */
939 static int ltdb_modify(struct ltdb_context *ctx)
940 {
941         struct ldb_module *module = ctx->module;
942         struct ldb_request *req = ctx->req;
943         int ret = LDB_SUCCESS;
944
945         ret = ltdb_check_special_dn(module, req->op.mod.message);
946         if (ret != LDB_SUCCESS) {
947                 return ret;
948         }
949
950         ldb_request_set_state(req, LDB_ASYNC_PENDING);
951
952         if (ltdb_cache_load(module) != 0) {
953                 return LDB_ERR_OPERATIONS_ERROR;
954         }
955
956         ret = ltdb_modify_internal(module, req->op.mod.message, req);
957
958         return ret;
959 }
960
961 /*
962   rename a record
963 */
964 static int ltdb_rename(struct ltdb_context *ctx)
965 {
966         struct ldb_module *module = ctx->module;
967         struct ldb_request *req = ctx->req;
968         struct ldb_message *msg;
969         int ret = LDB_SUCCESS;
970
971         ldb_request_set_state(req, LDB_ASYNC_PENDING);
972
973         if (ltdb_cache_load(ctx->module) != 0) {
974                 return LDB_ERR_OPERATIONS_ERROR;
975         }
976
977         msg = ldb_msg_new(ctx);
978         if (msg == NULL) {
979                 return LDB_ERR_OPERATIONS_ERROR;
980         }
981
982         /* in case any attribute of the message was indexed, we need
983            to fetch the old record */
984         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
985         if (ret != LDB_SUCCESS) {
986                 /* not finding the old record is an error */
987                 return ret;
988         }
989
990         /* Always delete first then add, to avoid conflicts with
991          * unique indexes. We rely on the transaction to make this
992          * atomic
993          */
994         ret = ltdb_delete_internal(module, msg->dn);
995         if (ret != LDB_SUCCESS) {
996                 return ret;
997         }
998
999         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1000         if (msg->dn == NULL) {
1001                 return LDB_ERR_OPERATIONS_ERROR;
1002         }
1003
1004         /* We don't check single value as we can have more than 1 with
1005          * deleted attributes. We could go through all elements but that's
1006          * maybe not the most efficient way
1007          */
1008         ret = ltdb_add_internal(module, msg, false);
1009
1010         return ret;
1011 }
1012
1013 static int ltdb_start_trans(struct ldb_module *module)
1014 {
1015         void *data = ldb_module_get_private(module);
1016         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1017
1018         if (tdb_transaction_start(ltdb->tdb) != 0) {
1019                 return ltdb_err_map(tdb_error(ltdb->tdb));
1020         }
1021
1022         ltdb->in_transaction++;
1023
1024         ltdb_index_transaction_start(module);
1025
1026         return LDB_SUCCESS;
1027 }
1028
1029 static int ltdb_prepare_commit(struct ldb_module *module)
1030 {
1031         void *data = ldb_module_get_private(module);
1032         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1033
1034         if (ltdb->in_transaction != 1) {
1035                 return LDB_SUCCESS;
1036         }
1037
1038         if (ltdb_index_transaction_commit(module) != 0) {
1039                 tdb_transaction_cancel(ltdb->tdb);
1040                 ltdb->in_transaction--;
1041                 return ltdb_err_map(tdb_error(ltdb->tdb));
1042         }
1043
1044         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1045                 ltdb->in_transaction--;
1046                 return ltdb_err_map(tdb_error(ltdb->tdb));
1047         }
1048
1049         ltdb->prepared_commit = true;
1050
1051         return LDB_SUCCESS;
1052 }
1053
1054 static int ltdb_end_trans(struct ldb_module *module)
1055 {
1056         void *data = ldb_module_get_private(module);
1057         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1058
1059         if (!ltdb->prepared_commit) {
1060                 int ret = ltdb_prepare_commit(module);
1061                 if (ret != LDB_SUCCESS) {
1062                         return ret;
1063                 }
1064         }
1065
1066         ltdb->in_transaction--;
1067         ltdb->prepared_commit = false;
1068
1069         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1070                 return ltdb_err_map(tdb_error(ltdb->tdb));
1071         }
1072
1073         return LDB_SUCCESS;
1074 }
1075
1076 static int ltdb_del_trans(struct ldb_module *module)
1077 {
1078         void *data = ldb_module_get_private(module);
1079         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1080
1081         ltdb->in_transaction--;
1082
1083         if (ltdb_index_transaction_cancel(module) != 0) {
1084                 tdb_transaction_cancel(ltdb->tdb);
1085                 return ltdb_err_map(tdb_error(ltdb->tdb));
1086         }
1087
1088         tdb_transaction_cancel(ltdb->tdb);
1089         return LDB_SUCCESS;
1090 }
1091
1092 /*
1093   return sequenceNumber from @BASEINFO
1094 */
1095 static int ltdb_sequence_number(struct ltdb_context *ctx,
1096                                 struct ldb_extended **ext)
1097 {
1098         struct ldb_context *ldb;
1099         struct ldb_module *module = ctx->module;
1100         struct ldb_request *req = ctx->req;
1101         TALLOC_CTX *tmp_ctx = NULL;
1102         struct ldb_seqnum_request *seq;
1103         struct ldb_seqnum_result *res;
1104         struct ldb_message *msg = NULL;
1105         struct ldb_dn *dn;
1106         const char *date;
1107         int ret = LDB_SUCCESS;
1108
1109         ldb = ldb_module_get_ctx(module);
1110
1111         seq = talloc_get_type(req->op.extended.data,
1112                                 struct ldb_seqnum_request);
1113         if (seq == NULL) {
1114                 return LDB_ERR_OPERATIONS_ERROR;
1115         }
1116
1117         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1118
1119         if (ltdb_lock_read(module) != 0) {
1120                 return LDB_ERR_OPERATIONS_ERROR;
1121         }
1122
1123         res = talloc_zero(req, struct ldb_seqnum_result);
1124         if (res == NULL) {
1125                 ret = LDB_ERR_OPERATIONS_ERROR;
1126                 goto done;
1127         }
1128
1129         tmp_ctx = talloc_new(req);
1130         if (tmp_ctx == NULL) {
1131                 ret = LDB_ERR_OPERATIONS_ERROR;
1132                 goto done;
1133         }
1134
1135         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1136         if (dn == NULL) {
1137                 ret = LDB_ERR_OPERATIONS_ERROR;
1138                 goto done;
1139         }
1140
1141         msg = ldb_msg_new(tmp_ctx);
1142         if (msg == NULL) {
1143                 ret = LDB_ERR_OPERATIONS_ERROR;
1144                 goto done;
1145         }
1146
1147         ret = ltdb_search_dn1(module, dn, msg);
1148         if (ret != LDB_SUCCESS) {
1149                 goto done;
1150         }
1151
1152         switch (seq->type) {
1153         case LDB_SEQ_HIGHEST_SEQ:
1154                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1155                 break;
1156         case LDB_SEQ_NEXT:
1157                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1158                 res->seq_num++;
1159                 break;
1160         case LDB_SEQ_HIGHEST_TIMESTAMP:
1161                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1162                 if (date) {
1163                         res->seq_num = ldb_string_to_time(date);
1164                 } else {
1165                         res->seq_num = 0;
1166                         /* zero is as good as anything when we don't know */
1167                 }
1168                 break;
1169         }
1170
1171         *ext = talloc_zero(req, struct ldb_extended);
1172         if (*ext == NULL) {
1173                 ret = LDB_ERR_OPERATIONS_ERROR;
1174                 goto done;
1175         }
1176         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1177         (*ext)->data = talloc_steal(*ext, res);
1178
1179 done:
1180         talloc_free(tmp_ctx);
1181         ltdb_unlock_read(module);
1182         return ret;
1183 }
1184
1185 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1186 {
1187         struct ldb_context *ldb;
1188         struct ldb_request *req;
1189         struct ldb_reply *ares;
1190
1191         ldb = ldb_module_get_ctx(ctx->module);
1192         req = ctx->req;
1193
1194         /* if we already returned an error just return */
1195         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1196                 return;
1197         }
1198
1199         ares = talloc_zero(req, struct ldb_reply);
1200         if (!ares) {
1201                 ldb_oom(ldb);
1202                 req->callback(req, NULL);
1203                 return;
1204         }
1205         ares->type = LDB_REPLY_DONE;
1206         ares->error = error;
1207
1208         req->callback(req, ares);
1209 }
1210
1211 static void ltdb_timeout(struct tevent_context *ev,
1212                           struct tevent_timer *te,
1213                           struct timeval t,
1214                           void *private_data)
1215 {
1216         struct ltdb_context *ctx;
1217         ctx = talloc_get_type(private_data, struct ltdb_context);
1218
1219         if (!ctx->request_terminated) {
1220                 /* request is done now */
1221                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1222         }
1223
1224         if (!ctx->request_terminated) {
1225                 /* neutralize the spy */
1226                 ctx->spy->ctx = NULL;
1227         }
1228         talloc_free(ctx);
1229 }
1230
1231 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1232                                         struct ldb_extended *ext,
1233                                         int error)
1234 {
1235         struct ldb_context *ldb;
1236         struct ldb_request *req;
1237         struct ldb_reply *ares;
1238
1239         ldb = ldb_module_get_ctx(ctx->module);
1240         req = ctx->req;
1241
1242         /* if we already returned an error just return */
1243         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1244                 return;
1245         }
1246
1247         ares = talloc_zero(req, struct ldb_reply);
1248         if (!ares) {
1249                 ldb_oom(ldb);
1250                 req->callback(req, NULL);
1251                 return;
1252         }
1253         ares->type = LDB_REPLY_DONE;
1254         ares->response = ext;
1255         ares->error = error;
1256
1257         req->callback(req, ares);
1258 }
1259
1260 static void ltdb_handle_extended(struct ltdb_context *ctx)
1261 {
1262         struct ldb_extended *ext = NULL;
1263         int ret;
1264
1265         if (strcmp(ctx->req->op.extended.oid,
1266                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1267                 /* get sequence number */
1268                 ret = ltdb_sequence_number(ctx, &ext);
1269         } else {
1270                 /* not recognized */
1271                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1272         }
1273
1274         ltdb_request_extended_done(ctx, ext, ret);
1275 }
1276
1277 static void ltdb_callback(struct tevent_context *ev,
1278                           struct tevent_timer *te,
1279                           struct timeval t,
1280                           void *private_data)
1281 {
1282         struct ltdb_context *ctx;
1283         int ret;
1284
1285         ctx = talloc_get_type(private_data, struct ltdb_context);
1286
1287         if (ctx->request_terminated) {
1288                 goto done;
1289         }
1290
1291         switch (ctx->req->operation) {
1292         case LDB_SEARCH:
1293                 ret = ltdb_search(ctx);
1294                 break;
1295         case LDB_ADD:
1296                 ret = ltdb_add(ctx);
1297                 break;
1298         case LDB_MODIFY:
1299                 ret = ltdb_modify(ctx);
1300                 break;
1301         case LDB_DELETE:
1302                 ret = ltdb_delete(ctx);
1303                 break;
1304         case LDB_RENAME:
1305                 ret = ltdb_rename(ctx);
1306                 break;
1307         case LDB_EXTENDED:
1308                 ltdb_handle_extended(ctx);
1309                 goto done;
1310         default:
1311                 /* no other op supported */
1312                 ret = LDB_ERR_PROTOCOL_ERROR;
1313         }
1314
1315         if (!ctx->request_terminated) {
1316                 /* request is done now */
1317                 ltdb_request_done(ctx, ret);
1318         }
1319
1320 done:
1321         if (!ctx->request_terminated) {
1322                 /* neutralize the spy */
1323                 ctx->spy->ctx = NULL;
1324         }
1325         talloc_free(ctx);
1326 }
1327
1328 static int ltdb_request_destructor(void *ptr)
1329 {
1330         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1331
1332         if (spy->ctx != NULL) {
1333                 spy->ctx->request_terminated = true;
1334         }
1335
1336         return 0;
1337 }
1338
1339 static int ltdb_handle_request(struct ldb_module *module,
1340                                struct ldb_request *req)
1341 {
1342         struct ldb_control *control_permissive;
1343         struct ldb_context *ldb;
1344         struct tevent_context *ev;
1345         struct ltdb_context *ac;
1346         struct tevent_timer *te;
1347         struct timeval tv;
1348         unsigned int i;
1349
1350         ldb = ldb_module_get_ctx(module);
1351
1352         control_permissive = ldb_request_get_control(req,
1353                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1354
1355         for (i = 0; req->controls && req->controls[i]; i++) {
1356                 if (req->controls[i]->critical &&
1357                     req->controls[i] != control_permissive) {
1358                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1359                                                req->controls[i]->oid);
1360                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1361                 }
1362         }
1363
1364         if (req->starttime == 0 || req->timeout == 0) {
1365                 ldb_set_errstring(ldb, "Invalid timeout settings");
1366                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1367         }
1368
1369         ev = ldb_get_event_context(ldb);
1370
1371         ac = talloc_zero(ldb, struct ltdb_context);
1372         if (ac == NULL) {
1373                 ldb_oom(ldb);
1374                 return LDB_ERR_OPERATIONS_ERROR;
1375         }
1376
1377         ac->module = module;
1378         ac->req = req;
1379
1380         tv.tv_sec = 0;
1381         tv.tv_usec = 0;
1382         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1383         if (NULL == te) {
1384                 talloc_free(ac);
1385                 return LDB_ERR_OPERATIONS_ERROR;
1386         }
1387
1388         tv.tv_sec = req->starttime + req->timeout;
1389         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1390         if (NULL == ac->timeout_event) {
1391                 talloc_free(ac);
1392                 return LDB_ERR_OPERATIONS_ERROR;
1393         }
1394
1395         /* set a spy so that we do not try to use the request context
1396          * if it is freed before ltdb_callback fires */
1397         ac->spy = talloc(req, struct ltdb_req_spy);
1398         if (NULL == ac->spy) {
1399                 talloc_free(ac);
1400                 return LDB_ERR_OPERATIONS_ERROR;
1401         }
1402         ac->spy->ctx = ac;
1403
1404         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1405
1406         return LDB_SUCCESS;
1407 }
1408
1409 static int ltdb_init_rootdse(struct ldb_module *module)
1410 {
1411         struct ldb_context *ldb;
1412         int ret;
1413
1414         ldb = ldb_module_get_ctx(module);
1415
1416         ret = ldb_mod_register_control(module,
1417                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1418         /* ignore errors on this - we expect it for non-sam databases */
1419
1420         /* there can be no module beyond the backend, just return */
1421         return LDB_SUCCESS;
1422 }
1423
1424 static const struct ldb_module_ops ltdb_ops = {
1425         .name              = "tdb",
1426         .init_context      = ltdb_init_rootdse,
1427         .search            = ltdb_handle_request,
1428         .add               = ltdb_handle_request,
1429         .modify            = ltdb_handle_request,
1430         .del               = ltdb_handle_request,
1431         .rename            = ltdb_handle_request,
1432         .extended          = ltdb_handle_request,
1433         .start_transaction = ltdb_start_trans,
1434         .end_transaction   = ltdb_end_trans,
1435         .prepare_commit    = ltdb_prepare_commit,
1436         .del_transaction   = ltdb_del_trans,
1437 };
1438
1439 /*
1440   connect to the database
1441 */
1442 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1443                         unsigned int flags, const char *options[],
1444                         struct ldb_module **_module)
1445 {
1446         struct ldb_module *module;
1447         const char *path;
1448         int tdb_flags, open_flags;
1449         struct ltdb_private *ltdb;
1450
1451         /* parse the url */
1452         if (strchr(url, ':')) {
1453                 if (strncmp(url, "tdb://", 6) != 0) {
1454                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1455                                   "Invalid tdb URL '%s'", url);
1456                         return LDB_ERR_OPERATIONS_ERROR;
1457                 }
1458                 path = url+6;
1459         } else {
1460                 path = url;
1461         }
1462
1463         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1464
1465         /* check for the 'nosync' option */
1466         if (flags & LDB_FLG_NOSYNC) {
1467                 tdb_flags |= TDB_NOSYNC;
1468         }
1469
1470         /* and nommap option */
1471         if (flags & LDB_FLG_NOMMAP) {
1472                 tdb_flags |= TDB_NOMMAP;
1473         }
1474
1475         if (flags & LDB_FLG_RDONLY) {
1476                 open_flags = O_RDONLY;
1477         } else {
1478                 open_flags = O_CREAT | O_RDWR;
1479         }
1480
1481         ltdb = talloc_zero(ldb, struct ltdb_private);
1482         if (!ltdb) {
1483                 ldb_oom(ldb);
1484                 return LDB_ERR_OPERATIONS_ERROR;
1485         }
1486
1487         /* note that we use quite a large default hash size */
1488         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1489                                    tdb_flags, open_flags,
1490                                    ldb_get_create_perms(ldb), ldb);
1491         if (!ltdb->tdb) {
1492                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1493                           "Unable to open tdb '%s'", path);
1494                 talloc_free(ltdb);
1495                 return LDB_ERR_OPERATIONS_ERROR;
1496         }
1497
1498         if (getenv("LDB_WARN_UNINDEXED")) {
1499                 ltdb->warn_unindexed = true;
1500         }
1501
1502         ltdb->sequence_number = 0;
1503
1504         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1505         if (!module) {
1506                 talloc_free(ltdb);
1507                 return LDB_ERR_OPERATIONS_ERROR;
1508         }
1509         ldb_module_set_private(module, ltdb);
1510         talloc_steal(module, ltdb);
1511
1512         if (ltdb_cache_load(module) != 0) {
1513                 talloc_free(module);
1514                 talloc_free(ltdb);
1515                 return LDB_ERR_OPERATIONS_ERROR;
1516         }
1517
1518         *_module = module;
1519         return LDB_SUCCESS;
1520 }
1521
1522 int ldb_tdb_init(const char *version)
1523 {
1524         LDB_MODULE_CHECK_VERSION(version);
1525         return ldb_register_backend("tdb", ltdb_connect, false);
1526 }