r10405: added transactions into tdb, and hook them into ldb. See my
[ira/wip.git] / source / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    
7
8      ** NOTE! The following LGPL license applies to the ldb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 /*
28  *  Name: ldb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  */
37
38 #include "includes.h"
39 #include "ldb/include/ldb.h"
40 #include "ldb/include/ldb_errors.h"
41 #include "ldb/include/ldb_private.h"
42 #include "ldb/ldb_tdb/ldb_tdb.h"
43
44 #define LDBLOCK "@INT_LDBLOCK"
45
46
47 /*
48   form a TDB_DATA for a record key
49   caller frees
50
51   note that the key for a record can depend on whether the 
52   dn refers to a case sensitive index record or not
53 */
54 struct TDB_DATA ltdb_key(struct ldb_module *module, const struct ldb_dn *dn)
55 {
56         struct ldb_context *ldb = module->ldb;
57         TDB_DATA key;
58         char *key_str = NULL;
59         char *dn_folded = NULL;
60
61         /*
62           most DNs are case insensitive. The exception is index DNs for
63           case sensitive attributes
64
65           there are 3 cases dealt with in this code:
66
67           1) if the dn doesn't start with @ then uppercase the attribute
68              names and the attributes values of case insensitive attributes
69           2) if the dn starts with @ then leave it alone - the indexing code handles
70              the rest
71         */
72
73         dn_folded = ldb_dn_linearize_casefold(ldb, dn);
74         if (!dn_folded) {
75                 goto failed;
76         }
77
78         key_str = talloc_asprintf(ldb, "DN=%s", dn_folded);
79
80         talloc_free(dn_folded);
81
82         if (!key_str) {
83                 goto failed;
84         }
85
86         key.dptr = key_str;
87         key.dsize = strlen(key_str) + 1;
88
89         return key;
90
91 failed:
92         errno = ENOMEM;
93         key.dptr = NULL;
94         key.dsize = 0;
95         return key;
96 }
97
98 /*
99   lock the database for write - currently a single lock is used
100 */
101 static int ltdb_lock(struct ldb_module *module, const char *lockname)
102 {
103         struct ltdb_private *ltdb = module->private_data;
104         struct ldb_dn *lock_dn;
105         char *ldn;
106         TDB_DATA key;
107         int ret;
108
109         if (lockname == NULL) {
110                 return -1;
111         }
112
113         ldn = talloc_asprintf(module->ldb, "%s_%s", LDBLOCK, lockname); 
114         if (ldn == NULL) {
115                 return -1;
116         }
117
118         lock_dn = ldb_dn_explode(module->ldb, ldn);
119         if (lock_dn == NULL) {
120                 talloc_free(ldn);
121                 return -1;
122         }
123         talloc_free(ldn);
124
125         key = ltdb_key(module, lock_dn);
126         if (!key.dptr) {
127                 talloc_free(lock_dn);
128                 return -1;
129         }
130
131         ret = tdb_chainlock(ltdb->tdb, key);
132
133         talloc_free(key.dptr);
134         talloc_free(lock_dn);
135
136         return ret;
137 }
138
139 /*
140   unlock the database after a ltdb_lock()
141 */
142 static int ltdb_unlock(struct ldb_module *module, const char *lockname)
143 {
144         struct ltdb_private *ltdb = module->private_data;
145         struct ldb_dn *lock_dn;
146         char *ldn;
147         TDB_DATA key;
148
149         if (lockname == NULL) {
150                 return -1;
151         }
152
153         ldn = talloc_asprintf(module->ldb, "%s_%s", LDBLOCK, lockname); 
154         if (ldn == NULL) {
155                 return -1;
156         }
157
158         lock_dn = ldb_dn_explode(module->ldb, ldn);
159         if (lock_dn == NULL) {
160                 talloc_free(ldn);
161                 return -1;
162         }
163         talloc_free(ldn);
164
165         key = ltdb_key(module, lock_dn);
166         if (!key.dptr) {
167                 talloc_free(lock_dn);
168                 return -1;
169         }
170
171         tdb_chainunlock(ltdb->tdb, key);
172
173         talloc_free(key.dptr);
174         talloc_free(lock_dn);
175
176         return 0;
177 }
178
179
180 /*
181   lock the database for read - use by ltdb_search
182 */
183 int ltdb_lock_read(struct ldb_module *module)
184 {
185         struct ltdb_private *ltdb = module->private_data;
186         TDB_DATA key;
187         struct ldb_dn *lock_dn;
188         int ret;
189
190         lock_dn = ldb_dn_explode(module, LDBLOCK);
191         if (lock_dn == NULL) {
192                 return -1;
193         }
194
195         key = ltdb_key(module, lock_dn);
196         if (!key.dptr) {
197                 talloc_free(lock_dn);
198                 return -1;
199         }
200         talloc_free(lock_dn);
201
202         ret = tdb_chainlock_read(ltdb->tdb, key);
203         talloc_free(key.dptr);
204         return ret;
205 }
206
207 /*
208   unlock the database after a ltdb_lock_read()
209 */
210 int ltdb_unlock_read(struct ldb_module *module)
211 {
212         struct ltdb_private *ltdb = module->private_data;
213         struct ldb_dn *lock_dn;
214         TDB_DATA key;
215
216         lock_dn = ldb_dn_explode(module, LDBLOCK);
217         if (lock_dn == NULL) {
218                 return -1;
219         }
220
221         key = ltdb_key(module, lock_dn);
222         if (!key.dptr) {
223                 talloc_free(lock_dn);
224                 return -1;
225         }
226         talloc_free(lock_dn);
227
228         tdb_chainunlock_read(ltdb->tdb, key);
229         talloc_free(key.dptr);
230         return 0;
231 }
232
233 /*
234   check special dn's have valid attributes
235   currently only @ATTRIBUTES is checked
236 */
237 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
238 {
239         int i, j;
240  
241         if (! ldb_dn_is_special(msg->dn) ||
242             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
243                 return 0;
244         }
245
246         /* we have @ATTRIBUTES, let's check attributes are fine */
247         /* should we check that we deny multivalued attributes ? */
248         for (i = 0; i < msg->num_elements; i++) {
249                 for (j = 0; j < msg->elements[i].num_values; j++) {
250                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
251                                 char *err_string = talloc_strdup(module, "Invalid attribute value in an @ATTRIBUTES entry");
252                                 if (err_string) {
253                                         ldb_set_errstring(module, err_string);
254                                 }
255                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
256                         }
257                 }
258         }
259
260         return 0;
261 }
262
263
264 /*
265   we've made a modification to a dn - possibly reindex and 
266   update sequence number
267 */
268 static int ltdb_modified(struct ldb_module *module, const struct ldb_dn *dn)
269 {
270         int ret = 0;
271
272         if (ldb_dn_is_special(dn) &&
273             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
274              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
275                 ret = ltdb_reindex(module);
276         }
277
278         if (ret == 0 &&
279             !(ldb_dn_is_special(dn) &&
280               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
281                 ret = ltdb_increase_sequence_number(module);
282         }
283
284         return ret;
285 }
286
287 /*
288   store a record into the db
289 */
290 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
291 {
292         struct ltdb_private *ltdb = module->private_data;
293         TDB_DATA tdb_key, tdb_data;
294         int ret;
295
296         tdb_key = ltdb_key(module, msg->dn);
297         if (!tdb_key.dptr) {
298                 return LDB_ERR_OTHER;
299         }
300
301         ret = ltdb_pack_data(module, msg, &tdb_data);
302         if (ret == -1) {
303                 talloc_free(tdb_key.dptr);
304                 return LDB_ERR_OTHER;
305         }
306
307         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
308         if (ret == -1) {
309                 ret = LDB_ERR_OTHER;
310                 goto done;
311         }
312         
313         ret = ltdb_index_add(module, msg);
314         if (ret == -1) {
315                 tdb_delete(ltdb->tdb, tdb_key);
316         }
317
318 done:
319         talloc_free(tdb_key.dptr);
320         talloc_free(tdb_data.dptr);
321
322         return ret;
323 }
324
325
326 /*
327   add a record to the database
328 */
329 static int ltdb_add(struct ldb_module *module, const struct ldb_message *msg)
330 {
331         int ret;
332
333         ret = ltdb_check_special_dn(module, msg);
334         if (ret != LDB_ERR_SUCCESS) {
335                 return ret;
336         }
337         
338         if (ltdb_lock(module, LDBLOCK) != 0) {
339                 return LDB_ERR_OTHER;
340         }
341
342         if (ltdb_cache_load(module) != 0) {
343                 ltdb_unlock(module, LDBLOCK);
344                 return LDB_ERR_OTHER;
345         }
346
347         ret = ltdb_store(module, msg, TDB_INSERT);
348
349         if (ret == LDB_ERR_SUCCESS) {
350                 ltdb_modified(module, msg->dn);
351         }
352
353         ltdb_unlock(module, LDBLOCK);
354         return ret;
355 }
356
357
358 /*
359   delete a record from the database, not updating indexes (used for deleting
360   index records)
361 */
362 int ltdb_delete_noindex(struct ldb_module *module, const struct ldb_dn *dn)
363 {
364         struct ltdb_private *ltdb = module->private_data;
365         TDB_DATA tdb_key;
366         int ret;
367
368         tdb_key = ltdb_key(module, dn);
369         if (!tdb_key.dptr) {
370                 return LDB_ERR_OTHER;
371         }
372
373         ret = tdb_delete(ltdb->tdb, tdb_key);
374         talloc_free(tdb_key.dptr);
375
376         if (ret != 0) ret = LDB_ERR_OTHER;
377
378         return ret;
379 }
380
381 /*
382   delete a record from the database
383 */
384 static int ltdb_delete(struct ldb_module *module, const struct ldb_dn *dn)
385 {
386         struct ldb_message *msg = NULL;
387         int ret = LDB_ERR_OTHER;
388
389         if (ltdb_lock(module, LDBLOCK) != 0) {
390                 return ret;
391         }
392
393         if (ltdb_cache_load(module) != 0) {
394                 goto failed;
395         }
396
397         msg = talloc(module, struct ldb_message);
398         if (msg == NULL) {
399                 goto failed;
400         }
401
402         /* in case any attribute of the message was indexed, we need
403            to fetch the old record */
404         ret = ltdb_search_dn1(module, dn, msg);
405         if (ret != 1) {
406                 /* not finding the old record is an error */
407                 ret = LDB_ERR_NO_SUCH_OBJECT;
408                 goto failed;
409         }
410
411         ret = ltdb_delete_noindex(module, dn);
412         if (ret != LDB_ERR_SUCCESS) {
413                 goto failed;
414         }
415
416         /* remove any indexed attributes */
417         ret = ltdb_index_del(module, msg);
418         if (ret == LDB_ERR_SUCCESS) {
419                 ltdb_modified(module, dn);
420         } else
421                 ret = LDB_ERR_OTHER;
422
423         talloc_free(msg);
424         ltdb_unlock(module, LDBLOCK);
425         return ret;
426
427 failed:
428         talloc_free(msg);
429         ltdb_unlock(module, LDBLOCK);
430         return ret;
431 }
432
433
434 /*
435   find an element by attribute name. At the moment this does a linear search, it should
436   be re-coded to use a binary search once all places that modify records guarantee
437   sorted order
438
439   return the index of the first matching element if found, otherwise -1
440 */
441 static int find_element(const struct ldb_message *msg, const char *name)
442 {
443         unsigned int i;
444         for (i=0;i<msg->num_elements;i++) {
445                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
446                         return i;
447                 }
448         }
449         return -1;
450 }
451
452
453 /*
454   add an element to an existing record. Assumes a elements array that we
455   can call re-alloc on, and assumed that we can re-use the data pointers from the 
456   passed in additional values. Use with care!
457
458   returns 0 on success, -1 on failure (and sets errno)
459 */
460 static int msg_add_element(struct ldb_context *ldb,
461                            struct ldb_message *msg, struct ldb_message_element *el)
462 {
463         struct ldb_message_element *e2;
464         unsigned int i;
465
466         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
467                               msg->num_elements+1);
468         if (!e2) {
469                 errno = ENOMEM;
470                 return -1;
471         }
472
473         msg->elements = e2;
474
475         e2 = &msg->elements[msg->num_elements];
476
477         e2->name = el->name;
478         e2->flags = el->flags;
479         e2->values = NULL;
480         if (el->num_values != 0) {
481                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
482                 if (!e2->values) {
483                         errno = ENOMEM;
484                         return -1;
485                 }
486         }
487         for (i=0;i<el->num_values;i++) {
488                 e2->values[i] = el->values[i];
489         }
490         e2->num_values = el->num_values;
491
492         msg->num_elements++;
493
494         return 0;
495 }
496
497 /*
498   delete all elements having a specified attribute name
499 */
500 static int msg_delete_attribute(struct ldb_module *module,
501                                 struct ldb_context *ldb,
502                                 struct ldb_message *msg, const char *name)
503 {
504         char *dn;
505         unsigned int i, j;
506
507         dn = ldb_dn_linearize(ldb, msg->dn);
508         if (dn == NULL) {
509                 return -1;
510         }
511
512         for (i=0;i<msg->num_elements;i++) {
513                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
514                         for (j=0;j<msg->elements[i].num_values;j++) {
515                                 ltdb_index_del_value(module, dn, &msg->elements[i], j);
516                         }
517                         talloc_free(msg->elements[i].values);
518                         if (msg->num_elements > (i+1)) {
519                                 memmove(&msg->elements[i], 
520                                         &msg->elements[i+1], 
521                                         sizeof(struct ldb_message_element)*
522                                         (msg->num_elements - (i+1)));
523                         }
524                         msg->num_elements--;
525                         i--;
526                         msg->elements = talloc_realloc(msg, msg->elements, 
527                                                          struct ldb_message_element, 
528                                                          msg->num_elements);
529                 }
530         }
531
532         talloc_free(dn);
533         return 0;
534 }
535
536 /*
537   delete all elements matching an attribute name/value 
538
539   return 0 on success, -1 on failure
540 */
541 static int msg_delete_element(struct ldb_module *module,
542                               struct ldb_message *msg, 
543                               const char *name,
544                               const struct ldb_val *val)
545 {
546         struct ldb_context *ldb = module->ldb;
547         unsigned int i;
548         int found;
549         struct ldb_message_element *el;
550         const struct ldb_attrib_handler *h;
551
552         found = find_element(msg, name);
553         if (found == -1) {
554                 return -1;
555         }
556
557         el = &msg->elements[found];
558
559         h = ldb_attrib_handler(ldb, el->name);
560
561         for (i=0;i<el->num_values;i++) {
562                 if (h->comparison_fn(ldb, ldb, &el->values[i], val) == 0) {
563                         if (i<el->num_values-1) {
564                                 memmove(&el->values[i], &el->values[i+1],
565                                         sizeof(el->values[i])*(el->num_values-(i+1)));
566                         }
567                         el->num_values--;
568                         if (el->num_values == 0) {
569                                 return msg_delete_attribute(module, ldb, msg, name);
570                         }
571                         return 0;
572                 }
573         }
574
575         return -1;
576 }
577
578
579 /*
580   modify a record - internal interface
581
582   yuck - this is O(n^2). Luckily n is usually small so we probably
583   get away with it, but if we ever have really large attribute lists 
584   then we'll need to look at this again
585 */
586 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
587 {
588         struct ldb_context *ldb = module->ldb;
589         struct ltdb_private *ltdb = module->private_data;
590         TDB_DATA tdb_key, tdb_data;
591         struct ldb_message *msg2;
592         unsigned i, j;
593         int ret;
594
595         tdb_key = ltdb_key(module, msg->dn);
596         if (!tdb_key.dptr) {
597                 return LDB_ERR_OTHER;
598         }
599
600         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
601         if (!tdb_data.dptr) {
602                 talloc_free(tdb_key.dptr);
603                 return LDB_ERR_OTHER;
604         }
605
606         msg2 = talloc(tdb_key.dptr, struct ldb_message);
607         if (msg2 == NULL) {
608                 talloc_free(tdb_key.dptr);
609                 return LDB_ERR_OTHER;
610         }
611
612         ret = ltdb_unpack_data(module, &tdb_data, msg2);
613         if (ret == -1) {
614                 talloc_free(tdb_key.dptr);
615                 free(tdb_data.dptr);
616                 return LDB_ERR_OTHER;
617         }
618
619         if (!msg2->dn) {
620                 msg2->dn = msg->dn;
621         }
622
623         for (i=0;i<msg->num_elements;i++) {
624                 struct ldb_message_element *el = &msg->elements[i];
625                 struct ldb_message_element *el2;
626                 struct ldb_val *vals;
627                 char *err_string;
628                 char *dn;
629
630                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
631
632                 case LDB_FLAG_MOD_ADD:
633                         /* add this element to the message. fail if it
634                            already exists */
635                         ret = find_element(msg2, el->name);
636
637                         if (ret == -1) {
638                                 if (msg_add_element(ldb, msg2, el) != 0) {
639                                         ret = LDB_ERR_OTHER;
640                                         goto failed;
641                                 }
642                                 continue;
643                         }
644
645                         el2 = &msg2->elements[ret];
646
647                         /* An attribute with this name already exists, add all
648                          * values if they don't already exist. */
649
650                         for (j=0;j<el->num_values;j++) {
651                                 if (ldb_msg_find_val(el2, &el->values[j])) {
652                                         err_string = talloc_strdup(module, "Type or value exists");
653                                         if (err_string) ldb_set_errstring(module, err_string);
654                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
655                                         goto failed;
656                                 }
657                         }
658
659                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
660                                                 el2->num_values + el->num_values);
661
662                         if (vals == NULL)
663                                 goto failed;
664
665                         for (j=0;j<el->num_values;j++) {
666                                 vals[el2->num_values + j] =
667                                         ldb_val_dup(vals, &el->values[j]);
668                         }
669
670                         el2->values = vals;
671                         el2->num_values += el->num_values;
672
673                         break;
674
675                 case LDB_FLAG_MOD_REPLACE:
676                         /* replace all elements of this attribute name with the elements
677                            listed. The attribute not existing is not an error */
678                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
679
680                         /* add the replacement element, if not empty */
681                         if (msg->elements[i].num_values != 0 &&
682                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
683                                 goto failed;
684                         }
685                         break;
686
687                 case LDB_FLAG_MOD_DELETE:
688
689                         dn = ldb_dn_linearize(msg2, msg->dn);
690                         if (dn == NULL) goto failed;
691
692                         /* we could be being asked to delete all
693                            values or just some values */
694                         if (msg->elements[i].num_values == 0) {
695                                 if (msg_delete_attribute(module, ldb, msg2, 
696                                                          msg->elements[i].name) != 0) {
697                                         err_string = talloc_strdup(module, "No such attribute");
698                                         if (err_string) ldb_set_errstring(module, err_string);
699                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
700                                         goto failed;
701                                 }
702                                 break;
703                         }
704                         for (j=0;j<msg->elements[i].num_values;j++) {
705                                 if (msg_delete_element(module,
706                                                        msg2, 
707                                                        msg->elements[i].name,
708                                                        &msg->elements[i].values[j]) != 0) {
709                                         err_string = talloc_strdup(module, "No such attribute");
710                                         if (err_string) ldb_set_errstring(module, err_string);
711                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
712                                         goto failed;
713                                 }
714                                 if (ltdb_index_del_value(module, dn, &msg->elements[i], j) != 0) {
715                                         goto failed;
716                                 }
717                         }
718                         break;
719                 default:
720                         err_string = talloc_strdup(module, "Invalid ldb_modify flags");
721                         if (err_string) ldb_set_errstring(module, err_string);
722                         ret = LDB_ERR_PROTOCOL_ERROR;
723                         goto failed;
724                 }
725         }
726
727         /* we've made all the mods - save the modified record back into the database */
728         ret = ltdb_store(module, msg2, TDB_MODIFY);
729
730         talloc_free(tdb_key.dptr);
731         free(tdb_data.dptr);
732         return ret;
733
734 failed:
735         talloc_free(tdb_key.dptr);
736         free(tdb_data.dptr);
737         return ret;
738 }
739
740 /*
741   modify a record
742 */
743 static int ltdb_modify(struct ldb_module *module, const struct ldb_message *msg)
744 {
745         int ret;
746
747         ret = ltdb_check_special_dn(module, msg);
748         if (ret != 0) {
749                 return ret;
750         }
751         
752         if (ltdb_lock(module, LDBLOCK) != 0) {
753                 return -1;
754         }
755
756         if (ltdb_cache_load(module) != 0) {
757                 ltdb_unlock(module, LDBLOCK);
758                 return -1;
759         }
760
761         ret = ltdb_modify_internal(module, msg);
762
763         if (ret == LDB_ERR_SUCCESS) {
764                 ltdb_modified(module, msg->dn);
765         }
766
767         ltdb_unlock(module, LDBLOCK);
768
769         return ret;
770 }
771
772 /*
773   rename a record
774 */
775 static int ltdb_rename(struct ldb_module *module, const struct ldb_dn *olddn, const struct ldb_dn *newdn)
776 {
777         struct ldb_message *msg;
778         char *error_str;
779         int ret = LDB_ERR_OTHER;
780
781         if (ltdb_lock(module, LDBLOCK) != 0) {
782                 return ret;
783         }
784
785         if (ltdb_cache_load(module) != 0) {
786                 ltdb_unlock(module, LDBLOCK);
787                 return ret;
788         }
789
790         msg = talloc(module, struct ldb_message);
791         if (msg == NULL) {
792                 goto failed;
793         }
794
795         /* in case any attribute of the message was indexed, we need
796            to fetch the old record */
797         ret = ltdb_search_dn1(module, olddn, msg);
798         if (ret != 1) {
799                 /* not finding the old record is an error */
800                 ret = LDB_ERR_NO_SUCH_OBJECT;
801                 goto failed;
802         }
803
804         msg->dn = ldb_dn_copy(msg, newdn);
805         if (!msg->dn) {
806                 ret = LDB_ERR_OTHER;
807                 goto failed;
808         }
809
810         ret = ltdb_add(module, msg);
811         if (ret != LDB_ERR_SUCCESS) {
812                 goto failed;
813         }
814
815         ret = ltdb_delete(module, olddn);
816         error_str = talloc_strdup(module, ldb_errstring(module->ldb));
817         if (ret != LDB_ERR_SUCCESS) {
818                 ltdb_delete(module, newdn);
819         }
820
821         ldb_set_errstring(module, error_str);
822
823         talloc_free(msg);
824         ltdb_unlock(module, LDBLOCK);
825
826         return ret;
827
828 failed:
829         talloc_free(msg);
830         ltdb_unlock(module, LDBLOCK);
831         return ret;
832 }
833
834 static int ltdb_start_trans(struct ldb_module *module)
835 {
836         struct ltdb_private *ltdb = module->private_data;
837
838         if (tdb_transaction_start(ltdb->tdb) != 0) {
839                 return LDB_ERR_OPERATIONS_ERROR;
840         }
841
842         return LDB_ERR_SUCCESS;
843 }
844
845 static int ltdb_end_trans(struct ldb_module *module, int status)
846 {
847         struct ltdb_private *ltdb = module->private_data;
848
849         if (status != LDB_ERR_SUCCESS) {
850                 if (tdb_transaction_cancel(ltdb->tdb) != 0) {
851                         return LDB_ERR_OPERATIONS_ERROR;
852                 }
853         } else {
854                 if (tdb_transaction_commit(ltdb->tdb) != 0) {
855                         return LDB_ERR_OPERATIONS_ERROR;
856                 }
857         }
858
859         return status;
860 }
861
862 static const struct ldb_module_ops ltdb_ops = {
863         .name              = "tdb",
864         .search            = ltdb_search,
865         .search_bytree     = ltdb_search_bytree,
866         .add_record        = ltdb_add,
867         .modify_record     = ltdb_modify,
868         .delete_record     = ltdb_delete,
869         .rename_record     = ltdb_rename,
870         .start_transaction = ltdb_start_trans,
871         .end_transaction   = ltdb_end_trans
872 };
873
874
875 /*
876   connect to the database
877 */
878 int ltdb_connect(struct ldb_context *ldb, const char *url, 
879                  unsigned int flags, const char *options[])
880 {
881         const char *path;
882         int tdb_flags, open_flags;
883         struct ltdb_private *ltdb;
884
885         /* parse the url */
886         if (strchr(url, ':')) {
887                 if (strncmp(url, "tdb://", 6) != 0) {
888                         ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid tdb URL '%s'", url);
889                         return -1;
890                 }
891                 path = url+6;
892         } else {
893                 path = url;
894         }
895
896         tdb_flags = TDB_DEFAULT;
897
898 #if 0
899         /* set this to run tdb without disk sync, but still with
900            transactions */
901         tdb_flags |= TDB_NOSYNC;
902 #endif
903
904         if (flags & LDB_FLG_RDONLY) {
905                 open_flags = O_RDONLY;
906         } else {
907                 open_flags = O_CREAT | O_RDWR;
908         }
909
910         ltdb = talloc_zero(ldb, struct ltdb_private);
911         if (!ltdb) {
912                 ldb_oom(ldb);
913                 return -1;
914         }
915
916         /* note that we use quite a large default hash size */
917         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000, tdb_flags, open_flags, 0666);
918         if (!ltdb->tdb) {
919                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Unable to open tdb '%s'\n", path);
920                 talloc_free(ltdb);
921                 return -1;
922         }
923
924         ltdb->sequence_number = 0;
925
926         ldb->modules = talloc(ldb, struct ldb_module);
927         if (!ldb->modules) {
928                 ldb_oom(ldb);
929                 talloc_free(ltdb);
930                 return -1;
931         }
932         ldb->modules->ldb = ldb;
933         ldb->modules->prev = ldb->modules->next = NULL;
934         ldb->modules->private_data = ltdb;
935         ldb->modules->ops = &ltdb_ops;
936
937         return 0;
938 }