s4:ldb Fix check made conditional by mistake
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         int ret = 0;
92
93         if (ltdb->in_transaction == 0 &&
94             ltdb->read_lock_count == 0) {
95                 ret = tdb_lockall_read(ltdb->tdb);
96         }
97         if (ret == 0) {
98                 ltdb->read_lock_count++;
99         }
100         return ret;
101 }
102
103 /*
104   unlock the database after a ltdb_lock_read()
105 */
106 int ltdb_unlock_read(struct ldb_module *module)
107 {
108         void *data = ldb_module_get_private(module);
109         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
110         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
111                 return tdb_unlockall_read(ltdb->tdb);
112         }
113         ltdb->read_lock_count--;
114         return 0;
115 }
116
117
118 /*
119   form a TDB_DATA for a record key
120   caller frees
121
122   note that the key for a record can depend on whether the
123   dn refers to a case sensitive index record or not
124 */
125 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
126 {
127         struct ldb_context *ldb = ldb_module_get_ctx(module);
128         TDB_DATA key;
129         char *key_str = NULL;
130         const char *dn_folded = NULL;
131
132         /*
133           most DNs are case insensitive. The exception is index DNs for
134           case sensitive attributes
135
136           there are 3 cases dealt with in this code:
137
138           1) if the dn doesn't start with @ then uppercase the attribute
139              names and the attributes values of case insensitive attributes
140           2) if the dn starts with @ then leave it alone -
141              the indexing code handles the rest
142         */
143
144         dn_folded = ldb_dn_get_casefold(dn);
145         if (!dn_folded) {
146                 goto failed;
147         }
148
149         key_str = talloc_strdup(ldb, "DN=");
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
155         if (!key_str) {
156                 goto failed;
157         }
158
159         key.dptr = (uint8_t *)key_str;
160         key.dsize = strlen(key_str) + 1;
161
162         return key;
163
164 failed:
165         errno = ENOMEM;
166         key.dptr = NULL;
167         key.dsize = 0;
168         return key;
169 }
170
171 /*
172   check special dn's have valid attributes
173   currently only @ATTRIBUTES is checked
174 */
175 static int ltdb_check_special_dn(struct ldb_module *module,
176                           const struct ldb_message *msg)
177 {
178         struct ldb_context *ldb = ldb_module_get_ctx(module);
179         int i, j;
180
181         if (! ldb_dn_is_special(msg->dn) ||
182             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
183                 return LDB_SUCCESS;
184         }
185
186         /* we have @ATTRIBUTES, let's check attributes are fine */
187         /* should we check that we deny multivalued attributes ? */
188         for (i = 0; i < msg->num_elements; i++) {
189                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
190
191                 for (j = 0; j < msg->elements[i].num_values; j++) {
192                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
193                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
194                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
195                         }
196                 }
197         }
198
199         return LDB_SUCCESS;
200 }
201
202
203 /*
204   we've made a modification to a dn - possibly reindex and
205   update sequence number
206 */
207 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
208 {
209         int ret = LDB_SUCCESS;
210         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
211
212         /* only allow modifies inside a transaction, otherwise the
213          * ldb is unsafe */
214         if (ltdb->in_transaction == 0) {
215                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
216                 return LDB_ERR_OPERATIONS_ERROR;
217         }
218
219         if (ldb_dn_is_special(dn) &&
220             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
221              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
222                 ret = ltdb_reindex(module);
223         }
224
225         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
226         if (ret == LDB_SUCCESS &&
227             !(ldb_dn_is_special(dn) &&
228               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
229                 ret = ltdb_increase_sequence_number(module);
230         }
231
232         /* If the modify was to @OPTIONS, reload the cache */
233         if (ret == LDB_SUCCESS &&
234             ldb_dn_is_special(dn) &&
235             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
236                 ret = ltdb_cache_reload(module);
237         }
238
239         return ret;
240 }
241
242 /*
243   store a record into the db
244 */
245 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
246 {
247         void *data = ldb_module_get_private(module);
248         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
249         TDB_DATA tdb_key, tdb_data;
250         int ret = LDB_SUCCESS;
251
252         tdb_key = ltdb_key(module, msg->dn);
253         if (tdb_key.dptr == NULL) {
254                 return LDB_ERR_OTHER;
255         }
256
257         ret = ltdb_pack_data(module, msg, &tdb_data);
258         if (ret == -1) {
259                 talloc_free(tdb_key.dptr);
260                 return LDB_ERR_OTHER;
261         }
262
263         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
264         if (ret == -1) {
265                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
266                 goto done;
267         }
268
269 done:
270         talloc_free(tdb_key.dptr);
271         talloc_free(tdb_data.dptr);
272
273         return ret;
274 }
275
276
277 static int ltdb_add_internal(struct ldb_module *module,
278                              const struct ldb_message *msg)
279 {
280         struct ldb_context *ldb = ldb_module_get_ctx(module);
281         int ret = LDB_SUCCESS, i;
282
283         ret = ltdb_check_special_dn(module, msg);
284         if (ret != LDB_SUCCESS) {
285                 return ret;
286         }
287
288         if (ltdb_cache_load(module) != 0) {
289                 return LDB_ERR_OPERATIONS_ERROR;
290         }
291
292         for (i=0;i<msg->num_elements;i++) {
293                 struct ldb_message_element *el = &msg->elements[i];
294                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
295
296                 if (el->num_values == 0) {
297                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
298                                                el->name, ldb_dn_get_linearized(msg->dn));
299                         return LDB_ERR_CONSTRAINT_VIOLATION;
300                 }
301                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
302                         if (el->num_values > 1) {
303                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
304                                                        el->name, ldb_dn_get_linearized(msg->dn));
305                                 return LDB_ERR_CONSTRAINT_VIOLATION;
306                         }
307                 }
308         }
309
310         ret = ltdb_store(module, msg, TDB_INSERT);
311         if (ret != LDB_SUCCESS) {
312                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
313                         ldb_asprintf_errstring(ldb,
314                                                "Entry %s already exists",
315                                                ldb_dn_get_linearized(msg->dn));
316                 }
317                 return ret;
318         }
319
320         ret = ltdb_index_add_new(module, msg);
321         if (ret != LDB_SUCCESS) {
322                 return ret;
323         }
324
325         ret = ltdb_modified(module, msg->dn);
326
327         return ret;
328 }
329
330 /*
331   add a record to the database
332 */
333 static int ltdb_add(struct ltdb_context *ctx)
334 {
335         struct ldb_module *module = ctx->module;
336         struct ldb_request *req = ctx->req;
337         int ret = LDB_SUCCESS;
338
339         ldb_request_set_state(req, LDB_ASYNC_PENDING);
340
341         if (ltdb_cache_load(module) != 0) {
342                 return LDB_ERR_OPERATIONS_ERROR;
343         }
344
345         ret = ltdb_add_internal(module, req->op.add.message);
346
347         return ret;
348 }
349
350 /*
351   delete a record from the database, not updating indexes (used for deleting
352   index records)
353 */
354 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
355 {
356         void *data = ldb_module_get_private(module);
357         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
358         TDB_DATA tdb_key;
359         int ret;
360
361         tdb_key = ltdb_key(module, dn);
362         if (!tdb_key.dptr) {
363                 return LDB_ERR_OTHER;
364         }
365
366         ret = tdb_delete(ltdb->tdb, tdb_key);
367         talloc_free(tdb_key.dptr);
368
369         if (ret != 0) {
370                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
371         }
372
373         return ret;
374 }
375
376 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
377 {
378         struct ldb_message *msg;
379         int ret = LDB_SUCCESS;
380
381         msg = talloc(module, struct ldb_message);
382         if (msg == NULL) {
383                 return LDB_ERR_OPERATIONS_ERROR;
384         }
385
386         /* in case any attribute of the message was indexed, we need
387            to fetch the old record */
388         ret = ltdb_search_dn1(module, dn, msg);
389         if (ret != LDB_SUCCESS) {
390                 /* not finding the old record is an error */
391                 goto done;
392         }
393
394         ret = ltdb_delete_noindex(module, dn);
395         if (ret != LDB_SUCCESS) {
396                 goto done;
397         }
398
399         /* remove any indexed attributes */
400         ret = ltdb_index_delete(module, msg);
401         if (ret != LDB_SUCCESS) {
402                 goto done;
403         }
404
405         ret = ltdb_modified(module, dn);
406         if (ret != LDB_SUCCESS) {
407                 goto done;
408         }
409
410 done:
411         talloc_free(msg);
412         return ret;
413 }
414
415 /*
416   delete a record from the database
417 */
418 static int ltdb_delete(struct ltdb_context *ctx)
419 {
420         struct ldb_module *module = ctx->module;
421         struct ldb_request *req = ctx->req;
422         int ret = LDB_SUCCESS;
423
424         ldb_request_set_state(req, LDB_ASYNC_PENDING);
425
426         if (ltdb_cache_load(module) != 0) {
427                 return LDB_ERR_OPERATIONS_ERROR;
428         }
429
430         ret = ltdb_delete_internal(module, req->op.del.dn);
431
432         return ret;
433 }
434
435 /*
436   find an element by attribute name. At the moment this does a linear search,
437   it should be re-coded to use a binary search once all places that modify
438   records guarantee sorted order
439
440   return the index of the first matching element if found, otherwise -1
441 */
442 static int find_element(const struct ldb_message *msg, const char *name)
443 {
444         unsigned int i;
445         for (i=0;i<msg->num_elements;i++) {
446                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
447                         return i;
448                 }
449         }
450         return -1;
451 }
452
453
454 /*
455   add an element to an existing record. Assumes a elements array that we
456   can call re-alloc on, and assumed that we can re-use the data pointers from
457   the passed in additional values. Use with care!
458
459   returns 0 on success, -1 on failure (and sets errno)
460 */
461 static int ltdb_msg_add_element(struct ldb_context *ldb,
462                                 struct ldb_message *msg,
463                                 struct ldb_message_element *el)
464 {
465         struct ldb_message_element *e2;
466         unsigned int i;
467
468         if (el->num_values == 0) {
469                 /* nothing to do here - we don't add empty elements */
470                 return 0;
471         }
472
473         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
474                               msg->num_elements+1);
475         if (!e2) {
476                 errno = ENOMEM;
477                 return -1;
478         }
479
480         msg->elements = e2;
481
482         e2 = &msg->elements[msg->num_elements];
483
484         e2->name = el->name;
485         e2->flags = el->flags;
486         e2->values = talloc_array(msg->elements,
487                                   struct ldb_val, el->num_values);
488         if (!e2->values) {
489                 errno = ENOMEM;
490                 return -1;
491         }
492         for (i=0;i<el->num_values;i++) {
493                 e2->values[i] = el->values[i];
494         }
495         e2->num_values = el->num_values;
496
497         ++msg->num_elements;
498
499         return 0;
500 }
501
502 /*
503   delete all elements having a specified attribute name
504 */
505 static int msg_delete_attribute(struct ldb_module *module,
506                                 struct ldb_context *ldb,
507                                 struct ldb_message *msg, const char *name)
508 {
509         unsigned int i;
510         int ret;
511         struct ldb_message_element *el;
512
513         el = ldb_msg_find_element(msg, name);
514         if (el == NULL) {
515                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
516         }
517         i = el - msg->elements;
518
519         ret = ltdb_index_del_element(module, msg->dn, el);
520         if (ret != LDB_SUCCESS) {
521                 return ret;
522         }
523
524         talloc_free(el->values);
525         if (msg->num_elements > (i+1)) {
526                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
527         }
528         msg->num_elements--;
529         msg->elements = talloc_realloc(msg, msg->elements,
530                                        struct ldb_message_element,
531                                        msg->num_elements);
532         return LDB_SUCCESS;
533 }
534
535 /*
536   delete all elements matching an attribute name/value
537
538   return LDB Error on failure
539 */
540 static int msg_delete_element(struct ldb_module *module,
541                               struct ldb_message *msg,
542                               const char *name,
543                               const struct ldb_val *val)
544 {
545         struct ldb_context *ldb = ldb_module_get_ctx(module);
546         unsigned int i;
547         int found, ret;
548         struct ldb_message_element *el;
549         const struct ldb_schema_attribute *a;
550
551         found = find_element(msg, name);
552         if (found == -1) {
553                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
554         }
555
556         el = &msg->elements[found];
557
558         a = ldb_schema_attribute_by_name(ldb, el->name);
559
560         for (i=0;i<el->num_values;i++) {
561                 if (a->syntax->comparison_fn(ldb, ldb,
562                                              &el->values[i], val) == 0) {
563                         if (el->num_values == 1) {
564                                 return msg_delete_attribute(module, ldb, msg, name);
565                         }
566
567                         ret = ltdb_index_del_value(module, msg->dn, el, i);
568                         if (ret != LDB_SUCCESS) {
569                                 return ret;
570                         }
571
572                         if (i<el->num_values-1) {
573                                 memmove(&el->values[i], &el->values[i+1],
574                                         sizeof(el->values[i])*
575                                                 (el->num_values-(i+1)));
576                         }
577                         el->num_values--;
578
579                         /* per definition we find in a canonicalised message an
580                            attribute value only once. So we are finished here */
581                         return LDB_SUCCESS;
582                 }
583         }
584
585         /* Not found */
586         return LDB_ERR_NO_SUCH_ATTRIBUTE;
587 }
588
589
590 /*
591   modify a record - internal interface
592
593   yuck - this is O(n^2). Luckily n is usually small so we probably
594   get away with it, but if we ever have really large attribute lists
595   then we'll need to look at this again
596
597   'req' is optional, and is used to specify controls if supplied
598 */
599 int ltdb_modify_internal(struct ldb_module *module,
600                          const struct ldb_message *msg,
601                          struct ldb_request *req)
602 {
603         struct ldb_context *ldb = ldb_module_get_ctx(module);
604         void *data = ldb_module_get_private(module);
605         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
606         TDB_DATA tdb_key, tdb_data;
607         struct ldb_message *msg2;
608         unsigned i, j, k;
609         int ret = LDB_SUCCESS, idx;
610         struct ldb_control *control_permissive = NULL;
611
612         if (req) {
613                 control_permissive = ldb_request_get_control(req,
614                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
615         }
616
617         tdb_key = ltdb_key(module, msg->dn);
618         if (!tdb_key.dptr) {
619                 return LDB_ERR_OTHER;
620         }
621
622         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
623         if (!tdb_data.dptr) {
624                 talloc_free(tdb_key.dptr);
625                 return ltdb_err_map(tdb_error(ltdb->tdb));
626         }
627
628         msg2 = talloc(tdb_key.dptr, struct ldb_message);
629         if (msg2 == NULL) {
630                 free(tdb_data.dptr);
631                 ret = LDB_ERR_OTHER;
632                 goto done;
633         }
634
635         ret = ltdb_unpack_data(module, &tdb_data, msg2);
636         free(tdb_data.dptr);
637         if (ret == -1) {
638                 ret = LDB_ERR_OTHER;
639                 goto done;
640         }
641
642         if (!msg2->dn) {
643                 msg2->dn = msg->dn;
644         }
645
646         for (i=0; i<msg->num_elements; i++) {
647                 struct ldb_message_element *el = &msg->elements[i], *el2;
648                 struct ldb_val *vals;
649                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
650                 const char *dn;
651
652                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
653                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
654                                                ldb_dn_get_linearized(msg2->dn));
655                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
656                         goto done;
657                 }
658
659                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
660                 case LDB_FLAG_MOD_ADD:
661
662                         if (el->num_values == 0) {
663                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
664                                                        el->name, ldb_dn_get_linearized(msg2->dn));
665                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
666                                 goto done;
667                         }
668
669                         /* make a copy of the array so that a permissive
670                          * control can remove duplicates without changing the
671                          * original values, but do not copy data as we do not
672                          * need to keep it around once the operation is
673                          * finished */
674                         if (control_permissive) {
675                                 el = talloc(msg2, struct ldb_message_element);
676                                 if (!el) {
677                                         ret = LDB_ERR_OTHER;
678                                         goto done;
679                                 }
680                                 el->name = msg->elements[i].name;
681                                 el->num_values = msg->elements[i].num_values;
682                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
683                                 if (el->values == NULL) {
684                                         ret = LDB_ERR_OTHER;
685                                         goto done;
686                                 }
687                                 for (j = 0; j < el->num_values; j++) {
688                                         el->values[j] = msg->elements[i].values[j];
689                                 }
690                         }
691
692                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
693                                 if (el->num_values > 1) {
694                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
695                                                                el->name, ldb_dn_get_linearized(msg2->dn));
696                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
697                                         goto done;
698                                 }
699                         }
700
701                         /* Checks if element already exists */
702                         idx = find_element(msg2, el->name);
703                         if (idx == -1) {
704                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
705                                         ret = LDB_ERR_OTHER;
706                                         goto done;
707                                 }
708                                 ret = ltdb_index_add_element(module, msg2->dn, el);
709                                 if (ret != LDB_SUCCESS) {
710                                         goto done;
711                                 }
712                         } else {
713                                 /* We cannot add another value on a existing one
714                                    if the attribute is single-valued */
715                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
716                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
717                                                                el->name, ldb_dn_get_linearized(msg2->dn));
718                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
719                                         goto done;
720                                 }
721
722                                 el2 = &(msg2->elements[idx]);
723
724                                 /* Check that values don't exist yet on multi-
725                                    valued attributes or aren't provided twice */
726                                 for (j = 0; j < el->num_values; j++) {
727                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
728                                                 if (control_permissive) {
729                                                         /* remove this one as if it was never added */
730                                                         el->num_values--;
731                                                         for (k = j; k < el->num_values; k++) {
732                                                                 el->values[k] = el->values[k + 1];
733                                                         }
734                                                         j--; /* rewind */
735
736                                                         continue;
737                                                 }
738
739                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
740                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
741                                                 goto done;
742                                         }
743                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
744                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
745                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
746                                                 goto done;
747                                         }
748                                 }
749
750                                 /* Now combine existing and new values to a new
751                                    attribute record */
752                                 vals = talloc_realloc(msg2->elements,
753                                                       el2->values, struct ldb_val,
754                                                       el2->num_values + el->num_values);
755                                 if (vals == NULL) {
756                                         ldb_oom(ldb);
757                                         ret = LDB_ERR_OTHER;
758                                         goto done;
759                                 }
760
761                                 for (j=0; j<el->num_values; j++) {
762                                         vals[el2->num_values + j] =
763                                                 ldb_val_dup(vals, &el->values[j]);
764                                 }
765
766                                 el2->values = vals;
767                                 el2->num_values += el->num_values;
768
769                                 ret = ltdb_index_add_element(module, msg2->dn, el);
770                                 if (ret != LDB_SUCCESS) {
771                                         goto done;
772                                 }
773                         }
774
775                         break;
776
777                 case LDB_FLAG_MOD_REPLACE:
778
779                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
780                                 /* the RELAX control overrides this
781                                    check for replace. This is needed as
782                                    DRS replication can produce multiple
783                                    values here for a single valued
784                                    attribute when the values are deleted
785                                    links
786                                 */
787                                 if (el->num_values > 1 &&
788                                     (!req || !ldb_request_get_control(req, LDB_CONTROL_RELAX_OID))) {
789                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
790                                                                el->name, ldb_dn_get_linearized(msg2->dn));
791                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
792                                         goto done;
793                                 }
794                         }
795
796                         /* TODO: This is O(n^2) - replace with more efficient check */
797                         for (j=0; j<el->num_values; j++) {
798                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
799                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
800                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
801                                         goto done;
802                                 }
803                         }
804
805                         idx = find_element(msg2, el->name);
806                         if (idx != -1) {
807                                 el2 = &(msg2->elements[idx]);
808                                 if (ldb_msg_element_compare(el, el2) == 0) {
809                                         /* we are replacing with the same values */
810                                         continue;
811                                 }
812                         
813                                 /* Delete the attribute if it exists in the DB */
814                                 if (msg_delete_attribute(module, ldb, msg2, el->name) != 0) {
815                                         ret = LDB_ERR_OTHER;
816                                         goto done;
817                                 }
818                         }
819
820                         /* Recreate it with the new values */
821                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
822                                 ret = LDB_ERR_OTHER;
823                                 goto done;
824                         }
825
826                         ret = ltdb_index_add_element(module, msg2->dn, el);
827                         if (ret != LDB_SUCCESS) {
828                                 goto done;
829                         }
830
831                         break;
832
833                 case LDB_FLAG_MOD_DELETE:
834                         dn = ldb_dn_get_linearized(msg2->dn);
835                         if (dn == NULL) {
836                                 ret = LDB_ERR_OTHER;
837                                 goto done;
838                         }
839
840                         if (msg->elements[i].num_values == 0) {
841                                 /* Delete the whole attribute */
842                                 ret = msg_delete_attribute(module, ldb, msg2,
843                                                            msg->elements[i].name);
844                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
845                                     control_permissive) {
846                                         ret = LDB_SUCCESS;
847                                 } else {
848                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
849                                                                msg->elements[i].name, dn);
850                                 }
851                                 if (ret != LDB_SUCCESS) {
852                                         goto done;
853                                 }
854                         } else {
855                                 /* Delete specified values from an attribute */
856                                 for (j=0; j < msg->elements[i].num_values; j++) {
857                                         ret = msg_delete_element(module,
858                                                                  msg2,
859                                                                  msg->elements[i].name,
860                                                                  &msg->elements[i].values[j]);
861                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
862                                             control_permissive) {
863                                                 ret = LDB_SUCCESS;
864                                         } else {
865                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
866                                                                        msg->elements[i].name, dn);
867                                         }
868                                         if (ret != LDB_SUCCESS) {
869                                                 goto done;
870                                         }
871                                 }
872                         }
873                         break;
874                 default:
875                         ldb_asprintf_errstring(ldb,
876                                 "Invalid ldb_modify flags on %s: 0x%x",
877                                 msg->elements[i].name,
878                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
879                         ret = LDB_ERR_PROTOCOL_ERROR;
880                         goto done;
881                 }
882         }
883
884         ret = ltdb_store(module, msg2, TDB_MODIFY);
885         if (ret != LDB_SUCCESS) {
886                 goto done;
887         }
888
889         ret = ltdb_modified(module, msg2->dn);
890         if (ret != LDB_SUCCESS) {
891                 goto done;
892         }
893
894 done:
895         talloc_free(tdb_key.dptr);
896         return ret;
897 }
898
899 /*
900   modify a record
901 */
902 static int ltdb_modify(struct ltdb_context *ctx)
903 {
904         struct ldb_module *module = ctx->module;
905         struct ldb_request *req = ctx->req;
906         int ret = LDB_SUCCESS;
907
908         ret = ltdb_check_special_dn(module, req->op.mod.message);
909         if (ret != LDB_SUCCESS) {
910                 return ret;
911         }
912
913         ldb_request_set_state(req, LDB_ASYNC_PENDING);
914
915         if (ltdb_cache_load(module) != 0) {
916                 return LDB_ERR_OPERATIONS_ERROR;
917         }
918
919         ret = ltdb_modify_internal(module, req->op.mod.message, req);
920
921         return ret;
922 }
923
924 /*
925   rename a record
926 */
927 static int ltdb_rename(struct ltdb_context *ctx)
928 {
929         struct ldb_module *module = ctx->module;
930         struct ldb_request *req = ctx->req;
931         struct ldb_message *msg;
932         int ret = LDB_SUCCESS;
933
934         ldb_request_set_state(req, LDB_ASYNC_PENDING);
935
936         if (ltdb_cache_load(ctx->module) != 0) {
937                 return LDB_ERR_OPERATIONS_ERROR;
938         }
939
940         msg = talloc(ctx, struct ldb_message);
941         if (msg == NULL) {
942                 return LDB_ERR_OPERATIONS_ERROR;
943         }
944
945         /* in case any attribute of the message was indexed, we need
946            to fetch the old record */
947         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
948         if (ret != LDB_SUCCESS) {
949                 /* not finding the old record is an error */
950                 return ret;
951         }
952
953         /* Always delete first then add, to avoid conflicts with
954          * unique indexes. We rely on the transaction to make this
955          * atomic
956          */
957         ret = ltdb_delete_internal(module, msg->dn);
958         if (ret != LDB_SUCCESS) {
959                 return ret;
960         }
961
962         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
963         if (msg->dn == NULL) {
964                 return LDB_ERR_OPERATIONS_ERROR;
965         }
966
967         ret = ltdb_add_internal(module, msg);
968
969         return ret;
970 }
971
972 static int ltdb_start_trans(struct ldb_module *module)
973 {
974         void *data = ldb_module_get_private(module);
975         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
976
977         if (tdb_transaction_start(ltdb->tdb) != 0) {
978                 return ltdb_err_map(tdb_error(ltdb->tdb));
979         }
980
981         ltdb->in_transaction++;
982
983         ltdb_index_transaction_start(module);
984
985         return LDB_SUCCESS;
986 }
987
988 static int ltdb_prepare_commit(struct ldb_module *module)
989 {
990         void *data = ldb_module_get_private(module);
991         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
992
993         if (ltdb->in_transaction != 1) {
994                 return LDB_SUCCESS;
995         }
996
997         if (ltdb_index_transaction_commit(module) != 0) {
998                 tdb_transaction_cancel(ltdb->tdb);
999                 ltdb->in_transaction--;
1000                 return ltdb_err_map(tdb_error(ltdb->tdb));
1001         }
1002
1003         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1004                 ltdb->in_transaction--;
1005                 return ltdb_err_map(tdb_error(ltdb->tdb));
1006         }
1007
1008         ltdb->prepared_commit = true;
1009
1010         return LDB_SUCCESS;
1011 }
1012
1013 static int ltdb_end_trans(struct ldb_module *module)
1014 {
1015         void *data = ldb_module_get_private(module);
1016         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1017
1018         if (!ltdb->prepared_commit) {
1019                 int ret = ltdb_prepare_commit(module);
1020                 if (ret != LDB_SUCCESS) {
1021                         return ret;
1022                 }
1023         }
1024
1025         ltdb->in_transaction--;
1026         ltdb->prepared_commit = false;
1027
1028         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1029                 return ltdb_err_map(tdb_error(ltdb->tdb));
1030         }
1031
1032         return LDB_SUCCESS;
1033 }
1034
1035 static int ltdb_del_trans(struct ldb_module *module)
1036 {
1037         void *data = ldb_module_get_private(module);
1038         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1039
1040         ltdb->in_transaction--;
1041
1042         if (ltdb_index_transaction_cancel(module) != 0) {
1043                 tdb_transaction_cancel(ltdb->tdb);
1044                 return ltdb_err_map(tdb_error(ltdb->tdb));
1045         }
1046
1047         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
1048                 return ltdb_err_map(tdb_error(ltdb->tdb));
1049         }
1050
1051         return LDB_SUCCESS;
1052 }
1053
1054 /*
1055   return sequenceNumber from @BASEINFO
1056 */
1057 static int ltdb_sequence_number(struct ltdb_context *ctx,
1058                                 struct ldb_extended **ext)
1059 {
1060         struct ldb_context *ldb;
1061         struct ldb_module *module = ctx->module;
1062         struct ldb_request *req = ctx->req;
1063         TALLOC_CTX *tmp_ctx;
1064         struct ldb_seqnum_request *seq;
1065         struct ldb_seqnum_result *res;
1066         struct ldb_message *msg = NULL;
1067         struct ldb_dn *dn;
1068         const char *date;
1069         int ret = LDB_SUCCESS;
1070
1071         ldb = ldb_module_get_ctx(module);
1072
1073         seq = talloc_get_type(req->op.extended.data,
1074                                 struct ldb_seqnum_request);
1075         if (seq == NULL) {
1076                 return LDB_ERR_OPERATIONS_ERROR;
1077         }
1078
1079         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1080
1081         if (ltdb_lock_read(module) != 0) {
1082                 return LDB_ERR_OPERATIONS_ERROR;
1083         }
1084
1085         res = talloc_zero(req, struct ldb_seqnum_result);
1086         if (res == NULL) {
1087                 ret = LDB_ERR_OPERATIONS_ERROR;
1088                 goto done;
1089         }
1090         tmp_ctx = talloc_new(req);
1091         if (tmp_ctx == NULL) {
1092                 ret = LDB_ERR_OPERATIONS_ERROR;
1093                 goto done;
1094         }
1095
1096         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1097
1098         msg = talloc(tmp_ctx, struct ldb_message);
1099         if (msg == NULL) {
1100                 ret = LDB_ERR_OPERATIONS_ERROR;
1101                 goto done;
1102         }
1103
1104         ret = ltdb_search_dn1(module, dn, msg);
1105         if (ret != LDB_SUCCESS) {
1106                 goto done;
1107         }
1108
1109         switch (seq->type) {
1110         case LDB_SEQ_HIGHEST_SEQ:
1111                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1112                 break;
1113         case LDB_SEQ_NEXT:
1114                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1115                 res->seq_num++;
1116                 break;
1117         case LDB_SEQ_HIGHEST_TIMESTAMP:
1118                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1119                 if (date) {
1120                         res->seq_num = ldb_string_to_time(date);
1121                 } else {
1122                         res->seq_num = 0;
1123                         /* zero is as good as anything when we don't know */
1124                 }
1125                 break;
1126         }
1127
1128         *ext = talloc_zero(req, struct ldb_extended);
1129         if (*ext == NULL) {
1130                 ret = LDB_ERR_OPERATIONS_ERROR;
1131                 goto done;
1132         }
1133         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1134         (*ext)->data = talloc_steal(*ext, res);
1135
1136 done:
1137         talloc_free(tmp_ctx);
1138         ltdb_unlock_read(module);
1139         return ret;
1140 }
1141
1142 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1143 {
1144         struct ldb_context *ldb;
1145         struct ldb_request *req;
1146         struct ldb_reply *ares;
1147
1148         ldb = ldb_module_get_ctx(ctx->module);
1149         req = ctx->req;
1150
1151         /* if we already returned an error just return */
1152         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1153                 return;
1154         }
1155
1156         ares = talloc_zero(req, struct ldb_reply);
1157         if (!ares) {
1158                 ldb_oom(ldb);
1159                 req->callback(req, NULL);
1160                 return;
1161         }
1162         ares->type = LDB_REPLY_DONE;
1163         ares->error = error;
1164
1165         req->callback(req, ares);
1166 }
1167
1168 static void ltdb_timeout(struct tevent_context *ev,
1169                           struct tevent_timer *te,
1170                           struct timeval t,
1171                           void *private_data)
1172 {
1173         struct ltdb_context *ctx;
1174         ctx = talloc_get_type(private_data, struct ltdb_context);
1175
1176         if (!ctx->request_terminated) {
1177                 /* request is done now */
1178                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1179         }
1180
1181         if (!ctx->request_terminated) {
1182                 /* neutralize the spy */
1183                 ctx->spy->ctx = NULL;
1184         }
1185         talloc_free(ctx);
1186 }
1187
1188 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1189                                         struct ldb_extended *ext,
1190                                         int error)
1191 {
1192         struct ldb_context *ldb;
1193         struct ldb_request *req;
1194         struct ldb_reply *ares;
1195
1196         ldb = ldb_module_get_ctx(ctx->module);
1197         req = ctx->req;
1198
1199         /* if we already returned an error just return */
1200         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1201                 return;
1202         }
1203
1204         ares = talloc_zero(req, struct ldb_reply);
1205         if (!ares) {
1206                 ldb_oom(ldb);
1207                 req->callback(req, NULL);
1208                 return;
1209         }
1210         ares->type = LDB_REPLY_DONE;
1211         ares->response = ext;
1212         ares->error = error;
1213
1214         req->callback(req, ares);
1215 }
1216
1217 static void ltdb_handle_extended(struct ltdb_context *ctx)
1218 {
1219         struct ldb_extended *ext = NULL;
1220         int ret;
1221
1222         if (strcmp(ctx->req->op.extended.oid,
1223                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1224                 /* get sequence number */
1225                 ret = ltdb_sequence_number(ctx, &ext);
1226         } else {
1227                 /* not recognized */
1228                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1229         }
1230
1231         ltdb_request_extended_done(ctx, ext, ret);
1232 }
1233
1234 static void ltdb_callback(struct tevent_context *ev,
1235                           struct tevent_timer *te,
1236                           struct timeval t,
1237                           void *private_data)
1238 {
1239         struct ltdb_context *ctx;
1240         int ret;
1241
1242         ctx = talloc_get_type(private_data, struct ltdb_context);
1243
1244         if (ctx->request_terminated) {
1245                 goto done;
1246         }
1247
1248         switch (ctx->req->operation) {
1249         case LDB_SEARCH:
1250                 ret = ltdb_search(ctx);
1251                 break;
1252         case LDB_ADD:
1253                 ret = ltdb_add(ctx);
1254                 break;
1255         case LDB_MODIFY:
1256                 ret = ltdb_modify(ctx);
1257                 break;
1258         case LDB_DELETE:
1259                 ret = ltdb_delete(ctx);
1260                 break;
1261         case LDB_RENAME:
1262                 ret = ltdb_rename(ctx);
1263                 break;
1264         case LDB_EXTENDED:
1265                 ltdb_handle_extended(ctx);
1266                 goto done;
1267         default:
1268                 /* no other op supported */
1269                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1270         }
1271
1272         if (!ctx->request_terminated) {
1273                 /* request is done now */
1274                 ltdb_request_done(ctx, ret);
1275         }
1276
1277 done:
1278         if (!ctx->request_terminated) {
1279                 /* neutralize the spy */
1280                 ctx->spy->ctx = NULL;
1281         }
1282         talloc_free(ctx);
1283 }
1284
1285 static int ltdb_request_destructor(void *ptr)
1286 {
1287         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1288
1289         if (spy->ctx != NULL) {
1290                 spy->ctx->request_terminated = true;
1291         }
1292
1293         return 0;
1294 }
1295
1296 static int ltdb_handle_request(struct ldb_module *module,
1297                                struct ldb_request *req)
1298 {
1299         struct ldb_control *control_permissive;
1300         struct ldb_context *ldb;
1301         struct tevent_context *ev;
1302         struct ltdb_context *ac;
1303         struct tevent_timer *te;
1304         struct timeval tv;
1305         int i;
1306
1307         ldb = ldb_module_get_ctx(module);
1308
1309         control_permissive = ldb_request_get_control(req,
1310                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1311
1312         for (i = 0; req->controls && req->controls[i]; i++) {
1313                 if (req->controls[i]->critical &&
1314                     req->controls[i] != control_permissive) {
1315                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1316                                                req->controls[i]->oid);
1317                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1318                 }
1319         }
1320
1321         if (req->starttime == 0 || req->timeout == 0) {
1322                 ldb_set_errstring(ldb, "Invalid timeout settings");
1323                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1324         }
1325
1326         ev = ldb_get_event_context(ldb);
1327
1328         ac = talloc_zero(ldb, struct ltdb_context);
1329         if (ac == NULL) {
1330                 ldb_oom(ldb);
1331                 return LDB_ERR_OPERATIONS_ERROR;
1332         }
1333
1334         ac->module = module;
1335         ac->req = req;
1336
1337         tv.tv_sec = 0;
1338         tv.tv_usec = 0;
1339         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1340         if (NULL == te) {
1341                 talloc_free(ac);
1342                 return LDB_ERR_OPERATIONS_ERROR;
1343         }
1344
1345         tv.tv_sec = req->starttime + req->timeout;
1346         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1347         if (NULL == ac->timeout_event) {
1348                 talloc_free(ac);
1349                 return LDB_ERR_OPERATIONS_ERROR;
1350         }
1351
1352         /* set a spy so that we do not try to use the request context
1353          * if it is freed before ltdb_callback fires */
1354         ac->spy = talloc(req, struct ltdb_req_spy);
1355         if (NULL == ac->spy) {
1356                 talloc_free(ac);
1357                 return LDB_ERR_OPERATIONS_ERROR;
1358         }
1359         ac->spy->ctx = ac;
1360
1361         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1362
1363         return LDB_SUCCESS;
1364 }
1365
1366 static int ltdb_init_rootdse(struct ldb_module *module)
1367 {
1368         struct ldb_context *ldb;
1369         int ret;
1370
1371         ldb = ldb_module_get_ctx(module);
1372
1373         ret = ldb_mod_register_control(module,
1374                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1375         if (ret != LDB_SUCCESS) {
1376                 ldb_debug(ldb, LDB_DEBUG_WARNING, "ldb_tdb: "
1377                           "Unable to register control with rootdse!");
1378         }
1379
1380         /* there can be no module beyond the backend, just return */
1381         return LDB_SUCCESS;
1382 }
1383
1384 static const struct ldb_module_ops ltdb_ops = {
1385         .name              = "tdb",
1386         .init_context      = ltdb_init_rootdse,
1387         .search            = ltdb_handle_request,
1388         .add               = ltdb_handle_request,
1389         .modify            = ltdb_handle_request,
1390         .del               = ltdb_handle_request,
1391         .rename            = ltdb_handle_request,
1392         .extended          = ltdb_handle_request,
1393         .start_transaction = ltdb_start_trans,
1394         .end_transaction   = ltdb_end_trans,
1395         .prepare_commit    = ltdb_prepare_commit,
1396         .del_transaction   = ltdb_del_trans,
1397 };
1398
1399 /*
1400   connect to the database
1401 */
1402 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1403                         unsigned int flags, const char *options[],
1404                         struct ldb_module **_module)
1405 {
1406         struct ldb_module *module;
1407         const char *path;
1408         int tdb_flags, open_flags;
1409         struct ltdb_private *ltdb;
1410
1411         /* parse the url */
1412         if (strchr(url, ':')) {
1413                 if (strncmp(url, "tdb://", 6) != 0) {
1414                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1415                                   "Invalid tdb URL '%s'", url);
1416                         return LDB_ERR_OPERATIONS_ERROR;
1417                 }
1418                 path = url+6;
1419         } else {
1420                 path = url;
1421         }
1422
1423         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1424
1425         /* check for the 'nosync' option */
1426         if (flags & LDB_FLG_NOSYNC) {
1427                 tdb_flags |= TDB_NOSYNC;
1428         }
1429
1430         /* and nommap option */
1431         if (flags & LDB_FLG_NOMMAP) {
1432                 tdb_flags |= TDB_NOMMAP;
1433         }
1434
1435         if (flags & LDB_FLG_RDONLY) {
1436                 open_flags = O_RDONLY;
1437         } else {
1438                 open_flags = O_CREAT | O_RDWR;
1439         }
1440
1441         ltdb = talloc_zero(ldb, struct ltdb_private);
1442         if (!ltdb) {
1443                 ldb_oom(ldb);
1444                 return LDB_ERR_OPERATIONS_ERROR;
1445         }
1446
1447         /* note that we use quite a large default hash size */
1448         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1449                                    tdb_flags, open_flags,
1450                                    ldb_get_create_perms(ldb), ldb);
1451         if (!ltdb->tdb) {
1452                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1453                           "Unable to open tdb '%s'", path);
1454                 talloc_free(ltdb);
1455                 return LDB_ERR_OPERATIONS_ERROR;
1456         }
1457
1458         ltdb->sequence_number = 0;
1459
1460         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1461         if (!module) {
1462                 talloc_free(ltdb);
1463                 return LDB_ERR_OPERATIONS_ERROR;
1464         }
1465         ldb_module_set_private(module, ltdb);
1466         talloc_steal(module, ltdb);
1467
1468         if (ltdb_cache_load(module) != 0) {
1469                 talloc_free(module);
1470                 talloc_free(ltdb);
1471                 return LDB_ERR_OPERATIONS_ERROR;
1472         }
1473
1474         *_module = module;
1475         return LDB_SUCCESS;
1476 }
1477
1478 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1479         .name = "tdb",
1480         .connect_fn = ltdb_connect,
1481 };