ldb: make the 'spy' code more paranoid
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include <lib/tdb_compat/tdb_compat.h>
54
55 /*
56   prevent memory errors on callbacks
57 */
58 struct ltdb_req_spy {
59         struct ltdb_context *ctx;
60 };
61
62 /*
63   map a tdb error code to a ldb error code
64 */
65 int ltdb_err_map(enum TDB_ERROR tdb_code)
66 {
67         switch (tdb_code) {
68         case TDB_SUCCESS:
69                 return LDB_SUCCESS;
70         case TDB_ERR_CORRUPT:
71         case TDB_ERR_OOM:
72         case TDB_ERR_EINVAL:
73                 return LDB_ERR_OPERATIONS_ERROR;
74         case TDB_ERR_IO:
75                 return LDB_ERR_PROTOCOL_ERROR;
76         case TDB_ERR_LOCK:
77 #ifndef BUILD_TDB2
78         case TDB_ERR_NOLOCK:
79 #endif
80                 return LDB_ERR_BUSY;
81 #ifndef BUILD_TDB2
82         case TDB_ERR_LOCK_TIMEOUT:
83 #endif
84                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
85         case TDB_ERR_EXISTS:
86                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
87         case TDB_ERR_NOEXIST:
88                 return LDB_ERR_NO_SUCH_OBJECT;
89         case TDB_ERR_RDONLY:
90                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
91         default:
92                 break;
93         }
94         return LDB_ERR_OTHER;
95 }
96
97 /*
98   lock the database for read - use by ltdb_search and ltdb_sequence_number
99 */
100 int ltdb_lock_read(struct ldb_module *module)
101 {
102         void *data = ldb_module_get_private(module);
103         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
104         int ret = 0;
105
106         if (ltdb->in_transaction == 0 &&
107             ltdb->read_lock_count == 0) {
108                 ret = tdb_lockall_read(ltdb->tdb);
109         }
110         if (ret == 0) {
111                 ltdb->read_lock_count++;
112         }
113         return ret;
114 }
115
116 /*
117   unlock the database after a ltdb_lock_read()
118 */
119 int ltdb_unlock_read(struct ldb_module *module)
120 {
121         void *data = ldb_module_get_private(module);
122         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
123         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
124                 tdb_unlockall_read(ltdb->tdb);
125                 return 0;
126         }
127         ltdb->read_lock_count--;
128         return 0;
129 }
130
131
132 /*
133   form a TDB_DATA for a record key
134   caller frees
135
136   note that the key for a record can depend on whether the
137   dn refers to a case sensitive index record or not
138 */
139 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
140 {
141         struct ldb_context *ldb = ldb_module_get_ctx(module);
142         TDB_DATA key;
143         char *key_str = NULL;
144         const char *dn_folded = NULL;
145
146         /*
147           most DNs are case insensitive. The exception is index DNs for
148           case sensitive attributes
149
150           there are 3 cases dealt with in this code:
151
152           1) if the dn doesn't start with @ then uppercase the attribute
153              names and the attributes values of case insensitive attributes
154           2) if the dn starts with @ then leave it alone -
155              the indexing code handles the rest
156         */
157
158         dn_folded = ldb_dn_get_casefold(dn);
159         if (!dn_folded) {
160                 goto failed;
161         }
162
163         key_str = talloc_strdup(ldb, "DN=");
164         if (!key_str) {
165                 goto failed;
166         }
167
168         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
169         if (!key_str) {
170                 goto failed;
171         }
172
173         key.dptr = (uint8_t *)key_str;
174         key.dsize = strlen(key_str) + 1;
175
176         return key;
177
178 failed:
179         errno = ENOMEM;
180         key.dptr = NULL;
181         key.dsize = 0;
182         return key;
183 }
184
185 /*
186   check special dn's have valid attributes
187   currently only @ATTRIBUTES is checked
188 */
189 static int ltdb_check_special_dn(struct ldb_module *module,
190                                  const struct ldb_message *msg)
191 {
192         struct ldb_context *ldb = ldb_module_get_ctx(module);
193         unsigned int i, j;
194
195         if (! ldb_dn_is_special(msg->dn) ||
196             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
197                 return LDB_SUCCESS;
198         }
199
200         /* we have @ATTRIBUTES, let's check attributes are fine */
201         /* should we check that we deny multivalued attributes ? */
202         for (i = 0; i < msg->num_elements; i++) {
203                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
204
205                 for (j = 0; j < msg->elements[i].num_values; j++) {
206                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
207                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
208                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
209                         }
210                 }
211         }
212
213         return LDB_SUCCESS;
214 }
215
216
217 /*
218   we've made a modification to a dn - possibly reindex and
219   update sequence number
220 */
221 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
222 {
223         int ret = LDB_SUCCESS;
224         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
225
226         /* only allow modifies inside a transaction, otherwise the
227          * ldb is unsafe */
228         if (ltdb->in_transaction == 0) {
229                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
230                 return LDB_ERR_OPERATIONS_ERROR;
231         }
232
233         if (ldb_dn_is_special(dn) &&
234             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
235              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
236                 ret = ltdb_reindex(module);
237         }
238
239         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
240         if (ret == LDB_SUCCESS &&
241             !(ldb_dn_is_special(dn) &&
242               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
243                 ret = ltdb_increase_sequence_number(module);
244         }
245
246         /* If the modify was to @OPTIONS, reload the cache */
247         if (ret == LDB_SUCCESS &&
248             ldb_dn_is_special(dn) &&
249             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
250                 ret = ltdb_cache_reload(module);
251         }
252
253         return ret;
254 }
255
256 /*
257   store a record into the db
258 */
259 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
260 {
261         void *data = ldb_module_get_private(module);
262         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
263         TDB_DATA tdb_key, tdb_data;
264         int ret = LDB_SUCCESS;
265
266         tdb_key = ltdb_key(module, msg->dn);
267         if (tdb_key.dptr == NULL) {
268                 return LDB_ERR_OTHER;
269         }
270
271         ret = ltdb_pack_data(module, msg, &tdb_data);
272         if (ret == -1) {
273                 talloc_free(tdb_key.dptr);
274                 return LDB_ERR_OTHER;
275         }
276
277         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
278         if (ret != 0) {
279                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
280                 goto done;
281         }
282
283 done:
284         talloc_free(tdb_key.dptr);
285         talloc_free(tdb_data.dptr);
286
287         return ret;
288 }
289
290
291 /*
292   check if a attribute is a single valued, for a given element
293  */
294 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
295                                   struct ldb_message_element *el)
296 {
297         if (!a) return false;
298         if (el != NULL) {
299                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
300                         /* override from a ldb module, for example
301                            used for the description field, which is
302                            marked multi-valued in the schema but which
303                            should not actually accept multiple
304                            values */
305                         return true;
306                 }
307                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
308                         /* override from a ldb module, for example used for
309                            deleted linked attribute entries */
310                         return false;
311                 }
312         }
313         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
314                 return true;
315         }
316         return false;
317 }
318
319 static int ltdb_add_internal(struct ldb_module *module,
320                              const struct ldb_message *msg,
321                              bool check_single_value)
322 {
323         struct ldb_context *ldb = ldb_module_get_ctx(module);
324         int ret = LDB_SUCCESS;
325         unsigned int i;
326
327         for (i=0;i<msg->num_elements;i++) {
328                 struct ldb_message_element *el = &msg->elements[i];
329                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
330
331                 if (el->num_values == 0) {
332                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
333                                                el->name, ldb_dn_get_linearized(msg->dn));
334                         return LDB_ERR_CONSTRAINT_VIOLATION;
335                 }
336                 if (check_single_value &&
337                                 el->num_values > 1 &&
338                                 ldb_tdb_single_valued(a, el)) {
339                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
340                                                el->name, ldb_dn_get_linearized(msg->dn));
341                         return LDB_ERR_CONSTRAINT_VIOLATION;
342                 }
343         }
344
345         ret = ltdb_store(module, msg, TDB_INSERT);
346         if (ret != LDB_SUCCESS) {
347                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
348                         ldb_asprintf_errstring(ldb,
349                                                "Entry %s already exists",
350                                                ldb_dn_get_linearized(msg->dn));
351                 }
352                 return ret;
353         }
354
355         ret = ltdb_index_add_new(module, msg);
356         if (ret != LDB_SUCCESS) {
357                 return ret;
358         }
359
360         ret = ltdb_modified(module, msg->dn);
361
362         return ret;
363 }
364
365 /*
366   add a record to the database
367 */
368 static int ltdb_add(struct ltdb_context *ctx)
369 {
370         struct ldb_module *module = ctx->module;
371         struct ldb_request *req = ctx->req;
372         int ret = LDB_SUCCESS;
373
374         ret = ltdb_check_special_dn(module, req->op.add.message);
375         if (ret != LDB_SUCCESS) {
376                 return ret;
377         }
378
379         ldb_request_set_state(req, LDB_ASYNC_PENDING);
380
381         if (ltdb_cache_load(module) != 0) {
382                 return LDB_ERR_OPERATIONS_ERROR;
383         }
384
385         ret = ltdb_add_internal(module, req->op.add.message, true);
386
387         return ret;
388 }
389
390 /*
391   delete a record from the database, not updating indexes (used for deleting
392   index records)
393 */
394 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
395 {
396         void *data = ldb_module_get_private(module);
397         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
398         TDB_DATA tdb_key;
399         int ret;
400
401         tdb_key = ltdb_key(module, dn);
402         if (!tdb_key.dptr) {
403                 return LDB_ERR_OTHER;
404         }
405
406         ret = tdb_delete(ltdb->tdb, tdb_key);
407         talloc_free(tdb_key.dptr);
408
409         if (ret != 0) {
410                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
411         }
412
413         return ret;
414 }
415
416 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
417 {
418         struct ldb_message *msg;
419         int ret = LDB_SUCCESS;
420
421         msg = ldb_msg_new(module);
422         if (msg == NULL) {
423                 return LDB_ERR_OPERATIONS_ERROR;
424         }
425
426         /* in case any attribute of the message was indexed, we need
427            to fetch the old record */
428         ret = ltdb_search_dn1(module, dn, msg);
429         if (ret != LDB_SUCCESS) {
430                 /* not finding the old record is an error */
431                 goto done;
432         }
433
434         ret = ltdb_delete_noindex(module, dn);
435         if (ret != LDB_SUCCESS) {
436                 goto done;
437         }
438
439         /* remove any indexed attributes */
440         ret = ltdb_index_delete(module, msg);
441         if (ret != LDB_SUCCESS) {
442                 goto done;
443         }
444
445         ret = ltdb_modified(module, dn);
446         if (ret != LDB_SUCCESS) {
447                 goto done;
448         }
449
450 done:
451         talloc_free(msg);
452         return ret;
453 }
454
455 /*
456   delete a record from the database
457 */
458 static int ltdb_delete(struct ltdb_context *ctx)
459 {
460         struct ldb_module *module = ctx->module;
461         struct ldb_request *req = ctx->req;
462         int ret = LDB_SUCCESS;
463
464         ldb_request_set_state(req, LDB_ASYNC_PENDING);
465
466         if (ltdb_cache_load(module) != 0) {
467                 return LDB_ERR_OPERATIONS_ERROR;
468         }
469
470         ret = ltdb_delete_internal(module, req->op.del.dn);
471
472         return ret;
473 }
474
475 /*
476   find an element by attribute name. At the moment this does a linear search,
477   it should be re-coded to use a binary search once all places that modify
478   records guarantee sorted order
479
480   return the index of the first matching element if found, otherwise -1
481 */
482 static int find_element(const struct ldb_message *msg, const char *name)
483 {
484         unsigned int i;
485         for (i=0;i<msg->num_elements;i++) {
486                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
487                         return i;
488                 }
489         }
490         return -1;
491 }
492
493
494 /*
495   add an element to an existing record. Assumes a elements array that we
496   can call re-alloc on, and assumed that we can re-use the data pointers from
497   the passed in additional values. Use with care!
498
499   returns 0 on success, -1 on failure (and sets errno)
500 */
501 static int ltdb_msg_add_element(struct ldb_context *ldb,
502                                 struct ldb_message *msg,
503                                 struct ldb_message_element *el)
504 {
505         struct ldb_message_element *e2;
506         unsigned int i;
507
508         if (el->num_values == 0) {
509                 /* nothing to do here - we don't add empty elements */
510                 return 0;
511         }
512
513         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
514                               msg->num_elements+1);
515         if (!e2) {
516                 errno = ENOMEM;
517                 return -1;
518         }
519
520         msg->elements = e2;
521
522         e2 = &msg->elements[msg->num_elements];
523
524         e2->name = el->name;
525         e2->flags = el->flags;
526         e2->values = talloc_array(msg->elements,
527                                   struct ldb_val, el->num_values);
528         if (!e2->values) {
529                 errno = ENOMEM;
530                 return -1;
531         }
532         for (i=0;i<el->num_values;i++) {
533                 e2->values[i] = el->values[i];
534         }
535         e2->num_values = el->num_values;
536
537         ++msg->num_elements;
538
539         return 0;
540 }
541
542 /*
543   delete all elements having a specified attribute name
544 */
545 static int msg_delete_attribute(struct ldb_module *module,
546                                 struct ldb_context *ldb,
547                                 struct ldb_message *msg, const char *name)
548 {
549         unsigned int i;
550         int ret;
551         struct ldb_message_element *el;
552
553         el = ldb_msg_find_element(msg, name);
554         if (el == NULL) {
555                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
556         }
557         i = el - msg->elements;
558
559         ret = ltdb_index_del_element(module, msg->dn, el);
560         if (ret != LDB_SUCCESS) {
561                 return ret;
562         }
563
564         talloc_free(el->values);
565         if (msg->num_elements > (i+1)) {
566                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
567         }
568         msg->num_elements--;
569         msg->elements = talloc_realloc(msg, msg->elements,
570                                        struct ldb_message_element,
571                                        msg->num_elements);
572         return LDB_SUCCESS;
573 }
574
575 /*
576   delete all elements matching an attribute name/value
577
578   return LDB Error on failure
579 */
580 static int msg_delete_element(struct ldb_module *module,
581                               struct ldb_message *msg,
582                               const char *name,
583                               const struct ldb_val *val)
584 {
585         struct ldb_context *ldb = ldb_module_get_ctx(module);
586         unsigned int i;
587         int found, ret;
588         struct ldb_message_element *el;
589         const struct ldb_schema_attribute *a;
590
591         found = find_element(msg, name);
592         if (found == -1) {
593                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
594         }
595
596         i = (unsigned int) found;
597         el = &(msg->elements[i]);
598
599         a = ldb_schema_attribute_by_name(ldb, el->name);
600
601         for (i=0;i<el->num_values;i++) {
602                 bool matched;
603                 if (a->syntax->operator_fn) {
604                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
605                                                      &el->values[i], val, &matched);
606                         if (ret != LDB_SUCCESS) return ret;
607                 } else {
608                         matched = (a->syntax->comparison_fn(ldb, ldb,
609                                                             &el->values[i], val) == 0);
610                 }
611                 if (matched) {
612                         if (el->num_values == 1) {
613                                 return msg_delete_attribute(module, ldb, msg, name);
614                         }
615
616                         ret = ltdb_index_del_value(module, msg->dn, el, i);
617                         if (ret != LDB_SUCCESS) {
618                                 return ret;
619                         }
620
621                         if (i<el->num_values-1) {
622                                 memmove(&el->values[i], &el->values[i+1],
623                                         sizeof(el->values[i])*
624                                                 (el->num_values-(i+1)));
625                         }
626                         el->num_values--;
627
628                         /* per definition we find in a canonicalised message an
629                            attribute value only once. So we are finished here */
630                         return LDB_SUCCESS;
631                 }
632         }
633
634         /* Not found */
635         return LDB_ERR_NO_SUCH_ATTRIBUTE;
636 }
637
638
639 /*
640   modify a record - internal interface
641
642   yuck - this is O(n^2). Luckily n is usually small so we probably
643   get away with it, but if we ever have really large attribute lists
644   then we'll need to look at this again
645
646   'req' is optional, and is used to specify controls if supplied
647 */
648 int ltdb_modify_internal(struct ldb_module *module,
649                          const struct ldb_message *msg,
650                          struct ldb_request *req)
651 {
652         struct ldb_context *ldb = ldb_module_get_ctx(module);
653         void *data = ldb_module_get_private(module);
654         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
655         TDB_DATA tdb_key, tdb_data;
656         struct ldb_message *msg2;
657         unsigned int i, j, k;
658         int ret = LDB_SUCCESS, idx;
659         struct ldb_control *control_permissive = NULL;
660
661         if (req) {
662                 control_permissive = ldb_request_get_control(req,
663                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
664         }
665
666         tdb_key = ltdb_key(module, msg->dn);
667         if (!tdb_key.dptr) {
668                 return LDB_ERR_OTHER;
669         }
670
671         tdb_data = tdb_fetch_compat(ltdb->tdb, tdb_key);
672         if (!tdb_data.dptr) {
673                 talloc_free(tdb_key.dptr);
674                 return ltdb_err_map(tdb_error(ltdb->tdb));
675         }
676
677         msg2 = ldb_msg_new(tdb_key.dptr);
678         if (msg2 == NULL) {
679                 free(tdb_data.dptr);
680                 ret = LDB_ERR_OTHER;
681                 goto done;
682         }
683
684         ret = ltdb_unpack_data(module, &tdb_data, msg2);
685         free(tdb_data.dptr);
686         if (ret == -1) {
687                 ret = LDB_ERR_OTHER;
688                 goto done;
689         }
690
691         if (!msg2->dn) {
692                 msg2->dn = msg->dn;
693         }
694
695         for (i=0; i<msg->num_elements; i++) {
696                 struct ldb_message_element *el = &msg->elements[i], *el2;
697                 struct ldb_val *vals;
698                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
699                 const char *dn;
700
701                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
702                 case LDB_FLAG_MOD_ADD:
703
704                         if (el->num_values == 0) {
705                                 ldb_asprintf_errstring(ldb,
706                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
707                                                        el->name, ldb_dn_get_linearized(msg2->dn));
708                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
709                                 goto done;
710                         }
711
712                         /* make a copy of the array so that a permissive
713                          * control can remove duplicates without changing the
714                          * original values, but do not copy data as we do not
715                          * need to keep it around once the operation is
716                          * finished */
717                         if (control_permissive) {
718                                 el = talloc(msg2, struct ldb_message_element);
719                                 if (!el) {
720                                         ret = LDB_ERR_OTHER;
721                                         goto done;
722                                 }
723                                 *el = msg->elements[i];
724                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
725                                 if (el->values == NULL) {
726                                         ret = LDB_ERR_OTHER;
727                                         goto done;
728                                 }
729                                 for (j = 0; j < el->num_values; j++) {
730                                         el->values[j] = msg->elements[i].values[j];
731                                 }
732                         }
733
734                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
735                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
736                                                        el->name, ldb_dn_get_linearized(msg2->dn));
737                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
738                                 goto done;
739                         }
740
741                         /* Checks if element already exists */
742                         idx = find_element(msg2, el->name);
743                         if (idx == -1) {
744                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
745                                         ret = LDB_ERR_OTHER;
746                                         goto done;
747                                 }
748                                 ret = ltdb_index_add_element(module, msg2->dn,
749                                                              el);
750                                 if (ret != LDB_SUCCESS) {
751                                         goto done;
752                                 }
753                         } else {
754                                 j = (unsigned int) idx;
755                                 el2 = &(msg2->elements[j]);
756
757                                 /* We cannot add another value on a existing one
758                                    if the attribute is single-valued */
759                                 if (ldb_tdb_single_valued(a, el)) {
760                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
761                                                                el->name, ldb_dn_get_linearized(msg2->dn));
762                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
763                                         goto done;
764                                 }
765
766                                 /* Check that values don't exist yet on multi-
767                                    valued attributes or aren't provided twice */
768                                 for (j = 0; j < el->num_values; j++) {
769                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
770                                                 if (control_permissive) {
771                                                         /* remove this one as if it was never added */
772                                                         el->num_values--;
773                                                         for (k = j; k < el->num_values; k++) {
774                                                                 el->values[k] = el->values[k + 1];
775                                                         }
776                                                         j--; /* rewind */
777
778                                                         continue;
779                                                 }
780
781                                                 ldb_asprintf_errstring(ldb,
782                                                                        "attribute '%s': value #%u on '%s' already exists",
783                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
784                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
785                                                 goto done;
786                                         }
787                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
788                                                 ldb_asprintf_errstring(ldb,
789                                                                        "attribute '%s': value #%u on '%s' provided more than once",
790                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
791                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
792                                                 goto done;
793                                         }
794                                 }
795
796                                 /* Now combine existing and new values to a new
797                                    attribute record */
798                                 vals = talloc_realloc(msg2->elements,
799                                                       el2->values, struct ldb_val,
800                                                       el2->num_values + el->num_values);
801                                 if (vals == NULL) {
802                                         ldb_oom(ldb);
803                                         ret = LDB_ERR_OTHER;
804                                         goto done;
805                                 }
806
807                                 for (j=0; j<el->num_values; j++) {
808                                         vals[el2->num_values + j] =
809                                                 ldb_val_dup(vals, &el->values[j]);
810                                 }
811
812                                 el2->values = vals;
813                                 el2->num_values += el->num_values;
814
815                                 ret = ltdb_index_add_element(module, msg2->dn, el);
816                                 if (ret != LDB_SUCCESS) {
817                                         goto done;
818                                 }
819                         }
820
821                         break;
822
823                 case LDB_FLAG_MOD_REPLACE:
824
825                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
826                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
827                                                        el->name, ldb_dn_get_linearized(msg2->dn));
828                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
829                                 goto done;
830                         }
831
832                         /* TODO: This is O(n^2) - replace with more efficient check */
833                         for (j=0; j<el->num_values; j++) {
834                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
835                                         ldb_asprintf_errstring(ldb,
836                                                                "attribute '%s': value #%u on '%s' provided more than once",
837                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
838                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
839                                         goto done;
840                                 }
841                         }
842
843                         /* Checks if element already exists */
844                         idx = find_element(msg2, el->name);
845                         if (idx != -1) {
846                                 j = (unsigned int) idx;
847                                 el2 = &(msg2->elements[j]);
848                                 if (ldb_msg_element_compare(el, el2) == 0) {
849                                         /* we are replacing with the same values */
850                                         continue;
851                                 }
852                         
853                                 /* Delete the attribute if it exists in the DB */
854                                 if (msg_delete_attribute(module, ldb, msg2,
855                                                          el->name) != 0) {
856                                         ret = LDB_ERR_OTHER;
857                                         goto done;
858                                 }
859                         }
860
861                         /* Recreate it with the new values */
862                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
863                                 ret = LDB_ERR_OTHER;
864                                 goto done;
865                         }
866
867                         ret = ltdb_index_add_element(module, msg2->dn, el);
868                         if (ret != LDB_SUCCESS) {
869                                 goto done;
870                         }
871
872                         break;
873
874                 case LDB_FLAG_MOD_DELETE:
875                         dn = ldb_dn_get_linearized(msg2->dn);
876                         if (dn == NULL) {
877                                 ret = LDB_ERR_OTHER;
878                                 goto done;
879                         }
880
881                         if (msg->elements[i].num_values == 0) {
882                                 /* Delete the whole attribute */
883                                 ret = msg_delete_attribute(module, ldb, msg2,
884                                                            msg->elements[i].name);
885                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
886                                     control_permissive) {
887                                         ret = LDB_SUCCESS;
888                                 } else {
889                                         ldb_asprintf_errstring(ldb,
890                                                                "attribute '%s': no such attribute for delete on '%s'",
891                                                                msg->elements[i].name, dn);
892                                 }
893                                 if (ret != LDB_SUCCESS) {
894                                         goto done;
895                                 }
896                         } else {
897                                 /* Delete specified values from an attribute */
898                                 for (j=0; j < msg->elements[i].num_values; j++) {
899                                         ret = msg_delete_element(module,
900                                                                  msg2,
901                                                                  msg->elements[i].name,
902                                                                  &msg->elements[i].values[j]);
903                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
904                                             control_permissive) {
905                                                 ret = LDB_SUCCESS;
906                                         } else {
907                                                 ldb_asprintf_errstring(ldb,
908                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
909                                                                        msg->elements[i].name, dn);
910                                         }
911                                         if (ret != LDB_SUCCESS) {
912                                                 goto done;
913                                         }
914                                 }
915                         }
916                         break;
917                 default:
918                         ldb_asprintf_errstring(ldb,
919                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
920                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
921                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
922                         ret = LDB_ERR_PROTOCOL_ERROR;
923                         goto done;
924                 }
925         }
926
927         ret = ltdb_store(module, msg2, TDB_MODIFY);
928         if (ret != LDB_SUCCESS) {
929                 goto done;
930         }
931
932         ret = ltdb_modified(module, msg2->dn);
933         if (ret != LDB_SUCCESS) {
934                 goto done;
935         }
936
937 done:
938         talloc_free(tdb_key.dptr);
939         return ret;
940 }
941
942 /*
943   modify a record
944 */
945 static int ltdb_modify(struct ltdb_context *ctx)
946 {
947         struct ldb_module *module = ctx->module;
948         struct ldb_request *req = ctx->req;
949         int ret = LDB_SUCCESS;
950
951         ret = ltdb_check_special_dn(module, req->op.mod.message);
952         if (ret != LDB_SUCCESS) {
953                 return ret;
954         }
955
956         ldb_request_set_state(req, LDB_ASYNC_PENDING);
957
958         if (ltdb_cache_load(module) != 0) {
959                 return LDB_ERR_OPERATIONS_ERROR;
960         }
961
962         ret = ltdb_modify_internal(module, req->op.mod.message, req);
963
964         return ret;
965 }
966
967 /*
968   rename a record
969 */
970 static int ltdb_rename(struct ltdb_context *ctx)
971 {
972         struct ldb_module *module = ctx->module;
973         struct ldb_request *req = ctx->req;
974         struct ldb_message *msg;
975         int ret = LDB_SUCCESS;
976
977         ldb_request_set_state(req, LDB_ASYNC_PENDING);
978
979         if (ltdb_cache_load(ctx->module) != 0) {
980                 return LDB_ERR_OPERATIONS_ERROR;
981         }
982
983         msg = ldb_msg_new(ctx);
984         if (msg == NULL) {
985                 return LDB_ERR_OPERATIONS_ERROR;
986         }
987
988         /* in case any attribute of the message was indexed, we need
989            to fetch the old record */
990         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
991         if (ret != LDB_SUCCESS) {
992                 /* not finding the old record is an error */
993                 return ret;
994         }
995
996         /* Always delete first then add, to avoid conflicts with
997          * unique indexes. We rely on the transaction to make this
998          * atomic
999          */
1000         ret = ltdb_delete_internal(module, msg->dn);
1001         if (ret != LDB_SUCCESS) {
1002                 return ret;
1003         }
1004
1005         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1006         if (msg->dn == NULL) {
1007                 return LDB_ERR_OPERATIONS_ERROR;
1008         }
1009
1010         /* We don't check single value as we can have more than 1 with
1011          * deleted attributes. We could go through all elements but that's
1012          * maybe not the most efficient way
1013          */
1014         ret = ltdb_add_internal(module, msg, false);
1015
1016         return ret;
1017 }
1018
1019 static int ltdb_start_trans(struct ldb_module *module)
1020 {
1021         void *data = ldb_module_get_private(module);
1022         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1023
1024         if (tdb_transaction_start(ltdb->tdb) != 0) {
1025                 return ltdb_err_map(tdb_error(ltdb->tdb));
1026         }
1027
1028         ltdb->in_transaction++;
1029
1030         ltdb_index_transaction_start(module);
1031
1032         return LDB_SUCCESS;
1033 }
1034
1035 static int ltdb_prepare_commit(struct ldb_module *module)
1036 {
1037         void *data = ldb_module_get_private(module);
1038         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1039
1040         if (ltdb->in_transaction != 1) {
1041                 return LDB_SUCCESS;
1042         }
1043
1044         if (ltdb_index_transaction_commit(module) != 0) {
1045                 tdb_transaction_cancel(ltdb->tdb);
1046                 ltdb->in_transaction--;
1047                 return ltdb_err_map(tdb_error(ltdb->tdb));
1048         }
1049
1050         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1051                 ltdb->in_transaction--;
1052                 return ltdb_err_map(tdb_error(ltdb->tdb));
1053         }
1054
1055         ltdb->prepared_commit = true;
1056
1057         return LDB_SUCCESS;
1058 }
1059
1060 static int ltdb_end_trans(struct ldb_module *module)
1061 {
1062         void *data = ldb_module_get_private(module);
1063         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1064
1065         if (!ltdb->prepared_commit) {
1066                 int ret = ltdb_prepare_commit(module);
1067                 if (ret != LDB_SUCCESS) {
1068                         return ret;
1069                 }
1070         }
1071
1072         ltdb->in_transaction--;
1073         ltdb->prepared_commit = false;
1074
1075         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1076                 return ltdb_err_map(tdb_error(ltdb->tdb));
1077         }
1078
1079         return LDB_SUCCESS;
1080 }
1081
1082 static int ltdb_del_trans(struct ldb_module *module)
1083 {
1084         void *data = ldb_module_get_private(module);
1085         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1086
1087         ltdb->in_transaction--;
1088
1089         if (ltdb_index_transaction_cancel(module) != 0) {
1090                 tdb_transaction_cancel(ltdb->tdb);
1091                 return ltdb_err_map(tdb_error(ltdb->tdb));
1092         }
1093
1094         tdb_transaction_cancel(ltdb->tdb);
1095         return LDB_SUCCESS;
1096 }
1097
1098 /*
1099   return sequenceNumber from @BASEINFO
1100 */
1101 static int ltdb_sequence_number(struct ltdb_context *ctx,
1102                                 struct ldb_extended **ext)
1103 {
1104         struct ldb_context *ldb;
1105         struct ldb_module *module = ctx->module;
1106         struct ldb_request *req = ctx->req;
1107         TALLOC_CTX *tmp_ctx = NULL;
1108         struct ldb_seqnum_request *seq;
1109         struct ldb_seqnum_result *res;
1110         struct ldb_message *msg = NULL;
1111         struct ldb_dn *dn;
1112         const char *date;
1113         int ret = LDB_SUCCESS;
1114
1115         ldb = ldb_module_get_ctx(module);
1116
1117         seq = talloc_get_type(req->op.extended.data,
1118                                 struct ldb_seqnum_request);
1119         if (seq == NULL) {
1120                 return LDB_ERR_OPERATIONS_ERROR;
1121         }
1122
1123         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1124
1125         if (ltdb_lock_read(module) != 0) {
1126                 return LDB_ERR_OPERATIONS_ERROR;
1127         }
1128
1129         res = talloc_zero(req, struct ldb_seqnum_result);
1130         if (res == NULL) {
1131                 ret = LDB_ERR_OPERATIONS_ERROR;
1132                 goto done;
1133         }
1134
1135         tmp_ctx = talloc_new(req);
1136         if (tmp_ctx == NULL) {
1137                 ret = LDB_ERR_OPERATIONS_ERROR;
1138                 goto done;
1139         }
1140
1141         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1142         if (dn == NULL) {
1143                 ret = LDB_ERR_OPERATIONS_ERROR;
1144                 goto done;
1145         }
1146
1147         msg = ldb_msg_new(tmp_ctx);
1148         if (msg == NULL) {
1149                 ret = LDB_ERR_OPERATIONS_ERROR;
1150                 goto done;
1151         }
1152
1153         ret = ltdb_search_dn1(module, dn, msg);
1154         if (ret != LDB_SUCCESS) {
1155                 goto done;
1156         }
1157
1158         switch (seq->type) {
1159         case LDB_SEQ_HIGHEST_SEQ:
1160                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1161                 break;
1162         case LDB_SEQ_NEXT:
1163                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1164                 res->seq_num++;
1165                 break;
1166         case LDB_SEQ_HIGHEST_TIMESTAMP:
1167                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1168                 if (date) {
1169                         res->seq_num = ldb_string_to_time(date);
1170                 } else {
1171                         res->seq_num = 0;
1172                         /* zero is as good as anything when we don't know */
1173                 }
1174                 break;
1175         }
1176
1177         *ext = talloc_zero(req, struct ldb_extended);
1178         if (*ext == NULL) {
1179                 ret = LDB_ERR_OPERATIONS_ERROR;
1180                 goto done;
1181         }
1182         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1183         (*ext)->data = talloc_steal(*ext, res);
1184
1185 done:
1186         talloc_free(tmp_ctx);
1187         ltdb_unlock_read(module);
1188         return ret;
1189 }
1190
1191 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1192 {
1193         struct ldb_context *ldb;
1194         struct ldb_request *req;
1195         struct ldb_reply *ares;
1196
1197         ldb = ldb_module_get_ctx(ctx->module);
1198         req = ctx->req;
1199
1200         /* if we already returned an error just return */
1201         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1202                 return;
1203         }
1204
1205         ares = talloc_zero(req, struct ldb_reply);
1206         if (!ares) {
1207                 ldb_oom(ldb);
1208                 req->callback(req, NULL);
1209                 return;
1210         }
1211         ares->type = LDB_REPLY_DONE;
1212         ares->error = error;
1213
1214         req->callback(req, ares);
1215 }
1216
1217 static void ltdb_timeout(struct tevent_context *ev,
1218                           struct tevent_timer *te,
1219                           struct timeval t,
1220                           void *private_data)
1221 {
1222         struct ltdb_context *ctx;
1223         ctx = talloc_get_type(private_data, struct ltdb_context);
1224
1225         if (!ctx->request_terminated) {
1226                 /* request is done now */
1227                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1228         }
1229
1230         if (ctx->spy) {
1231                 /* neutralize the spy */
1232                 ctx->spy->ctx = NULL;
1233                 ctx->spy = NULL;
1234         }
1235         talloc_free(ctx);
1236 }
1237
1238 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1239                                         struct ldb_extended *ext,
1240                                         int error)
1241 {
1242         struct ldb_context *ldb;
1243         struct ldb_request *req;
1244         struct ldb_reply *ares;
1245
1246         ldb = ldb_module_get_ctx(ctx->module);
1247         req = ctx->req;
1248
1249         /* if we already returned an error just return */
1250         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1251                 return;
1252         }
1253
1254         ares = talloc_zero(req, struct ldb_reply);
1255         if (!ares) {
1256                 ldb_oom(ldb);
1257                 req->callback(req, NULL);
1258                 return;
1259         }
1260         ares->type = LDB_REPLY_DONE;
1261         ares->response = ext;
1262         ares->error = error;
1263
1264         req->callback(req, ares);
1265 }
1266
1267 static void ltdb_handle_extended(struct ltdb_context *ctx)
1268 {
1269         struct ldb_extended *ext = NULL;
1270         int ret;
1271
1272         if (strcmp(ctx->req->op.extended.oid,
1273                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1274                 /* get sequence number */
1275                 ret = ltdb_sequence_number(ctx, &ext);
1276         } else {
1277                 /* not recognized */
1278                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1279         }
1280
1281         ltdb_request_extended_done(ctx, ext, ret);
1282 }
1283
1284 static void ltdb_callback(struct tevent_context *ev,
1285                           struct tevent_timer *te,
1286                           struct timeval t,
1287                           void *private_data)
1288 {
1289         struct ltdb_context *ctx;
1290         int ret;
1291
1292         ctx = talloc_get_type(private_data, struct ltdb_context);
1293
1294         if (ctx->request_terminated) {
1295                 goto done;
1296         }
1297
1298         switch (ctx->req->operation) {
1299         case LDB_SEARCH:
1300                 ret = ltdb_search(ctx);
1301                 break;
1302         case LDB_ADD:
1303                 ret = ltdb_add(ctx);
1304                 break;
1305         case LDB_MODIFY:
1306                 ret = ltdb_modify(ctx);
1307                 break;
1308         case LDB_DELETE:
1309                 ret = ltdb_delete(ctx);
1310                 break;
1311         case LDB_RENAME:
1312                 ret = ltdb_rename(ctx);
1313                 break;
1314         case LDB_EXTENDED:
1315                 ltdb_handle_extended(ctx);
1316                 goto done;
1317         default:
1318                 /* no other op supported */
1319                 ret = LDB_ERR_PROTOCOL_ERROR;
1320         }
1321
1322         if (!ctx->request_terminated) {
1323                 /* request is done now */
1324                 ltdb_request_done(ctx, ret);
1325         }
1326
1327 done:
1328         if (ctx->spy) {
1329                 /* neutralize the spy */
1330                 ctx->spy->ctx = NULL;
1331                 ctx->spy = NULL;
1332         }
1333         talloc_free(ctx);
1334 }
1335
1336 static int ltdb_request_destructor(void *ptr)
1337 {
1338         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1339
1340         if (spy->ctx != NULL) {
1341                 spy->ctx->spy = NULL;
1342                 spy->ctx->request_terminated = true;
1343                 spy->ctx = NULL;
1344         }
1345
1346         return 0;
1347 }
1348
1349 static int ltdb_handle_request(struct ldb_module *module,
1350                                struct ldb_request *req)
1351 {
1352         struct ldb_control *control_permissive;
1353         struct ldb_context *ldb;
1354         struct tevent_context *ev;
1355         struct ltdb_context *ac;
1356         struct tevent_timer *te;
1357         struct timeval tv;
1358         unsigned int i;
1359
1360         ldb = ldb_module_get_ctx(module);
1361
1362         control_permissive = ldb_request_get_control(req,
1363                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1364
1365         for (i = 0; req->controls && req->controls[i]; i++) {
1366                 if (req->controls[i]->critical &&
1367                     req->controls[i] != control_permissive) {
1368                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1369                                                req->controls[i]->oid);
1370                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1371                 }
1372         }
1373
1374         if (req->starttime == 0 || req->timeout == 0) {
1375                 ldb_set_errstring(ldb, "Invalid timeout settings");
1376                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1377         }
1378
1379         ev = ldb_get_event_context(ldb);
1380
1381         ac = talloc_zero(ldb, struct ltdb_context);
1382         if (ac == NULL) {
1383                 ldb_oom(ldb);
1384                 return LDB_ERR_OPERATIONS_ERROR;
1385         }
1386
1387         ac->module = module;
1388         ac->req = req;
1389
1390         tv.tv_sec = 0;
1391         tv.tv_usec = 0;
1392         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1393         if (NULL == te) {
1394                 talloc_free(ac);
1395                 return LDB_ERR_OPERATIONS_ERROR;
1396         }
1397
1398         tv.tv_sec = req->starttime + req->timeout;
1399         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1400         if (NULL == ac->timeout_event) {
1401                 talloc_free(ac);
1402                 return LDB_ERR_OPERATIONS_ERROR;
1403         }
1404
1405         /* set a spy so that we do not try to use the request context
1406          * if it is freed before ltdb_callback fires */
1407         ac->spy = talloc(req, struct ltdb_req_spy);
1408         if (NULL == ac->spy) {
1409                 talloc_free(ac);
1410                 return LDB_ERR_OPERATIONS_ERROR;
1411         }
1412         ac->spy->ctx = ac;
1413
1414         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1415
1416         return LDB_SUCCESS;
1417 }
1418
1419 static int ltdb_init_rootdse(struct ldb_module *module)
1420 {
1421         struct ldb_context *ldb;
1422         int ret;
1423
1424         ldb = ldb_module_get_ctx(module);
1425
1426         ret = ldb_mod_register_control(module,
1427                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1428         /* ignore errors on this - we expect it for non-sam databases */
1429
1430         /* there can be no module beyond the backend, just return */
1431         return LDB_SUCCESS;
1432 }
1433
1434 static const struct ldb_module_ops ltdb_ops = {
1435         .name              = "tdb",
1436         .init_context      = ltdb_init_rootdse,
1437         .search            = ltdb_handle_request,
1438         .add               = ltdb_handle_request,
1439         .modify            = ltdb_handle_request,
1440         .del               = ltdb_handle_request,
1441         .rename            = ltdb_handle_request,
1442         .extended          = ltdb_handle_request,
1443         .start_transaction = ltdb_start_trans,
1444         .end_transaction   = ltdb_end_trans,
1445         .prepare_commit    = ltdb_prepare_commit,
1446         .del_transaction   = ltdb_del_trans,
1447 };
1448
1449 /*
1450   connect to the database
1451 */
1452 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1453                         unsigned int flags, const char *options[],
1454                         struct ldb_module **_module)
1455 {
1456         struct ldb_module *module;
1457         const char *path;
1458         int tdb_flags, open_flags;
1459         struct ltdb_private *ltdb;
1460
1461         /* parse the url */
1462         if (strchr(url, ':')) {
1463                 if (strncmp(url, "tdb://", 6) != 0) {
1464                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1465                                   "Invalid tdb URL '%s'", url);
1466                         return LDB_ERR_OPERATIONS_ERROR;
1467                 }
1468                 path = url+6;
1469         } else {
1470                 path = url;
1471         }
1472
1473         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1474
1475         /* check for the 'nosync' option */
1476         if (flags & LDB_FLG_NOSYNC) {
1477                 tdb_flags |= TDB_NOSYNC;
1478         }
1479
1480         /* and nommap option */
1481         if (flags & LDB_FLG_NOMMAP) {
1482                 tdb_flags |= TDB_NOMMAP;
1483         }
1484
1485         if (flags & LDB_FLG_RDONLY) {
1486                 open_flags = O_RDONLY;
1487         } else {
1488                 open_flags = O_CREAT | O_RDWR;
1489         }
1490
1491         ltdb = talloc_zero(ldb, struct ltdb_private);
1492         if (!ltdb) {
1493                 ldb_oom(ldb);
1494                 return LDB_ERR_OPERATIONS_ERROR;
1495         }
1496
1497         /* note that we use quite a large default hash size */
1498         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1499                                    tdb_flags, open_flags,
1500                                    ldb_get_create_perms(ldb), ldb);
1501         if (!ltdb->tdb) {
1502                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1503                           "Unable to open tdb '%s'", path);
1504                 talloc_free(ltdb);
1505                 return LDB_ERR_OPERATIONS_ERROR;
1506         }
1507
1508         if (getenv("LDB_WARN_UNINDEXED")) {
1509                 ltdb->warn_unindexed = true;
1510         }
1511
1512         ltdb->sequence_number = 0;
1513
1514         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1515         if (!module) {
1516                 talloc_free(ltdb);
1517                 return LDB_ERR_OPERATIONS_ERROR;
1518         }
1519         ldb_module_set_private(module, ltdb);
1520         talloc_steal(module, ltdb);
1521
1522         if (ltdb_cache_load(module) != 0) {
1523                 talloc_free(module);
1524                 return LDB_ERR_OPERATIONS_ERROR;
1525         }
1526
1527         *_module = module;
1528         return LDB_SUCCESS;
1529 }
1530
1531 int ldb_tdb_init(const char *version)
1532 {
1533         LDB_MODULE_CHECK_VERSION(version);
1534         return ldb_register_backend("tdb", ltdb_connect, false);
1535 }