r20106: Optional ONE Level indexing for ldb_tdb
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /* 
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006
7    
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12    
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 2 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, write to the Free Software
25    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26 */
27
28 /*
29  *  Name: ldb_tdb
30  *
31  *  Component: ldb tdb backend
32  *
33  *  Description: core functions for tdb backend
34  *
35  *  Author: Andrew Tridgell
36  *  Author: Stefan Metzmacher
37  *
38  *  Modifications:
39  *
40  *  - description: make the module use asyncronous calls
41  *    date: Feb 2006
42  *    Author: Simo Sorce
43  */
44
45 #include "includes.h"
46 #include "ldb/include/includes.h"
47
48 #include "ldb/ldb_tdb/ldb_tdb.h"
49
50
51 /*
52   map a tdb error code to a ldb error code
53 */
54 static int ltdb_err_map(enum TDB_ERROR tdb_code)
55 {
56         switch (tdb_code) {
57         case TDB_SUCCESS:
58                 return LDB_SUCCESS;
59         case TDB_ERR_CORRUPT:
60         case TDB_ERR_OOM:
61         case TDB_ERR_EINVAL:
62                 return LDB_ERR_OPERATIONS_ERROR;
63         case TDB_ERR_IO:
64                 return LDB_ERR_PROTOCOL_ERROR;
65         case TDB_ERR_LOCK:
66         case TDB_ERR_NOLOCK:
67                 return LDB_ERR_BUSY;
68         case TDB_ERR_LOCK_TIMEOUT:
69                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
70         case TDB_ERR_EXISTS:
71                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
72         case TDB_ERR_NOEXIST:
73                 return LDB_ERR_NO_SUCH_OBJECT;
74         case TDB_ERR_RDONLY:
75                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
76         }
77         return LDB_ERR_OTHER;
78 }
79
80
81 struct ldb_handle *init_ltdb_handle(struct ltdb_private *ltdb, struct ldb_module *module,
82                                     struct ldb_request *req)
83 {
84         struct ltdb_context *ac;
85         struct ldb_handle *h;
86
87         h = talloc_zero(req, struct ldb_handle);
88         if (h == NULL) {
89                 ldb_set_errstring(module->ldb, "Out of Memory");
90                 return NULL;
91         }
92
93         h->module = module;
94
95         ac = talloc_zero(h, struct ltdb_context);
96         if (ac == NULL) {
97                 ldb_set_errstring(module->ldb, "Out of Memory");
98                 talloc_free(h);
99                 return NULL;
100         }
101
102         h->private_data = (void *)ac;
103
104         h->state = LDB_ASYNC_INIT;
105         h->status = LDB_SUCCESS;
106
107         ac->module = module;
108         ac->context = req->context;
109         ac->callback = req->callback;
110
111         return h;
112 }
113
114 /*
115   form a TDB_DATA for a record key
116   caller frees
117
118   note that the key for a record can depend on whether the 
119   dn refers to a case sensitive index record or not
120 */
121 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
122 {
123         struct ldb_context *ldb = module->ldb;
124         TDB_DATA key;
125         char *key_str = NULL;
126         const char *dn_folded = NULL;
127
128         /*
129           most DNs are case insensitive. The exception is index DNs for
130           case sensitive attributes
131
132           there are 3 cases dealt with in this code:
133
134           1) if the dn doesn't start with @ then uppercase the attribute
135              names and the attributes values of case insensitive attributes
136           2) if the dn starts with @ then leave it alone - the indexing code handles
137              the rest
138         */
139
140         dn_folded = ldb_dn_get_casefold(dn);
141         if (!dn_folded) {
142                 goto failed;
143         }
144
145         key_str = talloc_strdup(ldb, "DN=");
146         if (!key_str) {
147                 goto failed;
148         }
149
150         key_str = talloc_append_string(ldb, key_str, dn_folded);
151         if (!key_str) {
152                 goto failed;
153         }
154
155         key.dptr = (uint8_t *)key_str;
156         key.dsize = strlen(key_str) + 1;
157
158         return key;
159
160 failed:
161         errno = ENOMEM;
162         key.dptr = NULL;
163         key.dsize = 0;
164         return key;
165 }
166
167 /*
168   check special dn's have valid attributes
169   currently only @ATTRIBUTES is checked
170 */
171 int ltdb_check_special_dn(struct ldb_module *module, const struct ldb_message *msg)
172 {
173         int i, j;
174  
175         if (! ldb_dn_is_special(msg->dn) ||
176             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
177                 return 0;
178         }
179
180         /* we have @ATTRIBUTES, let's check attributes are fine */
181         /* should we check that we deny multivalued attributes ? */
182         for (i = 0; i < msg->num_elements; i++) {
183                 for (j = 0; j < msg->elements[i].num_values; j++) {
184                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
185                                 ldb_set_errstring(module->ldb, "Invalid attribute value in an @ATTRIBUTES entry");
186                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
187                         }
188                 }
189         }
190
191         return 0;
192 }
193
194
195 /*
196   we've made a modification to a dn - possibly reindex and 
197   update sequence number
198 */
199 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
200 {
201         int ret = 0;
202
203         if (ldb_dn_is_special(dn) &&
204             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
205              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
206                 ret = ltdb_reindex(module);
207         }
208
209         if (ret == 0 &&
210             !(ldb_dn_is_special(dn) &&
211               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
212                 ret = ltdb_increase_sequence_number(module);
213         }
214
215         return ret;
216 }
217
218 /*
219   store a record into the db
220 */
221 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
222 {
223         struct ltdb_private *ltdb =
224                 talloc_get_type(module->private_data, struct ltdb_private);
225         TDB_DATA tdb_key, tdb_data;
226         int ret;
227
228         tdb_key = ltdb_key(module, msg->dn);
229         if (!tdb_key.dptr) {
230                 return LDB_ERR_OTHER;
231         }
232
233         ret = ltdb_pack_data(module, msg, &tdb_data);
234         if (ret == -1) {
235                 talloc_free(tdb_key.dptr);
236                 return LDB_ERR_OTHER;
237         }
238
239         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
240         if (ret == -1) {
241                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
242                 goto done;
243         }
244         
245         ret = ltdb_index_add(module, msg);
246         if (ret == -1) {
247                 tdb_delete(ltdb->tdb, tdb_key);
248         }
249
250 done:
251         talloc_free(tdb_key.dptr);
252         talloc_free(tdb_data.dptr);
253
254         return ret;
255 }
256
257
258 static int ltdb_add_internal(struct ldb_module *module, const struct ldb_message *msg)
259 {
260         int ret;
261         
262         ret = ltdb_check_special_dn(module, msg);
263         if (ret != LDB_SUCCESS) {
264                 return ret;
265         }
266         
267         if (ltdb_cache_load(module) != 0) {
268                 return LDB_ERR_OPERATIONS_ERROR;
269         }
270
271         ret = ltdb_store(module, msg, TDB_INSERT);
272
273         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
274                 ldb_asprintf_errstring(module->ldb, "Entry %s already exists", ldb_dn_get_linearized(msg->dn));
275                 return ret;
276         }
277         
278         if (ret == LDB_SUCCESS) {
279                 ret = ltdb_index_one(module, msg, 1);
280                 if (ret != LDB_SUCCESS) {
281                         return LDB_ERR_OPERATIONS_ERROR;
282                 }
283
284                 ret = ltdb_modified(module, msg->dn);
285                 if (ret != LDB_SUCCESS) {
286                         return LDB_ERR_OPERATIONS_ERROR;
287                 }
288         }
289
290         return ret;
291 }
292
293 /*
294   add a record to the database
295 */
296 static int ltdb_add(struct ldb_module *module, struct ldb_request *req)
297 {
298         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
299         struct ltdb_context *ltdb_ac;
300         int tret, ret = LDB_SUCCESS;
301
302         if (req->controls != NULL) {
303                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
304                 if (check_critical_controls(req->controls)) {
305                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
306                 }
307         }
308         
309         req->handle = init_ltdb_handle(ltdb, module, req);
310         if (req->handle == NULL) {
311                 return LDB_ERR_OPERATIONS_ERROR;
312         }
313         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
314
315         tret = ltdb_add_internal(module, req->op.add.message);
316         if (tret != LDB_SUCCESS) {
317                 req->handle->status = tret;
318                 goto done;
319         }
320         
321         if (ltdb_ac->callback) {
322                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
323         }
324 done:
325         req->handle->state = LDB_ASYNC_DONE;
326         return ret;
327 }
328
329 /*
330   delete a record from the database, not updating indexes (used for deleting
331   index records)
332 */
333 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
334 {
335         struct ltdb_private *ltdb =
336                 talloc_get_type(module->private_data, struct ltdb_private);
337         TDB_DATA tdb_key;
338         int ret;
339
340         tdb_key = ltdb_key(module, dn);
341         if (!tdb_key.dptr) {
342                 return LDB_ERR_OTHER;
343         }
344
345         ret = tdb_delete(ltdb->tdb, tdb_key);
346         talloc_free(tdb_key.dptr);
347
348         if (ret != 0) {
349                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
350         }
351
352         return ret;
353 }
354
355 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
356 {
357         struct ldb_message *msg;
358         int ret;
359
360         msg = talloc(module, struct ldb_message);
361         if (msg == NULL) {
362                 return LDB_ERR_OPERATIONS_ERROR;
363         }
364
365         /* in case any attribute of the message was indexed, we need
366            to fetch the old record */
367         ret = ltdb_search_dn1(module, dn, msg);
368         if (ret != 1) {
369                 /* not finding the old record is an error */
370                 talloc_free(msg);
371                 return LDB_ERR_NO_SUCH_OBJECT;
372         }
373
374         ret = ltdb_delete_noindex(module, dn);
375         if (ret != LDB_SUCCESS) {
376                 talloc_free(msg);
377                 return LDB_ERR_NO_SUCH_OBJECT;
378         }
379
380         /* remove one level attribute */
381         ret = ltdb_index_one(module, msg, 0);
382         if (ret != LDB_SUCCESS) {
383                 talloc_free(msg);
384                 return LDB_ERR_OPERATIONS_ERROR;
385         }
386
387         /* remove any indexed attributes */
388         ret = ltdb_index_del(module, msg);
389         if (ret != LDB_SUCCESS) {
390                 talloc_free(msg);
391                 return LDB_ERR_OPERATIONS_ERROR;
392         }
393
394         ret = ltdb_modified(module, dn);
395         if (ret != LDB_SUCCESS) {
396                 talloc_free(msg);
397                 return LDB_ERR_OPERATIONS_ERROR;
398         }
399
400         talloc_free(msg);
401         return LDB_SUCCESS;
402 }
403
404 /*
405   delete a record from the database
406 */
407 static int ltdb_delete(struct ldb_module *module, struct ldb_request *req)
408 {
409         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
410         struct ltdb_context *ltdb_ac;
411         int tret, ret = LDB_SUCCESS;
412
413         if (req->controls != NULL) {
414                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
415                 if (check_critical_controls(req->controls)) {
416                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
417                 }
418         }
419         
420         req->handle = NULL;
421
422         if (ltdb_cache_load(module) != 0) {
423                 return LDB_ERR_OPERATIONS_ERROR;
424         }
425
426         req->handle = init_ltdb_handle(ltdb, module, req);
427         if (req->handle == NULL) {
428                 return LDB_ERR_OPERATIONS_ERROR;
429         }
430         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
431
432         tret = ltdb_delete_internal(module, req->op.del.dn);
433         if (tret != LDB_SUCCESS) {
434                 req->handle->status = tret; 
435                 goto done;
436         }
437
438         if (ltdb_ac->callback) {
439                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
440         }
441 done:
442         req->handle->state = LDB_ASYNC_DONE;
443         return ret;
444 }
445
446 /*
447   find an element by attribute name. At the moment this does a linear search, it should
448   be re-coded to use a binary search once all places that modify records guarantee
449   sorted order
450
451   return the index of the first matching element if found, otherwise -1
452 */
453 static int find_element(const struct ldb_message *msg, const char *name)
454 {
455         unsigned int i;
456         for (i=0;i<msg->num_elements;i++) {
457                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
458                         return i;
459                 }
460         }
461         return -1;
462 }
463
464
465 /*
466   add an element to an existing record. Assumes a elements array that we
467   can call re-alloc on, and assumed that we can re-use the data pointers from the 
468   passed in additional values. Use with care!
469
470   returns 0 on success, -1 on failure (and sets errno)
471 */
472 static int msg_add_element(struct ldb_context *ldb,
473                            struct ldb_message *msg, struct ldb_message_element *el)
474 {
475         struct ldb_message_element *e2;
476         unsigned int i;
477
478         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element, 
479                               msg->num_elements+1);
480         if (!e2) {
481                 errno = ENOMEM;
482                 return -1;
483         }
484
485         msg->elements = e2;
486
487         e2 = &msg->elements[msg->num_elements];
488
489         e2->name = el->name;
490         e2->flags = el->flags;
491         e2->values = NULL;
492         if (el->num_values != 0) {
493                 e2->values = talloc_array(msg->elements, struct ldb_val, el->num_values);
494                 if (!e2->values) {
495                         errno = ENOMEM;
496                         return -1;
497                 }
498         }
499         for (i=0;i<el->num_values;i++) {
500                 e2->values[i] = el->values[i];
501         }
502         e2->num_values = el->num_values;
503
504         msg->num_elements++;
505
506         return 0;
507 }
508
509 /*
510   delete all elements having a specified attribute name
511 */
512 static int msg_delete_attribute(struct ldb_module *module,
513                                 struct ldb_context *ldb,
514                                 struct ldb_message *msg, const char *name)
515 {
516         const char *dn;
517         unsigned int i, j;
518
519         dn = ldb_dn_get_linearized(msg->dn);
520         if (dn == NULL) {
521                 return -1;
522         }
523
524         for (i=0;i<msg->num_elements;i++) {
525                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
526                         for (j=0;j<msg->elements[i].num_values;j++) {
527                                 ltdb_index_del_value(module, dn, &msg->elements[i], j);
528                         }
529                         talloc_free(msg->elements[i].values);
530                         if (msg->num_elements > (i+1)) {
531                                 memmove(&msg->elements[i], 
532                                         &msg->elements[i+1], 
533                                         sizeof(struct ldb_message_element)*
534                                         (msg->num_elements - (i+1)));
535                         }
536                         msg->num_elements--;
537                         i--;
538                         msg->elements = talloc_realloc(msg, msg->elements, 
539                                                          struct ldb_message_element, 
540                                                          msg->num_elements);
541                 }
542         }
543
544         return 0;
545 }
546
547 /*
548   delete all elements matching an attribute name/value 
549
550   return 0 on success, -1 on failure
551 */
552 static int msg_delete_element(struct ldb_module *module,
553                               struct ldb_message *msg, 
554                               const char *name,
555                               const struct ldb_val *val)
556 {
557         struct ldb_context *ldb = module->ldb;
558         unsigned int i;
559         int found;
560         struct ldb_message_element *el;
561         const struct ldb_attrib_handler *h;
562
563         found = find_element(msg, name);
564         if (found == -1) {
565                 return -1;
566         }
567
568         el = &msg->elements[found];
569
570         h = ldb_attrib_handler(ldb, el->name);
571
572         for (i=0;i<el->num_values;i++) {
573                 if (h->comparison_fn(ldb, ldb, &el->values[i], val) == 0) {
574                         if (i<el->num_values-1) {
575                                 memmove(&el->values[i], &el->values[i+1],
576                                         sizeof(el->values[i])*(el->num_values-(i+1)));
577                         }
578                         el->num_values--;
579                         if (el->num_values == 0) {
580                                 return msg_delete_attribute(module, ldb, msg, name);
581                         }
582                         return 0;
583                 }
584         }
585
586         return -1;
587 }
588
589
590 /*
591   modify a record - internal interface
592
593   yuck - this is O(n^2). Luckily n is usually small so we probably
594   get away with it, but if we ever have really large attribute lists 
595   then we'll need to look at this again
596 */
597 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg)
598 {
599         struct ldb_context *ldb = module->ldb;
600         struct ltdb_private *ltdb =
601                 talloc_get_type(module->private_data, struct ltdb_private);
602         TDB_DATA tdb_key, tdb_data;
603         struct ldb_message *msg2;
604         unsigned i, j;
605         int ret;
606
607         tdb_key = ltdb_key(module, msg->dn);
608         if (!tdb_key.dptr) {
609                 return LDB_ERR_OTHER;
610         }
611
612         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
613         if (!tdb_data.dptr) {
614                 talloc_free(tdb_key.dptr);
615                 return ltdb_err_map(tdb_error(ltdb->tdb));
616         }
617
618         msg2 = talloc(tdb_key.dptr, struct ldb_message);
619         if (msg2 == NULL) {
620                 talloc_free(tdb_key.dptr);
621                 return LDB_ERR_OTHER;
622         }
623
624         ret = ltdb_unpack_data(module, &tdb_data, msg2);
625         if (ret == -1) {
626                 ret = LDB_ERR_OTHER;
627                 goto failed;
628         }
629
630         if (!msg2->dn) {
631                 msg2->dn = msg->dn;
632         }
633
634         for (i=0;i<msg->num_elements;i++) {
635                 struct ldb_message_element *el = &msg->elements[i];
636                 struct ldb_message_element *el2;
637                 struct ldb_val *vals;
638                 const char *dn;
639
640                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
641
642                 case LDB_FLAG_MOD_ADD:
643                         /* add this element to the message. fail if it
644                            already exists */
645                         ret = find_element(msg2, el->name);
646
647                         if (ret == -1) {
648                                 if (msg_add_element(ldb, msg2, el) != 0) {
649                                         ret = LDB_ERR_OTHER;
650                                         goto failed;
651                                 }
652                                 continue;
653                         }
654
655                         el2 = &msg2->elements[ret];
656
657                         /* An attribute with this name already exists, add all
658                          * values if they don't already exist. */
659
660                         for (j=0;j<el->num_values;j++) {
661                                 if (ldb_msg_find_val(el2, &el->values[j])) {
662                                         ldb_set_errstring(module->ldb, "Type or value exists");
663                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
664                                         goto failed;
665                                 }
666                         }
667
668                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
669                                                 el2->num_values + el->num_values);
670
671                         if (vals == NULL) {
672                                 ret = LDB_ERR_OTHER;
673                                 goto failed;
674                         }
675
676                         for (j=0;j<el->num_values;j++) {
677                                 vals[el2->num_values + j] =
678                                         ldb_val_dup(vals, &el->values[j]);
679                         }
680
681                         el2->values = vals;
682                         el2->num_values += el->num_values;
683
684                         break;
685
686                 case LDB_FLAG_MOD_REPLACE:
687                         /* replace all elements of this attribute name with the elements
688                            listed. The attribute not existing is not an error */
689                         msg_delete_attribute(module, ldb, msg2, msg->elements[i].name);
690
691                         /* add the replacement element, if not empty */
692                         if (msg->elements[i].num_values != 0 &&
693                             msg_add_element(ldb, msg2, &msg->elements[i]) != 0) {
694                                 ret = LDB_ERR_OTHER;
695                                 goto failed;
696                         }
697                         break;
698
699                 case LDB_FLAG_MOD_DELETE:
700
701                         dn = ldb_dn_get_linearized(msg->dn);
702                         if (dn == NULL) {
703                                 ret = LDB_ERR_OTHER;
704                                 goto failed;
705                         }
706
707                         /* we could be being asked to delete all
708                            values or just some values */
709                         if (msg->elements[i].num_values == 0) {
710                                 if (msg_delete_attribute(module, ldb, msg2, 
711                                                          msg->elements[i].name) != 0) {
712                                         ldb_asprintf_errstring(module->ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
713                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
714                                         goto failed;
715                                 }
716                                 break;
717                         }
718                         for (j=0;j<msg->elements[i].num_values;j++) {
719                                 if (msg_delete_element(module,
720                                                        msg2, 
721                                                        msg->elements[i].name,
722                                                        &msg->elements[i].values[j]) != 0) {
723                                         ldb_asprintf_errstring(module->ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
724                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
725                                         goto failed;
726                                 }
727                                 if (ltdb_index_del_value(module, dn, &msg->elements[i], j) != 0) {
728                                         ret = LDB_ERR_OTHER;
729                                         goto failed;
730                                 }
731                         }
732                         break;
733                 default:
734                         ldb_asprintf_errstring(module->ldb, "Invalid ldb_modify flags on %s: 0x%x", 
735                                                              msg->elements[i].name, 
736                                                              msg->elements[i].flags & LDB_FLAG_MOD_MASK);
737                         ret = LDB_ERR_PROTOCOL_ERROR;
738                         goto failed;
739                 }
740         }
741
742         /* we've made all the mods - save the modified record back into the database */
743         ret = ltdb_store(module, msg2, TDB_MODIFY);
744         if (ret != LDB_SUCCESS) {
745                 goto failed;
746         }
747
748         if (ltdb_modified(module, msg->dn) != LDB_SUCCESS) {
749                 ret = LDB_ERR_OPERATIONS_ERROR;
750                 goto failed;
751         }
752
753         talloc_free(tdb_key.dptr);
754         free(tdb_data.dptr);
755         return ret;
756
757 failed:
758         talloc_free(tdb_key.dptr);
759         free(tdb_data.dptr);
760         return ret;
761 }
762
763 /*
764   modify a record
765 */
766 static int ltdb_modify(struct ldb_module *module, struct ldb_request *req)
767 {
768         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
769         struct ltdb_context *ltdb_ac;
770         int tret, ret = LDB_SUCCESS;
771
772         if (req->controls != NULL) {
773                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
774                 if (check_critical_controls(req->controls)) {
775                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
776                 }
777         }
778         
779         req->handle = NULL;
780
781         req->handle = init_ltdb_handle(ltdb, module, req);
782         if (req->handle == NULL) {
783                 return LDB_ERR_OPERATIONS_ERROR;
784         }
785         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
786
787         tret = ltdb_check_special_dn(module, req->op.mod.message);
788         if (tret != LDB_SUCCESS) {
789                 req->handle->status = tret;
790                 goto done;
791         }
792         
793         if (ltdb_cache_load(module) != 0) {
794                 ret = LDB_ERR_OPERATIONS_ERROR;
795                 goto done;
796         }
797
798         tret = ltdb_modify_internal(module, req->op.mod.message);
799         if (tret != LDB_SUCCESS) {
800                 req->handle->status = tret;
801                 goto done;
802         }
803
804         if (ltdb_ac->callback) {
805                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
806         }
807 done:
808         req->handle->state = LDB_ASYNC_DONE;
809         return ret;
810 }
811
812 /*
813   rename a record
814 */
815 static int ltdb_rename(struct ldb_module *module, struct ldb_request *req)
816 {
817         struct ltdb_private *ltdb = talloc_get_type(module->private_data, struct ltdb_private);
818         struct ltdb_context *ltdb_ac;
819         struct ldb_message *msg;
820         int tret, ret = LDB_SUCCESS;
821
822         if (req->controls != NULL) {
823                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
824                 if (check_critical_controls(req->controls)) {
825                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
826                 }
827         }
828         
829         req->handle = NULL;
830
831         if (ltdb_cache_load(module) != 0) {
832                 return LDB_ERR_OPERATIONS_ERROR;
833         }
834
835         req->handle = init_ltdb_handle(ltdb, module, req);
836         if (req->handle == NULL) {
837                 return LDB_ERR_OPERATIONS_ERROR;
838         }
839         ltdb_ac = talloc_get_type(req->handle->private_data, struct ltdb_context);
840
841         msg = talloc(ltdb_ac, struct ldb_message);
842         if (msg == NULL) {
843                 ret = LDB_ERR_OPERATIONS_ERROR;
844                 goto done;
845         }
846
847         /* in case any attribute of the message was indexed, we need
848            to fetch the old record */
849         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
850         if (tret != 1) {
851                 /* not finding the old record is an error */
852                 req->handle->status = LDB_ERR_NO_SUCH_OBJECT;
853                 goto done;
854         }
855
856         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
857         if (!msg->dn) {
858                 ret = LDB_ERR_OPERATIONS_ERROR;
859                 goto done;
860         }
861
862         tret = ltdb_add_internal(module, msg);
863         if (tret != LDB_SUCCESS) {
864                 ret = LDB_ERR_OPERATIONS_ERROR;
865                 goto done;
866         }
867
868         tret = ltdb_delete_internal(module, req->op.rename.olddn);
869         if (tret != LDB_SUCCESS) {
870                 ltdb_delete_internal(module, req->op.rename.newdn);
871                 ret = LDB_ERR_OPERATIONS_ERROR;
872                 goto done;
873         }
874
875         if (ltdb_ac->callback) {
876                 ret = ltdb_ac->callback(module->ldb, ltdb_ac->context, NULL);
877         }
878 done:
879         req->handle->state = LDB_ASYNC_DONE;
880         return ret;
881 }
882
883 static int ltdb_start_trans(struct ldb_module *module)
884 {
885         struct ltdb_private *ltdb =
886                 talloc_get_type(module->private_data, struct ltdb_private);
887
888         if (tdb_transaction_start(ltdb->tdb) != 0) {
889                 return ltdb_err_map(tdb_error(ltdb->tdb));
890         }
891
892         return LDB_SUCCESS;
893 }
894
895 static int ltdb_end_trans(struct ldb_module *module)
896 {
897         struct ltdb_private *ltdb =
898                 talloc_get_type(module->private_data, struct ltdb_private);
899
900         if (tdb_transaction_commit(ltdb->tdb) != 0) {
901                 return ltdb_err_map(tdb_error(ltdb->tdb));
902         }
903
904         return LDB_SUCCESS;
905 }
906
907 static int ltdb_del_trans(struct ldb_module *module)
908 {
909         struct ltdb_private *ltdb =
910                 talloc_get_type(module->private_data, struct ltdb_private);
911
912         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
913                 return ltdb_err_map(tdb_error(ltdb->tdb));
914         }
915
916         return LDB_SUCCESS;
917 }
918
919 static int ltdb_wait(struct ldb_handle *handle, enum ldb_wait_type type)
920 {
921         return handle->status;
922 }
923
924 static int ltdb_request(struct ldb_module *module, struct ldb_request *req)
925 {
926         /* check for oustanding critical controls and return an error if found */
927         if (req->controls != NULL) {
928                 ldb_debug(module->ldb, LDB_DEBUG_WARNING, "Controls should not reach the ldb_tdb backend!\n");
929                 if (check_critical_controls(req->controls)) {
930                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
931                 }
932         }
933         
934         /* search, add, modify, delete, rename are handled by their own, no other op supported */
935         return LDB_ERR_OPERATIONS_ERROR;
936 }
937
938 /*
939   return sequenceNumber from @BASEINFO
940 */
941 static int ltdb_sequence_number(struct ldb_module *module, struct ldb_request *req)
942 {
943         TALLOC_CTX *tmp_ctx = talloc_new(req);
944         struct ldb_message *msg = NULL;
945         struct ldb_dn *dn = ldb_dn_new(tmp_ctx, module->ldb, LTDB_BASEINFO);
946         int tret;
947
948         if (tmp_ctx == NULL) {
949                 talloc_free(tmp_ctx);
950                 return LDB_ERR_OPERATIONS_ERROR;
951         }
952
953         msg = talloc(tmp_ctx, struct ldb_message);
954         if (msg == NULL) {
955                 talloc_free(tmp_ctx);
956                 return LDB_ERR_OPERATIONS_ERROR;
957         }
958
959         req->op.seq_num.flags = 0;
960
961         tret = ltdb_search_dn1(module, dn, msg);
962         if (tret != 1) {
963                 talloc_free(tmp_ctx);
964                 req->op.seq_num.seq_num = 0;
965                 /* zero is as good as anything when we don't know */
966                 return LDB_SUCCESS;
967         }
968
969         switch (req->op.seq_num.type) {
970         case LDB_SEQ_HIGHEST_SEQ:
971                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
972                 break;
973         case LDB_SEQ_NEXT:
974                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
975                 req->op.seq_num.seq_num++;
976                 break;
977         case LDB_SEQ_HIGHEST_TIMESTAMP:
978         {
979                 const char *date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
980                 if (date) {
981                         req->op.seq_num.seq_num = ldb_string_to_time(date);
982                 } else {
983                         req->op.seq_num.seq_num = 0;
984                         /* zero is as good as anything when we don't know */
985                 }
986                 break;
987         }
988         }
989         talloc_free(tmp_ctx);
990         return LDB_SUCCESS;
991 }
992
993 static const struct ldb_module_ops ltdb_ops = {
994         .name              = "tdb",
995         .search            = ltdb_search,
996         .add               = ltdb_add,
997         .modify            = ltdb_modify,
998         .del               = ltdb_delete,
999         .rename            = ltdb_rename,
1000         .request           = ltdb_request,
1001         .start_transaction = ltdb_start_trans,
1002         .end_transaction   = ltdb_end_trans,
1003         .del_transaction   = ltdb_del_trans,
1004         .wait              = ltdb_wait,
1005         .sequence_number   = ltdb_sequence_number
1006 };
1007
1008 /*
1009   connect to the database
1010 */
1011 static int ltdb_connect(struct ldb_context *ldb, const char *url, 
1012                         unsigned int flags, const char *options[],
1013                         struct ldb_module **module)
1014 {
1015         const char *path;
1016         int tdb_flags, open_flags;
1017         struct ltdb_private *ltdb;
1018
1019         /* parse the url */
1020         if (strchr(url, ':')) {
1021                 if (strncmp(url, "tdb://", 6) != 0) {
1022                         ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid tdb URL '%s'", url);
1023                         return -1;
1024                 }
1025                 path = url+6;
1026         } else {
1027                 path = url;
1028         }
1029
1030         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1031
1032         /* check for the 'nosync' option */
1033         if (flags & LDB_FLG_NOSYNC) {
1034                 tdb_flags |= TDB_NOSYNC;
1035         }
1036
1037         if (flags & LDB_FLG_RDONLY) {
1038                 open_flags = O_RDONLY;
1039         } else {
1040                 open_flags = O_CREAT | O_RDWR;
1041         }
1042
1043         ltdb = talloc_zero(ldb, struct ltdb_private);
1044         if (!ltdb) {
1045                 ldb_oom(ldb);
1046                 return -1;
1047         }
1048
1049         /* note that we use quite a large default hash size */
1050         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000, 
1051                                    tdb_flags, open_flags, 
1052                                    ldb->create_perms, ldb);
1053         if (!ltdb->tdb) {
1054                 ldb_debug(ldb, LDB_DEBUG_ERROR, "Unable to open tdb '%s'\n", path);
1055                 talloc_free(ltdb);
1056                 return -1;
1057         }
1058
1059         ltdb->sequence_number = 0;
1060
1061         *module = talloc(ldb, struct ldb_module);
1062         if (!module) {
1063                 ldb_oom(ldb);
1064                 talloc_free(ltdb);
1065                 return -1;
1066         }
1067         talloc_set_name_const(*module, "ldb_tdb backend");
1068         (*module)->ldb = ldb;
1069         (*module)->prev = (*module)->next = NULL;
1070         (*module)->private_data = ltdb;
1071         (*module)->ops = &ltdb_ops;
1072
1073         if (ltdb_cache_load(*module) != 0) {
1074                 talloc_free(*module);
1075                 talloc_free(ltdb);
1076                 return -1;
1077         }
1078
1079         return 0;
1080 }
1081
1082 int ldb_tdb_init(void)
1083 {
1084         return ldb_register_backend("tdb", ltdb_connect);
1085 }