r6560: added a tdb_chainlock_read() call in ldb_search(). This guarantees
[jra/samba/.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    
7
8      ** NOTE! The following LGPL license applies to the ldb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  */
37
38 #include "includes.h"
39 #include "ldb/include/ldb.h"
40 #include "ldb/include/ldb_private.h"
41 #include "ldb/ldb_tdb/ldb_tdb.h"
42
43 #define LDBLOCK "INT_LDBLOCK"
44
45
46 /*
47   casefold a dn. We need to uppercase the attribute names, and the 
48   attribute values of case insensitive attributes. We also need to remove
49   extraneous spaces between elements
50 */
51 static char *ltdb_dn_fold(struct ldb_module *module, const char *dn)
52 {
53         const char *dn_orig = dn;
54         struct ldb_context *ldb = module->ldb;
55         TALLOC_CTX *tmp_ctx = talloc_new(ldb);
56         char *ret;
57         size_t len;
58
59         ret = talloc_strdup(tmp_ctx, "");
60         if (ret == NULL) goto failed;
61
62         while ((len = strcspn(dn, ",")) > 0) {
63                 char *p = strchr(dn, '=');
64                 char *attr, *value;
65                 int flags;
66
67                 if (p == NULL || (p-dn) > len) goto failed;
68
69                 attr = talloc_strndup(tmp_ctx, dn, p-dn);
70                 if (attr == NULL) goto failed;
71
72                 /* trim spaces from the attribute name */
73                 while (' ' == *attr) attr++;
74                 while (' ' == attr[strlen(attr)-1]) {
75                         attr[strlen(attr)-1] = 0;
76                 }
77                 if (*attr == 0) goto failed;
78
79                 value = talloc_strndup(tmp_ctx, p+1, len-(p+1-dn));
80                 if (value == NULL) goto failed;
81
82                 /* trim spaces from the value */
83                 while (' ' == *value) value++;
84                 while (' ' == value[strlen(value)-1]) {
85                         value[strlen(value)-1] = 0;
86                 }
87                 if (*value == 0) goto failed;
88
89                 flags = ltdb_attribute_flags(module, attr);
90
91                 attr = ldb_casefold(ldb, attr);
92                 if (attr == NULL) goto failed;
93                 talloc_steal(tmp_ctx, attr);
94
95                 if (flags & LTDB_FLAG_CASE_INSENSITIVE) {
96                         value = ldb_casefold(ldb, value);
97                         if (value == NULL) goto failed;
98                         talloc_steal(tmp_ctx, value);
99                 }               
100
101                 if (dn[len] == ',') {
102                         ret = talloc_asprintf_append(ret, "%s=%s,", attr, value);
103                 } else {
104                         ret = talloc_asprintf_append(ret, "%s=%s", attr, value);
105                 }
106                 if (ret == NULL) goto failed;
107
108                 dn += len;
109                 if (*dn == ',') dn++;
110         }
111
112         talloc_steal(ldb, ret);
113         talloc_free(tmp_ctx);
114         return ret;
115
116 failed:
117         talloc_free(tmp_ctx);
118         return ldb_casefold(ldb, dn_orig);
119 }
120
121 /*
122   form a TDB_DATA for a record key
123   caller frees
124
125   note that the key for a record can depend on whether the 
126   dn refers to a case sensitive index record or not
127 */
128 struct TDB_DATA ltdb_key(struct ldb_module *module, const char *dn)
129 {
130         struct ldb_context *ldb = module->ldb;
131         TDB_DATA key;
132         char *key_str = NULL;
133         char *dn_folded = NULL;
134         const char *prefix = LTDB_INDEX ":";
135         const char *s;
136         int flags;
137
138         /*
139           most DNs are case insensitive. The exception is index DNs for
140           case sensitive attributes
141
142           there are 3 cases dealt with in this code:
143
144           1) if the dn doesn't start with @INDEX: then uppercase the attribute
145              names and the attributes values of case insensitive attributes
146           2) if the dn starts with @INDEX:attr and 'attr' is a case insensitive
147              attribute then uppercase whole dn
148           3) if the dn starts with @INDEX:attr and 'attr' is a case sensitive
149              attribute then uppercase up to the value of the attribute, but 
150              not the value itself
151         */
152         if (strncmp(dn, prefix, strlen(prefix)) == 0 &&
153             (s = strchr(dn+strlen(prefix), ':'))) {
154                 char *attr_name, *attr_name_folded;
155                 attr_name = talloc_strndup(ldb, dn+strlen(prefix), (s-(dn+strlen(prefix))));
156                 if (!attr_name) {
157                         goto failed;
158                 }
159                 flags = ltdb_attribute_flags(module, attr_name);
160                 
161                 if (flags & LTDB_FLAG_CASE_INSENSITIVE) {
162                         dn_folded = ldb_casefold(ldb, dn);
163                 } else {
164                         attr_name_folded = ldb_casefold(ldb, attr_name);
165                         if (!attr_name_folded) {
166                                 goto failed;
167                         }
168                         dn_folded = talloc_asprintf(ldb, "%s:%s:%s",
169                                                     prefix, attr_name_folded,
170                                                     s+1);
171                         talloc_free(attr_name_folded);
172                 }
173                 talloc_free(attr_name);
174         } else {
175                 dn_folded = ltdb_dn_fold(module, dn);
176         }
177
178         if (!dn_folded) {
179                 goto failed;
180         }
181
182         key_str = talloc_asprintf(ldb, "DN=%s", dn_folded);
183         talloc_free(dn_folded);
184
185         if (!key_str) {
186                 goto failed;
187         }
188
189         key.dptr = key_str;
190         key.dsize = strlen(key_str)+1;
191
192         return key;
193
194 failed:
195         errno = ENOMEM;
196         key.dptr = NULL;
197         key.dsize = 0;
198         return key;
199 }
200
201 /*
202   lock the database for write - currently a single lock is used
203 */
204 static int ltdb_lock(struct ldb_module *module, const char *lockname)
205 {
206         struct ltdb_private *ltdb = module->private_data;
207         TDB_DATA key;
208         int ret;
209
210         if (lockname == NULL) {
211                 return -1;
212         }
213
214         key = ltdb_key(module, lockname);
215         if (!key.dptr) {
216                 return -1;
217         }
218
219         ret = tdb_chainlock(ltdb->tdb, key);
220
221         talloc_free(key.dptr);
222
223         return ret;
224 }
225
226 /*
227   unlock the database after a ltdb_lock()
228 */
229 static int ltdb_unlock(struct ldb_module *module, const char *lockname)
230 {
231         struct ltdb_private *ltdb = module->private_data;
232         TDB_DATA key;
233
234         if (lockname == NULL) {
235                 return -1;
236         }
237
238         key = ltdb_key(module, lockname);
239         if (!key.dptr) {
240                 return -1;
241         }
242
243         tdb_chainunlock(ltdb->tdb, key);
244
245         talloc_free(key.dptr);
246
247         return 0;
248 }
249
250
251 /*
252   lock the database for read - use by ltdb_search
253 */
254 int ltdb_lock_read(struct ldb_module *module)
255 {
256         struct ltdb_private *ltdb = module->private_data;
257         TDB_DATA key;
258         int ret;
259         key = ltdb_key(module, LDBLOCK);
260         if (!key.dptr) {
261                 return -1;
262         }
263         ret = tdb_chainlock_read(ltdb->tdb, key);
264         talloc_free(key.dptr);
265         return ret;
266 }
267
268 /*
269   unlock the database after a ltdb_lock_read()
270 */
271 int ltdb_unlock_read(struct ldb_module *module)
272 {
273         struct ltdb_private *ltdb = module->private_data;
274         TDB_DATA key;
275         key = ltdb_key(module, LDBLOCK);
276         if (!key.dptr) {
277                 return -1;
278         }
279         tdb_chainunlock_read(ltdb->tdb, key);
280         talloc_free(key.dptr);
281         return 0;
282 }
283
284
285 /*
286   we've made a modification to a dn - possibly reindex and 
287   update sequence number
288 */
289 static int ltdb_modified(struct ldb_module *module, const char *dn)
290 {
291         int ret = 0;
292
293         if (strcmp(dn, LTDB_INDEXLIST) == 0 ||
294             strcmp(dn, LTDB_ATTRIBUTES) == 0) {
295                 ret = ltdb_reindex(module);
296         }
297
298         if (ret == 0 &&
299             strcmp(dn, LTDB_BASEINFO) != 0) {
300                 ret = ltdb_increase_sequence_number(module);
301         }
302
303         return ret;
304 }
305
306 /*
307   store a record into the db
308 */
309 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
310 {
311         struct ltdb_private *ltdb = module->private_data;
312         TDB_DATA tdb_key, tdb_data;
313         int ret;
314
315         tdb_key = ltdb_key(module, msg->dn);
316         if (!tdb_key.dptr) {
317                 return -1;
318         }
319
320         ret = ltdb_pack_data(module, msg, &tdb_data);
321         if (ret == -1) {
322                 talloc_free(tdb_key.dptr);
323                 return -1;
324         }
325
326         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
327         if (ret == -1) {
328                 goto done;
329         }
330         
331         ret = ltdb_index_add(module, msg);
332         if (ret == -1) {
333                 tdb_delete(ltdb->tdb, tdb_key);
334         }
335
336 done:
337         talloc_free(tdb_key.dptr);
338         talloc_free(tdb_data.dptr);
339
340         return ret;
341 }
342
343
344 /*
345   add a record to the database
346 */
347 static int ltdb_add(struct ldb_module *module, const struct ldb_message *msg)
348 {
349         struct ltdb_private *ltdb = module->private_data;
350         int ret;
351
352         ltdb->last_err_string = NULL;
353
354         if (ltdb_lock(module, LDBLOCK) != 0) {
355                 return -1;
356         }
357
358         if (ltdb_cache_load(module) != 0) {
359                 ltdb_unlock(module, LDBLOCK);
360                 return -1;
361         }
362         
363         ret = ltdb_store(module, msg, TDB_INSERT);
364
365         if (ret == 0) {
366                 ltdb_modified(module, msg->dn);
367         }
368
369         ltdb_unlock(module, LDBLOCK);
370         return ret;
371 }
372
373
374 /*
375   delete a record from the database, not updating indexes (used for deleting
376   index records)
377 */
378 int ltdb_delete_noindex(struct ldb_module *module, const char *dn)
379 {
380         struct ltdb_private *ltdb = module->private_data;
381         TDB_DATA tdb_key;
382         int ret;
383
384         tdb_key = ltdb_key(module, dn);
385         if (!tdb_key.dptr) {
386                 return -1;
387         }
388
389         ret = tdb_delete(ltdb->tdb, tdb_key);
390         talloc_free(tdb_key.dptr);
391
392         return ret;
393 }
394
395 /*
396   delete a record from the database
397 */
398 static int ltdb_delete(struct ldb_module *module, const char *dn)
399 {
400         struct ltdb_private *ltdb = module->private_data;
401         int ret;
402         struct ldb_message *msg = NULL;
403
404         ltdb->last_err_string = NULL;
405
406         if (ltdb_lock(module, LDBLOCK) != 0) {
407                 return -1;
408         }
409
410         if (ltdb_cache_load(module) != 0) {
411                 goto failed;
412         }
413
414         msg = talloc(module, struct ldb_message);
415         if (msg == NULL) {
416                 goto failed;
417         }
418
419         /* in case any attribute of the message was indexed, we need
420            to fetch the old record */
421         ret = ltdb_search_dn1(module, dn, msg);
422         if (ret != 1) {
423                 /* not finding the old record is an error */
424                 goto failed;
425         }
426
427         ret = ltdb_delete_noindex(module, dn);
428         if (ret == -1) {
429                 goto failed;
430         }
431
432         /* remove any indexed attributes */
433         ret = ltdb_index_del(module, msg);
434
435         if (ret == 0) {
436                 ltdb_modified(module, dn);
437         }
438
439         talloc_free(msg);
440         ltdb_unlock(module, LDBLOCK);
441         return ret;
442
443 failed:
444         talloc_free(msg);
445         ltdb_unlock(module, LDBLOCK);
446         return -1;
447 }
448
449
450 /*
451   find an element by attribute name. At the moment this does a linear search, it should
452   be re-coded to use a binary search once all places that modify records guarantee
453   sorted order
454
455   return the index of the first matching element if found, otherwise -1
456 */
457 static int find_element(const struct ldb_message *msg, const char *name)
458 {
459         unsigned int i;
460         for (i=0;i<msg->num_elements;i++) {
461                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
462                         return i;
463                 }
464         }
465         return -1;
466 }
467
468
469 /*
470   add an element to an existing record. Assumes a elements array that we
471   can call re-alloc on, and assumed that we can re-use the data pointers from the 
472   passed in additional values. Use with care!
473
474   returns 0 on success, -1 on failure (and sets errno)
475 */
476 static int msg_add_element(struct ldb_context *ldb,
477                            struct ldb_message *msg, struct ldb_message_element *el)
478 {
479         struct ldb_message_element *e2;
480         unsigned int i;
481
482         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
483                               msg->num_elements+1);
484         if (!e2) {
485                 errno = ENOMEM;
486                 return -1;
487         }
488
489         msg->elements = e2;
490
491         e2 = &msg->elements[msg->num_elements];
492
493         e2->name = el->name;
494         e2->flags = el->flags;
495         e2->values = NULL;
496         if (el->num_values != 0) {
497                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
498                 if (!e2->values) {
499                         errno = ENOMEM;
500                         return -1;
501                 }
502         }
503         for (i=0;i<el->num_values;i++) {
504                 e2->values[i] = el->values[i];
505         }
506         e2->num_values = el->num_values;
507
508         msg->num_elements++;
509
510         return 0;
511 }
512
513 /*
514   delete all elements having a specified attribute name
515 */
516 static int msg_delete_attribute(struct ldb_module *module,
517                                 struct ldb_context *ldb,
518                                 struct ldb_message *msg, const char *name)
519 {
520         unsigned int i, j;
521
522         for (i=0;i<msg->num_elements;i++) {
523                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
524                         for (j=0;j<msg->elements[i].num_values;j++) {
525                                 ltdb_index_del_value(module, msg->dn, &msg->elements[i], j);
526                         }
527                         talloc_free(msg->elements[i].values);
528                         if (msg->num_elements > (i+1)) {
529                                 memmove(&msg->elements[i], 
530                                         &msg->elements[i+1], 
531                                         sizeof(struct ldb_message_element)*
532                                         (msg->num_elements - (i+1)));
533                         }
534                         msg->num_elements--;
535                         i--;
536                         msg->elements = talloc_realloc(msg, msg->elements, 
537                                                          struct ldb_message_element, 
538                                                          msg->num_elements);
539                 }
540         }
541
542         return 0;
543 }
544
545 /*
546   delete all elements matching an attribute name/value 
547
548   return 0 on success, -1 on failure
549 */
550 static int msg_delete_element(struct ldb_module *module,
551                               struct ldb_message *msg, 
552                               const char *name,
553                               const struct ldb_val *val)
554 {
555         struct ldb_context *ldb = module->ldb;
556         unsigned int i;
557         int found;
558         struct ldb_message_element *el;
559
560         found = find_element(msg, name);
561         if (found == -1) {
562                 return -1;
563         }
564
565         el = &msg->elements[found];
566
567         for (i=0;i<el->num_values;i++) {
568                 if (ltdb_val_equal(module, msg->elements[i].name, &el->values[i], val)) {
569                         if (i<el->num_values-1) {
570                                 memmove(&el->values[i], &el->values[i+1],
571                                         sizeof(el->values[i])*(el->num_values-(i+1)));
572                         }
573                         el->num_values--;
574                         if (el->num_values == 0) {
575                                 return msg_delete_attribute(module, ldb, msg, name);
576                         }
577                         return 0;
578                 }
579         }
580
581         return -1;
582 }
583
584
585 /*
586   modify a record - internal interface
587
588   yuck - this is O(n^2). Luckily n is usually small so we probably
589   get away with it, but if we ever have really large attribute lists 
590   then we'll need to look at this again
591 */
592 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
593 {
594         struct ldb_context *ldb = module->ldb;
595         struct ltdb_private *ltdb = module->private_data;
596         TDB_DATA tdb_key, tdb_data;
597         struct ldb_message *msg2;
598         unsigned i, j;
599         int ret;
600
601         tdb_key = ltdb_key(module, msg->dn);
602         if (!tdb_key.dptr) {
603                 return -1;
604         }
605
606         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
607         if (!tdb_data.dptr) {
608                 talloc_free(tdb_key.dptr);
609                 return -1;
610         }
611
612         msg2 = talloc(tdb_key.dptr, struct ldb_message);
613         if (msg2 == NULL) {
614                 talloc_free(tdb_key.dptr);
615                 return -1;
616         }
617
618         ret = ltdb_unpack_data(module, &tdb_data, msg2);
619         if (ret == -1) {
620                 talloc_free(tdb_key.dptr);
621                 free(tdb_data.dptr);
622                 return -1;
623         }
624
625         if (!msg2->dn) {
626                 msg2->dn = msg->dn;
627         }
628
629         for (i=0;i<msg->num_elements;i++) {
630                 struct ldb_message_element *el = &msg->elements[i];
631                 struct ldb_message_element *el2;
632                 struct ldb_val *vals;
633
634                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
635
636                 case LDB_FLAG_MOD_ADD:
637                         /* add this element to the message. fail if it
638                            already exists */
639                         ret = find_element(msg2, el->name);
640
641                         if (ret == -1) {
642                                 if (msg_add_element(ldb, msg2, el) != 0) {
643                                         goto failed;
644                                 }
645                                 continue;
646                         }
647
648                         el2 = &msg2->elements[ret];
649
650                         /* An attribute with this name already exists, add all
651                          * values if they don't already exist. */
652
653                         for (j=0;j<el->num_values;j++) {
654                                 if (ldb_msg_find_val(el2, &el->values[j])) {
655                                         ltdb->last_err_string =
656                                                 "Type or value exists";
657                                         goto failed;
658                                 }
659                         }
660
661                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
662                                                 el2->num_values + el->num_values);
663
664                         if (vals == NULL)
665                                 goto failed;
666
667                         for (j=0;j<el->num_values;j++) {
668                                 vals[el2->num_values + j] =
669                                         ldb_val_dup(vals, &el->values[j]);
670                         }
671
672                         el2->values = vals;
673                         el2->num_values += el->num_values;
674
675                         break;
676
677                 case LDB_FLAG_MOD_REPLACE:
678                         /* replace all elements of this attribute name with the elements
679                            listed. The attribute not existing is not an error */
680                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
681
682                         /* add the replacement element, if not empty */
683                         if (msg->elements[i].num_values != 0 &&
684                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
685                                 goto failed;
686                         }
687                         break;
688
689                 case LDB_FLAG_MOD_DELETE:
690                         /* we could be being asked to delete all
691                            values or just some values */
692                         if (msg->elements[i].num_values == 0) {
693                                 if (msg_delete_attribute(module, ldb, msg2, 
694                                                          msg->elements[i].name) != 0) {
695                                         ltdb->last_err_string = "No such attribute";
696                                         goto failed;
697                                 }
698                                 break;
699                         }
700                         for (j=0;j<msg->elements[i].num_values;j++) {
701                                 if (msg_delete_element(module,
702                                                        msg2, 
703                                                        msg->elements[i].name,
704                                                        &msg->elements[i].values[j]) != 0) {
705                                         ltdb->last_err_string = "No such attribute";
706                                         goto failed;
707                                 }
708                                 if (ltdb_index_del_value(module, msg->dn, &msg->elements[i], j) != 0) {
709                                         goto failed;
710                                 }
711                         }
712                         break;
713                 }
714         }
715
716         /* we've made all the mods - save the modified record back into the database */
717         ret = ltdb_store(module, msg2, TDB_MODIFY);
718
719         talloc_free(tdb_key.dptr);
720         free(tdb_data.dptr);
721         return ret;
722
723 failed:
724         talloc_free(tdb_key.dptr);
725         free(tdb_data.dptr);
726         return -1;
727 }
728
729 /*
730   modify a record
731 */
732 static int ltdb_modify(struct ldb_module *module, const struct ldb_message *msg)
733 {
734         struct ltdb_private *ltdb = module->private_data;
735         int ret;
736
737         ltdb->last_err_string = NULL;
738
739         if (ltdb_lock(module, LDBLOCK) != 0) {
740                 return -1;
741         }
742
743         if (ltdb_cache_load(module) != 0) {
744                 ltdb_unlock(module, LDBLOCK);
745                 return -1;
746         }
747
748         ret = ltdb_modify_internal(module, msg);
749
750         if (ret == 0) {
751                 ltdb_modified(module, msg->dn);
752         }
753
754         ltdb_unlock(module, LDBLOCK);
755
756         return ret;
757 }
758
759 /*
760   rename a record
761 */
762 static int ltdb_rename(struct ldb_module *module, const char *olddn, const char *newdn)
763 {
764         struct ltdb_private *ltdb = module->private_data;
765         int ret;
766         struct ldb_message *msg;
767         const char *error_str;
768
769         ltdb->last_err_string = NULL;
770
771         if (ltdb_lock(module, LDBLOCK) != 0) {
772                 return -1;
773         }
774
775         msg = talloc(module, struct ldb_message);
776         if (msg == NULL) {
777                 goto failed;
778         }
779
780         /* in case any attribute of the message was indexed, we need
781            to fetch the old record */
782         ret = ltdb_search_dn1(module, olddn, msg);
783         if (ret != 1) {
784                 /* not finding the old record is an error */
785                 goto failed;
786         }
787
788         msg->dn = talloc_strdup(msg, newdn);
789         if (!msg->dn) {
790                 goto failed;
791         }
792
793         ret = ltdb_add(module, msg);
794         if (ret == -1) {
795                 goto failed;
796         }
797
798         ret = ltdb_delete(module, olddn);
799         error_str = ltdb->last_err_string;
800         if (ret == -1) {
801                 ltdb_delete(module, newdn);
802         }
803
804         ltdb->last_err_string = error_str;
805
806         talloc_free(msg);
807         ltdb_unlock(module, LDBLOCK);
808
809         return ret;
810
811 failed:
812         talloc_free(msg);
813         ltdb_unlock(module, LDBLOCK);
814         return -1;
815 }
816
817
818 /*
819   return extended error information
820 */
821 static const char *ltdb_errstring(struct ldb_module *module)
822 {
823         struct ltdb_private *ltdb = module->private_data;
824         if (ltdb->last_err_string) {
825                 return ltdb->last_err_string;
826         }
827         return tdb_errorstr(ltdb->tdb);
828 }
829
830
831 static const struct ldb_module_ops ltdb_ops = {
832         "tdb",
833         ltdb_search,
834         ltdb_add,
835         ltdb_modify,
836         ltdb_delete,
837         ltdb_rename,
838         ltdb_lock,
839         ltdb_unlock,
840         ltdb_errstring
841 };
842
843
844 /*
845   destroy the ltdb context
846 */
847 static int ltdb_destructor(void *p)
848 {
849         struct ltdb_private *ltdb = p;
850         tdb_close(ltdb->tdb);
851         return 0;
852 }
853
854 /*
855   connect to the database
856 */
857 struct ldb_context *ltdb_connect(const char *url, 
858                                  unsigned int flags, 
859                                  const char *options[])
860 {
861         const char *path;
862         int tdb_flags, open_flags;
863         struct ltdb_private *ltdb;
864         TDB_CONTEXT *tdb;
865         struct ldb_context *ldb;
866
867         ldb = talloc_zero(NULL, struct ldb_context);
868         if (!ldb) {
869                 errno = ENOMEM;
870                 return NULL;
871         }
872
873         /* parse the url */
874         if (strchr(url, ':')) {
875                 if (strncmp(url, "tdb://", 6) != 0) {
876                         errno = EINVAL;
877                         talloc_free(ldb);
878                         return NULL;
879                 }
880                 path = url+6;
881         } else {
882                 path = url;
883         }
884
885         tdb_flags = TDB_DEFAULT;
886
887         if (flags & LDB_FLG_RDONLY) {
888                 open_flags = O_RDONLY;
889         } else {
890                 open_flags = O_CREAT | O_RDWR;
891         }
892
893         /* note that we use quite a large default hash size */
894         tdb = tdb_open(path, 10000, tdb_flags, open_flags, 0666);
895         if (!tdb) {
896                 talloc_free(ldb);
897                 return NULL;
898         }
899
900         ltdb = talloc_zero(ldb, struct ltdb_private);
901         if (!ltdb) {
902                 tdb_close(tdb);
903                 talloc_free(ldb);
904                 errno = ENOMEM;
905                 return NULL;
906         }
907
908         ltdb->tdb = tdb;
909         ltdb->sequence_number = 0;
910
911         talloc_set_destructor(ltdb, ltdb_destructor);
912
913         ldb->modules = talloc(ldb, struct ldb_module);
914         if (!ldb->modules) {
915                 talloc_free(ldb);
916                 errno = ENOMEM;
917                 return NULL;
918         }
919         ldb->modules->ldb = ldb;
920         ldb->modules->prev = ldb->modules->next = NULL;
921         ldb->modules->private_data = ltdb;
922         ldb->modules->ops = &ltdb_ops;
923
924         return ldb;
925 }