LDB:TDB backend - change counter variables to "unsigned" where appropriate
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         int ret = 0;
92
93         if (ltdb->in_transaction == 0 &&
94             ltdb->read_lock_count == 0) {
95                 ret = tdb_lockall_read(ltdb->tdb);
96         }
97         if (ret == 0) {
98                 ltdb->read_lock_count++;
99         }
100         return ret;
101 }
102
103 /*
104   unlock the database after a ltdb_lock_read()
105 */
106 int ltdb_unlock_read(struct ldb_module *module)
107 {
108         void *data = ldb_module_get_private(module);
109         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
110         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
111                 return tdb_unlockall_read(ltdb->tdb);
112         }
113         ltdb->read_lock_count--;
114         return 0;
115 }
116
117
118 /*
119   form a TDB_DATA for a record key
120   caller frees
121
122   note that the key for a record can depend on whether the
123   dn refers to a case sensitive index record or not
124 */
125 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
126 {
127         struct ldb_context *ldb = ldb_module_get_ctx(module);
128         TDB_DATA key;
129         char *key_str = NULL;
130         const char *dn_folded = NULL;
131
132         /*
133           most DNs are case insensitive. The exception is index DNs for
134           case sensitive attributes
135
136           there are 3 cases dealt with in this code:
137
138           1) if the dn doesn't start with @ then uppercase the attribute
139              names and the attributes values of case insensitive attributes
140           2) if the dn starts with @ then leave it alone -
141              the indexing code handles the rest
142         */
143
144         dn_folded = ldb_dn_get_casefold(dn);
145         if (!dn_folded) {
146                 goto failed;
147         }
148
149         key_str = talloc_strdup(ldb, "DN=");
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
155         if (!key_str) {
156                 goto failed;
157         }
158
159         key.dptr = (uint8_t *)key_str;
160         key.dsize = strlen(key_str) + 1;
161
162         return key;
163
164 failed:
165         errno = ENOMEM;
166         key.dptr = NULL;
167         key.dsize = 0;
168         return key;
169 }
170
171 /*
172   check special dn's have valid attributes
173   currently only @ATTRIBUTES is checked
174 */
175 static int ltdb_check_special_dn(struct ldb_module *module,
176                           const struct ldb_message *msg)
177 {
178         struct ldb_context *ldb = ldb_module_get_ctx(module);
179         unsigned int i, j;
180
181         if (! ldb_dn_is_special(msg->dn) ||
182             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
183                 return LDB_SUCCESS;
184         }
185
186         /* we have @ATTRIBUTES, let's check attributes are fine */
187         /* should we check that we deny multivalued attributes ? */
188         for (i = 0; i < msg->num_elements; i++) {
189                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
190
191                 for (j = 0; j < msg->elements[i].num_values; j++) {
192                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
193                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
194                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
195                         }
196                 }
197         }
198
199         return LDB_SUCCESS;
200 }
201
202
203 /*
204   we've made a modification to a dn - possibly reindex and
205   update sequence number
206 */
207 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
208 {
209         int ret = LDB_SUCCESS;
210         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
211
212         /* only allow modifies inside a transaction, otherwise the
213          * ldb is unsafe */
214         if (ltdb->in_transaction == 0) {
215                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
216                 return LDB_ERR_OPERATIONS_ERROR;
217         }
218
219         if (ldb_dn_is_special(dn) &&
220             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
221              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
222                 ret = ltdb_reindex(module);
223         }
224
225         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
226         if (ret == LDB_SUCCESS &&
227             !(ldb_dn_is_special(dn) &&
228               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
229                 ret = ltdb_increase_sequence_number(module);
230         }
231
232         /* If the modify was to @OPTIONS, reload the cache */
233         if (ret == LDB_SUCCESS &&
234             ldb_dn_is_special(dn) &&
235             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
236                 ret = ltdb_cache_reload(module);
237         }
238
239         return ret;
240 }
241
242 /*
243   store a record into the db
244 */
245 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
246 {
247         void *data = ldb_module_get_private(module);
248         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
249         TDB_DATA tdb_key, tdb_data;
250         int ret = LDB_SUCCESS;
251
252         tdb_key = ltdb_key(module, msg->dn);
253         if (tdb_key.dptr == NULL) {
254                 return LDB_ERR_OTHER;
255         }
256
257         ret = ltdb_pack_data(module, msg, &tdb_data);
258         if (ret == -1) {
259                 talloc_free(tdb_key.dptr);
260                 return LDB_ERR_OTHER;
261         }
262
263         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
264         if (ret == -1) {
265                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
266                 goto done;
267         }
268
269 done:
270         talloc_free(tdb_key.dptr);
271         talloc_free(tdb_data.dptr);
272
273         return ret;
274 }
275
276
277 static int ltdb_add_internal(struct ldb_module *module,
278                              const struct ldb_message *msg)
279 {
280         struct ldb_context *ldb = ldb_module_get_ctx(module);
281         int ret = LDB_SUCCESS;
282         unsigned int i;
283
284         ret = ltdb_check_special_dn(module, msg);
285         if (ret != LDB_SUCCESS) {
286                 return ret;
287         }
288
289         if (ltdb_cache_load(module) != 0) {
290                 return LDB_ERR_OPERATIONS_ERROR;
291         }
292
293         for (i=0;i<msg->num_elements;i++) {
294                 struct ldb_message_element *el = &msg->elements[i];
295                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
296
297                 if (el->num_values == 0) {
298                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
299                                                el->name, ldb_dn_get_linearized(msg->dn));
300                         return LDB_ERR_CONSTRAINT_VIOLATION;
301                 }
302                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
303                         if (el->num_values > 1) {
304                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
305                                                        el->name, ldb_dn_get_linearized(msg->dn));
306                                 return LDB_ERR_CONSTRAINT_VIOLATION;
307                         }
308                 }
309         }
310
311         ret = ltdb_store(module, msg, TDB_INSERT);
312         if (ret != LDB_SUCCESS) {
313                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
314                         ldb_asprintf_errstring(ldb,
315                                                "Entry %s already exists",
316                                                ldb_dn_get_linearized(msg->dn));
317                 }
318                 return ret;
319         }
320
321         ret = ltdb_index_add_new(module, msg);
322         if (ret != LDB_SUCCESS) {
323                 return ret;
324         }
325
326         ret = ltdb_modified(module, msg->dn);
327
328         return ret;
329 }
330
331 /*
332   add a record to the database
333 */
334 static int ltdb_add(struct ltdb_context *ctx)
335 {
336         struct ldb_module *module = ctx->module;
337         struct ldb_request *req = ctx->req;
338         int ret = LDB_SUCCESS;
339
340         ldb_request_set_state(req, LDB_ASYNC_PENDING);
341
342         if (ltdb_cache_load(module) != 0) {
343                 return LDB_ERR_OPERATIONS_ERROR;
344         }
345
346         ret = ltdb_add_internal(module, req->op.add.message);
347
348         return ret;
349 }
350
351 /*
352   delete a record from the database, not updating indexes (used for deleting
353   index records)
354 */
355 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
356 {
357         void *data = ldb_module_get_private(module);
358         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
359         TDB_DATA tdb_key;
360         int ret;
361
362         tdb_key = ltdb_key(module, dn);
363         if (!tdb_key.dptr) {
364                 return LDB_ERR_OTHER;
365         }
366
367         ret = tdb_delete(ltdb->tdb, tdb_key);
368         talloc_free(tdb_key.dptr);
369
370         if (ret != 0) {
371                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
372         }
373
374         return ret;
375 }
376
377 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
378 {
379         struct ldb_message *msg;
380         int ret = LDB_SUCCESS;
381
382         msg = talloc(module, struct ldb_message);
383         if (msg == NULL) {
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         /* in case any attribute of the message was indexed, we need
388            to fetch the old record */
389         ret = ltdb_search_dn1(module, dn, msg);
390         if (ret != LDB_SUCCESS) {
391                 /* not finding the old record is an error */
392                 goto done;
393         }
394
395         ret = ltdb_delete_noindex(module, dn);
396         if (ret != LDB_SUCCESS) {
397                 goto done;
398         }
399
400         /* remove any indexed attributes */
401         ret = ltdb_index_delete(module, msg);
402         if (ret != LDB_SUCCESS) {
403                 goto done;
404         }
405
406         ret = ltdb_modified(module, dn);
407         if (ret != LDB_SUCCESS) {
408                 goto done;
409         }
410
411 done:
412         talloc_free(msg);
413         return ret;
414 }
415
416 /*
417   delete a record from the database
418 */
419 static int ltdb_delete(struct ltdb_context *ctx)
420 {
421         struct ldb_module *module = ctx->module;
422         struct ldb_request *req = ctx->req;
423         int ret = LDB_SUCCESS;
424
425         ldb_request_set_state(req, LDB_ASYNC_PENDING);
426
427         if (ltdb_cache_load(module) != 0) {
428                 return LDB_ERR_OPERATIONS_ERROR;
429         }
430
431         ret = ltdb_delete_internal(module, req->op.del.dn);
432
433         return ret;
434 }
435
436 /*
437   find an element by attribute name. At the moment this does a linear search,
438   it should be re-coded to use a binary search once all places that modify
439   records guarantee sorted order
440
441   return the index of the first matching element if found, otherwise -1
442 */
443 static int find_element(const struct ldb_message *msg, const char *name)
444 {
445         unsigned int i;
446         for (i=0;i<msg->num_elements;i++) {
447                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
448                         return i;
449                 }
450         }
451         return -1;
452 }
453
454
455 /*
456   add an element to an existing record. Assumes a elements array that we
457   can call re-alloc on, and assumed that we can re-use the data pointers from
458   the passed in additional values. Use with care!
459
460   returns 0 on success, -1 on failure (and sets errno)
461 */
462 static int ltdb_msg_add_element(struct ldb_context *ldb,
463                                 struct ldb_message *msg,
464                                 struct ldb_message_element *el)
465 {
466         struct ldb_message_element *e2;
467         unsigned int i;
468
469         if (el->num_values == 0) {
470                 /* nothing to do here - we don't add empty elements */
471                 return 0;
472         }
473
474         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
475                               msg->num_elements+1);
476         if (!e2) {
477                 errno = ENOMEM;
478                 return -1;
479         }
480
481         msg->elements = e2;
482
483         e2 = &msg->elements[msg->num_elements];
484
485         e2->name = el->name;
486         e2->flags = el->flags;
487         e2->values = talloc_array(msg->elements,
488                                   struct ldb_val, el->num_values);
489         if (!e2->values) {
490                 errno = ENOMEM;
491                 return -1;
492         }
493         for (i=0;i<el->num_values;i++) {
494                 e2->values[i] = el->values[i];
495         }
496         e2->num_values = el->num_values;
497
498         ++msg->num_elements;
499
500         return 0;
501 }
502
503 /*
504   delete all elements having a specified attribute name
505 */
506 static int msg_delete_attribute(struct ldb_module *module,
507                                 struct ldb_context *ldb,
508                                 struct ldb_message *msg, const char *name)
509 {
510         unsigned int i;
511         int ret;
512         struct ldb_message_element *el;
513
514         el = ldb_msg_find_element(msg, name);
515         if (el == NULL) {
516                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
517         }
518         i = el - msg->elements;
519
520         ret = ltdb_index_del_element(module, msg->dn, el);
521         if (ret != LDB_SUCCESS) {
522                 return ret;
523         }
524
525         talloc_free(el->values);
526         if (msg->num_elements > (i+1)) {
527                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
528         }
529         msg->num_elements--;
530         msg->elements = talloc_realloc(msg, msg->elements,
531                                        struct ldb_message_element,
532                                        msg->num_elements);
533         return LDB_SUCCESS;
534 }
535
536 /*
537   delete all elements matching an attribute name/value
538
539   return LDB Error on failure
540 */
541 static int msg_delete_element(struct ldb_module *module,
542                               struct ldb_message *msg,
543                               const char *name,
544                               const struct ldb_val *val)
545 {
546         struct ldb_context *ldb = ldb_module_get_ctx(module);
547         unsigned int i;
548         int found, ret;
549         struct ldb_message_element *el;
550         const struct ldb_schema_attribute *a;
551
552         found = find_element(msg, name);
553         if (found == -1) {
554                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
555         }
556
557         el = &msg->elements[found];
558
559         a = ldb_schema_attribute_by_name(ldb, el->name);
560
561         for (i=0;i<el->num_values;i++) {
562                 if (a->syntax->comparison_fn(ldb, ldb,
563                                              &el->values[i], val) == 0) {
564                         if (el->num_values == 1) {
565                                 return msg_delete_attribute(module, ldb, msg, name);
566                         }
567
568                         ret = ltdb_index_del_value(module, msg->dn, el, i);
569                         if (ret != LDB_SUCCESS) {
570                                 return ret;
571                         }
572
573                         if (i<el->num_values-1) {
574                                 memmove(&el->values[i], &el->values[i+1],
575                                         sizeof(el->values[i])*
576                                                 (el->num_values-(i+1)));
577                         }
578                         el->num_values--;
579
580                         /* per definition we find in a canonicalised message an
581                            attribute value only once. So we are finished here */
582                         return LDB_SUCCESS;
583                 }
584         }
585
586         /* Not found */
587         return LDB_ERR_NO_SUCH_ATTRIBUTE;
588 }
589
590
591 /*
592   modify a record - internal interface
593
594   yuck - this is O(n^2). Luckily n is usually small so we probably
595   get away with it, but if we ever have really large attribute lists
596   then we'll need to look at this again
597
598   'req' is optional, and is used to specify controls if supplied
599 */
600 int ltdb_modify_internal(struct ldb_module *module,
601                          const struct ldb_message *msg,
602                          struct ldb_request *req)
603 {
604         struct ldb_context *ldb = ldb_module_get_ctx(module);
605         void *data = ldb_module_get_private(module);
606         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
607         TDB_DATA tdb_key, tdb_data;
608         struct ldb_message *msg2;
609         unsigned i, j, k;
610         int ret = LDB_SUCCESS, idx;
611         struct ldb_control *control_permissive = NULL;
612
613         if (req) {
614                 control_permissive = ldb_request_get_control(req,
615                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
616         }
617
618         tdb_key = ltdb_key(module, msg->dn);
619         if (!tdb_key.dptr) {
620                 return LDB_ERR_OTHER;
621         }
622
623         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
624         if (!tdb_data.dptr) {
625                 talloc_free(tdb_key.dptr);
626                 return ltdb_err_map(tdb_error(ltdb->tdb));
627         }
628
629         msg2 = talloc(tdb_key.dptr, struct ldb_message);
630         if (msg2 == NULL) {
631                 free(tdb_data.dptr);
632                 ret = LDB_ERR_OTHER;
633                 goto done;
634         }
635
636         ret = ltdb_unpack_data(module, &tdb_data, msg2);
637         free(tdb_data.dptr);
638         if (ret == -1) {
639                 ret = LDB_ERR_OTHER;
640                 goto done;
641         }
642
643         if (!msg2->dn) {
644                 msg2->dn = msg->dn;
645         }
646
647         for (i=0; i<msg->num_elements; i++) {
648                 struct ldb_message_element *el = &msg->elements[i], *el2;
649                 struct ldb_val *vals;
650                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
651                 const char *dn;
652
653                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
654                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
655                                                ldb_dn_get_linearized(msg2->dn));
656                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
657                         goto done;
658                 }
659
660                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
661                 case LDB_FLAG_MOD_ADD:
662
663                         if (el->num_values == 0) {
664                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
665                                                        el->name, ldb_dn_get_linearized(msg2->dn));
666                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
667                                 goto done;
668                         }
669
670                         /* make a copy of the array so that a permissive
671                          * control can remove duplicates without changing the
672                          * original values, but do not copy data as we do not
673                          * need to keep it around once the operation is
674                          * finished */
675                         if (control_permissive) {
676                                 el = talloc(msg2, struct ldb_message_element);
677                                 if (!el) {
678                                         ret = LDB_ERR_OTHER;
679                                         goto done;
680                                 }
681                                 el->name = msg->elements[i].name;
682                                 el->num_values = msg->elements[i].num_values;
683                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
684                                 if (el->values == NULL) {
685                                         ret = LDB_ERR_OTHER;
686                                         goto done;
687                                 }
688                                 for (j = 0; j < el->num_values; j++) {
689                                         el->values[j] = msg->elements[i].values[j];
690                                 }
691                         }
692
693                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
694                                 if (el->num_values > 1) {
695                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
696                                                                el->name, ldb_dn_get_linearized(msg2->dn));
697                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
698                                         goto done;
699                                 }
700                         }
701
702                         /* Checks if element already exists */
703                         idx = find_element(msg2, el->name);
704                         if (idx == -1) {
705                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
706                                         ret = LDB_ERR_OTHER;
707                                         goto done;
708                                 }
709                                 ret = ltdb_index_add_element(module, msg2->dn, el);
710                                 if (ret != LDB_SUCCESS) {
711                                         goto done;
712                                 }
713                         } else {
714                                 /* We cannot add another value on a existing one
715                                    if the attribute is single-valued */
716                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
717                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
718                                                                el->name, ldb_dn_get_linearized(msg2->dn));
719                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
720                                         goto done;
721                                 }
722
723                                 el2 = &(msg2->elements[idx]);
724
725                                 /* Check that values don't exist yet on multi-
726                                    valued attributes or aren't provided twice */
727                                 for (j = 0; j < el->num_values; j++) {
728                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
729                                                 if (control_permissive) {
730                                                         /* remove this one as if it was never added */
731                                                         el->num_values--;
732                                                         for (k = j; k < el->num_values; k++) {
733                                                                 el->values[k] = el->values[k + 1];
734                                                         }
735                                                         j--; /* rewind */
736
737                                                         continue;
738                                                 }
739
740                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
741                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
742                                                 goto done;
743                                         }
744                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
745                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
746                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
747                                                 goto done;
748                                         }
749                                 }
750
751                                 /* Now combine existing and new values to a new
752                                    attribute record */
753                                 vals = talloc_realloc(msg2->elements,
754                                                       el2->values, struct ldb_val,
755                                                       el2->num_values + el->num_values);
756                                 if (vals == NULL) {
757                                         ldb_oom(ldb);
758                                         ret = LDB_ERR_OTHER;
759                                         goto done;
760                                 }
761
762                                 for (j=0; j<el->num_values; j++) {
763                                         vals[el2->num_values + j] =
764                                                 ldb_val_dup(vals, &el->values[j]);
765                                 }
766
767                                 el2->values = vals;
768                                 el2->num_values += el->num_values;
769
770                                 ret = ltdb_index_add_element(module, msg2->dn, el);
771                                 if (ret != LDB_SUCCESS) {
772                                         goto done;
773                                 }
774                         }
775
776                         break;
777
778                 case LDB_FLAG_MOD_REPLACE:
779
780                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
781                                 /* the RELAX control overrides this
782                                    check for replace. This is needed as
783                                    DRS replication can produce multiple
784                                    values here for a single valued
785                                    attribute when the values are deleted
786                                    links
787                                 */
788                                 if (el->num_values > 1 &&
789                                     (!req || !ldb_request_get_control(req, LDB_CONTROL_RELAX_OID))) {
790                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
791                                                                el->name, ldb_dn_get_linearized(msg2->dn));
792                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
793                                         goto done;
794                                 }
795                         }
796
797                         /* TODO: This is O(n^2) - replace with more efficient check */
798                         for (j=0; j<el->num_values; j++) {
799                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
800                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
801                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
802                                         goto done;
803                                 }
804                         }
805
806                         idx = find_element(msg2, el->name);
807                         if (idx != -1) {
808                                 el2 = &(msg2->elements[idx]);
809                                 if (ldb_msg_element_compare(el, el2) == 0) {
810                                         /* we are replacing with the same values */
811                                         continue;
812                                 }
813                         
814                                 /* Delete the attribute if it exists in the DB */
815                                 if (msg_delete_attribute(module, ldb, msg2, el->name) != 0) {
816                                         ret = LDB_ERR_OTHER;
817                                         goto done;
818                                 }
819                         }
820
821                         /* Recreate it with the new values */
822                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
823                                 ret = LDB_ERR_OTHER;
824                                 goto done;
825                         }
826
827                         ret = ltdb_index_add_element(module, msg2->dn, el);
828                         if (ret != LDB_SUCCESS) {
829                                 goto done;
830                         }
831
832                         break;
833
834                 case LDB_FLAG_MOD_DELETE:
835                         dn = ldb_dn_get_linearized(msg2->dn);
836                         if (dn == NULL) {
837                                 ret = LDB_ERR_OTHER;
838                                 goto done;
839                         }
840
841                         if (msg->elements[i].num_values == 0) {
842                                 /* Delete the whole attribute */
843                                 ret = msg_delete_attribute(module, ldb, msg2,
844                                                            msg->elements[i].name);
845                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
846                                     control_permissive) {
847                                         ret = LDB_SUCCESS;
848                                 } else {
849                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
850                                                                msg->elements[i].name, dn);
851                                 }
852                                 if (ret != LDB_SUCCESS) {
853                                         goto done;
854                                 }
855                         } else {
856                                 /* Delete specified values from an attribute */
857                                 for (j=0; j < msg->elements[i].num_values; j++) {
858                                         ret = msg_delete_element(module,
859                                                                  msg2,
860                                                                  msg->elements[i].name,
861                                                                  &msg->elements[i].values[j]);
862                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
863                                             control_permissive) {
864                                                 ret = LDB_SUCCESS;
865                                         } else {
866                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
867                                                                        msg->elements[i].name, dn);
868                                         }
869                                         if (ret != LDB_SUCCESS) {
870                                                 goto done;
871                                         }
872                                 }
873                         }
874                         break;
875                 default:
876                         ldb_asprintf_errstring(ldb,
877                                 "Invalid ldb_modify flags on %s: 0x%x",
878                                 msg->elements[i].name,
879                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
880                         ret = LDB_ERR_PROTOCOL_ERROR;
881                         goto done;
882                 }
883         }
884
885         ret = ltdb_store(module, msg2, TDB_MODIFY);
886         if (ret != LDB_SUCCESS) {
887                 goto done;
888         }
889
890         ret = ltdb_modified(module, msg2->dn);
891         if (ret != LDB_SUCCESS) {
892                 goto done;
893         }
894
895 done:
896         talloc_free(tdb_key.dptr);
897         return ret;
898 }
899
900 /*
901   modify a record
902 */
903 static int ltdb_modify(struct ltdb_context *ctx)
904 {
905         struct ldb_module *module = ctx->module;
906         struct ldb_request *req = ctx->req;
907         int ret = LDB_SUCCESS;
908
909         ret = ltdb_check_special_dn(module, req->op.mod.message);
910         if (ret != LDB_SUCCESS) {
911                 return ret;
912         }
913
914         ldb_request_set_state(req, LDB_ASYNC_PENDING);
915
916         if (ltdb_cache_load(module) != 0) {
917                 return LDB_ERR_OPERATIONS_ERROR;
918         }
919
920         ret = ltdb_modify_internal(module, req->op.mod.message, req);
921
922         return ret;
923 }
924
925 /*
926   rename a record
927 */
928 static int ltdb_rename(struct ltdb_context *ctx)
929 {
930         struct ldb_module *module = ctx->module;
931         struct ldb_request *req = ctx->req;
932         struct ldb_message *msg;
933         int ret = LDB_SUCCESS;
934
935         ldb_request_set_state(req, LDB_ASYNC_PENDING);
936
937         if (ltdb_cache_load(ctx->module) != 0) {
938                 return LDB_ERR_OPERATIONS_ERROR;
939         }
940
941         msg = talloc(ctx, struct ldb_message);
942         if (msg == NULL) {
943                 return LDB_ERR_OPERATIONS_ERROR;
944         }
945
946         /* in case any attribute of the message was indexed, we need
947            to fetch the old record */
948         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
949         if (ret != LDB_SUCCESS) {
950                 /* not finding the old record is an error */
951                 return ret;
952         }
953
954         /* Always delete first then add, to avoid conflicts with
955          * unique indexes. We rely on the transaction to make this
956          * atomic
957          */
958         ret = ltdb_delete_internal(module, msg->dn);
959         if (ret != LDB_SUCCESS) {
960                 return ret;
961         }
962
963         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
964         if (msg->dn == NULL) {
965                 return LDB_ERR_OPERATIONS_ERROR;
966         }
967
968         ret = ltdb_add_internal(module, msg);
969
970         return ret;
971 }
972
973 static int ltdb_start_trans(struct ldb_module *module)
974 {
975         void *data = ldb_module_get_private(module);
976         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
977
978         if (tdb_transaction_start(ltdb->tdb) != 0) {
979                 return ltdb_err_map(tdb_error(ltdb->tdb));
980         }
981
982         ltdb->in_transaction++;
983
984         ltdb_index_transaction_start(module);
985
986         return LDB_SUCCESS;
987 }
988
989 static int ltdb_prepare_commit(struct ldb_module *module)
990 {
991         void *data = ldb_module_get_private(module);
992         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
993
994         if (ltdb->in_transaction != 1) {
995                 return LDB_SUCCESS;
996         }
997
998         if (ltdb_index_transaction_commit(module) != 0) {
999                 tdb_transaction_cancel(ltdb->tdb);
1000                 ltdb->in_transaction--;
1001                 return ltdb_err_map(tdb_error(ltdb->tdb));
1002         }
1003
1004         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1005                 ltdb->in_transaction--;
1006                 return ltdb_err_map(tdb_error(ltdb->tdb));
1007         }
1008
1009         ltdb->prepared_commit = true;
1010
1011         return LDB_SUCCESS;
1012 }
1013
1014 static int ltdb_end_trans(struct ldb_module *module)
1015 {
1016         void *data = ldb_module_get_private(module);
1017         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1018
1019         if (!ltdb->prepared_commit) {
1020                 int ret = ltdb_prepare_commit(module);
1021                 if (ret != LDB_SUCCESS) {
1022                         return ret;
1023                 }
1024         }
1025
1026         ltdb->in_transaction--;
1027         ltdb->prepared_commit = false;
1028
1029         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1030                 return ltdb_err_map(tdb_error(ltdb->tdb));
1031         }
1032
1033         return LDB_SUCCESS;
1034 }
1035
1036 static int ltdb_del_trans(struct ldb_module *module)
1037 {
1038         void *data = ldb_module_get_private(module);
1039         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1040
1041         ltdb->in_transaction--;
1042
1043         if (ltdb_index_transaction_cancel(module) != 0) {
1044                 tdb_transaction_cancel(ltdb->tdb);
1045                 return ltdb_err_map(tdb_error(ltdb->tdb));
1046         }
1047
1048         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
1049                 return ltdb_err_map(tdb_error(ltdb->tdb));
1050         }
1051
1052         return LDB_SUCCESS;
1053 }
1054
1055 /*
1056   return sequenceNumber from @BASEINFO
1057 */
1058 static int ltdb_sequence_number(struct ltdb_context *ctx,
1059                                 struct ldb_extended **ext)
1060 {
1061         struct ldb_context *ldb;
1062         struct ldb_module *module = ctx->module;
1063         struct ldb_request *req = ctx->req;
1064         TALLOC_CTX *tmp_ctx;
1065         struct ldb_seqnum_request *seq;
1066         struct ldb_seqnum_result *res;
1067         struct ldb_message *msg = NULL;
1068         struct ldb_dn *dn;
1069         const char *date;
1070         int ret = LDB_SUCCESS;
1071
1072         ldb = ldb_module_get_ctx(module);
1073
1074         seq = talloc_get_type(req->op.extended.data,
1075                                 struct ldb_seqnum_request);
1076         if (seq == NULL) {
1077                 return LDB_ERR_OPERATIONS_ERROR;
1078         }
1079
1080         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1081
1082         if (ltdb_lock_read(module) != 0) {
1083                 return LDB_ERR_OPERATIONS_ERROR;
1084         }
1085
1086         res = talloc_zero(req, struct ldb_seqnum_result);
1087         if (res == NULL) {
1088                 ret = LDB_ERR_OPERATIONS_ERROR;
1089                 goto done;
1090         }
1091         tmp_ctx = talloc_new(req);
1092         if (tmp_ctx == NULL) {
1093                 ret = LDB_ERR_OPERATIONS_ERROR;
1094                 goto done;
1095         }
1096
1097         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1098
1099         msg = talloc(tmp_ctx, struct ldb_message);
1100         if (msg == NULL) {
1101                 ret = LDB_ERR_OPERATIONS_ERROR;
1102                 goto done;
1103         }
1104
1105         ret = ltdb_search_dn1(module, dn, msg);
1106         if (ret != LDB_SUCCESS) {
1107                 goto done;
1108         }
1109
1110         switch (seq->type) {
1111         case LDB_SEQ_HIGHEST_SEQ:
1112                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1113                 break;
1114         case LDB_SEQ_NEXT:
1115                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1116                 res->seq_num++;
1117                 break;
1118         case LDB_SEQ_HIGHEST_TIMESTAMP:
1119                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1120                 if (date) {
1121                         res->seq_num = ldb_string_to_time(date);
1122                 } else {
1123                         res->seq_num = 0;
1124                         /* zero is as good as anything when we don't know */
1125                 }
1126                 break;
1127         }
1128
1129         *ext = talloc_zero(req, struct ldb_extended);
1130         if (*ext == NULL) {
1131                 ret = LDB_ERR_OPERATIONS_ERROR;
1132                 goto done;
1133         }
1134         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1135         (*ext)->data = talloc_steal(*ext, res);
1136
1137 done:
1138         talloc_free(tmp_ctx);
1139         ltdb_unlock_read(module);
1140         return ret;
1141 }
1142
1143 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1144 {
1145         struct ldb_context *ldb;
1146         struct ldb_request *req;
1147         struct ldb_reply *ares;
1148
1149         ldb = ldb_module_get_ctx(ctx->module);
1150         req = ctx->req;
1151
1152         /* if we already returned an error just return */
1153         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1154                 return;
1155         }
1156
1157         ares = talloc_zero(req, struct ldb_reply);
1158         if (!ares) {
1159                 ldb_oom(ldb);
1160                 req->callback(req, NULL);
1161                 return;
1162         }
1163         ares->type = LDB_REPLY_DONE;
1164         ares->error = error;
1165
1166         req->callback(req, ares);
1167 }
1168
1169 static void ltdb_timeout(struct tevent_context *ev,
1170                           struct tevent_timer *te,
1171                           struct timeval t,
1172                           void *private_data)
1173 {
1174         struct ltdb_context *ctx;
1175         ctx = talloc_get_type(private_data, struct ltdb_context);
1176
1177         if (!ctx->request_terminated) {
1178                 /* request is done now */
1179                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1180         }
1181
1182         if (!ctx->request_terminated) {
1183                 /* neutralize the spy */
1184                 ctx->spy->ctx = NULL;
1185         }
1186         talloc_free(ctx);
1187 }
1188
1189 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1190                                         struct ldb_extended *ext,
1191                                         int error)
1192 {
1193         struct ldb_context *ldb;
1194         struct ldb_request *req;
1195         struct ldb_reply *ares;
1196
1197         ldb = ldb_module_get_ctx(ctx->module);
1198         req = ctx->req;
1199
1200         /* if we already returned an error just return */
1201         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1202                 return;
1203         }
1204
1205         ares = talloc_zero(req, struct ldb_reply);
1206         if (!ares) {
1207                 ldb_oom(ldb);
1208                 req->callback(req, NULL);
1209                 return;
1210         }
1211         ares->type = LDB_REPLY_DONE;
1212         ares->response = ext;
1213         ares->error = error;
1214
1215         req->callback(req, ares);
1216 }
1217
1218 static void ltdb_handle_extended(struct ltdb_context *ctx)
1219 {
1220         struct ldb_extended *ext = NULL;
1221         int ret;
1222
1223         if (strcmp(ctx->req->op.extended.oid,
1224                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1225                 /* get sequence number */
1226                 ret = ltdb_sequence_number(ctx, &ext);
1227         } else {
1228                 /* not recognized */
1229                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1230         }
1231
1232         ltdb_request_extended_done(ctx, ext, ret);
1233 }
1234
1235 static void ltdb_callback(struct tevent_context *ev,
1236                           struct tevent_timer *te,
1237                           struct timeval t,
1238                           void *private_data)
1239 {
1240         struct ltdb_context *ctx;
1241         int ret;
1242
1243         ctx = talloc_get_type(private_data, struct ltdb_context);
1244
1245         if (ctx->request_terminated) {
1246                 goto done;
1247         }
1248
1249         switch (ctx->req->operation) {
1250         case LDB_SEARCH:
1251                 ret = ltdb_search(ctx);
1252                 break;
1253         case LDB_ADD:
1254                 ret = ltdb_add(ctx);
1255                 break;
1256         case LDB_MODIFY:
1257                 ret = ltdb_modify(ctx);
1258                 break;
1259         case LDB_DELETE:
1260                 ret = ltdb_delete(ctx);
1261                 break;
1262         case LDB_RENAME:
1263                 ret = ltdb_rename(ctx);
1264                 break;
1265         case LDB_EXTENDED:
1266                 ltdb_handle_extended(ctx);
1267                 goto done;
1268         default:
1269                 /* no other op supported */
1270                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1271         }
1272
1273         if (!ctx->request_terminated) {
1274                 /* request is done now */
1275                 ltdb_request_done(ctx, ret);
1276         }
1277
1278 done:
1279         if (!ctx->request_terminated) {
1280                 /* neutralize the spy */
1281                 ctx->spy->ctx = NULL;
1282         }
1283         talloc_free(ctx);
1284 }
1285
1286 static int ltdb_request_destructor(void *ptr)
1287 {
1288         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1289
1290         if (spy->ctx != NULL) {
1291                 spy->ctx->request_terminated = true;
1292         }
1293
1294         return 0;
1295 }
1296
1297 static int ltdb_handle_request(struct ldb_module *module,
1298                                struct ldb_request *req)
1299 {
1300         struct ldb_control *control_permissive;
1301         struct ldb_context *ldb;
1302         struct tevent_context *ev;
1303         struct ltdb_context *ac;
1304         struct tevent_timer *te;
1305         struct timeval tv;
1306         int i;
1307
1308         ldb = ldb_module_get_ctx(module);
1309
1310         control_permissive = ldb_request_get_control(req,
1311                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1312
1313         for (i = 0; req->controls && req->controls[i]; i++) {
1314                 if (req->controls[i]->critical &&
1315                     req->controls[i] != control_permissive) {
1316                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1317                                                req->controls[i]->oid);
1318                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1319                 }
1320         }
1321
1322         if (req->starttime == 0 || req->timeout == 0) {
1323                 ldb_set_errstring(ldb, "Invalid timeout settings");
1324                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1325         }
1326
1327         ev = ldb_get_event_context(ldb);
1328
1329         ac = talloc_zero(ldb, struct ltdb_context);
1330         if (ac == NULL) {
1331                 ldb_oom(ldb);
1332                 return LDB_ERR_OPERATIONS_ERROR;
1333         }
1334
1335         ac->module = module;
1336         ac->req = req;
1337
1338         tv.tv_sec = 0;
1339         tv.tv_usec = 0;
1340         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1341         if (NULL == te) {
1342                 talloc_free(ac);
1343                 return LDB_ERR_OPERATIONS_ERROR;
1344         }
1345
1346         tv.tv_sec = req->starttime + req->timeout;
1347         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1348         if (NULL == ac->timeout_event) {
1349                 talloc_free(ac);
1350                 return LDB_ERR_OPERATIONS_ERROR;
1351         }
1352
1353         /* set a spy so that we do not try to use the request context
1354          * if it is freed before ltdb_callback fires */
1355         ac->spy = talloc(req, struct ltdb_req_spy);
1356         if (NULL == ac->spy) {
1357                 talloc_free(ac);
1358                 return LDB_ERR_OPERATIONS_ERROR;
1359         }
1360         ac->spy->ctx = ac;
1361
1362         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1363
1364         return LDB_SUCCESS;
1365 }
1366
1367 static int ltdb_init_rootdse(struct ldb_module *module)
1368 {
1369         struct ldb_context *ldb;
1370         int ret;
1371
1372         ldb = ldb_module_get_ctx(module);
1373
1374         ret = ldb_mod_register_control(module,
1375                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1376         if (ret != LDB_SUCCESS) {
1377                 ldb_debug(ldb, LDB_DEBUG_TRACE, "ldb_tdb: "
1378                           "Unable to register control with rootdse!");
1379         }
1380
1381         /* there can be no module beyond the backend, just return */
1382         return LDB_SUCCESS;
1383 }
1384
1385 static const struct ldb_module_ops ltdb_ops = {
1386         .name              = "tdb",
1387         .init_context      = ltdb_init_rootdse,
1388         .search            = ltdb_handle_request,
1389         .add               = ltdb_handle_request,
1390         .modify            = ltdb_handle_request,
1391         .del               = ltdb_handle_request,
1392         .rename            = ltdb_handle_request,
1393         .extended          = ltdb_handle_request,
1394         .start_transaction = ltdb_start_trans,
1395         .end_transaction   = ltdb_end_trans,
1396         .prepare_commit    = ltdb_prepare_commit,
1397         .del_transaction   = ltdb_del_trans,
1398 };
1399
1400 /*
1401   connect to the database
1402 */
1403 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1404                         unsigned int flags, const char *options[],
1405                         struct ldb_module **_module)
1406 {
1407         struct ldb_module *module;
1408         const char *path;
1409         int tdb_flags, open_flags;
1410         struct ltdb_private *ltdb;
1411
1412         /* parse the url */
1413         if (strchr(url, ':')) {
1414                 if (strncmp(url, "tdb://", 6) != 0) {
1415                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1416                                   "Invalid tdb URL '%s'", url);
1417                         return LDB_ERR_OPERATIONS_ERROR;
1418                 }
1419                 path = url+6;
1420         } else {
1421                 path = url;
1422         }
1423
1424         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1425
1426         /* check for the 'nosync' option */
1427         if (flags & LDB_FLG_NOSYNC) {
1428                 tdb_flags |= TDB_NOSYNC;
1429         }
1430
1431         /* and nommap option */
1432         if (flags & LDB_FLG_NOMMAP) {
1433                 tdb_flags |= TDB_NOMMAP;
1434         }
1435
1436         if (flags & LDB_FLG_RDONLY) {
1437                 open_flags = O_RDONLY;
1438         } else {
1439                 open_flags = O_CREAT | O_RDWR;
1440         }
1441
1442         ltdb = talloc_zero(ldb, struct ltdb_private);
1443         if (!ltdb) {
1444                 ldb_oom(ldb);
1445                 return LDB_ERR_OPERATIONS_ERROR;
1446         }
1447
1448         /* note that we use quite a large default hash size */
1449         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1450                                    tdb_flags, open_flags,
1451                                    ldb_get_create_perms(ldb), ldb);
1452         if (!ltdb->tdb) {
1453                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1454                           "Unable to open tdb '%s'", path);
1455                 talloc_free(ltdb);
1456                 return LDB_ERR_OPERATIONS_ERROR;
1457         }
1458
1459         ltdb->sequence_number = 0;
1460
1461         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1462         if (!module) {
1463                 talloc_free(ltdb);
1464                 return LDB_ERR_OPERATIONS_ERROR;
1465         }
1466         ldb_module_set_private(module, ltdb);
1467         talloc_steal(module, ltdb);
1468
1469         if (ltdb_cache_load(module) != 0) {
1470                 talloc_free(module);
1471                 talloc_free(ltdb);
1472                 return LDB_ERR_OPERATIONS_ERROR;
1473         }
1474
1475         *_module = module;
1476         return LDB_SUCCESS;
1477 }
1478
1479 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1480         .name = "tdb",
1481         .connect_fn = ltdb_connect,
1482 };