r8601: fixed null termination in ltdb connect error
[ira/wip.git] / source / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    
7
8      ** NOTE! The following LGPL license applies to the ldb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  */
37
38 #include "includes.h"
39 #include "ldb/include/ldb.h"
40 #include "ldb/include/ldb_private.h"
41 #include "ldb/ldb_tdb/ldb_tdb.h"
42
43 #define LDBLOCK "@INT_LDBLOCK"
44
45
46 /*
47   form a TDB_DATA for a record key
48   caller frees
49
50   note that the key for a record can depend on whether the 
51   dn refers to a case sensitive index record or not
52 */
53 struct TDB_DATA ltdb_key(struct ldb_module *module, const char *dn)
54 {
55         struct ldb_context *ldb = module->ldb;
56         TDB_DATA key;
57         char *key_str = NULL;
58         char *dn_folded = NULL;
59         struct ldb_dn *edn = NULL;
60         struct ldb_dn *cedn = NULL;
61
62         /*
63           most DNs are case insensitive. The exception is index DNs for
64           case sensitive attributes
65
66           there are 3 cases dealt with in this code:
67
68           1) if the dn doesn't start with @ then uppercase the attribute
69              names and the attributes values of case insensitive attributes
70           2) if the dn starts with @ then leave it alone - the indexing code handles
71              the rest
72         */
73         if (*dn == '@') {
74                 dn_folded = talloc_strdup(ldb, dn);
75         } else {
76                 edn = ldb_dn_explode(ldb, dn);
77                 if (!edn)
78                         goto failed;
79
80                 cedn = ldb_dn_casefold(ldb, edn);
81                 if (!cedn)
82                         goto failed;
83
84                 dn_folded = ldb_dn_linearize(ldb, cedn);
85                 if (!dn_folded)
86                         goto failed;
87
88                 talloc_free(edn);
89                 talloc_free(cedn);
90         }
91
92         key_str = talloc_asprintf(ldb, "DN=%s", dn_folded);
93         talloc_free(dn_folded);
94
95         if (!key_str) {
96                 goto failed;
97         }
98
99         key.dptr = key_str;
100         key.dsize = strlen(key_str) + 1;
101
102         return key;
103
104 failed:
105         talloc_free(edn);
106         talloc_free(cedn);
107         errno = ENOMEM;
108         key.dptr = NULL;
109         key.dsize = 0;
110         return key;
111 }
112
113 /*
114   lock the database for write - currently a single lock is used
115 */
116 static int ltdb_lock(struct ldb_module *module, const char *lockname)
117 {
118         struct ltdb_private *ltdb = module->private_data;
119         char *lock_dn;
120         TDB_DATA key;
121         int ret;
122
123         if (lockname == NULL) {
124                 return -1;
125         }
126
127         lock_dn = talloc_asprintf(module->ldb, "%s_%s", LDBLOCK, lockname);     
128         if (lock_dn == NULL) {
129                 return -1;
130         }
131
132         key = ltdb_key(module, lock_dn);
133         if (!key.dptr) {
134                 talloc_free(lock_dn);
135                 return -1;
136         }
137
138         ret = tdb_chainlock(ltdb->tdb, key);
139
140         talloc_free(key.dptr);
141         talloc_free(lock_dn);
142
143         return ret;
144 }
145
146 /*
147   unlock the database after a ltdb_lock()
148 */
149 static int ltdb_unlock(struct ldb_module *module, const char *lockname)
150 {
151         struct ltdb_private *ltdb = module->private_data;
152         char *lock_dn;
153         TDB_DATA key;
154
155         if (lockname == NULL) {
156                 return -1;
157         }
158
159         lock_dn = talloc_asprintf(module->ldb, "%s_%s", LDBLOCK, lockname);     
160         if (lock_dn == NULL) {
161                 return -1;
162         }
163
164         key = ltdb_key(module, lock_dn);
165         if (!key.dptr) {
166                 talloc_free(lock_dn);
167                 return -1;
168         }
169
170         tdb_chainunlock(ltdb->tdb, key);
171
172         talloc_free(key.dptr);
173         talloc_free(lock_dn);
174
175         return 0;
176 }
177
178
179 /*
180   lock the database for read - use by ltdb_search
181 */
182 int ltdb_lock_read(struct ldb_module *module)
183 {
184         struct ltdb_private *ltdb = module->private_data;
185         TDB_DATA key;
186         int ret;
187         key = ltdb_key(module, LDBLOCK);
188         if (!key.dptr) {
189                 return -1;
190         }
191         ret = tdb_chainlock_read(ltdb->tdb, key);
192         talloc_free(key.dptr);
193         return ret;
194 }
195
196 /*
197   unlock the database after a ltdb_lock_read()
198 */
199 int ltdb_unlock_read(struct ldb_module *module)
200 {
201         struct ltdb_private *ltdb = module->private_data;
202         TDB_DATA key;
203         key = ltdb_key(module, LDBLOCK);
204         if (!key.dptr) {
205                 return -1;
206         }
207         tdb_chainunlock_read(ltdb->tdb, key);
208         talloc_free(key.dptr);
209         return 0;
210 }
211
212 /*
213   check special dn's have valid attributes
214   currently only @ATTRIBUTES is checked
215 */
216 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
217 {
218         struct ltdb_private *ltdb = module->private_data;
219         int i, j;
220
221         if (strcmp(msg->dn, LTDB_ATTRIBUTES) != 0) {
222                 return 0;
223         }
224
225         /* we have @ATTRIBUTES, let's check attributes are fine */
226         /* should we check that we deny multivalued attributes ? */
227         for (i = 0; i < msg->num_elements; i++) {
228                 for (j = 0; j < msg->elements[i].num_values; j++) {
229                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
230                                 ltdb->last_err_string = "Invalid attribute value in an @ATTRIBUTES entry";
231                                 return -1;
232                         }
233                 }
234         }
235
236         return 0;
237 }
238
239
240 /*
241   we've made a modification to a dn - possibly reindex and 
242   update sequence number
243 */
244 static int ltdb_modified(struct ldb_module *module, const char *dn)
245 {
246         int ret = 0;
247
248         if (strcmp(dn, LTDB_INDEXLIST) == 0 ||
249             strcmp(dn, LTDB_ATTRIBUTES) == 0) {
250                 ret = ltdb_reindex(module);
251         }
252
253         if (ret == 0 &&
254             strcmp(dn, LTDB_BASEINFO) != 0) {
255                 ret = ltdb_increase_sequence_number(module);
256         }
257
258         return ret;
259 }
260
261 /*
262   store a record into the db
263 */
264 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
265 {
266         struct ltdb_private *ltdb = module->private_data;
267         TDB_DATA tdb_key, tdb_data;
268         int ret;
269
270         tdb_key = ltdb_key(module, msg->dn);
271         if (!tdb_key.dptr) {
272                 return -1;
273         }
274
275         ret = ltdb_pack_data(module, msg, &tdb_data);
276         if (ret == -1) {
277                 talloc_free(tdb_key.dptr);
278                 return -1;
279         }
280
281         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
282         if (ret == -1) {
283                 goto done;
284         }
285         
286         ret = ltdb_index_add(module, msg);
287         if (ret == -1) {
288                 tdb_delete(ltdb->tdb, tdb_key);
289         }
290
291 done:
292         talloc_free(tdb_key.dptr);
293         talloc_free(tdb_data.dptr);
294
295         return ret;
296 }
297
298
299 /*
300   add a record to the database
301 */
302 static int ltdb_add(struct ldb_module *module, const struct ldb_message *msg)
303 {
304         struct ltdb_private *ltdb = module->private_data;
305         int ret;
306
307         ltdb->last_err_string = NULL;
308
309         ret = ltdb_check_special_dn(module, msg);
310         if (ret != 0) {
311                 return ret;
312         }
313         
314         if (ltdb_lock(module, LDBLOCK) != 0) {
315                 return -1;
316         }
317
318         if (ltdb_cache_load(module) != 0) {
319                 ltdb_unlock(module, LDBLOCK);
320                 return -1;
321         }
322
323         ret = ltdb_store(module, msg, TDB_INSERT);
324
325         if (ret == 0) {
326                 ltdb_modified(module, msg->dn);
327         }
328
329         ltdb_unlock(module, LDBLOCK);
330         return ret;
331 }
332
333
334 /*
335   delete a record from the database, not updating indexes (used for deleting
336   index records)
337 */
338 int ltdb_delete_noindex(struct ldb_module *module, const char *dn)
339 {
340         struct ltdb_private *ltdb = module->private_data;
341         TDB_DATA tdb_key;
342         int ret;
343
344         tdb_key = ltdb_key(module, dn);
345         if (!tdb_key.dptr) {
346                 return -1;
347         }
348
349         ret = tdb_delete(ltdb->tdb, tdb_key);
350         talloc_free(tdb_key.dptr);
351
352         return ret;
353 }
354
355 /*
356   delete a record from the database
357 */
358 static int ltdb_delete(struct ldb_module *module, const char *dn)
359 {
360         struct ltdb_private *ltdb = module->private_data;
361         int ret;
362         struct ldb_message *msg = NULL;
363
364         ltdb->last_err_string = NULL;
365
366         if (ltdb_lock(module, LDBLOCK) != 0) {
367                 return -1;
368         }
369
370         if (ltdb_cache_load(module) != 0) {
371                 goto failed;
372         }
373
374         msg = talloc(module, struct ldb_message);
375         if (msg == NULL) {
376                 goto failed;
377         }
378
379         /* in case any attribute of the message was indexed, we need
380            to fetch the old record */
381         ret = ltdb_search_dn1(module, dn, msg);
382         if (ret != 1) {
383                 /* not finding the old record is an error */
384                 goto failed;
385         }
386
387         ret = ltdb_delete_noindex(module, dn);
388         if (ret == -1) {
389                 goto failed;
390         }
391
392         /* remove any indexed attributes */
393         ret = ltdb_index_del(module, msg);
394
395         if (ret == 0) {
396                 ltdb_modified(module, dn);
397         }
398
399         talloc_free(msg);
400         ltdb_unlock(module, LDBLOCK);
401         return ret;
402
403 failed:
404         talloc_free(msg);
405         ltdb_unlock(module, LDBLOCK);
406         return -1;
407 }
408
409
410 /*
411   find an element by attribute name. At the moment this does a linear search, it should
412   be re-coded to use a binary search once all places that modify records guarantee
413   sorted order
414
415   return the index of the first matching element if found, otherwise -1
416 */
417 static int find_element(const struct ldb_message *msg, const char *name)
418 {
419         unsigned int i;
420         for (i=0;i<msg->num_elements;i++) {
421                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
422                         return i;
423                 }
424         }
425         return -1;
426 }
427
428
429 /*
430   add an element to an existing record. Assumes a elements array that we
431   can call re-alloc on, and assumed that we can re-use the data pointers from the 
432   passed in additional values. Use with care!
433
434   returns 0 on success, -1 on failure (and sets errno)
435 */
436 static int msg_add_element(struct ldb_context *ldb,
437                            struct ldb_message *msg, struct ldb_message_element *el)
438 {
439         struct ldb_message_element *e2;
440         unsigned int i;
441
442         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
443                               msg->num_elements+1);
444         if (!e2) {
445                 errno = ENOMEM;
446                 return -1;
447         }
448
449         msg->elements = e2;
450
451         e2 = &msg->elements[msg->num_elements];
452
453         e2->name = el->name;
454         e2->flags = el->flags;
455         e2->values = NULL;
456         if (el->num_values != 0) {
457                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
458                 if (!e2->values) {
459                         errno = ENOMEM;
460                         return -1;
461                 }
462         }
463         for (i=0;i<el->num_values;i++) {
464                 e2->values[i] = el->values[i];
465         }
466         e2->num_values = el->num_values;
467
468         msg->num_elements++;
469
470         return 0;
471 }
472
473 /*
474   delete all elements having a specified attribute name
475 */
476 static int msg_delete_attribute(struct ldb_module *module,
477                                 struct ldb_context *ldb,
478                                 struct ldb_message *msg, const char *name)
479 {
480         unsigned int i, j;
481
482         for (i=0;i<msg->num_elements;i++) {
483                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
484                         for (j=0;j<msg->elements[i].num_values;j++) {
485                                 ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
486                         }
487                         talloc_free(msg->elements[i].values);
488                         if (msg->num_elements > (i+1)) {
489                                 memmove(&msg->elements[i], 
490                                         &msg->elements[i+1], 
491                                         sizeof(struct ldb_message_element)*
492                                         (msg->num_elements - (i+1)));
493                         }
494                         msg->num_elements--;
495                         i--;
496                         msg->elements = talloc_realloc(msg, msg->elements, 
497                                                          struct ldb_message_element, 
498                                                          msg->num_elements);
499                 }
500         }
501
502         return 0;
503 }
504
505 /*
506   delete all elements matching an attribute name/value 
507
508   return 0 on success, -1 on failure
509 */
510 static int msg_delete_element(struct ldb_module *module,
511                               struct ldb_message *msg, 
512                               const char *name,
513                               const struct ldb_val *val)
514 {
515         struct ldb_context *ldb = module->ldb;
516         unsigned int i;
517         int found;
518         struct ldb_message_element *el;
519         const struct ldb_attrib_handler *h;
520
521         found = find_element(msg, name);
522         if (found == -1) {
523                 return -1;
524         }
525
526         el = &msg->elements[found];
527
528         h = ldb_attrib_handler(ldb, el->name);
529
530         for (i=0;i<el->num_values;i++) {
531                 if (h->comparison_fn(ldb, ldb, &el->values[i], val) == 0) {
532                         if (i<el->num_values-1) {
533                                 memmove(&el->values[i], &el->values[i+1],
534                                         sizeof(el->values[i])*(el->num_values-(i+1)));
535                         }
536                         el->num_values--;
537                         if (el->num_values == 0) {
538                                 return msg_delete_attribute(module, ldb, msg, name);
539                         }
540                         return 0;
541                 }
542         }
543
544         return -1;
545 }
546
547
548 /*
549   modify a record - internal interface
550
551   yuck - this is O(n^2). Luckily n is usually small so we probably
552   get away with it, but if we ever have really large attribute lists 
553   then we'll need to look at this again
554 */
555 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
556 {
557         struct ldb_context *ldb = module->ldb;
558         struct ltdb_private *ltdb = module->private_data;
559         TDB_DATA tdb_key, tdb_data;
560         struct ldb_message *msg2;
561         unsigned i, j;
562         int ret;
563
564         tdb_key = ltdb_key(module, msg->dn);
565         if (!tdb_key.dptr) {
566                 return -1;
567         }
568
569         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
570         if (!tdb_data.dptr) {
571                 talloc_free(tdb_key.dptr);
572                 return -1;
573         }
574
575         msg2 = talloc(tdb_key.dptr, struct ldb_message);
576         if (msg2 == NULL) {
577                 talloc_free(tdb_key.dptr);
578                 return -1;
579         }
580
581         ret = ltdb_unpack_data(module, &tdb_data, msg2);
582         if (ret == -1) {
583                 talloc_free(tdb_key.dptr);
584                 free(tdb_data.dptr);
585                 return -1;
586         }
587
588         if (!msg2->dn) {
589                 msg2->dn = msg->dn;
590         }
591
592         for (i=0;i<msg->num_elements;i++) {
593                 struct ldb_message_element *el = &msg->elements[i];
594                 struct ldb_message_element *el2;
595                 struct ldb_val *vals;
596
597                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
598
599                 case LDB_FLAG_MOD_ADD:
600                         /* add this element to the message. fail if it
601                            already exists */
602                         ret = find_element(msg2, el->name);
603
604                         if (ret == -1) {
605                                 if (msg_add_element(ldb, msg2, el) != 0) {
606                                         goto failed;
607                                 }
608                                 continue;
609                         }
610
611                         el2 = &msg2->elements[ret];
612
613                         /* An attribute with this name already exists, add all
614                          * values if they don't already exist. */
615
616                         for (j=0;j<el->num_values;j++) {
617                                 if (ldb_msg_find_val(el2, &el->values[j])) {
618                                         ltdb->last_err_string =
619                                                 "Type or value exists";
620                                         goto failed;
621                                 }
622                         }
623
624                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
625                                                 el2->num_values + el->num_values);
626
627                         if (vals == NULL)
628                                 goto failed;
629
630                         for (j=0;j<el->num_values;j++) {
631                                 vals[el2->num_values + j] =
632                                         ldb_val_dup(vals, &el->values[j]);
633                         }
634
635                         el2->values = vals;
636                         el2->num_values += el->num_values;
637
638                         break;
639
640                 case LDB_FLAG_MOD_REPLACE:
641                         /* replace all elements of this attribute name with the elements
642                            listed. The attribute not existing is not an error */
643                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
644
645                         /* add the replacement element, if not empty */
646                         if (msg->elements[i].num_values != 0 &&
647                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
648                                 goto failed;
649                         }
650                         break;
651
652                 case LDB_FLAG_MOD_DELETE:
653                         /* we could be being asked to delete all
654                            values or just some values */
655                         if (msg->elements[i].num_values == 0) {
656                                 if (msg_delete_attribute(module, ldb, msg2, 
657                                                          msg->elements[i].name) != 0) {
658                                         ltdb->last_err_string = "No such attribute";
659                                         goto failed;
660                                 }
661                                 break;
662                         }
663                         for (j=0;j<msg->elements[i].num_values;j++) {
664                                 if (msg_delete_element(module,
665                                                        msg2, 
666                                                        msg->elements[i].name,
667                                                        &msg->elements[i].values[j]) != 0) {
668                                         ltdb->last_err_string = "No such attribute";
669                                         goto failed;
670                                 }
671                                 if (ltdb_index_del_value(module, msg->dn, &msg->elements[i], j) != 0) {
672                                         goto failed;
673                                 }
674                         }
675                         break;
676                 default:
677                         ltdb->last_err_string = "Invalid ldb_modify flags";
678                         goto failed;
679                 }
680         }
681
682         /* we've made all the mods - save the modified record back into the database */
683         ret = ltdb_store(module, msg2, TDB_MODIFY);
684
685         talloc_free(tdb_key.dptr);
686         free(tdb_data.dptr);
687         return ret;
688
689 failed:
690         talloc_free(tdb_key.dptr);
691         free(tdb_data.dptr);
692         return -1;
693 }
694
695 /*
696   modify a record
697 */
698 static int ltdb_modify(struct ldb_module *module, const struct ldb_message *msg)
699 {
700         struct ltdb_private *ltdb = module->private_data;
701         int ret;
702
703         ltdb->last_err_string = NULL;
704
705         ret = ltdb_check_special_dn(module, msg);
706         if (ret != 0) {
707                 return ret;
708         }
709         
710         if (ltdb_lock(module, LDBLOCK) != 0) {
711                 return -1;
712         }
713
714         if (ltdb_cache_load(module) != 0) {
715                 ltdb_unlock(module, LDBLOCK);
716                 return -1;
717         }
718
719         ret = ltdb_modify_internal(module, msg);
720
721         if (ret == 0) {
722                 ltdb_modified(module, msg->dn);
723         }
724
725         ltdb_unlock(module, LDBLOCK);
726
727         return ret;
728 }
729
730 /*
731   rename a record
732 */
733 static int ltdb_rename(struct ldb_module *module, const char *olddn, const char *newdn)
734 {
735         struct ltdb_private *ltdb = module->private_data;
736         int ret;
737         struct ldb_message *msg;
738         const char *error_str;
739
740         ltdb->last_err_string = NULL;
741
742         if (ltdb_lock(module, LDBLOCK) != 0) {
743                 return -1;
744         }
745
746         if (ltdb_cache_load(module) != 0) {
747                 ltdb_unlock(module, LDBLOCK);
748                 return -1;
749         }
750
751         msg = talloc(module, struct ldb_message);
752         if (msg == NULL) {
753                 goto failed;
754         }
755
756         /* in case any attribute of the message was indexed, we need
757            to fetch the old record */
758         ret = ltdb_search_dn1(module, olddn, msg);
759         if (ret != 1) {
760                 /* not finding the old record is an error */
761                 goto failed;
762         }
763
764         msg->dn = talloc_strdup(msg, newdn);
765         if (!msg->dn) {
766                 goto failed;
767         }
768
769         ret = ltdb_add(module, msg);
770         if (ret == -1) {
771                 goto failed;
772         }
773
774         ret = ltdb_delete(module, olddn);
775         error_str = ltdb->last_err_string;
776         if (ret == -1) {
777                 ltdb_delete(module, newdn);
778         }
779
780         ltdb->last_err_string = error_str;
781
782         talloc_free(msg);
783         ltdb_unlock(module, LDBLOCK);
784
785         return ret;
786
787 failed:
788         talloc_free(msg);
789         ltdb_unlock(module, LDBLOCK);
790         return -1;
791 }
792
793
794 /*
795   return extended error information
796 */
797 static const char *ltdb_errstring(struct ldb_module *module)
798 {
799         struct ltdb_private *ltdb = module->private_data;
800         if (ltdb->last_err_string) {
801                 return ltdb->last_err_string;
802         }
803         return tdb_errorstr(ltdb->tdb);
804 }
805
806
807 static const struct ldb_module_ops ltdb_ops = {
808         .name          = "tdb",
809         .search        = ltdb_search,
810         .search_bytree = ltdb_search_bytree,
811         .add_record    = ltdb_add,
812         .modify_record = ltdb_modify,
813         .delete_record = ltdb_delete,
814         .rename_record = ltdb_rename,
815         .named_lock    = ltdb_lock,
816         .named_unlock  = ltdb_unlock,
817         .errstring     = ltdb_errstring
818 };
819
820
821 /*
822   destroy the ltdb context
823 */
824 static int ltdb_destructor(void *p)
825 {
826         struct ltdb_private *ltdb = p;
827         tdb_close(ltdb->tdb);
828         return 0;
829 }
830
831 /*
832   connect to the database
833 */
834 int ltdb_connect(struct ldb_context *ldb, const char *url, 
835                  unsigned int flags, const char *options[])
836 {
837         const char *path;
838         int tdb_flags, open_flags;
839         struct ltdb_private *ltdb;
840         TDB_CONTEXT *tdb;
841
842         /* parse the url */
843         if (strchr(url, ':')) {
844                 if (strncmp(url, "tdb://", 6) != 0) {
845                         ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid tdb URL '%s'", url);
846                         return -1;
847                 }
848                 path = url+6;
849         } else {
850                 path = url;
851         }
852
853         tdb_flags = TDB_DEFAULT;
854
855         if (flags & LDB_FLG_RDONLY) {
856                 open_flags = O_RDONLY;
857         } else {
858                 open_flags = O_CREAT | O_RDWR;
859         }
860
861         /* note that we use quite a large default hash size */
862         tdb = tdb_open(path, 10000, tdb_flags, open_flags, 0666);
863         if (!tdb) {
864                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Unable to open tdb '%s'\n", path);
865                 return -1;
866         }
867
868         ltdb = talloc_zero(ldb, struct ltdb_private);
869         if (!ltdb) {
870                 tdb_close(tdb);
871                 ldb_oom(ldb);
872                 return -1;
873         }
874
875         ltdb->tdb = tdb;
876         ltdb->sequence_number = 0;
877
878         talloc_set_destructor(ltdb, ltdb_destructor);
879
880         ldb->modules = talloc(ldb, struct ldb_module);
881         if (!ldb->modules) {
882                 ldb_oom(ldb);
883                 talloc_free(ltdb);
884                 return -1;
885         }
886         ldb->modules->ldb = ldb;
887         ldb->modules->prev = ldb->modules->next = NULL;
888         ldb->modules->private_data = ltdb;
889         ldb->modules->ops = &ltdb_ops;
890
891         return 0;
892 }