r7601: ldb_sqlite3 work in progress
[jra/samba/.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    
7
8      ** NOTE! The following LGPL license applies to the ldb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  */
37
38 #include "includes.h"
39 #include "ldb/include/ldb.h"
40 #include "ldb/include/ldb_private.h"
41 #include "ldb/ldb_tdb/ldb_tdb.h"
42
43 #define LDBLOCK "INT_LDBLOCK"
44
45
46 /*
47   callback function used in call to ldb_dn_fold() for determining whether an
48   attribute type requires case folding.
49 */
50 static int ltdb_case_fold_attr_required(void * user_data, char *attr)
51 {
52     struct ldb_module *module = user_data;
53
54     return ltdb_attribute_flags(module, attr) & LTDB_FLAG_CASE_INSENSITIVE;
55 }
56
57 /*
58   form a TDB_DATA for a record key
59   caller frees
60
61   note that the key for a record can depend on whether the 
62   dn refers to a case sensitive index record or not
63 */
64 struct TDB_DATA ltdb_key(struct ldb_module *module, const char *dn)
65 {
66         struct ldb_context *ldb = module->ldb;
67         TDB_DATA key;
68         char *key_str = NULL;
69         char *dn_folded = NULL;
70         const char *prefix = LTDB_INDEX ":";
71         const char *s;
72         int flags;
73
74         /*
75           most DNs are case insensitive. The exception is index DNs for
76           case sensitive attributes
77
78           there are 3 cases dealt with in this code:
79
80           1) if the dn doesn't start with @INDEX: then uppercase the attribute
81              names and the attributes values of case insensitive attributes
82           2) if the dn starts with @INDEX:attr and 'attr' is a case insensitive
83              attribute then uppercase whole dn
84           3) if the dn starts with @INDEX:attr and 'attr' is a case sensitive
85              attribute then uppercase up to the value of the attribute, but 
86              not the value itself
87         */
88         if (strncmp(dn, prefix, strlen(prefix)) == 0 &&
89             (s = strchr(dn+strlen(prefix), ':'))) {
90                 char *attr_name, *attr_name_folded;
91                 attr_name = talloc_strndup(ldb, dn+strlen(prefix), (s-(dn+strlen(prefix))));
92                 if (!attr_name) {
93                         goto failed;
94                 }
95                 flags = ltdb_attribute_flags(module, attr_name);
96                 
97                 if (flags & LTDB_FLAG_CASE_INSENSITIVE) {
98                         dn_folded = ldb_casefold(ldb, dn);
99                 } else {
100                         attr_name_folded = ldb_casefold(ldb, attr_name);
101                         if (!attr_name_folded) {
102                                 goto failed;
103                         }
104                         dn_folded = talloc_asprintf(ldb, "%s:%s:%s",
105                                                     prefix, attr_name_folded,
106                                                     s+1);
107                         talloc_free(attr_name_folded);
108                 }
109                 talloc_free(attr_name);
110         } else {
111                 dn_folded = ldb_dn_fold(module->ldb, dn,
112                                         module, ltdb_case_fold_attr_required);
113         }
114
115         if (!dn_folded) {
116                 goto failed;
117         }
118
119         key_str = talloc_asprintf(ldb, "DN=%s", dn_folded);
120         talloc_free(dn_folded);
121
122         if (!key_str) {
123                 goto failed;
124         }
125
126         key.dptr = key_str;
127         key.dsize = strlen(key_str)+1;
128
129         return key;
130
131 failed:
132         errno = ENOMEM;
133         key.dptr = NULL;
134         key.dsize = 0;
135         return key;
136 }
137
138 /*
139   lock the database for write - currently a single lock is used
140 */
141 static int ltdb_lock(struct ldb_module *module, const char *lockname)
142 {
143         struct ltdb_private *ltdb = module->private_data;
144         TDB_DATA key;
145         int ret;
146
147         if (lockname == NULL) {
148                 return -1;
149         }
150
151         key = ltdb_key(module, lockname);
152         if (!key.dptr) {
153                 return -1;
154         }
155
156         ret = tdb_chainlock(ltdb->tdb, key);
157
158         talloc_free(key.dptr);
159
160         return ret;
161 }
162
163 /*
164   unlock the database after a ltdb_lock()
165 */
166 static int ltdb_unlock(struct ldb_module *module, const char *lockname)
167 {
168         struct ltdb_private *ltdb = module->private_data;
169         TDB_DATA key;
170
171         if (lockname == NULL) {
172                 return -1;
173         }
174
175         key = ltdb_key(module, lockname);
176         if (!key.dptr) {
177                 return -1;
178         }
179
180         tdb_chainunlock(ltdb->tdb, key);
181
182         talloc_free(key.dptr);
183
184         return 0;
185 }
186
187
188 /*
189   lock the database for read - use by ltdb_search
190 */
191 int ltdb_lock_read(struct ldb_module *module)
192 {
193         struct ltdb_private *ltdb = module->private_data;
194         TDB_DATA key;
195         int ret;
196         key = ltdb_key(module, LDBLOCK);
197         if (!key.dptr) {
198                 return -1;
199         }
200         ret = tdb_chainlock_read(ltdb->tdb, key);
201         talloc_free(key.dptr);
202         return ret;
203 }
204
205 /*
206   unlock the database after a ltdb_lock_read()
207 */
208 int ltdb_unlock_read(struct ldb_module *module)
209 {
210         struct ltdb_private *ltdb = module->private_data;
211         TDB_DATA key;
212         key = ltdb_key(module, LDBLOCK);
213         if (!key.dptr) {
214                 return -1;
215         }
216         tdb_chainunlock_read(ltdb->tdb, key);
217         talloc_free(key.dptr);
218         return 0;
219 }
220
221 /*
222   check special dn's have valid attributes
223   currently only @ATTRIBUTES is checked
224 */
225 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
226 {
227         struct ltdb_private *ltdb = module->private_data;
228         int i, j;
229
230         if (strcmp(msg->dn, LTDB_ATTRIBUTES) != 0) {
231                 return 0;
232         }
233
234         /* we have @ATTRIBUTES, let's check attributes are fine */
235         /* should we check that we deny multivalued attributes ? */
236         for (i = 0; i < msg->num_elements; i++) {
237                 for (j = 0; j < msg->elements[i].num_values; j++) {
238                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
239                                 ltdb->last_err_string = "Invalid attribute value in an @ATTRIBUTES entry";
240                                 return -1;
241                         }
242                 }
243         }
244
245         return 0;
246 }
247
248
249 /*
250   we've made a modification to a dn - possibly reindex and 
251   update sequence number
252 */
253 static int ltdb_modified(struct ldb_module *module, const char *dn)
254 {
255         int ret = 0;
256
257         if (strcmp(dn, LTDB_INDEXLIST) == 0 ||
258             strcmp(dn, LTDB_ATTRIBUTES) == 0) {
259                 ret = ltdb_reindex(module);
260         }
261
262         if (ret == 0 &&
263             strcmp(dn, LTDB_BASEINFO) != 0) {
264                 ret = ltdb_increase_sequence_number(module);
265         }
266
267         return ret;
268 }
269
270 /*
271   store a record into the db
272 */
273 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
274 {
275         struct ltdb_private *ltdb = module->private_data;
276         TDB_DATA tdb_key, tdb_data;
277         int ret;
278
279         tdb_key = ltdb_key(module, msg->dn);
280         if (!tdb_key.dptr) {
281                 return -1;
282         }
283
284         ret = ltdb_pack_data(module, msg, &tdb_data);
285         if (ret == -1) {
286                 talloc_free(tdb_key.dptr);
287                 return -1;
288         }
289
290         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
291         if (ret == -1) {
292                 goto done;
293         }
294         
295         ret = ltdb_index_add(module, msg);
296         if (ret == -1) {
297                 tdb_delete(ltdb->tdb, tdb_key);
298         }
299
300 done:
301         talloc_free(tdb_key.dptr);
302         talloc_free(tdb_data.dptr);
303
304         return ret;
305 }
306
307
308 /*
309   add a record to the database
310 */
311 static int ltdb_add(struct ldb_module *module, const struct ldb_message *msg)
312 {
313         struct ltdb_private *ltdb = module->private_data;
314         int ret;
315
316         ltdb->last_err_string = NULL;
317
318         ret = ltdb_check_special_dn(module, msg);
319         if (ret != 0) {
320                 return ret;
321         }
322         
323         if (ltdb_lock(module, LDBLOCK) != 0) {
324                 return -1;
325         }
326
327         if (ltdb_cache_load(module) != 0) {
328                 ltdb_unlock(module, LDBLOCK);
329                 return -1;
330         }
331
332         ret = ltdb_store(module, msg, TDB_INSERT);
333
334         if (ret == 0) {
335                 ltdb_modified(module, msg->dn);
336         }
337
338         ltdb_unlock(module, LDBLOCK);
339         return ret;
340 }
341
342
343 /*
344   delete a record from the database, not updating indexes (used for deleting
345   index records)
346 */
347 int ltdb_delete_noindex(struct ldb_module *module, const char *dn)
348 {
349         struct ltdb_private *ltdb = module->private_data;
350         TDB_DATA tdb_key;
351         int ret;
352
353         tdb_key = ltdb_key(module, dn);
354         if (!tdb_key.dptr) {
355                 return -1;
356         }
357
358         ret = tdb_delete(ltdb->tdb, tdb_key);
359         talloc_free(tdb_key.dptr);
360
361         return ret;
362 }
363
364 /*
365   delete a record from the database
366 */
367 static int ltdb_delete(struct ldb_module *module, const char *dn)
368 {
369         struct ltdb_private *ltdb = module->private_data;
370         int ret;
371         struct ldb_message *msg = NULL;
372
373         ltdb->last_err_string = NULL;
374
375         if (ltdb_lock(module, LDBLOCK) != 0) {
376                 return -1;
377         }
378
379         if (ltdb_cache_load(module) != 0) {
380                 goto failed;
381         }
382
383         msg = talloc(module, struct ldb_message);
384         if (msg == NULL) {
385                 goto failed;
386         }
387
388         /* in case any attribute of the message was indexed, we need
389            to fetch the old record */
390         ret = ltdb_search_dn1(module, dn, msg);
391         if (ret != 1) {
392                 /* not finding the old record is an error */
393                 goto failed;
394         }
395
396         ret = ltdb_delete_noindex(module, dn);
397         if (ret == -1) {
398                 goto failed;
399         }
400
401         /* remove any indexed attributes */
402         ret = ltdb_index_del(module, msg);
403
404         if (ret == 0) {
405                 ltdb_modified(module, dn);
406         }
407
408         talloc_free(msg);
409         ltdb_unlock(module, LDBLOCK);
410         return ret;
411
412 failed:
413         talloc_free(msg);
414         ltdb_unlock(module, LDBLOCK);
415         return -1;
416 }
417
418
419 /*
420   find an element by attribute name. At the moment this does a linear search, it should
421   be re-coded to use a binary search once all places that modify records guarantee
422   sorted order
423
424   return the index of the first matching element if found, otherwise -1
425 */
426 static int find_element(const struct ldb_message *msg, const char *name)
427 {
428         unsigned int i;
429         for (i=0;i<msg->num_elements;i++) {
430                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
431                         return i;
432                 }
433         }
434         return -1;
435 }
436
437
438 /*
439   add an element to an existing record. Assumes a elements array that we
440   can call re-alloc on, and assumed that we can re-use the data pointers from the 
441   passed in additional values. Use with care!
442
443   returns 0 on success, -1 on failure (and sets errno)
444 */
445 static int msg_add_element(struct ldb_context *ldb,
446                            struct ldb_message *msg, struct ldb_message_element *el)
447 {
448         struct ldb_message_element *e2;
449         unsigned int i;
450
451         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
452                               msg->num_elements+1);
453         if (!e2) {
454                 errno = ENOMEM;
455                 return -1;
456         }
457
458         msg->elements = e2;
459
460         e2 = &msg->elements[msg->num_elements];
461
462         e2->name = el->name;
463         e2->flags = el->flags;
464         e2->values = NULL;
465         if (el->num_values != 0) {
466                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
467                 if (!e2->values) {
468                         errno = ENOMEM;
469                         return -1;
470                 }
471         }
472         for (i=0;i<el->num_values;i++) {
473                 e2->values[i] = el->values[i];
474         }
475         e2->num_values = el->num_values;
476
477         msg->num_elements++;
478
479         return 0;
480 }
481
482 /*
483   delete all elements having a specified attribute name
484 */
485 static int msg_delete_attribute(struct ldb_module *module,
486                                 struct ldb_context *ldb,
487                                 struct ldb_message *msg, const char *name)
488 {
489         unsigned int i, j;
490
491         for (i=0;i<msg->num_elements;i++) {
492                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
493                         for (j=0;j<msg->elements[i].num_values;j++) {
494                                 ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
495                         }
496                         talloc_free(msg->elements[i].values);
497                         if (msg->num_elements > (i+1)) {
498                                 memmove(&msg->elements[i], 
499                                         &msg->elements[i+1], 
500                                         sizeof(struct ldb_message_element)*
501                                         (msg->num_elements - (i+1)));
502                         }
503                         msg->num_elements--;
504                         i--;
505                         msg->elements = talloc_realloc(msg, msg->elements, 
506                                                          struct ldb_message_element, 
507                                                          msg->num_elements);
508                 }
509         }
510
511         return 0;
512 }
513
514 /*
515   delete all elements matching an attribute name/value 
516
517   return 0 on success, -1 on failure
518 */
519 static int msg_delete_element(struct ldb_module *module,
520                               struct ldb_message *msg, 
521                               const char *name,
522                               const struct ldb_val *val)
523 {
524         struct ldb_context *ldb = module->ldb;
525         unsigned int i;
526         int found;
527         struct ldb_message_element *el;
528
529         found = find_element(msg, name);
530         if (found == -1) {
531                 return -1;
532         }
533
534         el = &msg->elements[found];
535
536         for (i=0;i<el->num_values;i++) {
537                 if (ltdb_val_equal(module, msg->elements[i].name, &el->values[i], val)) {
538                         if (i<el->num_values-1) {
539                                 memmove(&el->values[i], &el->values[i+1],
540                                         sizeof(el->values[i])*(el->num_values-(i+1)));
541                         }
542                         el->num_values--;
543                         if (el->num_values == 0) {
544                                 return msg_delete_attribute(module, ldb, msg, name);
545                         }
546                         return 0;
547                 }
548         }
549
550         return -1;
551 }
552
553
554 /*
555   modify a record - internal interface
556
557   yuck - this is O(n^2). Luckily n is usually small so we probably
558   get away with it, but if we ever have really large attribute lists 
559   then we'll need to look at this again
560 */
561 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
562 {
563         struct ldb_context *ldb = module->ldb;
564         struct ltdb_private *ltdb = module->private_data;
565         TDB_DATA tdb_key, tdb_data;
566         struct ldb_message *msg2;
567         unsigned i, j;
568         int ret;
569
570         tdb_key = ltdb_key(module, msg->dn);
571         if (!tdb_key.dptr) {
572                 return -1;
573         }
574
575         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
576         if (!tdb_data.dptr) {
577                 talloc_free(tdb_key.dptr);
578                 return -1;
579         }
580
581         msg2 = talloc(tdb_key.dptr, struct ldb_message);
582         if (msg2 == NULL) {
583                 talloc_free(tdb_key.dptr);
584                 return -1;
585         }
586
587         ret = ltdb_unpack_data(module, &tdb_data, msg2);
588         if (ret == -1) {
589                 talloc_free(tdb_key.dptr);
590                 free(tdb_data.dptr);
591                 return -1;
592         }
593
594         if (!msg2->dn) {
595                 msg2->dn = msg->dn;
596         }
597
598         for (i=0;i<msg->num_elements;i++) {
599                 struct ldb_message_element *el = &msg->elements[i];
600                 struct ldb_message_element *el2;
601                 struct ldb_val *vals;
602
603                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
604
605                 case LDB_FLAG_MOD_ADD:
606                         /* add this element to the message. fail if it
607                            already exists */
608                         ret = find_element(msg2, el->name);
609
610                         if (ret == -1) {
611                                 if (msg_add_element(ldb, msg2, el) != 0) {
612                                         goto failed;
613                                 }
614                                 continue;
615                         }
616
617                         el2 = &msg2->elements[ret];
618
619                         /* An attribute with this name already exists, add all
620                          * values if they don't already exist. */
621
622                         for (j=0;j<el->num_values;j++) {
623                                 if (ldb_msg_find_val(el2, &el->values[j])) {
624                                         ltdb->last_err_string =
625                                                 "Type or value exists";
626                                         goto failed;
627                                 }
628                         }
629
630                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
631                                                 el2->num_values + el->num_values);
632
633                         if (vals == NULL)
634                                 goto failed;
635
636                         for (j=0;j<el->num_values;j++) {
637                                 vals[el2->num_values + j] =
638                                         ldb_val_dup(vals, &el->values[j]);
639                         }
640
641                         el2->values = vals;
642                         el2->num_values += el->num_values;
643
644                         break;
645
646                 case LDB_FLAG_MOD_REPLACE:
647                         /* replace all elements of this attribute name with the elements
648                            listed. The attribute not existing is not an error */
649                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
650
651                         /* add the replacement element, if not empty */
652                         if (msg->elements[i].num_values != 0 &&
653                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
654                                 goto failed;
655                         }
656                         break;
657
658                 case LDB_FLAG_MOD_DELETE:
659                         /* we could be being asked to delete all
660                            values or just some values */
661                         if (msg->elements[i].num_values == 0) {
662                                 if (msg_delete_attribute(module, ldb, msg2, 
663                                                          msg->elements[i].name) != 0) {
664                                         ltdb->last_err_string = "No such attribute";
665                                         goto failed;
666                                 }
667                                 break;
668                         }
669                         for (j=0;j<msg->elements[i].num_values;j++) {
670                                 if (msg_delete_element(module,
671                                                        msg2, 
672                                                        msg->elements[i].name,
673                                                        &msg->elements[i].values[j]) != 0) {
674                                         ltdb->last_err_string = "No such attribute";
675                                         goto failed;
676                                 }
677                                 if (ltdb_index_del_value(module, msg->dn, &msg->elements[i], j) != 0) {
678                                         goto failed;
679                                 }
680                         }
681                         break;
682                 }
683         }
684
685         /* we've made all the mods - save the modified record back into the database */
686         ret = ltdb_store(module, msg2, TDB_MODIFY);
687
688         talloc_free(tdb_key.dptr);
689         free(tdb_data.dptr);
690         return ret;
691
692 failed:
693         talloc_free(tdb_key.dptr);
694         free(tdb_data.dptr);
695         return -1;
696 }
697
698 /*
699   modify a record
700 */
701 static int ltdb_modify(struct ldb_module *module, const struct ldb_message *msg)
702 {
703         struct ltdb_private *ltdb = module->private_data;
704         int ret;
705
706         ltdb->last_err_string = NULL;
707
708         ret = ltdb_check_special_dn(module, msg);
709         if (ret != 0) {
710                 return ret;
711         }
712         
713         if (ltdb_lock(module, LDBLOCK) != 0) {
714                 return -1;
715         }
716
717         if (ltdb_cache_load(module) != 0) {
718                 ltdb_unlock(module, LDBLOCK);
719                 return -1;
720         }
721
722         ret = ltdb_modify_internal(module, msg);
723
724         if (ret == 0) {
725                 ltdb_modified(module, msg->dn);
726         }
727
728         ltdb_unlock(module, LDBLOCK);
729
730         return ret;
731 }
732
733 /*
734   rename a record
735 */
736 static int ltdb_rename(struct ldb_module *module, const char *olddn, const char *newdn)
737 {
738         struct ltdb_private *ltdb = module->private_data;
739         int ret;
740         struct ldb_message *msg;
741         const char *error_str;
742
743         ltdb->last_err_string = NULL;
744
745         if (ltdb_lock(module, LDBLOCK) != 0) {
746                 return -1;
747         }
748
749         msg = talloc(module, struct ldb_message);
750         if (msg == NULL) {
751                 goto failed;
752         }
753
754         /* in case any attribute of the message was indexed, we need
755            to fetch the old record */
756         ret = ltdb_search_dn1(module, olddn, msg);
757         if (ret != 1) {
758                 /* not finding the old record is an error */
759                 goto failed;
760         }
761
762         msg->dn = talloc_strdup(msg, newdn);
763         if (!msg->dn) {
764                 goto failed;
765         }
766
767         ret = ltdb_add(module, msg);
768         if (ret == -1) {
769                 goto failed;
770         }
771
772         ret = ltdb_delete(module, olddn);
773         error_str = ltdb->last_err_string;
774         if (ret == -1) {
775                 ltdb_delete(module, newdn);
776         }
777
778         ltdb->last_err_string = error_str;
779
780         talloc_free(msg);
781         ltdb_unlock(module, LDBLOCK);
782
783         return ret;
784
785 failed:
786         talloc_free(msg);
787         ltdb_unlock(module, LDBLOCK);
788         return -1;
789 }
790
791
792 /*
793   return extended error information
794 */
795 static const char *ltdb_errstring(struct ldb_module *module)
796 {
797         struct ltdb_private *ltdb = module->private_data;
798         if (ltdb->last_err_string) {
799                 return ltdb->last_err_string;
800         }
801         return tdb_errorstr(ltdb->tdb);
802 }
803
804
805 static const struct ldb_module_ops ltdb_ops = {
806         .name          = "tdb",
807         .search        = ltdb_search,
808         .search_bytree = ltdb_search_bytree,
809         .add_record    = ltdb_add,
810         .modify_record = ltdb_modify,
811         .delete_record = ltdb_delete,
812         .rename_record = ltdb_rename,
813         .named_lock    = ltdb_lock,
814         .named_unlock  = ltdb_unlock,
815         .errstring     = ltdb_errstring
816 };
817
818
819 /*
820   destroy the ltdb context
821 */
822 static int ltdb_destructor(void *p)
823 {
824         struct ltdb_private *ltdb = p;
825         tdb_close(ltdb->tdb);
826         return 0;
827 }
828
829 /*
830   connect to the database
831 */
832 struct ldb_context *ltdb_connect(const char *url, 
833                                  unsigned int flags, 
834                                  const char *options[])
835 {
836         const char *path;
837         int tdb_flags, open_flags;
838         struct ltdb_private *ltdb;
839         TDB_CONTEXT *tdb;
840         struct ldb_context *ldb;
841
842         ldb = talloc_zero(NULL, struct ldb_context);
843         if (!ldb) {
844                 errno = ENOMEM;
845                 return NULL;
846         }
847
848         /* parse the url */
849         if (strchr(url, ':')) {
850                 if (strncmp(url, "tdb://", 6) != 0) {
851                         errno = EINVAL;
852                         talloc_free(ldb);
853                         return NULL;
854                 }
855                 path = url+6;
856         } else {
857                 path = url;
858         }
859
860         tdb_flags = TDB_DEFAULT;
861
862         if (flags & LDB_FLG_RDONLY) {
863                 open_flags = O_RDONLY;
864         } else {
865                 open_flags = O_CREAT | O_RDWR;
866         }
867
868         /* note that we use quite a large default hash size */
869         tdb = tdb_open(path, 10000, tdb_flags, open_flags, 0666);
870         if (!tdb) {
871                 talloc_free(ldb);
872                 return NULL;
873         }
874
875         ltdb = talloc_zero(ldb, struct ltdb_private);
876         if (!ltdb) {
877                 tdb_close(tdb);
878                 talloc_free(ldb);
879                 errno = ENOMEM;
880                 return NULL;
881         }
882
883         ltdb->tdb = tdb;
884         ltdb->sequence_number = 0;
885
886         talloc_set_destructor(ltdb, ltdb_destructor);
887
888         ldb->modules = talloc(ldb, struct ldb_module);
889         if (!ldb->modules) {
890                 talloc_free(ldb);
891                 errno = ENOMEM;
892                 return NULL;
893         }
894         ldb->modules->ldb = ldb;
895         ldb->modules->prev = ldb->modules->next = NULL;
896         ldb->modules->private_data = ltdb;
897         ldb->modules->ops = &ltdb_ops;
898
899         return ldb;
900 }