r23364: add LDB_FLG_NOMMAP flag
[samba.git] / source / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006
7    
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb_tdb
30  *
31  *  Component: ldb tdb backend
32  *
33  *  Description: core functions for tdb backend
34  *
35  *  Author: Andrew Tridgell
36  *  Author: Stefan Metzmacher
37  *
38  *  Modifications:
39  *
40  *  - description: make the module use asyncronous calls
41  *    date: Feb 2006
42  *    Author: Simo Sorce
43  */
44
45 #include "ldb_includes.h"
46
47 #include "ldb_tdb.h"
48
49
50 /*
51   map a tdb error code to a ldb error code
52 */
53 static int ltdb_err_map(enum TDB_ERROR tdb_code)
54 {
55         switch (tdb_code) {
56         case TDB_SUCCESS:
57                 return LDB_SUCCESS;
58         case TDB_ERR_CORRUPT:
59         case TDB_ERR_OOM:
60         case TDB_ERR_EINVAL:
61                 return LDB_ERR_OPERATIONS_ERROR;
62         case TDB_ERR_IO:
63                 return LDB_ERR_PROTOCOL_ERROR;
64         case TDB_ERR_LOCK:
65         case TDB_ERR_NOLOCK:
66                 return LDB_ERR_BUSY;
67         case TDB_ERR_LOCK_TIMEOUT:
68                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
69         case TDB_ERR_EXISTS:
70                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
71         case TDB_ERR_NOEXIST:
72                 return LDB_ERR_NO_SUCH_OBJECT;
73         case TDB_ERR_RDONLY:
74                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
75         }
76         return LDB_ERR_OTHER;
77 }
78
79
80 struct ldb_handle *init_ltdb_handle(struct ltdb_private *ltdb, struct ldb_module *module,
81                                     struct ldb_request *req)
82 {
83         struct ltdb_context *ac;
84         struct ldb_handle *h;
85
86         h = talloc_zero(req, struct ldb_handle);
87         if (h == NULL) {
88                 ldb_set_errstring(module->ldb, "Out of Memory");
89                 return NULL;
90         }
91
92         h->module = module;
93
94         ac = talloc_zero(h, struct ltdb_context);
95         if (ac == NULL) {
96                 ldb_set_errstring(module->ldb, "Out of Memory");
97                 talloc_free(h);
98                 return NULL;
99         }
100
101         h->private_data = (void *)ac;
102
103         h->state = LDB_ASYNC_INIT;
104         h->status = LDB_SUCCESS;
105
106         ac->module = module;
107         ac->context = req->context;
108         ac->callback = req->callback;
109
110         return h;
111 }
112
113 /*
114   form a TDB_DATA for a record key
115   caller frees
116
117   note that the key for a record can depend on whether the 
118   dn refers to a case sensitive index record or not
119 */
120 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
121 {
122         struct ldb_context *ldb = module->ldb;
123         TDB_DATA key;
124         char *key_str = NULL;
125         const char *dn_folded = NULL;
126
127         /*
128           most DNs are case insensitive. The exception is index DNs for
129           case sensitive attributes
130
131           there are 3 cases dealt with in this code:
132
133           1) if the dn doesn't start with @ then uppercase the attribute
134              names and the attributes values of case insensitive attributes
135           2) if the dn starts with @ then leave it alone - the indexing code handles
136              the rest
137         */
138
139         dn_folded = ldb_dn_get_casefold(dn);
140         if (!dn_folded) {
141                 goto failed;
142         }
143
144         key_str = talloc_strdup(ldb, "DN=");
145         if (!key_str) {
146                 goto failed;
147         }
148
149         key_str = talloc_append_string(ldb, key_str, dn_folded);
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key.dptr = (uint8_t *)key_str;
155         key.dsize = strlen(key_str) + 1;
156
157         return key;
158
159 failed:
160         errno = ENOMEM;
161         key.dptr = NULL;
162         key.dsize = 0;
163         return key;
164 }
165
166 /*
167   check special dn's have valid attributes
168   currently only @ATTRIBUTES is checked
169 */
170 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
171 {
172         int i, j;
173  
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return 0;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 for (j = 0; j < msg->elements[i].num_values; j++) {
183                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
184                                 ldb_set_errstring(module->ldb, "Invalid attribute value in an @ATTRIBUTES entry");
185                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
186                         }
187                 }
188         }
189
190         return 0;
191 }
192
193
194 /*
195   we've made a modification to a dn - possibly reindex and 
196   update sequence number
197 */
198 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
199 {
200         int ret = LDB_SUCCESS;
201
202         if (ldb_dn_is_special(dn) &&
203             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
204              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
205                 ret = ltdb_reindex(module);
206         }
207
208         if (ret == LDB_SUCCESS &&
209             !(ldb_dn_is_special(dn) &&
210               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
211                 ret = ltdb_increase_sequence_number(module);
212         }
213
214         return ret;
215 }
216
217 /*
218   store a record into the db
219 */
220 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
221 {
222         struct ltdb_private *ltdb =
223                 talloc_get_type(module->private_data, struct ltdb_private);
224         TDB_DATA tdb_key, tdb_data;
225         int ret;
226
227         tdb_key = ltdb_key(module, msg->dn);
228         if (!tdb_key.dptr) {
229                 return LDB_ERR_OTHER;
230         }
231
232         ret = ltdb_pack_data(module, msg, &tdb_data);
233         if (ret == -1) {
234                 talloc_free(tdb_key.dptr);
235                 return LDB_ERR_OTHER;
236         }
237
238         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
239         if (ret == -1) {
240                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
241                 goto done;
242         }
243         
244         ret = ltdb_index_add(module, msg);
245         if (ret != LDB_SUCCESS) {
246                 tdb_delete(ltdb->tdb, tdb_key);
247         }
248
249 done:
250         talloc_free(tdb_key.dptr);
251         talloc_free(tdb_data.dptr);
252
253         return ret;
254 }
255
256
257 static int ltdb_add_internal(struct ldb_module *module, const struct ldb_message *msg)
258 {
259         int ret;
260         
261         ret = ltdb_check_special_dn(module, msg);
262         if (ret != LDB_SUCCESS) {
263                 return ret;
264         }
265         
266         if (ltdb_cache_load(module) != 0) {
267                 return LDB_ERR_OPERATIONS_ERROR;
268         }
269
270         ret = ltdb_store(module, msg, TDB_INSERT);
271
272         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
273                 ldb_asprintf_errstring(module->ldb, "Entry %s already exists", ldb_dn_get_linearized(msg->dn));
274                 return ret;
275         }
276         
277         if (ret == LDB_SUCCESS) {
278                 ret = ltdb_index_one(module, msg, 1);
279                 if (ret != LDB_SUCCESS) {
280                         return ret;
281                 }
282
283                 ret = ltdb_modified(module, msg->dn);
284                 if (ret != LDB_SUCCESS) {
285                         return ret;
286                 }
287         }
288
289         return ret;
290 }
291
292 /*
293   add a record to the database
294 */
295 static int ltdb_add(struct ldb_module *module, struct ldb_request *req)
296 {
297         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
298         struct ltdb_context *ltdb_ac;
299         int tret, ret = LDB_SUCCESS;
300
301         if (check_critical_controls(req->controls)) {
302                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
303         }
304         
305         req->handle = init_ltdb_handle(ltdb, module, req);
306         if (req->handle == NULL) {
307                 return LDB_ERR_OPERATIONS_ERROR;
308         }
309         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
310
311         tret = ltdb_add_internal(module, req->op.add.message);
312         if (tret != LDB_SUCCESS) {
313                 req->handle->status = tret;
314                 goto done;
315         }
316         
317         if (ltdb_ac->callback) {
318                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
319         }
320 done:
321         req->handle->state = LDB_ASYNC_DONE;
322         return ret;
323 }
324
325 /*
326   delete a record from the database, not updating indexes (used for deleting
327   index records)
328 */
329 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
330 {
331         struct ltdb_private *ltdb =
332                 talloc_get_type(module->private_data, struct ltdb_private);
333         TDB_DATA tdb_key;
334         int ret;
335
336         tdb_key = ltdb_key(module, dn);
337         if (!tdb_key.dptr) {
338                 return LDB_ERR_OTHER;
339         }
340
341         ret = tdb_delete(ltdb->tdb, tdb_key);
342         talloc_free(tdb_key.dptr);
343
344         if (ret != 0) {
345                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
346         }
347
348         return ret;
349 }
350
351 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
352 {
353         struct ldb_message *msg;
354         int ret;
355
356         msg = talloc(module, struct ldb_message);
357         if (msg == NULL) {
358                 return LDB_ERR_OPERATIONS_ERROR;
359         }
360
361         /* in case any attribute of the message was indexed, we need
362            to fetch the old record */
363         ret = ltdb_search_dn1(module, dn, msg);
364         if (ret != LDB_SUCCESS) {
365                 /* not finding the old record is an error */
366                 goto done;
367         }
368
369         ret = ltdb_delete_noindex(module, dn);
370         if (ret != LDB_SUCCESS) {
371                 goto done;
372         }
373
374         /* remove one level attribute */
375         ret = ltdb_index_one(module, msg, 0);
376         if (ret != LDB_SUCCESS) {
377                 goto done;
378         }
379
380         /* remove any indexed attributes */
381         ret = ltdb_index_del(module, msg);
382         if (ret != LDB_SUCCESS) {
383                 goto done;
384         }
385
386         ret = ltdb_modified(module, dn);
387         if (ret != LDB_SUCCESS) {
388                 goto done;
389         }
390
391 done:
392         talloc_free(msg);
393         return ret;
394 }
395
396 /*
397   delete a record from the database
398 */
399 static int ltdb_delete(struct ldb_module *module, struct ldb_request *req)
400 {
401         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
402         struct ltdb_context *ltdb_ac;
403         int tret, ret = LDB_SUCCESS;
404
405         if (check_critical_controls(req->controls)) {
406                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
407         }
408         
409         req->handle = NULL;
410
411         if (ltdb_cache_load(module) != 0) {
412                 return LDB_ERR_OPERATIONS_ERROR;
413         }
414
415         req->handle = init_ltdb_handle(ltdb, module, req);
416         if (req->handle == NULL) {
417                 return LDB_ERR_OPERATIONS_ERROR;
418         }
419         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
420
421         tret = ltdb_delete_internal(module, req->op.del.dn);
422         if (tret != LDB_SUCCESS) {
423                 req->handle->status = tret; 
424                 goto done;
425         }
426
427         if (ltdb_ac->callback) {
428                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
429         }
430 done:
431         req->handle->state = LDB_ASYNC_DONE;
432         return ret;
433 }
434
435 /*
436   find an element by attribute name. At the moment this does a linear search, it should
437   be re-coded to use a binary search once all places that modify records guarantee
438   sorted order
439
440   return the index of the first matching element if found, otherwise -1
441 */
442 static int find_element(const struct ldb_message *msg, const char *name)
443 {
444         unsigned int i;
445         for (i=0;i<msg->num_elements;i++) {
446                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
447                         return i;
448                 }
449         }
450         return -1;
451 }
452
453
454 /*
455   add an element to an existing record. Assumes a elements array that we
456   can call re-alloc on, and assumed that we can re-use the data pointers from the 
457   passed in additional values. Use with care!
458
459   returns 0 on success, -1 on failure (and sets errno)
460 */
461 static int msg_add_element(struct ldb_context *ldb,
462                            struct ldb_message *msg, struct ldb_message_element *el)
463 {
464         struct ldb_message_element *e2;
465         unsigned int i;
466
467         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
468                               msg->num_elements+1);
469         if (!e2) {
470                 errno = ENOMEM;
471                 return -1;
472         }
473
474         msg->elements = e2;
475
476         e2 = &msg->elements[msg->num_elements];
477
478         e2->name = el->name;
479         e2->flags = el->flags;
480         e2->values = NULL;
481         if (el->num_values != 0) {
482                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
483                 if (!e2->values) {
484                         errno = ENOMEM;
485                         return -1;
486                 }
487         }
488         for (i=0;i<el->num_values;i++) {
489                 e2->values[i] = el->values[i];
490         }
491         e2->num_values = el->num_values;
492
493         msg->num_elements++;
494
495         return 0;
496 }
497
498 /*
499   delete all elements having a specified attribute name
500 */
501 static int msg_delete_attribute(struct ldb_module *module,
502                                 struct ldb_context *ldb,
503                                 struct ldb_message *msg, const char *name)
504 {
505         const char *dn;
506         unsigned int i, j;
507
508         dn = ldb_dn_get_linearized(msg->dn);
509         if (dn == NULL) {
510                 return -1;
511         }
512
513         for (i=0;i<msg->num_elements;i++) {
514                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
515                         for (j=0;j<msg->elements[i].num_values;j++) {
516                                 ltdb_index_del_value(module, dn, &msg->elements[i], j);
517                         }
518                         talloc_free(msg->elements[i].values);
519                         if (msg->num_elements > (i+1)) {
520                                 memmove(&msg->elements[i], 
521                                         &msg->elements[i+1], 
522                                         sizeof(struct ldb_message_element)*
523                                         (msg->num_elements - (i+1)));
524                         }
525                         msg->num_elements--;
526                         i--;
527                         msg->elements = talloc_realloc(msg, msg->elements, 
528                                                          struct ldb_message_element, 
529                                                          msg->num_elements);
530                 }
531         }
532
533         return 0;
534 }
535
536 /*
537   delete all elements matching an attribute name/value 
538
539   return 0 on success, -1 on failure
540 */
541 static int msg_delete_element(struct ldb_module *module,
542                               struct ldb_message *msg, 
543                               const char *name,
544                               const struct ldb_val *val)
545 {
546         struct ldb_context *ldb = module->ldb;
547         unsigned int i;
548         int found;
549         struct ldb_message_element *el;
550         const struct ldb_schema_attribute *a;
551
552         found = find_element(msg, name);
553         if (found == -1) {
554                 return -1;
555         }
556
557         el = &msg->elements[found];
558
559         a = ldb_schema_attribute_by_name(ldb, el->name);
560
561         for (i=0;i<el->num_values;i++) {
562                 if (a->syntax->comparison_fn(ldb, ldb, &el->values[i], val) == 0) {
563                         if (i<el->num_values-1) {
564                                 memmove(&el->values[i], &el->values[i+1],
565                                         sizeof(el->values[i])*(el->num_values-(i+1)));
566                         }
567                         el->num_values--;
568                         if (el->num_values == 0) {
569                                 return msg_delete_attribute(module, ldb, msg, name);
570                         }
571                         return 0;
572                 }
573         }
574
575         return -1;
576 }
577
578
579 /*
580   modify a record - internal interface
581
582   yuck - this is O(n^2). Luckily n is usually small so we probably
583   get away with it, but if we ever have really large attribute lists 
584   then we'll need to look at this again
585 */
586 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
587 {
588         struct ldb_context *ldb = module->ldb;
589         struct ltdb_private *ltdb =
590                 talloc_get_type(module->private_data, struct ltdb_private);
591         TDB_DATA tdb_key, tdb_data;
592         struct ldb_message *msg2;
593         unsigned i, j;
594         int ret;
595
596         tdb_key = ltdb_key(module, msg->dn);
597         if (!tdb_key.dptr) {
598                 return LDB_ERR_OTHER;
599         }
600
601         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
602         if (!tdb_data.dptr) {
603                 talloc_free(tdb_key.dptr);
604                 return ltdb_err_map(tdb_error(ltdb->tdb));
605         }
606
607         msg2 = talloc(tdb_key.dptr, struct ldb_message);
608         if (msg2 == NULL) {
609                 talloc_free(tdb_key.dptr);
610                 return LDB_ERR_OTHER;
611         }
612
613         ret = ltdb_unpack_data(module, &tdb_data, msg2);
614         if (ret == -1) {
615                 ret = LDB_ERR_OTHER;
616                 goto failed;
617         }
618
619         if (!msg2->dn) {
620                 msg2->dn = msg->dn;
621         }
622
623         for (i=0;i<msg->num_elements;i++) {
624                 struct ldb_message_element *el = &msg->elements[i];
625                 struct ldb_message_element *el2;
626                 struct ldb_val *vals;
627                 const char *dn;
628
629                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
630
631                 case LDB_FLAG_MOD_ADD:
632                         /* add this element to the message. fail if it
633                            already exists */
634                         ret = find_element(msg2, el->name);
635
636                         if (ret == -1) {
637                                 if (msg_add_element(ldb, msg2, el) != 0) {
638                                         ret = LDB_ERR_OTHER;
639                                         goto failed;
640                                 }
641                                 continue;
642                         }
643
644                         el2 = &msg2->elements[ret];
645
646                         /* An attribute with this name already exists, add all
647                          * values if they don't already exist. */
648
649                         for (j=0;j<el->num_values;j++) {
650                                 if (ldb_msg_find_val(el2, &el->values[j])) {
651                                         ldb_set_errstring(module->ldb, "Type or value exists");
652                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
653                                         goto failed;
654                                 }
655                         }
656
657                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
658                                                 el2->num_values + el->num_values);
659
660                         if (vals == NULL) {
661                                 ret = LDB_ERR_OTHER;
662                                 goto failed;
663                         }
664
665                         for (j=0;j<el->num_values;j++) {
666                                 vals[el2->num_values + j] =
667                                         ldb_val_dup(vals, &el->values[j]);
668                         }
669
670                         el2->values = vals;
671                         el2->num_values += el->num_values;
672
673                         break;
674
675                 case LDB_FLAG_MOD_REPLACE:
676                         /* replace all elements of this attribute name with the elements
677                            listed. The attribute not existing is not an error */
678                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
679
680                         /* add the replacement element, if not empty */
681                         if (msg->elements[i].num_values != 0 &&
682                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
683                                 ret = LDB_ERR_OTHER;
684                                 goto failed;
685                         }
686                         break;
687
688                 case LDB_FLAG_MOD_DELETE:
689
690                         dn = ldb_dn_get_linearized(msg->dn);
691                         if (dn == NULL) {
692                                 ret = LDB_ERR_OTHER;
693                                 goto failed;
694                         }
695
696                         /* we could be being asked to delete all
697                            values or just some values */
698                         if (msg->elements[i].num_values == 0) {
699                                 if (msg_delete_attribute(module, ldb, msg2, 
700                                                          msg->elements[i].name) != 0) {
701                                         ldb_asprintf_errstring(module->ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
702                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
703                                         goto failed;
704                                 }
705                                 break;
706                         }
707                         for (j=0;j<msg->elements[i].num_values;j++) {
708                                 if (msg_delete_element(module,
709                                                        msg2, 
710                                                        msg->elements[i].name,
711                                                        &msg->elements[i].values[j]) != 0) {
712                                         ldb_asprintf_errstring(module->ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
713                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
714                                         goto failed;
715                                 }
716                                 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
717                                 if (ret != LDB_SUCCESS) {
718                                         goto failed;
719                                 }
720                         }
721                         break;
722                 default:
723                         ldb_asprintf_errstring(module->ldb, "Invalid ldb_modify flags on %s: 0x%x", 
724                                                              msg->elements[i].name, 
725                                                              msg->elements[i].flags & LDB_FLAG_MOD_MASK);
726                         ret = LDB_ERR_PROTOCOL_ERROR;
727                         goto failed;
728                 }
729         }
730
731         /* we've made all the mods - save the modified record back into the database */
732         ret = ltdb_store(module, msg2, TDB_MODIFY);
733         if (ret != LDB_SUCCESS) {
734                 goto failed;
735         }
736
737         ret = ltdb_modified(module, msg->dn);
738         if (ret != LDB_SUCCESS) {
739                 goto failed;
740         }
741
742         talloc_free(tdb_key.dptr);
743         free(tdb_data.dptr);
744         return ret;
745
746 failed:
747         talloc_free(tdb_key.dptr);
748         free(tdb_data.dptr);
749         return ret;
750 }
751
752 /*
753   modify a record
754 */
755 static int ltdb_modify(struct ldb_module *module, struct ldb_request *req)
756 {
757         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
758         struct ltdb_context *ltdb_ac;
759         int tret, ret = LDB_SUCCESS;
760
761         if (check_critical_controls(req->controls)) {
762                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
763         }
764         
765         req->handle = NULL;
766
767         req->handle = init_ltdb_handle(ltdb, module, req);
768         if (req->handle == NULL) {
769                 return LDB_ERR_OPERATIONS_ERROR;
770         }
771         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
772
773         tret = ltdb_check_special_dn(module, req->op.mod.message);
774         if (tret != LDB_SUCCESS) {
775                 req->handle->status = tret;
776                 goto done;
777         }
778         
779         if (ltdb_cache_load(module) != 0) {
780                 ret = LDB_ERR_OPERATIONS_ERROR;
781                 goto done;
782         }
783
784         tret = ltdb_modify_internal(module, req->op.mod.message);
785         if (tret != LDB_SUCCESS) {
786                 req->handle->status = tret;
787                 goto done;
788         }
789
790         if (ltdb_ac->callback) {
791                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
792         }
793 done:
794         req->handle->state = LDB_ASYNC_DONE;
795         return ret;
796 }
797
798 /*
799   rename a record
800 */
801 static int ltdb_rename(struct ldb_module *module, struct ldb_request *req)
802 {
803         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
804         struct ltdb_context *ltdb_ac;
805         struct ldb_message *msg;
806         int tret, ret = LDB_SUCCESS;
807
808         if (check_critical_controls(req->controls)) {
809                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
810         }
811         
812         req->handle = NULL;
813
814         if (ltdb_cache_load(module) != 0) {
815                 return LDB_ERR_OPERATIONS_ERROR;
816         }
817
818         req->handle = init_ltdb_handle(ltdb, module, req);
819         if (req->handle == NULL) {
820                 return LDB_ERR_OPERATIONS_ERROR;
821         }
822         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
823
824         msg = talloc(ltdb_ac, struct ldb_message);
825         if (msg == NULL) {
826                 ret = LDB_ERR_OPERATIONS_ERROR;
827                 goto done;
828         }
829
830         /* in case any attribute of the message was indexed, we need
831            to fetch the old record */
832         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
833         if (tret != LDB_SUCCESS) {
834                 /* not finding the old record is an error */
835                 req->handle->status = tret;
836                 goto done;
837         }
838
839         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
840         if (!msg->dn) {
841                 ret = LDB_ERR_OPERATIONS_ERROR;
842                 goto done;
843         }
844
845         tret = ltdb_add_internal(module, msg);
846         if (tret != LDB_SUCCESS) {
847                 ret = LDB_ERR_OPERATIONS_ERROR;
848                 goto done;
849         }
850
851         tret = ltdb_delete_internal(module, req->op.rename.olddn);
852         if (tret != LDB_SUCCESS) {
853                 ltdb_delete_internal(module, req->op.rename.newdn);
854                 ret = LDB_ERR_OPERATIONS_ERROR;
855                 goto done;
856         }
857
858         if (ltdb_ac->callback) {
859                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
860         }
861 done:
862         req->handle->state = LDB_ASYNC_DONE;
863         return ret;
864 }
865
866 static int ltdb_start_trans(struct ldb_module *module)
867 {
868         struct ltdb_private *ltdb =
869                 talloc_get_type(module->private_data, struct ltdb_private);
870
871         if (tdb_transaction_start(ltdb->tdb) != 0) {
872                 return ltdb_err_map(tdb_error(ltdb->tdb));
873         }
874
875         return LDB_SUCCESS;
876 }
877
878 static int ltdb_end_trans(struct ldb_module *module)
879 {
880         struct ltdb_private *ltdb =
881                 talloc_get_type(module->private_data, struct ltdb_private);
882
883         if (tdb_transaction_commit(ltdb->tdb) != 0) {
884                 return ltdb_err_map(tdb_error(ltdb->tdb));
885         }
886
887         return LDB_SUCCESS;
888 }
889
890 static int ltdb_del_trans(struct ldb_module *module)
891 {
892         struct ltdb_private *ltdb =
893                 talloc_get_type(module->private_data, struct ltdb_private);
894
895         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
896                 return ltdb_err_map(tdb_error(ltdb->tdb));
897         }
898
899         return LDB_SUCCESS;
900 }
901
902 static int ltdb_wait(struct ldb_handle *handle, enum ldb_wait_type type)
903 {
904         return handle->status;
905 }
906
907 static int ltdb_request(struct ldb_module *module, struct ldb_request *req)
908 {
909         /* check for oustanding critical controls and return an error if found */
910         if (check_critical_controls(req->controls)) {
911                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
912         }
913         
914         /* search, add, modify, delete, rename are handled by their own, no other op supported */
915         return LDB_ERR_OPERATIONS_ERROR;
916 }
917
918 /*
919   return sequenceNumber from @BASEINFO
920 */
921 static int ltdb_sequence_number(struct ldb_module *module, struct ldb_request *req)
922 {
923         TALLOC_CTX *tmp_ctx = talloc_new(req);
924         struct ldb_message *msg = NULL;
925         struct ldb_dn *dn = ldb_dn_new(tmp_ctx, module->ldb, LTDB_BASEINFO);
926         int tret;
927
928         if (tmp_ctx == NULL) {
929                 talloc_free(tmp_ctx);
930                 return LDB_ERR_OPERATIONS_ERROR;
931         }
932
933         msg = talloc(tmp_ctx, struct ldb_message);
934         if (msg == NULL) {
935                 talloc_free(tmp_ctx);
936                 return LDB_ERR_OPERATIONS_ERROR;
937         }
938
939         req->op.seq_num.flags = 0;
940
941         tret = ltdb_search_dn1(module, dn, msg);
942         if (tret != LDB_SUCCESS) {
943                 talloc_free(tmp_ctx);
944                 /* zero is as good as anything when we don't know */
945                 req->op.seq_num.seq_num = 0;
946                 return LDB_SUCCESS;
947         }
948
949         switch (req->op.seq_num.type) {
950         case LDB_SEQ_HIGHEST_SEQ:
951                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
952                 break;
953         case LDB_SEQ_NEXT:
954                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
955                 req->op.seq_num.seq_num++;
956                 break;
957         case LDB_SEQ_HIGHEST_TIMESTAMP:
958         {
959                 const char *date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
960                 if (date) {
961                         req->op.seq_num.seq_num = ldb_string_to_time(date);
962                 } else {
963                         req->op.seq_num.seq_num = 0;
964                         /* zero is as good as anything when we don't know */
965                 }
966                 break;
967         }
968         }
969         talloc_free(tmp_ctx);
970         return LDB_SUCCESS;
971 }
972
973 static const struct ldb_module_ops ltdb_ops = {
974         .name              = "tdb",
975         .search            = ltdb_search,
976         .add               = ltdb_add,
977         .modify            = ltdb_modify,
978         .del               = ltdb_delete,
979         .rename            = ltdb_rename,
980         .request           = ltdb_request,
981         .start_transaction = ltdb_start_trans,
982         .end_transaction   = ltdb_end_trans,
983         .del_transaction   = ltdb_del_trans,
984         .wait              = ltdb_wait,
985         .sequence_number   = ltdb_sequence_number
986 };
987
988 /*
989   connect to the database
990 */
991 static int ltdb_connect(struct ldb_context *ldb, const char *url, 
992                         unsigned int flags, const char *options[],
993                         struct ldb_module **module)
994 {
995         const char *path;
996         int tdb_flags, open_flags;
997         struct ltdb_private *ltdb;
998
999         /* parse the url */
1000         if (strchr(url, ':')) {
1001                 if (strncmp(url, "tdb://", 6) != 0) {
1002                         ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid tdb URL '%s'", url);
1003                         return -1;
1004                 }
1005                 path = url+6;
1006         } else {
1007                 path = url;
1008         }
1009
1010         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1011
1012         /* check for the 'nosync' option */
1013         if (flags & LDB_FLG_NOSYNC) {
1014                 tdb_flags |= TDB_NOSYNC;
1015         }
1016
1017         /* and nommap option */
1018         if (flags & LDB_FLG_NOMMAP) {
1019                 tdb_flags |= TDB_NOMMAP;
1020         }
1021
1022         if (flags & LDB_FLG_RDONLY) {
1023                 open_flags = O_RDONLY;
1024         } else {
1025                 open_flags = O_CREAT | O_RDWR;
1026         }
1027
1028         ltdb = talloc_zero(ldb, struct ltdb_private);
1029         if (!ltdb) {
1030                 ldb_oom(ldb);
1031                 return -1;
1032         }
1033
1034         /* note that we use quite a large default hash size */
1035         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000, 
1036                                    tdb_flags, open_flags, 
1037                                    ldb->create_perms, ldb);
1038         if (!ltdb->tdb) {
1039                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Unable to open tdb '%s'\n", path);
1040                 talloc_free(ltdb);
1041                 return -1;
1042         }
1043
1044         ltdb->sequence_number = 0;
1045
1046         *module = talloc(ldb, struct ldb_module);
1047         if (!module) {
1048                 ldb_oom(ldb);
1049                 talloc_free(ltdb);
1050                 return -1;
1051         }
1052         talloc_set_name_const(*module, "ldb_tdb backend");
1053         (*module)->ldb = ldb;
1054         (*module)->prev = (*module)->next = NULL;
1055         (*module)->private_data = ltdb;
1056         (*module)->ops = &ltdb_ops;
1057
1058         if (ltdb_cache_load(*module) != 0) {
1059                 talloc_free(*module);
1060                 talloc_free(ltdb);
1061                 return -1;
1062         }
1063
1064         return 0;
1065 }
1066
1067 int ldb_tdb_init(void)
1068 {
1069         return ldb_register_backend("tdb", ltdb_connect);
1070 }