LDB ASYNC: ldb_tdb backend
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006-2008
7
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  */
47
48 #include "ldb_includes.h"
49
50 #include "ldb_tdb.h"
51
52
53 /*
54   map a tdb error code to a ldb error code
55 */
56 static int ltdb_err_map(enum TDB_ERROR tdb_code)
57 {
58         switch (tdb_code) {
59         case TDB_SUCCESS:
60                 return LDB_SUCCESS;
61         case TDB_ERR_CORRUPT:
62         case TDB_ERR_OOM:
63         case TDB_ERR_EINVAL:
64                 return LDB_ERR_OPERATIONS_ERROR;
65         case TDB_ERR_IO:
66                 return LDB_ERR_PROTOCOL_ERROR;
67         case TDB_ERR_LOCK:
68         case TDB_ERR_NOLOCK:
69                 return LDB_ERR_BUSY;
70         case TDB_ERR_LOCK_TIMEOUT:
71                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
72         case TDB_ERR_EXISTS:
73                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
74         case TDB_ERR_NOEXIST:
75                 return LDB_ERR_NO_SUCH_OBJECT;
76         case TDB_ERR_RDONLY:
77                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
78         }
79         return LDB_ERR_OTHER;
80 }
81
82
83 /*
84   form a TDB_DATA for a record key
85   caller frees
86
87   note that the key for a record can depend on whether the
88   dn refers to a case sensitive index record or not
89 */
90 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
91 {
92         struct ldb_context *ldb = module->ldb;
93         TDB_DATA key;
94         char *key_str = NULL;
95         const char *dn_folded = NULL;
96
97         /*
98           most DNs are case insensitive. The exception is index DNs for
99           case sensitive attributes
100
101           there are 3 cases dealt with in this code:
102
103           1) if the dn doesn't start with @ then uppercase the attribute
104              names and the attributes values of case insensitive attributes
105           2) if the dn starts with @ then leave it alone -
106              the indexing code handles the rest
107         */
108
109         dn_folded = ldb_dn_get_casefold(dn);
110         if (!dn_folded) {
111                 goto failed;
112         }
113
114         key_str = talloc_strdup(ldb, "DN=");
115         if (!key_str) {
116                 goto failed;
117         }
118
119         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
120         if (!key_str) {
121                 goto failed;
122         }
123
124         key.dptr = (uint8_t *)key_str;
125         key.dsize = strlen(key_str) + 1;
126
127         return key;
128
129 failed:
130         errno = ENOMEM;
131         key.dptr = NULL;
132         key.dsize = 0;
133         return key;
134 }
135
136 /*
137   check special dn's have valid attributes
138   currently only @ATTRIBUTES is checked
139 */
140 int ltdb_check_special_dn(struct ldb_module *module,
141                           const struct ldb_message *msg)
142 {
143         int i, j;
144
145         if (! ldb_dn_is_special(msg->dn) ||
146             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
147                 return 0;
148         }
149
150         /* we have @ATTRIBUTES, let's check attributes are fine */
151         /* should we check that we deny multivalued attributes ? */
152         for (i = 0; i < msg->num_elements; i++) {
153                 for (j = 0; j < msg->elements[i].num_values; j++) {
154                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
155                                 ldb_set_errstring(module->ldb, "Invalid attribute value in an @ATTRIBUTES entry");
156                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
157                         }
158                 }
159         }
160
161         return 0;
162 }
163
164
165 /*
166   we've made a modification to a dn - possibly reindex and
167   update sequence number
168 */
169 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
170 {
171         int ret = LDB_SUCCESS;
172
173         if (ldb_dn_is_special(dn) &&
174             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
175              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
176                 ret = ltdb_reindex(module);
177         }
178
179         if (ret == LDB_SUCCESS &&
180             !(ldb_dn_is_special(dn) &&
181               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
182                 ret = ltdb_increase_sequence_number(module);
183         }
184
185         return ret;
186 }
187
188 /*
189   store a record into the db
190 */
191 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
192 {
193         struct ltdb_private *ltdb =
194                 talloc_get_type(module->private_data, struct ltdb_private);
195         TDB_DATA tdb_key, tdb_data;
196         int ret;
197
198         tdb_key = ltdb_key(module, msg->dn);
199         if (!tdb_key.dptr) {
200                 return LDB_ERR_OTHER;
201         }
202
203         ret = ltdb_pack_data(module, msg, &tdb_data);
204         if (ret == -1) {
205                 talloc_free(tdb_key.dptr);
206                 return LDB_ERR_OTHER;
207         }
208
209         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
210         if (ret == -1) {
211                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
212                 goto done;
213         }
214
215         ret = ltdb_index_add(module, msg);
216         if (ret != LDB_SUCCESS) {
217                 tdb_delete(ltdb->tdb, tdb_key);
218         }
219
220 done:
221         talloc_free(tdb_key.dptr);
222         talloc_free(tdb_data.dptr);
223
224         return ret;
225 }
226
227
228 static int ltdb_add_internal(struct ldb_module *module,
229                              const struct ldb_message *msg)
230 {
231         int ret;
232
233         ret = ltdb_check_special_dn(module, msg);
234         if (ret != LDB_SUCCESS) {
235                 return ret;
236         }
237
238         if (ltdb_cache_load(module) != 0) {
239                 return LDB_ERR_OPERATIONS_ERROR;
240         }
241
242         ret = ltdb_store(module, msg, TDB_INSERT);
243
244         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
245                 ldb_asprintf_errstring(module->ldb,
246                                         "Entry %s already exists",
247                                         ldb_dn_get_linearized(msg->dn));
248                 return ret;
249         }
250
251         if (ret == LDB_SUCCESS) {
252                 ret = ltdb_index_one(module, msg, 1);
253                 if (ret != LDB_SUCCESS) {
254                         return ret;
255                 }
256
257                 ret = ltdb_modified(module, msg->dn);
258                 if (ret != LDB_SUCCESS) {
259                         return ret;
260                 }
261         }
262
263         return ret;
264 }
265
266 /*
267   add a record to the database
268 */
269 static int ltdb_add(struct ltdb_context *ctx)
270 {
271         struct ldb_module *module = ctx->module;
272         struct ldb_request *req = ctx->req;
273         int tret;
274
275         req->handle->state = LDB_ASYNC_PENDING;
276
277         tret = ltdb_add_internal(module, req->op.add.message);
278         if (tret != LDB_SUCCESS) {
279                 return tret;
280         }
281
282         return LDB_SUCCESS;
283 }
284
285 /*
286   delete a record from the database, not updating indexes (used for deleting
287   index records)
288 */
289 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
290 {
291         struct ltdb_private *ltdb =
292                 talloc_get_type(module->private_data, struct ltdb_private);
293         TDB_DATA tdb_key;
294         int ret;
295
296         tdb_key = ltdb_key(module, dn);
297         if (!tdb_key.dptr) {
298                 return LDB_ERR_OTHER;
299         }
300
301         ret = tdb_delete(ltdb->tdb, tdb_key);
302         talloc_free(tdb_key.dptr);
303
304         if (ret != 0) {
305                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
306         }
307
308         return ret;
309 }
310
311 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
312 {
313         struct ldb_message *msg;
314         int ret;
315
316         msg = talloc(module, struct ldb_message);
317         if (msg == NULL) {
318                 return LDB_ERR_OPERATIONS_ERROR;
319         }
320
321         /* in case any attribute of the message was indexed, we need
322            to fetch the old record */
323         ret = ltdb_search_dn1(module, dn, msg);
324         if (ret != LDB_SUCCESS) {
325                 /* not finding the old record is an error */
326                 goto done;
327         }
328
329         ret = ltdb_delete_noindex(module, dn);
330         if (ret != LDB_SUCCESS) {
331                 goto done;
332         }
333
334         /* remove one level attribute */
335         ret = ltdb_index_one(module, msg, 0);
336         if (ret != LDB_SUCCESS) {
337                 goto done;
338         }
339
340         /* remove any indexed attributes */
341         ret = ltdb_index_del(module, msg);
342         if (ret != LDB_SUCCESS) {
343                 goto done;
344         }
345
346         ret = ltdb_modified(module, dn);
347         if (ret != LDB_SUCCESS) {
348                 goto done;
349         }
350
351 done:
352         talloc_free(msg);
353         return ret;
354 }
355
356 /*
357   delete a record from the database
358 */
359 static int ltdb_delete(struct ltdb_context *ctx)
360 {
361         struct ldb_module *module = ctx->module;
362         struct ldb_request *req = ctx->req;
363         int tret;
364
365         req->handle->state = LDB_ASYNC_PENDING;
366
367         if (ltdb_cache_load(module) != 0) {
368                 return LDB_ERR_OPERATIONS_ERROR;
369         }
370
371         tret = ltdb_delete_internal(module, req->op.del.dn);
372         if (tret != LDB_SUCCESS) {
373                 return tret;
374         }
375
376         return LDB_SUCCESS;
377 }
378
379 /*
380   find an element by attribute name. At the moment this does a linear search,
381   it should be re-coded to use a binary search once all places that modify
382   records guarantee sorted order
383
384   return the index of the first matching element if found, otherwise -1
385 */
386 static int find_element(const struct ldb_message *msg, const char *name)
387 {
388         unsigned int i;
389         for (i=0;i<msg->num_elements;i++) {
390                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
391                         return i;
392                 }
393         }
394         return -1;
395 }
396
397
398 /*
399   add an element to an existing record. Assumes a elements array that we
400   can call re-alloc on, and assumed that we can re-use the data pointers from
401   the passed in additional values. Use with care!
402
403   returns 0 on success, -1 on failure (and sets errno)
404 */
405 static int msg_add_element(struct ldb_context *ldb,
406                            struct ldb_message *msg,
407                            struct ldb_message_element *el)
408 {
409         struct ldb_message_element *e2;
410         unsigned int i;
411
412         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
413                               msg->num_elements+1);
414         if (!e2) {
415                 errno = ENOMEM;
416                 return -1;
417         }
418
419         msg->elements = e2;
420
421         e2 = &msg->elements[msg->num_elements];
422
423         e2->name = el->name;
424         e2->flags = el->flags;
425         e2->values = NULL;
426         if (el->num_values != 0) {
427                 e2->values = talloc_array(msg->elements,
428                                           struct ldb_val, el->num_values);
429                 if (!e2->values) {
430                         errno = ENOMEM;
431                         return -1;
432                 }
433         }
434         for (i=0;i<el->num_values;i++) {
435                 e2->values[i] = el->values[i];
436         }
437         e2->num_values = el->num_values;
438
439         msg->num_elements++;
440
441         return 0;
442 }
443
444 /*
445   delete all elements having a specified attribute name
446 */
447 static int msg_delete_attribute(struct ldb_module *module,
448                                 struct ldb_context *ldb,
449                                 struct ldb_message *msg, const char *name)
450 {
451         const char *dn;
452         unsigned int i, j;
453
454         dn = ldb_dn_get_linearized(msg->dn);
455         if (dn == NULL) {
456                 return -1;
457         }
458
459         for (i=0;i<msg->num_elements;i++) {
460                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
461                         for (j=0;j<msg->elements[i].num_values;j++) {
462                                 ltdb_index_del_value(module, dn,
463                                                      &msg->elements[i], j);
464                         }
465                         talloc_free(msg->elements[i].values);
466                         if (msg->num_elements > (i+1)) {
467                                 memmove(&msg->elements[i],
468                                         &msg->elements[i+1],
469                                         sizeof(struct ldb_message_element)*
470                                         (msg->num_elements - (i+1)));
471                         }
472                         msg->num_elements--;
473                         i--;
474                         msg->elements = talloc_realloc(msg, msg->elements,
475                                                         struct ldb_message_element,
476                                                         msg->num_elements);
477                 }
478         }
479
480         return 0;
481 }
482
483 /*
484   delete all elements matching an attribute name/value
485
486   return 0 on success, -1 on failure
487 */
488 static int msg_delete_element(struct ldb_module *module,
489                               struct ldb_message *msg,
490                               const char *name,
491                               const struct ldb_val *val)
492 {
493         struct ldb_context *ldb = module->ldb;
494         unsigned int i;
495         int found;
496         struct ldb_message_element *el;
497         const struct ldb_schema_attribute *a;
498
499         found = find_element(msg, name);
500         if (found == -1) {
501                 return -1;
502         }
503
504         el = &msg->elements[found];
505
506         a = ldb_schema_attribute_by_name(ldb, el->name);
507
508         for (i=0;i<el->num_values;i++) {
509                 if (a->syntax->comparison_fn(ldb, ldb,
510                                                 &el->values[i], val) == 0) {
511                         if (i<el->num_values-1) {
512                                 memmove(&el->values[i], &el->values[i+1],
513                                         sizeof(el->values[i])*
514                                                 (el->num_values-(i+1)));
515                         }
516                         el->num_values--;
517                         if (el->num_values == 0) {
518                                 return msg_delete_attribute(module, ldb,
519                                                             msg, name);
520                         }
521                         return 0;
522                 }
523         }
524
525         return -1;
526 }
527
528
529 /*
530   modify a record - internal interface
531
532   yuck - this is O(n^2). Luckily n is usually small so we probably
533   get away with it, but if we ever have really large attribute lists
534   then we'll need to look at this again
535 */
536 int ltdb_modify_internal(struct ldb_module *module,
537                          const struct ldb_message *msg)
538 {
539         struct ldb_context *ldb = module->ldb;
540         struct ltdb_private *ltdb =
541                 talloc_get_type(module->private_data, struct ltdb_private);
542         TDB_DATA tdb_key, tdb_data;
543         struct ldb_message *msg2;
544         unsigned i, j;
545         int ret, idx;
546
547         tdb_key = ltdb_key(module, msg->dn);
548         if (!tdb_key.dptr) {
549                 return LDB_ERR_OTHER;
550         }
551
552         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
553         if (!tdb_data.dptr) {
554                 talloc_free(tdb_key.dptr);
555                 return ltdb_err_map(tdb_error(ltdb->tdb));
556         }
557
558         msg2 = talloc(tdb_key.dptr, struct ldb_message);
559         if (msg2 == NULL) {
560                 talloc_free(tdb_key.dptr);
561                 return LDB_ERR_OTHER;
562         }
563
564         ret = ltdb_unpack_data(module, &tdb_data, msg2);
565         if (ret == -1) {
566                 ret = LDB_ERR_OTHER;
567                 goto failed;
568         }
569
570         if (!msg2->dn) {
571                 msg2->dn = msg->dn;
572         }
573
574         for (i=0;i<msg->num_elements;i++) {
575                 struct ldb_message_element *el = &msg->elements[i];
576                 struct ldb_message_element *el2;
577                 struct ldb_val *vals;
578                 const char *dn;
579
580                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
581
582                 case LDB_FLAG_MOD_ADD:
583                         /* add this element to the message. fail if it
584                            already exists */
585                         idx = find_element(msg2, el->name);
586
587                         if (idx == -1) {
588                                 if (msg_add_element(ldb, msg2, el) != 0) {
589                                         ret = LDB_ERR_OTHER;
590                                         goto failed;
591                                 }
592                                 continue;
593                         }
594
595                         el2 = &msg2->elements[idx];
596
597                         /* An attribute with this name already exists,
598                          * add all values if they don't already exist
599                          * (check both the other elements to be added,
600                          * and those already in the db). */
601
602                         for (j=0;j<el->num_values;j++) {
603                                 if (ldb_msg_find_val(el2, &el->values[j])) {
604                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d already exists", el->name, j);
605                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
606                                         goto failed;
607                                 }
608                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
609                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d provided more than once", el->name, j);
610                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
611                                         goto failed;
612                                 }
613                         }
614
615                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
616                                                 el2->num_values + el->num_values);
617
618                         if (vals == NULL) {
619                                 ret = LDB_ERR_OTHER;
620                                 goto failed;
621                         }
622
623                         for (j=0;j<el->num_values;j++) {
624                                 vals[el2->num_values + j] =
625                                         ldb_val_dup(vals, &el->values[j]);
626                         }
627
628                         el2->values = vals;
629                         el2->num_values += el->num_values;
630
631                         break;
632
633                 case LDB_FLAG_MOD_REPLACE:
634                         /* replace all elements of this attribute name with the elements
635                            listed. The attribute not existing is not an error */
636                         msg_delete_attribute(module, ldb, msg2, el->name);
637
638                         for (j=0;j<el->num_values;j++) {
639                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
640                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d provided more than once", el->name, j);
641                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
642                                         goto failed;
643                                 }
644                         }
645
646                         /* add the replacement element, if not empty */
647                         if (el->num_values != 0 &&
648                             msg_add_element(ldb, msg2, el) != 0) {
649                                 ret = LDB_ERR_OTHER;
650                                 goto failed;
651                         }
652                         break;
653
654                 case LDB_FLAG_MOD_DELETE:
655
656                         dn = ldb_dn_get_linearized(msg->dn);
657                         if (dn == NULL) {
658                                 ret = LDB_ERR_OTHER;
659                                 goto failed;
660                         }
661
662                         /* we could be being asked to delete all
663                            values or just some values */
664                         if (msg->elements[i].num_values == 0) {
665                                 if (msg_delete_attribute(module, ldb, msg2, 
666                                                          msg->elements[i].name) != 0) {
667                                         ldb_asprintf_errstring(module->ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
668                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
669                                         goto failed;
670                                 }
671                                 break;
672                         }
673                         for (j=0;j<msg->elements[i].num_values;j++) {
674                                 if (msg_delete_element(module,
675                                                        msg2, 
676                                                        msg->elements[i].name,
677                                                        &msg->elements[i].values[j]) != 0) {
678                                         ldb_asprintf_errstring(module->ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
679                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
680                                         goto failed;
681                                 }
682                                 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
683                                 if (ret != LDB_SUCCESS) {
684                                         goto failed;
685                                 }
686                         }
687                         break;
688                 default:
689                         ldb_asprintf_errstring(module->ldb,
690                                 "Invalid ldb_modify flags on %s: 0x%x",
691                                 msg->elements[i].name,
692                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
693                         ret = LDB_ERR_PROTOCOL_ERROR;
694                         goto failed;
695                 }
696         }
697
698         /* we've made all the mods
699          * save the modified record back into the database */
700         ret = ltdb_store(module, msg2, TDB_MODIFY);
701         if (ret != LDB_SUCCESS) {
702                 goto failed;
703         }
704
705         ret = ltdb_modified(module, msg->dn);
706         if (ret != LDB_SUCCESS) {
707                 goto failed;
708         }
709
710         talloc_free(tdb_key.dptr);
711         free(tdb_data.dptr);
712         return ret;
713
714 failed:
715         talloc_free(tdb_key.dptr);
716         free(tdb_data.dptr);
717         return ret;
718 }
719
720 /*
721   modify a record
722 */
723 static int ltdb_modify(struct ltdb_context *ctx)
724 {
725         struct ldb_module *module = ctx->module;
726         struct ldb_request *req = ctx->req;
727         int tret;
728
729         req->handle->state = LDB_ASYNC_PENDING;
730
731         tret = ltdb_check_special_dn(module, req->op.mod.message);
732         if (tret != LDB_SUCCESS) {
733                 return tret;
734         }
735
736         if (ltdb_cache_load(module) != 0) {
737                 return LDB_ERR_OPERATIONS_ERROR;
738         }
739
740         tret = ltdb_modify_internal(module, req->op.mod.message);
741         if (tret != LDB_SUCCESS) {
742                 return tret;
743         }
744
745         return LDB_SUCCESS;
746 }
747
748 /*
749   rename a record
750 */
751 static int ltdb_rename(struct ltdb_context *ctx)
752 {
753         struct ldb_module *module = ctx->module;
754         struct ldb_request *req = ctx->req;
755         struct ldb_message *msg;
756         int tret;
757
758         req->handle->state = LDB_ASYNC_PENDING;
759
760         if (ltdb_cache_load(ctx->module) != 0) {
761                 return LDB_ERR_OPERATIONS_ERROR;
762         }
763
764         msg = talloc(ctx, struct ldb_message);
765         if (msg == NULL) {
766                 return LDB_ERR_OPERATIONS_ERROR;
767         }
768
769         /* in case any attribute of the message was indexed, we need
770            to fetch the old record */
771         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
772         if (tret != LDB_SUCCESS) {
773                 /* not finding the old record is an error */
774                 return tret;
775         }
776
777         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
778         if (!msg->dn) {
779                 return LDB_ERR_OPERATIONS_ERROR;
780         }
781
782         if (ldb_dn_compare(req->op.rename.olddn, req->op.rename.newdn) == 0) {
783                 /* The rename operation is apparently only changing case -
784                    the DNs are the same.  Delete the old DN before adding
785                    the new one to avoid a TDB_ERR_EXISTS error.
786
787                    The only drawback to this is that if the delete
788                    succeeds but the add fails, we rely on the
789                    transaction to roll this all back. */
790                 tret = ltdb_delete_internal(module, req->op.rename.olddn);
791                 if (tret != LDB_SUCCESS) {
792                         return tret;
793                 }
794
795                 tret = ltdb_add_internal(module, msg);
796                 if (tret != LDB_SUCCESS) {
797                         return tret;
798                 }
799         } else {
800                 /* The rename operation is changing DNs.  Try to add the new
801                    DN first to avoid clobbering another DN not related to
802                    this rename operation. */
803                 tret = ltdb_add_internal(module, msg);
804                 if (tret != LDB_SUCCESS) {
805                         return tret;
806                 }
807
808                 tret = ltdb_delete_internal(module, req->op.rename.olddn);
809                 if (tret != LDB_SUCCESS) {
810                         ltdb_delete_internal(module, req->op.rename.newdn);
811                         return LDB_ERR_OPERATIONS_ERROR;
812                 }
813         }
814
815         return LDB_SUCCESS;
816 }
817
818 static int ltdb_start_trans(struct ldb_module *module)
819 {
820         struct ltdb_private *ltdb =
821                 talloc_get_type(module->private_data, struct ltdb_private);
822
823         if (tdb_transaction_start(ltdb->tdb) != 0) {
824                 return ltdb_err_map(tdb_error(ltdb->tdb));
825         }
826
827         ltdb->in_transaction++;
828
829         return LDB_SUCCESS;
830 }
831
832 static int ltdb_end_trans(struct ldb_module *module)
833 {
834         struct ltdb_private *ltdb =
835                 talloc_get_type(module->private_data, struct ltdb_private);
836
837         ltdb->in_transaction--;
838
839         if (tdb_transaction_commit(ltdb->tdb) != 0) {
840                 return ltdb_err_map(tdb_error(ltdb->tdb));
841         }
842
843         return LDB_SUCCESS;
844 }
845
846 static int ltdb_del_trans(struct ldb_module *module)
847 {
848         struct ltdb_private *ltdb =
849                 talloc_get_type(module->private_data, struct ltdb_private);
850
851         ltdb->in_transaction--;
852
853         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
854                 return ltdb_err_map(tdb_error(ltdb->tdb));
855         }
856
857         return LDB_SUCCESS;
858 }
859
860 /*
861   return sequenceNumber from @BASEINFO
862 */
863 static int ltdb_sequence_number(struct ldb_module *module,
864                                 struct ldb_request *req)
865 {
866         TALLOC_CTX *tmp_ctx;
867         struct ldb_message *msg = NULL;
868         struct ldb_dn *dn;
869         const char *date;
870         int tret;
871
872         tmp_ctx = talloc_new(req);
873         if (tmp_ctx == NULL) {
874                 talloc_free(tmp_ctx);
875                 return LDB_ERR_OPERATIONS_ERROR;
876         }
877
878         dn = ldb_dn_new(tmp_ctx, module->ldb, LTDB_BASEINFO);
879
880         msg = talloc(tmp_ctx, struct ldb_message);
881         if (msg == NULL) {
882                 talloc_free(tmp_ctx);
883                 return LDB_ERR_OPERATIONS_ERROR;
884         }
885
886         req->op.seq_num.flags = 0;
887
888         tret = ltdb_search_dn1(module, dn, msg);
889         if (tret != LDB_SUCCESS) {
890                 talloc_free(tmp_ctx);
891                 /* zero is as good as anything when we don't know */
892                 req->op.seq_num.seq_num = 0;
893                 return tret;
894         }
895
896         switch (req->op.seq_num.type) {
897         case LDB_SEQ_HIGHEST_SEQ:
898                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
899                 break;
900         case LDB_SEQ_NEXT:
901                 req->op.seq_num.seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
902                 req->op.seq_num.seq_num++;
903                 break;
904         case LDB_SEQ_HIGHEST_TIMESTAMP:
905                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
906                 if (date) {
907                         req->op.seq_num.seq_num = ldb_string_to_time(date);
908                 } else {
909                         req->op.seq_num.seq_num = 0;
910                         /* zero is as good as anything when we don't know */
911                 }
912                 break;
913         }
914
915         talloc_free(tmp_ctx);
916
917         return LDB_SUCCESS;
918 }
919
920 void ltdb_request_done(struct ldb_request *req, int error)
921 {
922         struct ldb_reply *ares;
923
924         /* if we already returned an error just return */
925         if (req->handle->status != LDB_SUCCESS) {
926                 return;
927         }
928
929         ares = talloc_zero(req, struct ldb_reply);
930         if (!ares) {
931                 ldb_oom(req->handle->ldb);
932                 req->callback(req, NULL);
933                 return;
934         }
935         ares->type = LDB_REPLY_DONE;
936         ares->error = error;
937
938         req->callback(req, ares);
939 }
940
941 static void ltdb_timeout(struct event_context *ev,
942                           struct timed_event *te,
943                           struct timeval t,
944                           void *private_data)
945 {
946         struct ltdb_context *ctx;
947         ctx = talloc_get_type(private_data, struct ltdb_context);
948
949         ltdb_request_done(ctx->req, LDB_ERR_TIME_LIMIT_EXCEEDED);
950 }
951
952 static void ltdb_callback(struct event_context *ev,
953                           struct timed_event *te,
954                           struct timeval t,
955                           void *private_data)
956 {
957         struct ltdb_context *ctx;
958         int ret;
959
960         ctx = talloc_get_type(private_data, struct ltdb_context);
961
962         switch (ctx->req->operation) {
963         case LDB_SEARCH:
964                 ret = ltdb_search(ctx);
965                 break;
966         case LDB_ADD:
967                 ret = ltdb_add(ctx);
968                 break;
969         case LDB_MODIFY:
970                 ret = ltdb_modify(ctx);
971                 break;
972         case LDB_DELETE:
973                 ret = ltdb_delete(ctx);
974                 break;
975         case LDB_RENAME:
976                 ret = ltdb_rename(ctx);
977                 break;
978         default:
979                 /* no other op supported */
980                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
981         }
982
983         if (!ctx->callback_failed) {
984                 ltdb_request_done(ctx->req, ret);
985         }
986 }
987
988 static int ltdb_handle_request(struct ldb_module *module,
989                                 struct ldb_request *req)
990 {
991         struct event_context *ev;
992         struct ltdb_context *ac;
993         struct timed_event *te;
994         struct timeval tv;
995
996         if (check_critical_controls(req->controls)) {
997                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
998         }
999
1000         if (req->starttime == 0 || req->timeout == 0) {
1001                 ldb_set_errstring(module->ldb, "Invalid timeout settings");
1002                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1003         }
1004
1005         ev = ldb_get_event_context(module->ldb);
1006
1007         ac = talloc_zero(req, struct ltdb_context);
1008         if (ac == NULL) {
1009                 ldb_set_errstring(module->ldb, "Out of Memory");
1010                 return LDB_ERR_OPERATIONS_ERROR;
1011         }
1012
1013         ac->module = module;
1014         ac->req = req;
1015
1016         tv.tv_sec = 0;
1017         tv.tv_usec = 0;
1018         te = event_add_timed(ev, ac, tv, ltdb_callback, ac);
1019         if (NULL == te) {
1020                 return LDB_ERR_OPERATIONS_ERROR;
1021         }
1022
1023
1024         tv.tv_sec = req->starttime + req->timeout;
1025         te = event_add_timed(ev, ac, tv, ltdb_timeout, ac);
1026         if (NULL == te) {
1027                 return LDB_ERR_OPERATIONS_ERROR;
1028         }
1029
1030         return LDB_SUCCESS;
1031 }
1032
1033 static const struct ldb_module_ops ltdb_ops = {
1034         .name              = "tdb",
1035         .search            = ltdb_handle_request,
1036         .add               = ltdb_handle_request,
1037         .modify            = ltdb_handle_request,
1038         .del               = ltdb_handle_request,
1039         .rename            = ltdb_handle_request,
1040 /*      .request           = ltdb_handle_request, */
1041         .start_transaction = ltdb_start_trans,
1042         .end_transaction   = ltdb_end_trans,
1043         .del_transaction   = ltdb_del_trans,
1044         .sequence_number   = ltdb_sequence_number
1045 };
1046
1047 /*
1048   connect to the database
1049 */
1050 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1051                         unsigned int flags, const char *options[],
1052                         struct ldb_module **module)
1053 {
1054         const char *path;
1055         int tdb_flags, open_flags;
1056         struct ltdb_private *ltdb;
1057
1058         /* parse the url */
1059         if (strchr(url, ':')) {
1060                 if (strncmp(url, "tdb://", 6) != 0) {
1061                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1062                                   "Invalid tdb URL '%s'", url);
1063                         return -1;
1064                 }
1065                 path = url+6;
1066         } else {
1067                 path = url;
1068         }
1069
1070         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1071
1072         /* check for the 'nosync' option */
1073         if (flags & LDB_FLG_NOSYNC) {
1074                 tdb_flags |= TDB_NOSYNC;
1075         }
1076
1077         /* and nommap option */
1078         if (flags & LDB_FLG_NOMMAP) {
1079                 tdb_flags |= TDB_NOMMAP;
1080         }
1081
1082         if (flags & LDB_FLG_RDONLY) {
1083                 open_flags = O_RDONLY;
1084         } else {
1085                 open_flags = O_CREAT | O_RDWR;
1086         }
1087
1088         ltdb = talloc_zero(ldb, struct ltdb_private);
1089         if (!ltdb) {
1090                 ldb_oom(ldb);
1091                 return -1;
1092         }
1093
1094         /* note that we use quite a large default hash size */
1095         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1096                                    tdb_flags, open_flags,
1097                                    ldb->create_perms, ldb);
1098         if (!ltdb->tdb) {
1099                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1100                           "Unable to open tdb '%s'\n", path);
1101                 talloc_free(ltdb);
1102                 return -1;
1103         }
1104
1105         ltdb->sequence_number = 0;
1106
1107         *module = talloc(ldb, struct ldb_module);
1108         if ((*module) == NULL) {
1109                 ldb_oom(ldb);
1110                 talloc_free(ltdb);
1111                 return -1;
1112         }
1113         talloc_set_name_const(*module, "ldb_tdb backend");
1114         (*module)->ldb = ldb;
1115         (*module)->prev = (*module)->next = NULL;
1116         (*module)->private_data = ltdb;
1117         (*module)->ops = &ltdb_ops;
1118
1119         if (ltdb_cache_load(*module) != 0) {
1120                 talloc_free(*module);
1121                 talloc_free(ltdb);
1122                 return -1;
1123         }
1124
1125         return 0;
1126 }
1127
1128 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1129         .name = "tdb",
1130         .connect_fn = ltdb_connect
1131 };