r7527: - added a ldb_search_bytree() interface, which takes a ldb_parse_tree
[kai/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    
7
8      ** NOTE! The following LGPL license applies to the ldb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  */
37
38 #include "includes.h"
39 #include "ldb/include/ldb.h"
40 #include "ldb/include/ldb_private.h"
41 #include "ldb/ldb_tdb/ldb_tdb.h"
42
43 #define LDBLOCK "INT_LDBLOCK"
44
45
46 /*
47   callback function used in call to ldb_dn_fold() for determining whether an
48   attribute type requires case folding.
49 */
50 static int ltdb_case_fold_attr_required(struct ldb_module *module, char *attr)
51 {
52     return ltdb_attribute_flags(module, attr) & LTDB_FLAG_CASE_INSENSITIVE;
53 }
54
55 /*
56   form a TDB_DATA for a record key
57   caller frees
58
59   note that the key for a record can depend on whether the 
60   dn refers to a case sensitive index record or not
61 */
62 struct TDB_DATA ltdb_key(struct ldb_module *module, const char *dn)
63 {
64         struct ldb_context *ldb = module->ldb;
65         TDB_DATA key;
66         char *key_str = NULL;
67         char *dn_folded = NULL;
68         const char *prefix = LTDB_INDEX ":";
69         const char *s;
70         int flags;
71
72         /*
73           most DNs are case insensitive. The exception is index DNs for
74           case sensitive attributes
75
76           there are 3 cases dealt with in this code:
77
78           1) if the dn doesn't start with @INDEX: then uppercase the attribute
79              names and the attributes values of case insensitive attributes
80           2) if the dn starts with @INDEX:attr and 'attr' is a case insensitive
81              attribute then uppercase whole dn
82           3) if the dn starts with @INDEX:attr and 'attr' is a case sensitive
83              attribute then uppercase up to the value of the attribute, but 
84              not the value itself
85         */
86         if (strncmp(dn, prefix, strlen(prefix)) == 0 &&
87             (s = strchr(dn+strlen(prefix), ':'))) {
88                 char *attr_name, *attr_name_folded;
89                 attr_name = talloc_strndup(ldb, dn+strlen(prefix), (s-(dn+strlen(prefix))));
90                 if (!attr_name) {
91                         goto failed;
92                 }
93                 flags = ltdb_attribute_flags(module, attr_name);
94                 
95                 if (flags & LTDB_FLAG_CASE_INSENSITIVE) {
96                         dn_folded = ldb_casefold(ldb, dn);
97                 } else {
98                         attr_name_folded = ldb_casefold(ldb, attr_name);
99                         if (!attr_name_folded) {
100                                 goto failed;
101                         }
102                         dn_folded = talloc_asprintf(ldb, "%s:%s:%s",
103                                                     prefix, attr_name_folded,
104                                                     s+1);
105                         talloc_free(attr_name_folded);
106                 }
107                 talloc_free(attr_name);
108         } else {
109                 dn_folded = ldb_dn_fold(module, dn, ltdb_case_fold_attr_required);
110         }
111
112         if (!dn_folded) {
113                 goto failed;
114         }
115
116         key_str = talloc_asprintf(ldb, "DN=%s", dn_folded);
117         talloc_free(dn_folded);
118
119         if (!key_str) {
120                 goto failed;
121         }
122
123         key.dptr = key_str;
124         key.dsize = strlen(key_str)+1;
125
126         return key;
127
128 failed:
129         errno = ENOMEM;
130         key.dptr = NULL;
131         key.dsize = 0;
132         return key;
133 }
134
135 /*
136   lock the database for write - currently a single lock is used
137 */
138 static int ltdb_lock(struct ldb_module *module, const char *lockname)
139 {
140         struct ltdb_private *ltdb = module->private_data;
141         TDB_DATA key;
142         int ret;
143
144         if (lockname == NULL) {
145                 return -1;
146         }
147
148         key = ltdb_key(module, lockname);
149         if (!key.dptr) {
150                 return -1;
151         }
152
153         ret = tdb_chainlock(ltdb->tdb, key);
154
155         talloc_free(key.dptr);
156
157         return ret;
158 }
159
160 /*
161   unlock the database after a ltdb_lock()
162 */
163 static int ltdb_unlock(struct ldb_module *module, const char *lockname)
164 {
165         struct ltdb_private *ltdb = module->private_data;
166         TDB_DATA key;
167
168         if (lockname == NULL) {
169                 return -1;
170         }
171
172         key = ltdb_key(module, lockname);
173         if (!key.dptr) {
174                 return -1;
175         }
176
177         tdb_chainunlock(ltdb->tdb, key);
178
179         talloc_free(key.dptr);
180
181         return 0;
182 }
183
184
185 /*
186   lock the database for read - use by ltdb_search
187 */
188 int ltdb_lock_read(struct ldb_module *module)
189 {
190         struct ltdb_private *ltdb = module->private_data;
191         TDB_DATA key;
192         int ret;
193         key = ltdb_key(module, LDBLOCK);
194         if (!key.dptr) {
195                 return -1;
196         }
197         ret = tdb_chainlock_read(ltdb->tdb, key);
198         talloc_free(key.dptr);
199         return ret;
200 }
201
202 /*
203   unlock the database after a ltdb_lock_read()
204 */
205 int ltdb_unlock_read(struct ldb_module *module)
206 {
207         struct ltdb_private *ltdb = module->private_data;
208         TDB_DATA key;
209         key = ltdb_key(module, LDBLOCK);
210         if (!key.dptr) {
211                 return -1;
212         }
213         tdb_chainunlock_read(ltdb->tdb, key);
214         talloc_free(key.dptr);
215         return 0;
216 }
217
218 /*
219   check special dn's have valid attributes
220   currently only @ATTRIBUTES is checked
221 */
222 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
223 {
224         struct ltdb_private *ltdb = module->private_data;
225         int i, j;
226
227         if (strcmp(msg->dn, LTDB_ATTRIBUTES) != 0) {
228                 return 0;
229         }
230
231         /* we have @ATTRIBUTES, let's check attributes are fine */
232         /* should we check that we deny multivalued attributes ? */
233         for (i = 0; i < msg->num_elements; i++) {
234                 for (j = 0; j < msg->elements[i].num_values; j++) {
235                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
236                                 ltdb->last_err_string = "Invalid attribute value in an @ATTRIBUTES entry";
237                                 return -1;
238                         }
239                 }
240         }
241
242         return 0;
243 }
244
245
246 /*
247   we've made a modification to a dn - possibly reindex and 
248   update sequence number
249 */
250 static int ltdb_modified(struct ldb_module *module, const char *dn)
251 {
252         int ret = 0;
253
254         if (strcmp(dn, LTDB_INDEXLIST) == 0 ||
255             strcmp(dn, LTDB_ATTRIBUTES) == 0) {
256                 ret = ltdb_reindex(module);
257         }
258
259         if (ret == 0 &&
260             strcmp(dn, LTDB_BASEINFO) != 0) {
261                 ret = ltdb_increase_sequence_number(module);
262         }
263
264         return ret;
265 }
266
267 /*
268   store a record into the db
269 */
270 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
271 {
272         struct ltdb_private *ltdb = module->private_data;
273         TDB_DATA tdb_key, tdb_data;
274         int ret;
275
276         tdb_key = ltdb_key(module, msg->dn);
277         if (!tdb_key.dptr) {
278                 return -1;
279         }
280
281         ret = ltdb_pack_data(module, msg, &tdb_data);
282         if (ret == -1) {
283                 talloc_free(tdb_key.dptr);
284                 return -1;
285         }
286
287         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
288         if (ret == -1) {
289                 goto done;
290         }
291         
292         ret = ltdb_index_add(module, msg);
293         if (ret == -1) {
294                 tdb_delete(ltdb->tdb, tdb_key);
295         }
296
297 done:
298         talloc_free(tdb_key.dptr);
299         talloc_free(tdb_data.dptr);
300
301         return ret;
302 }
303
304
305 /*
306   add a record to the database
307 */
308 static int ltdb_add(struct ldb_module *module, const struct ldb_message *msg)
309 {
310         struct ltdb_private *ltdb = module->private_data;
311         int ret;
312
313         ltdb->last_err_string = NULL;
314
315         ret = ltdb_check_special_dn(module, msg);
316         if (ret != 0) {
317                 return ret;
318         }
319         
320         if (ltdb_lock(module, LDBLOCK) != 0) {
321                 return -1;
322         }
323
324         if (ltdb_cache_load(module) != 0) {
325                 ltdb_unlock(module, LDBLOCK);
326                 return -1;
327         }
328
329         ret = ltdb_store(module, msg, TDB_INSERT);
330
331         if (ret == 0) {
332                 ltdb_modified(module, msg->dn);
333         }
334
335         ltdb_unlock(module, LDBLOCK);
336         return ret;
337 }
338
339
340 /*
341   delete a record from the database, not updating indexes (used for deleting
342   index records)
343 */
344 int ltdb_delete_noindex(struct ldb_module *module, const char *dn)
345 {
346         struct ltdb_private *ltdb = module->private_data;
347         TDB_DATA tdb_key;
348         int ret;
349
350         tdb_key = ltdb_key(module, dn);
351         if (!tdb_key.dptr) {
352                 return -1;
353         }
354
355         ret = tdb_delete(ltdb->tdb, tdb_key);
356         talloc_free(tdb_key.dptr);
357
358         return ret;
359 }
360
361 /*
362   delete a record from the database
363 */
364 static int ltdb_delete(struct ldb_module *module, const char *dn)
365 {
366         struct ltdb_private *ltdb = module->private_data;
367         int ret;
368         struct ldb_message *msg = NULL;
369
370         ltdb->last_err_string = NULL;
371
372         if (ltdb_lock(module, LDBLOCK) != 0) {
373                 return -1;
374         }
375
376         if (ltdb_cache_load(module) != 0) {
377                 goto failed;
378         }
379
380         msg = talloc(module, struct ldb_message);
381         if (msg == NULL) {
382                 goto failed;
383         }
384
385         /* in case any attribute of the message was indexed, we need
386            to fetch the old record */
387         ret = ltdb_search_dn1(module, dn, msg);
388         if (ret != 1) {
389                 /* not finding the old record is an error */
390                 goto failed;
391         }
392
393         ret = ltdb_delete_noindex(module, dn);
394         if (ret == -1) {
395                 goto failed;
396         }
397
398         /* remove any indexed attributes */
399         ret = ltdb_index_del(module, msg);
400
401         if (ret == 0) {
402                 ltdb_modified(module, dn);
403         }
404
405         talloc_free(msg);
406         ltdb_unlock(module, LDBLOCK);
407         return ret;
408
409 failed:
410         talloc_free(msg);
411         ltdb_unlock(module, LDBLOCK);
412         return -1;
413 }
414
415
416 /*
417   find an element by attribute name. At the moment this does a linear search, it should
418   be re-coded to use a binary search once all places that modify records guarantee
419   sorted order
420
421   return the index of the first matching element if found, otherwise -1
422 */
423 static int find_element(const struct ldb_message *msg, const char *name)
424 {
425         unsigned int i;
426         for (i=0;i<msg->num_elements;i++) {
427                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
428                         return i;
429                 }
430         }
431         return -1;
432 }
433
434
435 /*
436   add an element to an existing record. Assumes a elements array that we
437   can call re-alloc on, and assumed that we can re-use the data pointers from the 
438   passed in additional values. Use with care!
439
440   returns 0 on success, -1 on failure (and sets errno)
441 */
442 static int msg_add_element(struct ldb_context *ldb,
443                            struct ldb_message *msg, struct ldb_message_element *el)
444 {
445         struct ldb_message_element *e2;
446         unsigned int i;
447
448         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
449                               msg->num_elements+1);
450         if (!e2) {
451                 errno = ENOMEM;
452                 return -1;
453         }
454
455         msg->elements = e2;
456
457         e2 = &msg->elements[msg->num_elements];
458
459         e2->name = el->name;
460         e2->flags = el->flags;
461         e2->values = NULL;
462         if (el->num_values != 0) {
463                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
464                 if (!e2->values) {
465                         errno = ENOMEM;
466                         return -1;
467                 }
468         }
469         for (i=0;i<el->num_values;i++) {
470                 e2->values[i] = el->values[i];
471         }
472         e2->num_values = el->num_values;
473
474         msg->num_elements++;
475
476         return 0;
477 }
478
479 /*
480   delete all elements having a specified attribute name
481 */
482 static int msg_delete_attribute(struct ldb_module *module,
483                                 struct ldb_context *ldb,
484                                 struct ldb_message *msg, const char *name)
485 {
486         unsigned int i, j;
487
488         for (i=0;i<msg->num_elements;i++) {
489                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
490                         for (j=0;j<msg->elements[i].num_values;j++) {
491                                 ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
492                         }
493                         talloc_free(msg->elements[i].values);
494                         if (msg->num_elements > (i+1)) {
495                                 memmove(&msg->elements[i], 
496                                         &msg->elements[i+1], 
497                                         sizeof(struct ldb_message_element)*
498                                         (msg->num_elements - (i+1)));
499                         }
500                         msg->num_elements--;
501                         i--;
502                         msg->elements = talloc_realloc(msg, msg->elements, 
503                                                          struct ldb_message_element, 
504                                                          msg->num_elements);
505                 }
506         }
507
508         return 0;
509 }
510
511 /*
512   delete all elements matching an attribute name/value 
513
514   return 0 on success, -1 on failure
515 */
516 static int msg_delete_element(struct ldb_module *module,
517                               struct ldb_message *msg, 
518                               const char *name,
519                               const struct ldb_val *val)
520 {
521         struct ldb_context *ldb = module->ldb;
522         unsigned int i;
523         int found;
524         struct ldb_message_element *el;
525
526         found = find_element(msg, name);
527         if (found == -1) {
528                 return -1;
529         }
530
531         el = &msg->elements[found];
532
533         for (i=0;i<el->num_values;i++) {
534                 if (ltdb_val_equal(module, msg->elements[i].name, &el->values[i], val)) {
535                         if (i<el->num_values-1) {
536                                 memmove(&el->values[i], &el->values[i+1],
537                                         sizeof(el->values[i])*(el->num_values-(i+1)));
538                         }
539                         el->num_values--;
540                         if (el->num_values == 0) {
541                                 return msg_delete_attribute(module, ldb, msg, name);
542                         }
543                         return 0;
544                 }
545         }
546
547         return -1;
548 }
549
550
551 /*
552   modify a record - internal interface
553
554   yuck - this is O(n^2). Luckily n is usually small so we probably
555   get away with it, but if we ever have really large attribute lists 
556   then we'll need to look at this again
557 */
558 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
559 {
560         struct ldb_context *ldb = module->ldb;
561         struct ltdb_private *ltdb = module->private_data;
562         TDB_DATA tdb_key, tdb_data;
563         struct ldb_message *msg2;
564         unsigned i, j;
565         int ret;
566
567         tdb_key = ltdb_key(module, msg->dn);
568         if (!tdb_key.dptr) {
569                 return -1;
570         }
571
572         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
573         if (!tdb_data.dptr) {
574                 talloc_free(tdb_key.dptr);
575                 return -1;
576         }
577
578         msg2 = talloc(tdb_key.dptr, struct ldb_message);
579         if (msg2 == NULL) {
580                 talloc_free(tdb_key.dptr);
581                 return -1;
582         }
583
584         ret = ltdb_unpack_data(module, &tdb_data, msg2);
585         if (ret == -1) {
586                 talloc_free(tdb_key.dptr);
587                 free(tdb_data.dptr);
588                 return -1;
589         }
590
591         if (!msg2->dn) {
592                 msg2->dn = msg->dn;
593         }
594
595         for (i=0;i<msg->num_elements;i++) {
596                 struct ldb_message_element *el = &msg->elements[i];
597                 struct ldb_message_element *el2;
598                 struct ldb_val *vals;
599
600                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
601
602                 case LDB_FLAG_MOD_ADD:
603                         /* add this element to the message. fail if it
604                            already exists */
605                         ret = find_element(msg2, el->name);
606
607                         if (ret == -1) {
608                                 if (msg_add_element(ldb, msg2, el) != 0) {
609                                         goto failed;
610                                 }
611                                 continue;
612                         }
613
614                         el2 = &msg2->elements[ret];
615
616                         /* An attribute with this name already exists, add all
617                          * values if they don't already exist. */
618
619                         for (j=0;j<el->num_values;j++) {
620                                 if (ldb_msg_find_val(el2, &el->values[j])) {
621                                         ltdb->last_err_string =
622                                                 "Type or value exists";
623                                         goto failed;
624                                 }
625                         }
626
627                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
628                                                 el2->num_values + el->num_values);
629
630                         if (vals == NULL)
631                                 goto failed;
632
633                         for (j=0;j<el->num_values;j++) {
634                                 vals[el2->num_values + j] =
635                                         ldb_val_dup(vals, &el->values[j]);
636                         }
637
638                         el2->values = vals;
639                         el2->num_values += el->num_values;
640
641                         break;
642
643                 case LDB_FLAG_MOD_REPLACE:
644                         /* replace all elements of this attribute name with the elements
645                            listed. The attribute not existing is not an error */
646                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
647
648                         /* add the replacement element, if not empty */
649                         if (msg->elements[i].num_values != 0 &&
650                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
651                                 goto failed;
652                         }
653                         break;
654
655                 case LDB_FLAG_MOD_DELETE:
656                         /* we could be being asked to delete all
657                            values or just some values */
658                         if (msg->elements[i].num_values == 0) {
659                                 if (msg_delete_attribute(module, ldb, msg2, 
660                                                          msg->elements[i].name) != 0) {
661                                         ltdb->last_err_string = "No such attribute";
662                                         goto failed;
663                                 }
664                                 break;
665                         }
666                         for (j=0;j<msg->elements[i].num_values;j++) {
667                                 if (msg_delete_element(module,
668                                                        msg2, 
669                                                        msg->elements[i].name,
670                                                        &msg->elements[i].values[j]) != 0) {
671                                         ltdb->last_err_string = "No such attribute";
672                                         goto failed;
673                                 }
674                                 if (ltdb_index_del_value(module, msg->dn, &msg->elements[i], j) != 0) {
675                                         goto failed;
676                                 }
677                         }
678                         break;
679                 }
680         }
681
682         /* we've made all the mods - save the modified record back into the database */
683         ret = ltdb_store(module, msg2, TDB_MODIFY);
684
685         talloc_free(tdb_key.dptr);
686         free(tdb_data.dptr);
687         return ret;
688
689 failed:
690         talloc_free(tdb_key.dptr);
691         free(tdb_data.dptr);
692         return -1;
693 }
694
695 /*
696   modify a record
697 */
698 static int ltdb_modify(struct ldb_module *module, const struct ldb_message *msg)
699 {
700         struct ltdb_private *ltdb = module->private_data;
701         int ret;
702
703         ltdb->last_err_string = NULL;
704
705         ret = ltdb_check_special_dn(module, msg);
706         if (ret != 0) {
707                 return ret;
708         }
709         
710         if (ltdb_lock(module, LDBLOCK) != 0) {
711                 return -1;
712         }
713
714         if (ltdb_cache_load(module) != 0) {
715                 ltdb_unlock(module, LDBLOCK);
716                 return -1;
717         }
718
719         ret = ltdb_modify_internal(module, msg);
720
721         if (ret == 0) {
722                 ltdb_modified(module, msg->dn);
723         }
724
725         ltdb_unlock(module, LDBLOCK);
726
727         return ret;
728 }
729
730 /*
731   rename a record
732 */
733 static int ltdb_rename(struct ldb_module *module, const char *olddn, const char *newdn)
734 {
735         struct ltdb_private *ltdb = module->private_data;
736         int ret;
737         struct ldb_message *msg;
738         const char *error_str;
739
740         ltdb->last_err_string = NULL;
741
742         if (ltdb_lock(module, LDBLOCK) != 0) {
743                 return -1;
744         }
745
746         msg = talloc(module, struct ldb_message);
747         if (msg == NULL) {
748                 goto failed;
749         }
750
751         /* in case any attribute of the message was indexed, we need
752            to fetch the old record */
753         ret = ltdb_search_dn1(module, olddn, msg);
754         if (ret != 1) {
755                 /* not finding the old record is an error */
756                 goto failed;
757         }
758
759         msg->dn = talloc_strdup(msg, newdn);
760         if (!msg->dn) {
761                 goto failed;
762         }
763
764         ret = ltdb_add(module, msg);
765         if (ret == -1) {
766                 goto failed;
767         }
768
769         ret = ltdb_delete(module, olddn);
770         error_str = ltdb->last_err_string;
771         if (ret == -1) {
772                 ltdb_delete(module, newdn);
773         }
774
775         ltdb->last_err_string = error_str;
776
777         talloc_free(msg);
778         ltdb_unlock(module, LDBLOCK);
779
780         return ret;
781
782 failed:
783         talloc_free(msg);
784         ltdb_unlock(module, LDBLOCK);
785         return -1;
786 }
787
788
789 /*
790   return extended error information
791 */
792 static const char *ltdb_errstring(struct ldb_module *module)
793 {
794         struct ltdb_private *ltdb = module->private_data;
795         if (ltdb->last_err_string) {
796                 return ltdb->last_err_string;
797         }
798         return tdb_errorstr(ltdb->tdb);
799 }
800
801
802 static const struct ldb_module_ops ltdb_ops = {
803         .name          = "tdb",
804         .search        = ltdb_search,
805         .search_bytree = ltdb_search_bytree,
806         .add_record    = ltdb_add,
807         .modify_record = ltdb_modify,
808         .delete_record = ltdb_delete,
809         .rename_record = ltdb_rename,
810         .named_lock    = ltdb_lock,
811         .named_unlock  = ltdb_unlock,
812         .errstring     = ltdb_errstring
813 };
814
815
816 /*
817   destroy the ltdb context
818 */
819 static int ltdb_destructor(void *p)
820 {
821         struct ltdb_private *ltdb = p;
822         tdb_close(ltdb->tdb);
823         return 0;
824 }
825
826 /*
827   connect to the database
828 */
829 struct ldb_context *ltdb_connect(const char *url, 
830                                  unsigned int flags, 
831                                  const char *options[])
832 {
833         const char *path;
834         int tdb_flags, open_flags;
835         struct ltdb_private *ltdb;
836         TDB_CONTEXT *tdb;
837         struct ldb_context *ldb;
838
839         ldb = talloc_zero(NULL, struct ldb_context);
840         if (!ldb) {
841                 errno = ENOMEM;
842                 return NULL;
843         }
844
845         /* parse the url */
846         if (strchr(url, ':')) {
847                 if (strncmp(url, "tdb://", 6) != 0) {
848                         errno = EINVAL;
849                         talloc_free(ldb);
850                         return NULL;
851                 }
852                 path = url+6;
853         } else {
854                 path = url;
855         }
856
857         tdb_flags = TDB_DEFAULT;
858
859         if (flags & LDB_FLG_RDONLY) {
860                 open_flags = O_RDONLY;
861         } else {
862                 open_flags = O_CREAT | O_RDWR;
863         }
864
865         /* note that we use quite a large default hash size */
866         tdb = tdb_open(path, 10000, tdb_flags, open_flags, 0666);
867         if (!tdb) {
868                 talloc_free(ldb);
869                 return NULL;
870         }
871
872         ltdb = talloc_zero(ldb, struct ltdb_private);
873         if (!ltdb) {
874                 tdb_close(tdb);
875                 talloc_free(ldb);
876                 errno = ENOMEM;
877                 return NULL;
878         }
879
880         ltdb->tdb = tdb;
881         ltdb->sequence_number = 0;
882
883         talloc_set_destructor(ltdb, ltdb_destructor);
884
885         ldb->modules = talloc(ldb, struct ldb_module);
886         if (!ldb->modules) {
887                 talloc_free(ldb);
888                 errno = ENOMEM;
889                 return NULL;
890         }
891         ldb->modules->ldb = ldb;
892         ldb->modules->prev = ldb->modules->next = NULL;
893         ldb->modules->private_data = ltdb;
894         ldb->modules->ops = &ltdb_ops;
895
896         return ldb;
897 }