Merge branch 'master' of ssh://git.samba.org/data/git/samba into regsrv
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006-2008
7
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  */
47
48 #include "ldb_includes.h"
49
50 #include "ldb_tdb.h"
51
52
53 /*
54   map a tdb error code to a ldb error code
55 */
56 static int ltdb_err_map(enum TDB_ERROR tdb_code)
57 {
58         switch (tdb_code) {
59         case TDB_SUCCESS:
60                 return LDB_SUCCESS;
61         case TDB_ERR_CORRUPT:
62         case TDB_ERR_OOM:
63         case TDB_ERR_EINVAL:
64                 return LDB_ERR_OPERATIONS_ERROR;
65         case TDB_ERR_IO:
66                 return LDB_ERR_PROTOCOL_ERROR;
67         case TDB_ERR_LOCK:
68         case TDB_ERR_NOLOCK:
69                 return LDB_ERR_BUSY;
70         case TDB_ERR_LOCK_TIMEOUT:
71                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
72         case TDB_ERR_EXISTS:
73                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
74         case TDB_ERR_NOEXIST:
75                 return LDB_ERR_NO_SUCH_OBJECT;
76         case TDB_ERR_RDONLY:
77                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
78         }
79         return LDB_ERR_OTHER;
80 }
81
82 /*
83   lock the database for read - use by ltdb_search and ltdb_sequence_number
84 */
85 int ltdb_lock_read(struct ldb_module *module)
86 {
87         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
88         if (ltdb->in_transaction == 0) {
89                 return tdb_lockall_read(ltdb->tdb);
90         }
91         return 0;
92 }
93
94 /*
95   unlock the database after a ltdb_lock_read()
96 */
97 int ltdb_unlock_read(struct ldb_module *module)
98 {
99         struct ltdb_private *ltdb = (struct ltdb_private *)module->private_data;
100         if (ltdb->in_transaction == 0) {
101                 return tdb_unlockall_read(ltdb->tdb);
102         }
103         return 0;
104 }
105
106
107 /*
108   form a TDB_DATA for a record key
109   caller frees
110
111   note that the key for a record can depend on whether the
112   dn refers to a case sensitive index record or not
113 */
114 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
115 {
116         struct ldb_context *ldb = module->ldb;
117         TDB_DATA key;
118         char *key_str = NULL;
119         const char *dn_folded = NULL;
120
121         /*
122           most DNs are case insensitive. The exception is index DNs for
123           case sensitive attributes
124
125           there are 3 cases dealt with in this code:
126
127           1) if the dn doesn't start with @ then uppercase the attribute
128              names and the attributes values of case insensitive attributes
129           2) if the dn starts with @ then leave it alone -
130              the indexing code handles the rest
131         */
132
133         dn_folded = ldb_dn_get_casefold(dn);
134         if (!dn_folded) {
135                 goto failed;
136         }
137
138         key_str = talloc_strdup(ldb, "DN=");
139         if (!key_str) {
140                 goto failed;
141         }
142
143         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
144         if (!key_str) {
145                 goto failed;
146         }
147
148         key.dptr = (uint8_t *)key_str;
149         key.dsize = strlen(key_str) + 1;
150
151         return key;
152
153 failed:
154         errno = ENOMEM;
155         key.dptr = NULL;
156         key.dsize = 0;
157         return key;
158 }
159
160 /*
161   check special dn's have valid attributes
162   currently only @ATTRIBUTES is checked
163 */
164 static int ltdb_check_special_dn(struct ldb_module *module,
165                           const struct ldb_message *msg)
166 {
167         int i, j;
168
169         if (! ldb_dn_is_special(msg->dn) ||
170             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
171                 return 0;
172         }
173
174         /* we have @ATTRIBUTES, let's check attributes are fine */
175         /* should we check that we deny multivalued attributes ? */
176         for (i = 0; i < msg->num_elements; i++) {
177                 for (j = 0; j < msg->elements[i].num_values; j++) {
178                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
179                                 ldb_set_errstring(module->ldb, "Invalid attribute value in an @ATTRIBUTES entry");
180                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
181                         }
182                 }
183         }
184
185         return 0;
186 }
187
188
189 /*
190   we've made a modification to a dn - possibly reindex and
191   update sequence number
192 */
193 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
194 {
195         int ret = LDB_SUCCESS;
196
197         if (ldb_dn_is_special(dn) &&
198             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
199              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
200                 ret = ltdb_reindex(module);
201         }
202
203         if (ret == LDB_SUCCESS &&
204             !(ldb_dn_is_special(dn) &&
205               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
206                 ret = ltdb_increase_sequence_number(module);
207         }
208
209         return ret;
210 }
211
212 /*
213   store a record into the db
214 */
215 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
216 {
217         struct ltdb_private *ltdb =
218                 talloc_get_type(module->private_data, struct ltdb_private);
219         TDB_DATA tdb_key, tdb_data;
220         int ret;
221
222         tdb_key = ltdb_key(module, msg->dn);
223         if (!tdb_key.dptr) {
224                 return LDB_ERR_OTHER;
225         }
226
227         ret = ltdb_pack_data(module, msg, &tdb_data);
228         if (ret == -1) {
229                 talloc_free(tdb_key.dptr);
230                 return LDB_ERR_OTHER;
231         }
232
233         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
234         if (ret == -1) {
235                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
236                 goto done;
237         }
238
239         ret = ltdb_index_add(module, msg);
240         if (ret != LDB_SUCCESS) {
241                 tdb_delete(ltdb->tdb, tdb_key);
242         }
243
244 done:
245         talloc_free(tdb_key.dptr);
246         talloc_free(tdb_data.dptr);
247
248         return ret;
249 }
250
251
252 static int ltdb_add_internal(struct ldb_module *module,
253                              const struct ldb_message *msg)
254 {
255         int ret;
256
257         ret = ltdb_check_special_dn(module, msg);
258         if (ret != LDB_SUCCESS) {
259                 return ret;
260         }
261
262         if (ltdb_cache_load(module) != 0) {
263                 return LDB_ERR_OPERATIONS_ERROR;
264         }
265
266         ret = ltdb_store(module, msg, TDB_INSERT);
267
268         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
269                 ldb_asprintf_errstring(module->ldb,
270                                         "Entry %s already exists",
271                                         ldb_dn_get_linearized(msg->dn));
272                 return ret;
273         }
274
275         if (ret == LDB_SUCCESS) {
276                 ret = ltdb_index_one(module, msg, 1);
277                 if (ret != LDB_SUCCESS) {
278                         return ret;
279                 }
280
281                 ret = ltdb_modified(module, msg->dn);
282                 if (ret != LDB_SUCCESS) {
283                         return ret;
284                 }
285         }
286
287         return ret;
288 }
289
290 /*
291   add a record to the database
292 */
293 static int ltdb_add(struct ltdb_context *ctx)
294 {
295         struct ldb_module *module = ctx->module;
296         struct ldb_request *req = ctx->req;
297         int tret;
298
299         req->handle->state = LDB_ASYNC_PENDING;
300
301         tret = ltdb_add_internal(module, req->op.add.message);
302         if (tret != LDB_SUCCESS) {
303                 return tret;
304         }
305
306         return LDB_SUCCESS;
307 }
308
309 /*
310   delete a record from the database, not updating indexes (used for deleting
311   index records)
312 */
313 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
314 {
315         struct ltdb_private *ltdb =
316                 talloc_get_type(module->private_data, struct ltdb_private);
317         TDB_DATA tdb_key;
318         int ret;
319
320         tdb_key = ltdb_key(module, dn);
321         if (!tdb_key.dptr) {
322                 return LDB_ERR_OTHER;
323         }
324
325         ret = tdb_delete(ltdb->tdb, tdb_key);
326         talloc_free(tdb_key.dptr);
327
328         if (ret != 0) {
329                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
330         }
331
332         return ret;
333 }
334
335 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
336 {
337         struct ldb_message *msg;
338         int ret;
339
340         msg = talloc(module, struct ldb_message);
341         if (msg == NULL) {
342                 return LDB_ERR_OPERATIONS_ERROR;
343         }
344
345         /* in case any attribute of the message was indexed, we need
346            to fetch the old record */
347         ret = ltdb_search_dn1(module, dn, msg);
348         if (ret != LDB_SUCCESS) {
349                 /* not finding the old record is an error */
350                 goto done;
351         }
352
353         ret = ltdb_delete_noindex(module, dn);
354         if (ret != LDB_SUCCESS) {
355                 goto done;
356         }
357
358         /* remove one level attribute */
359         ret = ltdb_index_one(module, msg, 0);
360         if (ret != LDB_SUCCESS) {
361                 goto done;
362         }
363
364         /* remove any indexed attributes */
365         ret = ltdb_index_del(module, msg);
366         if (ret != LDB_SUCCESS) {
367                 goto done;
368         }
369
370         ret = ltdb_modified(module, dn);
371         if (ret != LDB_SUCCESS) {
372                 goto done;
373         }
374
375 done:
376         talloc_free(msg);
377         return ret;
378 }
379
380 /*
381   delete a record from the database
382 */
383 static int ltdb_delete(struct ltdb_context *ctx)
384 {
385         struct ldb_module *module = ctx->module;
386         struct ldb_request *req = ctx->req;
387         int tret;
388
389         req->handle->state = LDB_ASYNC_PENDING;
390
391         if (ltdb_cache_load(module) != 0) {
392                 return LDB_ERR_OPERATIONS_ERROR;
393         }
394
395         tret = ltdb_delete_internal(module, req->op.del.dn);
396         if (tret != LDB_SUCCESS) {
397                 return tret;
398         }
399
400         return LDB_SUCCESS;
401 }
402
403 /*
404   find an element by attribute name. At the moment this does a linear search,
405   it should be re-coded to use a binary search once all places that modify
406   records guarantee sorted order
407
408   return the index of the first matching element if found, otherwise -1
409 */
410 static int find_element(const struct ldb_message *msg, const char *name)
411 {
412         unsigned int i;
413         for (i=0;i<msg->num_elements;i++) {
414                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
415                         return i;
416                 }
417         }
418         return -1;
419 }
420
421
422 /*
423   add an element to an existing record. Assumes a elements array that we
424   can call re-alloc on, and assumed that we can re-use the data pointers from
425   the passed in additional values. Use with care!
426
427   returns 0 on success, -1 on failure (and sets errno)
428 */
429 static int msg_add_element(struct ldb_context *ldb,
430                            struct ldb_message *msg,
431                            struct ldb_message_element *el)
432 {
433         struct ldb_message_element *e2;
434         unsigned int i;
435
436         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
437                               msg->num_elements+1);
438         if (!e2) {
439                 errno = ENOMEM;
440                 return -1;
441         }
442
443         msg->elements = e2;
444
445         e2 = &msg->elements[msg->num_elements];
446
447         e2->name = el->name;
448         e2->flags = el->flags;
449         e2->values = NULL;
450         if (el->num_values != 0) {
451                 e2->values = talloc_array(msg->elements,
452                                           struct ldb_val, el->num_values);
453                 if (!e2->values) {
454                         errno = ENOMEM;
455                         return -1;
456                 }
457         }
458         for (i=0;i<el->num_values;i++) {
459                 e2->values[i] = el->values[i];
460         }
461         e2->num_values = el->num_values;
462
463         msg->num_elements++;
464
465         return 0;
466 }
467
468 /*
469   delete all elements having a specified attribute name
470 */
471 static int msg_delete_attribute(struct ldb_module *module,
472                                 struct ldb_context *ldb,
473                                 struct ldb_message *msg, const char *name)
474 {
475         const char *dn;
476         unsigned int i, j;
477
478         dn = ldb_dn_get_linearized(msg->dn);
479         if (dn == NULL) {
480                 return -1;
481         }
482
483         for (i=0;i<msg->num_elements;i++) {
484                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
485                         for (j=0;j<msg->elements[i].num_values;j++) {
486                                 ltdb_index_del_value(module, dn,
487                                                      &msg->elements[i], j);
488                         }
489                         talloc_free(msg->elements[i].values);
490                         if (msg->num_elements > (i+1)) {
491                                 memmove(&msg->elements[i],
492                                         &msg->elements[i+1],
493                                         sizeof(struct ldb_message_element)*
494                                         (msg->num_elements - (i+1)));
495                         }
496                         msg->num_elements--;
497                         i--;
498                         msg->elements = talloc_realloc(msg, msg->elements,
499                                                         struct ldb_message_element,
500                                                         msg->num_elements);
501                 }
502         }
503
504         return 0;
505 }
506
507 /*
508   delete all elements matching an attribute name/value
509
510   return 0 on success, -1 on failure
511 */
512 static int msg_delete_element(struct ldb_module *module,
513                               struct ldb_message *msg,
514                               const char *name,
515                               const struct ldb_val *val)
516 {
517         struct ldb_context *ldb = module->ldb;
518         unsigned int i;
519         int found;
520         struct ldb_message_element *el;
521         const struct ldb_schema_attribute *a;
522
523         found = find_element(msg, name);
524         if (found == -1) {
525                 return -1;
526         }
527
528         el = &msg->elements[found];
529
530         a = ldb_schema_attribute_by_name(ldb, el->name);
531
532         for (i=0;i<el->num_values;i++) {
533                 if (a->syntax->comparison_fn(ldb, ldb,
534                                                 &el->values[i], val) == 0) {
535                         if (i<el->num_values-1) {
536                                 memmove(&el->values[i], &el->values[i+1],
537                                         sizeof(el->values[i])*
538                                                 (el->num_values-(i+1)));
539                         }
540                         el->num_values--;
541                         if (el->num_values == 0) {
542                                 return msg_delete_attribute(module, ldb,
543                                                             msg, name);
544                         }
545                         return 0;
546                 }
547         }
548
549         return -1;
550 }
551
552
553 /*
554   modify a record - internal interface
555
556   yuck - this is O(n^2). Luckily n is usually small so we probably
557   get away with it, but if we ever have really large attribute lists
558   then we'll need to look at this again
559 */
560 int ltdb_modify_internal(struct ldb_module *module,
561                          const struct ldb_message *msg)
562 {
563         struct ldb_context *ldb = module->ldb;
564         struct ltdb_private *ltdb =
565                 talloc_get_type(module->private_data, struct ltdb_private);
566         TDB_DATA tdb_key, tdb_data;
567         struct ldb_message *msg2;
568         unsigned i, j;
569         int ret, idx;
570
571         tdb_key = ltdb_key(module, msg->dn);
572         if (!tdb_key.dptr) {
573                 return LDB_ERR_OTHER;
574         }
575
576         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
577         if (!tdb_data.dptr) {
578                 talloc_free(tdb_key.dptr);
579                 return ltdb_err_map(tdb_error(ltdb->tdb));
580         }
581
582         msg2 = talloc(tdb_key.dptr, struct ldb_message);
583         if (msg2 == NULL) {
584                 talloc_free(tdb_key.dptr);
585                 return LDB_ERR_OTHER;
586         }
587
588         ret = ltdb_unpack_data(module, &tdb_data, msg2);
589         if (ret == -1) {
590                 ret = LDB_ERR_OTHER;
591                 goto failed;
592         }
593
594         if (!msg2->dn) {
595                 msg2->dn = msg->dn;
596         }
597
598         for (i=0;i<msg->num_elements;i++) {
599                 struct ldb_message_element *el = &msg->elements[i];
600                 struct ldb_message_element *el2;
601                 struct ldb_val *vals;
602                 const char *dn;
603
604                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
605
606                 case LDB_FLAG_MOD_ADD:
607                         /* add this element to the message. fail if it
608                            already exists */
609                         idx = find_element(msg2, el->name);
610
611                         if (idx == -1) {
612                                 if (msg_add_element(ldb, msg2, el) != 0) {
613                                         ret = LDB_ERR_OTHER;
614                                         goto failed;
615                                 }
616                                 continue;
617                         }
618
619                         el2 = &msg2->elements[idx];
620
621                         /* An attribute with this name already exists,
622                          * add all values if they don't already exist
623                          * (check both the other elements to be added,
624                          * and those already in the db). */
625
626                         for (j=0;j<el->num_values;j++) {
627                                 if (ldb_msg_find_val(el2, &el->values[j])) {
628                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d already exists", el->name, j);
629                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
630                                         goto failed;
631                                 }
632                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
633                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d provided more than once", el->name, j);
634                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
635                                         goto failed;
636                                 }
637                         }
638
639                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
640                                                 el2->num_values + el->num_values);
641
642                         if (vals == NULL) {
643                                 ret = LDB_ERR_OTHER;
644                                 goto failed;
645                         }
646
647                         for (j=0;j<el->num_values;j++) {
648                                 vals[el2->num_values + j] =
649                                         ldb_val_dup(vals, &el->values[j]);
650                         }
651
652                         el2->values = vals;
653                         el2->num_values += el->num_values;
654
655                         break;
656
657                 case LDB_FLAG_MOD_REPLACE:
658                         /* replace all elements of this attribute name with the elements
659                            listed. The attribute not existing is not an error */
660                         msg_delete_attribute(module, ldb, msg2, el->name);
661
662                         for (j=0;j<el->num_values;j++) {
663                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
664                                         ldb_asprintf_errstring(module->ldb, "%s: value #%d provided more than once", el->name, j);
665                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
666                                         goto failed;
667                                 }
668                         }
669
670                         /* add the replacement element, if not empty */
671                         if (el->num_values != 0 &&
672                             msg_add_element(ldb, msg2, el) != 0) {
673                                 ret = LDB_ERR_OTHER;
674                                 goto failed;
675                         }
676                         break;
677
678                 case LDB_FLAG_MOD_DELETE:
679
680                         dn = ldb_dn_get_linearized(msg->dn);
681                         if (dn == NULL) {
682                                 ret = LDB_ERR_OTHER;
683                                 goto failed;
684                         }
685
686                         /* we could be being asked to delete all
687                            values or just some values */
688                         if (msg->elements[i].num_values == 0) {
689                                 if (msg_delete_attribute(module, ldb, msg2, 
690                                                          msg->elements[i].name) != 0) {
691                                         ldb_asprintf_errstring(module->ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
692                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
693                                         goto failed;
694                                 }
695                                 break;
696                         }
697                         for (j=0;j<msg->elements[i].num_values;j++) {
698                                 if (msg_delete_element(module,
699                                                        msg2, 
700                                                        msg->elements[i].name,
701                                                        &msg->elements[i].values[j]) != 0) {
702                                         ldb_asprintf_errstring(module->ldb, "No matching attribute value when deleting attribute: %s on %s", msg->elements[i].name, dn);
703                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
704                                         goto failed;
705                                 }
706                                 ret = ltdb_index_del_value(module, dn, &msg->elements[i], j);
707                                 if (ret != LDB_SUCCESS) {
708                                         goto failed;
709                                 }
710                         }
711                         break;
712                 default:
713                         ldb_asprintf_errstring(module->ldb,
714                                 "Invalid ldb_modify flags on %s: 0x%x",
715                                 msg->elements[i].name,
716                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
717                         ret = LDB_ERR_PROTOCOL_ERROR;
718                         goto failed;
719                 }
720         }
721
722         /* we've made all the mods
723          * save the modified record back into the database */
724         ret = ltdb_store(module, msg2, TDB_MODIFY);
725         if (ret != LDB_SUCCESS) {
726                 goto failed;
727         }
728
729         ret = ltdb_modified(module, msg->dn);
730         if (ret != LDB_SUCCESS) {
731                 goto failed;
732         }
733
734         talloc_free(tdb_key.dptr);
735         free(tdb_data.dptr);
736         return ret;
737
738 failed:
739         talloc_free(tdb_key.dptr);
740         free(tdb_data.dptr);
741         return ret;
742 }
743
744 /*
745   modify a record
746 */
747 static int ltdb_modify(struct ltdb_context *ctx)
748 {
749         struct ldb_module *module = ctx->module;
750         struct ldb_request *req = ctx->req;
751         int tret;
752
753         req->handle->state = LDB_ASYNC_PENDING;
754
755         tret = ltdb_check_special_dn(module, req->op.mod.message);
756         if (tret != LDB_SUCCESS) {
757                 return tret;
758         }
759
760         if (ltdb_cache_load(module) != 0) {
761                 return LDB_ERR_OPERATIONS_ERROR;
762         }
763
764         tret = ltdb_modify_internal(module, req->op.mod.message);
765         if (tret != LDB_SUCCESS) {
766                 return tret;
767         }
768
769         return LDB_SUCCESS;
770 }
771
772 /*
773   rename a record
774 */
775 static int ltdb_rename(struct ltdb_context *ctx)
776 {
777         struct ldb_module *module = ctx->module;
778         struct ldb_request *req = ctx->req;
779         struct ldb_message *msg;
780         int tret;
781
782         req->handle->state = LDB_ASYNC_PENDING;
783
784         if (ltdb_cache_load(ctx->module) != 0) {
785                 return LDB_ERR_OPERATIONS_ERROR;
786         }
787
788         msg = talloc(ctx, struct ldb_message);
789         if (msg == NULL) {
790                 return LDB_ERR_OPERATIONS_ERROR;
791         }
792
793         /* in case any attribute of the message was indexed, we need
794            to fetch the old record */
795         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
796         if (tret != LDB_SUCCESS) {
797                 /* not finding the old record is an error */
798                 return tret;
799         }
800
801         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
802         if (!msg->dn) {
803                 return LDB_ERR_OPERATIONS_ERROR;
804         }
805
806         if (ldb_dn_compare(req->op.rename.olddn, req->op.rename.newdn) == 0) {
807                 /* The rename operation is apparently only changing case -
808                    the DNs are the same.  Delete the old DN before adding
809                    the new one to avoid a TDB_ERR_EXISTS error.
810
811                    The only drawback to this is that if the delete
812                    succeeds but the add fails, we rely on the
813                    transaction to roll this all back. */
814                 tret = ltdb_delete_internal(module, req->op.rename.olddn);
815                 if (tret != LDB_SUCCESS) {
816                         return tret;
817                 }
818
819                 tret = ltdb_add_internal(module, msg);
820                 if (tret != LDB_SUCCESS) {
821                         return tret;
822                 }
823         } else {
824                 /* The rename operation is changing DNs.  Try to add the new
825                    DN first to avoid clobbering another DN not related to
826                    this rename operation. */
827                 tret = ltdb_add_internal(module, msg);
828                 if (tret != LDB_SUCCESS) {
829                         return tret;
830                 }
831
832                 tret = ltdb_delete_internal(module, req->op.rename.olddn);
833                 if (tret != LDB_SUCCESS) {
834                         ltdb_delete_internal(module, req->op.rename.newdn);
835                         return LDB_ERR_OPERATIONS_ERROR;
836                 }
837         }
838
839         return LDB_SUCCESS;
840 }
841
842 static int ltdb_start_trans(struct ldb_module *module)
843 {
844         struct ltdb_private *ltdb =
845                 talloc_get_type(module->private_data, struct ltdb_private);
846
847         if (tdb_transaction_start(ltdb->tdb) != 0) {
848                 return ltdb_err_map(tdb_error(ltdb->tdb));
849         }
850
851         ltdb->in_transaction++;
852
853         return LDB_SUCCESS;
854 }
855
856 static int ltdb_end_trans(struct ldb_module *module)
857 {
858         struct ltdb_private *ltdb =
859                 talloc_get_type(module->private_data, struct ltdb_private);
860
861         ltdb->in_transaction--;
862
863         if (tdb_transaction_commit(ltdb->tdb) != 0) {
864                 return ltdb_err_map(tdb_error(ltdb->tdb));
865         }
866
867         return LDB_SUCCESS;
868 }
869
870 static int ltdb_del_trans(struct ldb_module *module)
871 {
872         struct ltdb_private *ltdb =
873                 talloc_get_type(module->private_data, struct ltdb_private);
874
875         ltdb->in_transaction--;
876
877         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
878                 return ltdb_err_map(tdb_error(ltdb->tdb));
879         }
880
881         return LDB_SUCCESS;
882 }
883
884 /*
885   return sequenceNumber from @BASEINFO
886 */
887 static int ltdb_sequence_number(struct ltdb_context *ctx,
888                                 struct ldb_extended **ext)
889 {
890         struct ldb_module *module = ctx->module;
891         struct ldb_request *req = ctx->req;
892         TALLOC_CTX *tmp_ctx;
893         struct ldb_seqnum_request *seq;
894         struct ldb_seqnum_result *res;
895         struct ldb_message *msg = NULL;
896         struct ldb_dn *dn;
897         const char *date;
898         int ret;
899
900         seq = talloc_get_type(req->op.extended.data,
901                                 struct ldb_seqnum_request);
902         if (seq == NULL) {
903                 return LDB_ERR_OPERATIONS_ERROR;
904         }
905
906         req->handle->state = LDB_ASYNC_PENDING;
907
908         if (ltdb_lock_read(module) != 0) {
909                 return LDB_ERR_OPERATIONS_ERROR;
910         }
911
912         res = talloc_zero(req, struct ldb_seqnum_result);
913         if (res == NULL) {
914                 ret = LDB_ERR_OPERATIONS_ERROR;
915                 goto done;
916         }
917         tmp_ctx = talloc_new(req);
918         if (tmp_ctx == NULL) {
919                 ret = LDB_ERR_OPERATIONS_ERROR;
920                 goto done;
921         }
922
923         dn = ldb_dn_new(tmp_ctx, module->ldb, LTDB_BASEINFO);
924
925         msg = talloc(tmp_ctx, struct ldb_message);
926         if (msg == NULL) {
927                 ret = LDB_ERR_OPERATIONS_ERROR;
928                 goto done;
929         }
930
931         ret = ltdb_search_dn1(module, dn, msg);
932         if (ret != LDB_SUCCESS) {
933                 goto done;
934         }
935
936         switch (seq->type) {
937         case LDB_SEQ_HIGHEST_SEQ:
938                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
939                 break;
940         case LDB_SEQ_NEXT:
941                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
942                 res->seq_num++;
943                 break;
944         case LDB_SEQ_HIGHEST_TIMESTAMP:
945                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
946                 if (date) {
947                         res->seq_num = ldb_string_to_time(date);
948                 } else {
949                         res->seq_num = 0;
950                         /* zero is as good as anything when we don't know */
951                 }
952                 break;
953         }
954
955         *ext = talloc_zero(req, struct ldb_extended);
956         if (*ext == NULL) {
957                 ret = LDB_ERR_OPERATIONS_ERROR;
958                 goto done;
959         }
960         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
961         (*ext)->data = talloc_steal(*ext, res);
962
963         ret = LDB_SUCCESS;
964
965 done:
966         talloc_free(tmp_ctx);
967         ltdb_unlock_read(module);
968         return ret;
969 }
970
971 static void ltdb_request_done(struct ldb_request *req, int error)
972 {
973         struct ldb_reply *ares;
974
975         /* if we already returned an error just return */
976         if (req->handle->status != LDB_SUCCESS) {
977                 return;
978         }
979
980         ares = talloc_zero(req, struct ldb_reply);
981         if (!ares) {
982                 ldb_oom(req->handle->ldb);
983                 req->callback(req, NULL);
984                 return;
985         }
986         ares->type = LDB_REPLY_DONE;
987         ares->error = error;
988
989         req->callback(req, ares);
990 }
991
992 static void ltdb_timeout(struct event_context *ev,
993                           struct timed_event *te,
994                           struct timeval t,
995                           void *private_data)
996 {
997         struct ltdb_context *ctx;
998         ctx = talloc_get_type(private_data, struct ltdb_context);
999
1000         ltdb_request_done(ctx->req, LDB_ERR_TIME_LIMIT_EXCEEDED);
1001 }
1002
1003 static void ltdb_request_extended_done(struct ldb_request *req,
1004                                         struct ldb_extended *ext,
1005                                         int error)
1006 {
1007         struct ldb_reply *ares;
1008
1009         /* if we already returned an error just return */
1010         if (req->handle->status != LDB_SUCCESS) {
1011                 return;
1012         }
1013
1014         ares = talloc_zero(req, struct ldb_reply);
1015         if (!ares) {
1016                 ldb_oom(req->handle->ldb);
1017                 req->callback(req, NULL);
1018                 return;
1019         }
1020         ares->type = LDB_REPLY_DONE;
1021         ares->response = ext;
1022         ares->error = error;
1023
1024         req->callback(req, ares);
1025 }
1026
1027 static void ltdb_handle_extended(struct ltdb_context *ctx)
1028 {
1029         struct ldb_extended *ext = NULL;
1030         int ret;
1031
1032         if (strcmp(ctx->req->op.extended.oid,
1033                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1034                 /* get sequence number */
1035                 ret = ltdb_sequence_number(ctx, &ext);
1036         } else {
1037                 /* not recognized */
1038                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1039         }
1040
1041         ltdb_request_extended_done(ctx->req, ext, ret);
1042 }
1043
1044 static void ltdb_callback(struct event_context *ev,
1045                           struct timed_event *te,
1046                           struct timeval t,
1047                           void *private_data)
1048 {
1049         struct ltdb_context *ctx;
1050         int ret;
1051
1052         ctx = talloc_get_type(private_data, struct ltdb_context);
1053
1054         switch (ctx->req->operation) {
1055         case LDB_SEARCH:
1056                 ret = ltdb_search(ctx);
1057                 break;
1058         case LDB_ADD:
1059                 ret = ltdb_add(ctx);
1060                 break;
1061         case LDB_MODIFY:
1062                 ret = ltdb_modify(ctx);
1063                 break;
1064         case LDB_DELETE:
1065                 ret = ltdb_delete(ctx);
1066                 break;
1067         case LDB_RENAME:
1068                 ret = ltdb_rename(ctx);
1069                 break;
1070         case LDB_EXTENDED:
1071                 ltdb_handle_extended(ctx);
1072                 return;
1073         default:
1074                 /* no other op supported */
1075                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1076         }
1077
1078         if (!ctx->callback_failed) {
1079                 ltdb_request_done(ctx->req, ret);
1080         }
1081 }
1082
1083 static int ltdb_handle_request(struct ldb_module *module,
1084                                 struct ldb_request *req)
1085 {
1086         struct event_context *ev;
1087         struct ltdb_context *ac;
1088         struct timed_event *te;
1089         struct timeval tv;
1090
1091         if (check_critical_controls(req->controls)) {
1092                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1093         }
1094
1095         if (req->starttime == 0 || req->timeout == 0) {
1096                 ldb_set_errstring(module->ldb, "Invalid timeout settings");
1097                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1098         }
1099
1100         ev = ldb_get_event_context(module->ldb);
1101
1102         ac = talloc_zero(req, struct ltdb_context);
1103         if (ac == NULL) {
1104                 ldb_set_errstring(module->ldb, "Out of Memory");
1105                 return LDB_ERR_OPERATIONS_ERROR;
1106         }
1107
1108         ac->module = module;
1109         ac->req = req;
1110
1111         tv.tv_sec = 0;
1112         tv.tv_usec = 0;
1113         te = event_add_timed(ev, ac, tv, ltdb_callback, ac);
1114         if (NULL == te) {
1115                 return LDB_ERR_OPERATIONS_ERROR;
1116         }
1117
1118
1119         tv.tv_sec = req->starttime + req->timeout;
1120         te = event_add_timed(ev, ac, tv, ltdb_timeout, ac);
1121         if (NULL == te) {
1122                 return LDB_ERR_OPERATIONS_ERROR;
1123         }
1124
1125         return LDB_SUCCESS;
1126 }
1127
1128 static const struct ldb_module_ops ltdb_ops = {
1129         .name              = "tdb",
1130         .search            = ltdb_handle_request,
1131         .add               = ltdb_handle_request,
1132         .modify            = ltdb_handle_request,
1133         .del               = ltdb_handle_request,
1134         .rename            = ltdb_handle_request,
1135         .extended          = ltdb_handle_request,
1136         .start_transaction = ltdb_start_trans,
1137         .end_transaction   = ltdb_end_trans,
1138         .del_transaction   = ltdb_del_trans,
1139 };
1140
1141 /*
1142   connect to the database
1143 */
1144 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1145                         unsigned int flags, const char *options[],
1146                         struct ldb_module **module)
1147 {
1148         const char *path;
1149         int tdb_flags, open_flags;
1150         struct ltdb_private *ltdb;
1151
1152         /* parse the url */
1153         if (strchr(url, ':')) {
1154                 if (strncmp(url, "tdb://", 6) != 0) {
1155                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1156                                   "Invalid tdb URL '%s'", url);
1157                         return -1;
1158                 }
1159                 path = url+6;
1160         } else {
1161                 path = url;
1162         }
1163
1164         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1165
1166         /* check for the 'nosync' option */
1167         if (flags & LDB_FLG_NOSYNC) {
1168                 tdb_flags |= TDB_NOSYNC;
1169         }
1170
1171         /* and nommap option */
1172         if (flags & LDB_FLG_NOMMAP) {
1173                 tdb_flags |= TDB_NOMMAP;
1174         }
1175
1176         if (flags & LDB_FLG_RDONLY) {
1177                 open_flags = O_RDONLY;
1178         } else {
1179                 open_flags = O_CREAT | O_RDWR;
1180         }
1181
1182         ltdb = talloc_zero(ldb, struct ltdb_private);
1183         if (!ltdb) {
1184                 ldb_oom(ldb);
1185                 return -1;
1186         }
1187
1188         /* note that we use quite a large default hash size */
1189         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1190                                    tdb_flags, open_flags,
1191                                    ldb->create_perms, ldb);
1192         if (!ltdb->tdb) {
1193                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1194                           "Unable to open tdb '%s'\n", path);
1195                 talloc_free(ltdb);
1196                 return -1;
1197         }
1198
1199         ltdb->sequence_number = 0;
1200
1201         *module = talloc(ldb, struct ldb_module);
1202         if ((*module) == NULL) {
1203                 ldb_oom(ldb);
1204                 talloc_free(ltdb);
1205                 return -1;
1206         }
1207         talloc_set_name_const(*module, "ldb_tdb backend");
1208         (*module)->ldb = ldb;
1209         (*module)->prev = (*module)->next = NULL;
1210         (*module)->private_data = ltdb;
1211         (*module)->ops = &ltdb_ops;
1212
1213         if (ltdb_cache_load(*module) != 0) {
1214                 talloc_free(*module);
1215                 talloc_free(ltdb);
1216                 return -1;
1217         }
1218
1219         return 0;
1220 }
1221
1222 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1223         .name = "tdb",
1224         .connect_fn = ltdb_connect
1225 };