7f8342b06777365f5c55a836c00c04d34cccbbb3
[kai/samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         default:
81                 break;
82         }
83         return LDB_ERR_OTHER;
84 }
85
86 /*
87   lock the database for read - use by ltdb_search and ltdb_sequence_number
88 */
89 int ltdb_lock_read(struct ldb_module *module)
90 {
91         void *data = ldb_module_get_private(module);
92         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
93         int ret = 0;
94
95         if (ltdb->in_transaction == 0 &&
96             ltdb->read_lock_count == 0) {
97                 ret = tdb_lockall_read(ltdb->tdb);
98         }
99         if (ret == 0) {
100                 ltdb->read_lock_count++;
101         }
102         return ret;
103 }
104
105 /*
106   unlock the database after a ltdb_lock_read()
107 */
108 int ltdb_unlock_read(struct ldb_module *module)
109 {
110         void *data = ldb_module_get_private(module);
111         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
112         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
113                 return tdb_unlockall_read(ltdb->tdb);
114         }
115         ltdb->read_lock_count--;
116         return 0;
117 }
118
119
120 /*
121   form a TDB_DATA for a record key
122   caller frees
123
124   note that the key for a record can depend on whether the
125   dn refers to a case sensitive index record or not
126 */
127 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
128 {
129         struct ldb_context *ldb = ldb_module_get_ctx(module);
130         TDB_DATA key;
131         char *key_str = NULL;
132         const char *dn_folded = NULL;
133
134         /*
135           most DNs are case insensitive. The exception is index DNs for
136           case sensitive attributes
137
138           there are 3 cases dealt with in this code:
139
140           1) if the dn doesn't start with @ then uppercase the attribute
141              names and the attributes values of case insensitive attributes
142           2) if the dn starts with @ then leave it alone -
143              the indexing code handles the rest
144         */
145
146         dn_folded = ldb_dn_get_casefold(dn);
147         if (!dn_folded) {
148                 goto failed;
149         }
150
151         key_str = talloc_strdup(ldb, "DN=");
152         if (!key_str) {
153                 goto failed;
154         }
155
156         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
157         if (!key_str) {
158                 goto failed;
159         }
160
161         key.dptr = (uint8_t *)key_str;
162         key.dsize = strlen(key_str) + 1;
163
164         return key;
165
166 failed:
167         errno = ENOMEM;
168         key.dptr = NULL;
169         key.dsize = 0;
170         return key;
171 }
172
173 /*
174   check special dn's have valid attributes
175   currently only @ATTRIBUTES is checked
176 */
177 static int ltdb_check_special_dn(struct ldb_module *module,
178                           const struct ldb_message *msg)
179 {
180         struct ldb_context *ldb = ldb_module_get_ctx(module);
181         unsigned int i, j;
182
183         if (! ldb_dn_is_special(msg->dn) ||
184             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
185                 return LDB_SUCCESS;
186         }
187
188         /* we have @ATTRIBUTES, let's check attributes are fine */
189         /* should we check that we deny multivalued attributes ? */
190         for (i = 0; i < msg->num_elements; i++) {
191                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
192
193                 for (j = 0; j < msg->elements[i].num_values; j++) {
194                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
195                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
196                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
197                         }
198                 }
199         }
200
201         return LDB_SUCCESS;
202 }
203
204
205 /*
206   we've made a modification to a dn - possibly reindex and
207   update sequence number
208 */
209 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
210 {
211         int ret = LDB_SUCCESS;
212         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
213
214         /* only allow modifies inside a transaction, otherwise the
215          * ldb is unsafe */
216         if (ltdb->in_transaction == 0) {
217                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
218                 return LDB_ERR_OPERATIONS_ERROR;
219         }
220
221         if (ldb_dn_is_special(dn) &&
222             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
223              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
224                 ret = ltdb_reindex(module);
225         }
226
227         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
228         if (ret == LDB_SUCCESS &&
229             !(ldb_dn_is_special(dn) &&
230               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
231                 ret = ltdb_increase_sequence_number(module);
232         }
233
234         /* If the modify was to @OPTIONS, reload the cache */
235         if (ret == LDB_SUCCESS &&
236             ldb_dn_is_special(dn) &&
237             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
238                 ret = ltdb_cache_reload(module);
239         }
240
241         return ret;
242 }
243
244 /*
245   store a record into the db
246 */
247 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
248 {
249         void *data = ldb_module_get_private(module);
250         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
251         TDB_DATA tdb_key, tdb_data;
252         int ret = LDB_SUCCESS;
253
254         tdb_key = ltdb_key(module, msg->dn);
255         if (tdb_key.dptr == NULL) {
256                 return LDB_ERR_OTHER;
257         }
258
259         ret = ltdb_pack_data(module, msg, &tdb_data);
260         if (ret == -1) {
261                 talloc_free(tdb_key.dptr);
262                 return LDB_ERR_OTHER;
263         }
264
265         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
266         if (ret == -1) {
267                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
268                 goto done;
269         }
270
271 done:
272         talloc_free(tdb_key.dptr);
273         talloc_free(tdb_data.dptr);
274
275         return ret;
276 }
277
278
279 static int ltdb_add_internal(struct ldb_module *module,
280                              const struct ldb_message *msg)
281 {
282         struct ldb_context *ldb = ldb_module_get_ctx(module);
283         int ret = LDB_SUCCESS;
284         unsigned int i;
285
286         ret = ltdb_check_special_dn(module, msg);
287         if (ret != LDB_SUCCESS) {
288                 return ret;
289         }
290
291         if (ltdb_cache_load(module) != 0) {
292                 return LDB_ERR_OPERATIONS_ERROR;
293         }
294
295         for (i=0;i<msg->num_elements;i++) {
296                 struct ldb_message_element *el = &msg->elements[i];
297
298                 if (el->num_values == 0) {
299                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
300                                                el->name, ldb_dn_get_linearized(msg->dn));
301                         return LDB_ERR_CONSTRAINT_VIOLATION;
302                 }
303         }
304
305         ret = ltdb_store(module, msg, TDB_INSERT);
306         if (ret != LDB_SUCCESS) {
307                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
308                         ldb_asprintf_errstring(ldb,
309                                                "Entry %s already exists",
310                                                ldb_dn_get_linearized(msg->dn));
311                 }
312                 return ret;
313         }
314
315         ret = ltdb_index_add_new(module, msg);
316         if (ret != LDB_SUCCESS) {
317                 return ret;
318         }
319
320         ret = ltdb_modified(module, msg->dn);
321
322         return ret;
323 }
324
325 /*
326   add a record to the database
327 */
328 static int ltdb_add(struct ltdb_context *ctx)
329 {
330         struct ldb_module *module = ctx->module;
331         struct ldb_request *req = ctx->req;
332         int ret = LDB_SUCCESS;
333
334         ldb_request_set_state(req, LDB_ASYNC_PENDING);
335
336         if (ltdb_cache_load(module) != 0) {
337                 return LDB_ERR_OPERATIONS_ERROR;
338         }
339
340         ret = ltdb_add_internal(module, req->op.add.message);
341
342         return ret;
343 }
344
345 /*
346   delete a record from the database, not updating indexes (used for deleting
347   index records)
348 */
349 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
350 {
351         void *data = ldb_module_get_private(module);
352         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
353         TDB_DATA tdb_key;
354         int ret;
355
356         tdb_key = ltdb_key(module, dn);
357         if (!tdb_key.dptr) {
358                 return LDB_ERR_OTHER;
359         }
360
361         ret = tdb_delete(ltdb->tdb, tdb_key);
362         talloc_free(tdb_key.dptr);
363
364         if (ret != 0) {
365                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
366         }
367
368         return ret;
369 }
370
371 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
372 {
373         struct ldb_message *msg;
374         int ret = LDB_SUCCESS;
375
376         msg = talloc(module, struct ldb_message);
377         if (msg == NULL) {
378                 return LDB_ERR_OPERATIONS_ERROR;
379         }
380
381         /* in case any attribute of the message was indexed, we need
382            to fetch the old record */
383         ret = ltdb_search_dn1(module, dn, msg);
384         if (ret != LDB_SUCCESS) {
385                 /* not finding the old record is an error */
386                 goto done;
387         }
388
389         ret = ltdb_delete_noindex(module, dn);
390         if (ret != LDB_SUCCESS) {
391                 goto done;
392         }
393
394         /* remove any indexed attributes */
395         ret = ltdb_index_delete(module, msg);
396         if (ret != LDB_SUCCESS) {
397                 goto done;
398         }
399
400         ret = ltdb_modified(module, dn);
401         if (ret != LDB_SUCCESS) {
402                 goto done;
403         }
404
405 done:
406         talloc_free(msg);
407         return ret;
408 }
409
410 /*
411   delete a record from the database
412 */
413 static int ltdb_delete(struct ltdb_context *ctx)
414 {
415         struct ldb_module *module = ctx->module;
416         struct ldb_request *req = ctx->req;
417         int ret = LDB_SUCCESS;
418
419         ldb_request_set_state(req, LDB_ASYNC_PENDING);
420
421         if (ltdb_cache_load(module) != 0) {
422                 return LDB_ERR_OPERATIONS_ERROR;
423         }
424
425         ret = ltdb_delete_internal(module, req->op.del.dn);
426
427         return ret;
428 }
429
430 /*
431   find an element by attribute name. At the moment this does a linear search,
432   it should be re-coded to use a binary search once all places that modify
433   records guarantee sorted order
434
435   return the index of the first matching element if found, otherwise -1
436 */
437 static int find_element(const struct ldb_message *msg, const char *name)
438 {
439         unsigned int i;
440         for (i=0;i<msg->num_elements;i++) {
441                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
442                         return i;
443                 }
444         }
445         return -1;
446 }
447
448
449 /*
450   add an element to an existing record. Assumes a elements array that we
451   can call re-alloc on, and assumed that we can re-use the data pointers from
452   the passed in additional values. Use with care!
453
454   returns 0 on success, -1 on failure (and sets errno)
455 */
456 static int ltdb_msg_add_element(struct ldb_context *ldb,
457                                 struct ldb_message *msg,
458                                 struct ldb_message_element *el)
459 {
460         struct ldb_message_element *e2;
461         unsigned int i;
462
463         if (el->num_values == 0) {
464                 /* nothing to do here - we don't add empty elements */
465                 return 0;
466         }
467
468         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
469                               msg->num_elements+1);
470         if (!e2) {
471                 errno = ENOMEM;
472                 return -1;
473         }
474
475         msg->elements = e2;
476
477         e2 = &msg->elements[msg->num_elements];
478
479         e2->name = el->name;
480         e2->flags = el->flags;
481         e2->values = talloc_array(msg->elements,
482                                   struct ldb_val, el->num_values);
483         if (!e2->values) {
484                 errno = ENOMEM;
485                 return -1;
486         }
487         for (i=0;i<el->num_values;i++) {
488                 e2->values[i] = el->values[i];
489         }
490         e2->num_values = el->num_values;
491
492         ++msg->num_elements;
493
494         return 0;
495 }
496
497 /*
498   delete all elements having a specified attribute name
499 */
500 static int msg_delete_attribute(struct ldb_module *module,
501                                 struct ldb_context *ldb,
502                                 struct ldb_message *msg, const char *name)
503 {
504         unsigned int i;
505         int ret;
506         struct ldb_message_element *el;
507
508         el = ldb_msg_find_element(msg, name);
509         if (el == NULL) {
510                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
511         }
512         i = el - msg->elements;
513
514         ret = ltdb_index_del_element(module, msg->dn, el);
515         if (ret != LDB_SUCCESS) {
516                 return ret;
517         }
518
519         talloc_free(el->values);
520         if (msg->num_elements > (i+1)) {
521                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
522         }
523         msg->num_elements--;
524         msg->elements = talloc_realloc(msg, msg->elements,
525                                        struct ldb_message_element,
526                                        msg->num_elements);
527         return LDB_SUCCESS;
528 }
529
530 /*
531   delete all elements matching an attribute name/value
532
533   return LDB Error on failure
534 */
535 static int msg_delete_element(struct ldb_module *module,
536                               struct ldb_message *msg,
537                               const char *name,
538                               const struct ldb_val *val)
539 {
540         struct ldb_context *ldb = ldb_module_get_ctx(module);
541         unsigned int i;
542         int found, ret;
543         struct ldb_message_element *el;
544         const struct ldb_schema_attribute *a;
545
546         found = find_element(msg, name);
547         if (found == -1) {
548                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
549         }
550
551         el = &msg->elements[found];
552
553         a = ldb_schema_attribute_by_name(ldb, el->name);
554
555         for (i=0;i<el->num_values;i++) {
556                 if (a->syntax->comparison_fn(ldb, ldb,
557                                              &el->values[i], val) == 0) {
558                         if (el->num_values == 1) {
559                                 return msg_delete_attribute(module, ldb, msg, name);
560                         }
561
562                         ret = ltdb_index_del_value(module, msg->dn, el, i);
563                         if (ret != LDB_SUCCESS) {
564                                 return ret;
565                         }
566
567                         if (i<el->num_values-1) {
568                                 memmove(&el->values[i], &el->values[i+1],
569                                         sizeof(el->values[i])*
570                                                 (el->num_values-(i+1)));
571                         }
572                         el->num_values--;
573
574                         /* per definition we find in a canonicalised message an
575                            attribute value only once. So we are finished here */
576                         return LDB_SUCCESS;
577                 }
578         }
579
580         /* Not found */
581         return LDB_ERR_NO_SUCH_ATTRIBUTE;
582 }
583
584
585 /*
586   modify a record - internal interface
587
588   yuck - this is O(n^2). Luckily n is usually small so we probably
589   get away with it, but if we ever have really large attribute lists
590   then we'll need to look at this again
591
592   'req' is optional, and is used to specify controls if supplied
593 */
594 int ltdb_modify_internal(struct ldb_module *module,
595                          const struct ldb_message *msg,
596                          struct ldb_request *req)
597 {
598         struct ldb_context *ldb = ldb_module_get_ctx(module);
599         void *data = ldb_module_get_private(module);
600         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
601         TDB_DATA tdb_key, tdb_data;
602         struct ldb_message *msg2;
603         unsigned i, j, k;
604         int ret = LDB_SUCCESS, idx;
605         struct ldb_control *control_permissive = NULL;
606
607         if (req) {
608                 control_permissive = ldb_request_get_control(req,
609                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
610         }
611
612         tdb_key = ltdb_key(module, msg->dn);
613         if (!tdb_key.dptr) {
614                 return LDB_ERR_OTHER;
615         }
616
617         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
618         if (!tdb_data.dptr) {
619                 talloc_free(tdb_key.dptr);
620                 return ltdb_err_map(tdb_error(ltdb->tdb));
621         }
622
623         msg2 = talloc(tdb_key.dptr, struct ldb_message);
624         if (msg2 == NULL) {
625                 free(tdb_data.dptr);
626                 ret = LDB_ERR_OTHER;
627                 goto done;
628         }
629
630         ret = ltdb_unpack_data(module, &tdb_data, msg2);
631         free(tdb_data.dptr);
632         if (ret == -1) {
633                 ret = LDB_ERR_OTHER;
634                 goto done;
635         }
636
637         if (!msg2->dn) {
638                 msg2->dn = msg->dn;
639         }
640
641         for (i=0; i<msg->num_elements; i++) {
642                 struct ldb_message_element *el = &msg->elements[i], *el2;
643                 struct ldb_val *vals;
644                 const char *dn;
645
646                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
647                 case LDB_FLAG_MOD_ADD:
648
649                         if (el->num_values == 0) {
650                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
651                                                        el->name, ldb_dn_get_linearized(msg2->dn));
652                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
653                                 goto done;
654                         }
655
656                         /* make a copy of the array so that a permissive
657                          * control can remove duplicates without changing the
658                          * original values, but do not copy data as we do not
659                          * need to keep it around once the operation is
660                          * finished */
661                         if (control_permissive) {
662                                 el = talloc(msg2, struct ldb_message_element);
663                                 if (!el) {
664                                         ret = LDB_ERR_OTHER;
665                                         goto done;
666                                 }
667                                 el->name = msg->elements[i].name;
668                                 el->num_values = msg->elements[i].num_values;
669                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
670                                 if (el->values == NULL) {
671                                         ret = LDB_ERR_OTHER;
672                                         goto done;
673                                 }
674                                 for (j = 0; j < el->num_values; j++) {
675                                         el->values[j] = msg->elements[i].values[j];
676                                 }
677                         }
678
679                         /* Checks if element already exists */
680                         idx = find_element(msg2, el->name);
681                         if (idx == -1) {
682                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
683                                         ret = LDB_ERR_OTHER;
684                                         goto done;
685                                 }
686                                 ret = ltdb_index_add_element(module, msg2->dn, el);
687                                 if (ret != LDB_SUCCESS) {
688                                         goto done;
689                                 }
690                         } else {
691                                 el2 = &(msg2->elements[idx]);
692
693                                 /* Check that values don't exist yet on multi-
694                                    valued attributes or aren't provided twice */
695                                 for (j = 0; j < el->num_values; j++) {
696                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
697                                                 if (control_permissive) {
698                                                         /* remove this one as if it was never added */
699                                                         el->num_values--;
700                                                         for (k = j; k < el->num_values; k++) {
701                                                                 el->values[k] = el->values[k + 1];
702                                                         }
703                                                         j--; /* rewind */
704
705                                                         continue;
706                                                 }
707
708                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
709                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
710                                                 goto done;
711                                         }
712                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
713                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
714                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
715                                                 goto done;
716                                         }
717                                 }
718
719                                 /* Now combine existing and new values to a new
720                                    attribute record */
721                                 vals = talloc_realloc(msg2->elements,
722                                                       el2->values, struct ldb_val,
723                                                       el2->num_values + el->num_values);
724                                 if (vals == NULL) {
725                                         ldb_oom(ldb);
726                                         ret = LDB_ERR_OTHER;
727                                         goto done;
728                                 }
729
730                                 for (j=0; j<el->num_values; j++) {
731                                         vals[el2->num_values + j] =
732                                                 ldb_val_dup(vals, &el->values[j]);
733                                 }
734
735                                 el2->values = vals;
736                                 el2->num_values += el->num_values;
737
738                                 ret = ltdb_index_add_element(module, msg2->dn, el);
739                                 if (ret != LDB_SUCCESS) {
740                                         goto done;
741                                 }
742                         }
743
744                         break;
745
746                 case LDB_FLAG_MOD_REPLACE:
747
748                         /* TODO: This is O(n^2) - replace with more efficient check */
749                         for (j=0; j<el->num_values; j++) {
750                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
751                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
752                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
753                                         goto done;
754                                 }
755                         }
756
757                         idx = find_element(msg2, el->name);
758                         if (idx != -1) {
759                                 el2 = &(msg2->elements[idx]);
760                                 if (ldb_msg_element_compare(el, el2) == 0) {
761                                         /* we are replacing with the same values */
762                                         continue;
763                                 }
764                         
765                                 /* Delete the attribute if it exists in the DB */
766                                 if (msg_delete_attribute(module, ldb, msg2, el->name) != 0) {
767                                         ret = LDB_ERR_OTHER;
768                                         goto done;
769                                 }
770                         }
771
772                         /* Recreate it with the new values */
773                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
774                                 ret = LDB_ERR_OTHER;
775                                 goto done;
776                         }
777
778                         ret = ltdb_index_add_element(module, msg2->dn, el);
779                         if (ret != LDB_SUCCESS) {
780                                 goto done;
781                         }
782
783                         break;
784
785                 case LDB_FLAG_MOD_DELETE:
786                         dn = ldb_dn_get_linearized(msg2->dn);
787                         if (dn == NULL) {
788                                 ret = LDB_ERR_OTHER;
789                                 goto done;
790                         }
791
792                         if (msg->elements[i].num_values == 0) {
793                                 /* Delete the whole attribute */
794                                 ret = msg_delete_attribute(module, ldb, msg2,
795                                                            msg->elements[i].name);
796                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
797                                     control_permissive) {
798                                         ret = LDB_SUCCESS;
799                                 } else {
800                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
801                                                                msg->elements[i].name, dn);
802                                 }
803                                 if (ret != LDB_SUCCESS) {
804                                         goto done;
805                                 }
806                         } else {
807                                 /* Delete specified values from an attribute */
808                                 for (j=0; j < msg->elements[i].num_values; j++) {
809                                         ret = msg_delete_element(module,
810                                                                  msg2,
811                                                                  msg->elements[i].name,
812                                                                  &msg->elements[i].values[j]);
813                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
814                                             control_permissive) {
815                                                 ret = LDB_SUCCESS;
816                                         } else {
817                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
818                                                                        msg->elements[i].name, dn);
819                                         }
820                                         if (ret != LDB_SUCCESS) {
821                                                 goto done;
822                                         }
823                                 }
824                         }
825                         break;
826                 default:
827                         ldb_asprintf_errstring(ldb,
828                                 "Invalid ldb_modify flags on %s: 0x%x",
829                                 msg->elements[i].name,
830                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
831                         ret = LDB_ERR_PROTOCOL_ERROR;
832                         goto done;
833                 }
834         }
835
836         ret = ltdb_store(module, msg2, TDB_MODIFY);
837         if (ret != LDB_SUCCESS) {
838                 goto done;
839         }
840
841         ret = ltdb_modified(module, msg2->dn);
842         if (ret != LDB_SUCCESS) {
843                 goto done;
844         }
845
846 done:
847         talloc_free(tdb_key.dptr);
848         return ret;
849 }
850
851 /*
852   modify a record
853 */
854 static int ltdb_modify(struct ltdb_context *ctx)
855 {
856         struct ldb_module *module = ctx->module;
857         struct ldb_request *req = ctx->req;
858         int ret = LDB_SUCCESS;
859
860         ret = ltdb_check_special_dn(module, req->op.mod.message);
861         if (ret != LDB_SUCCESS) {
862                 return ret;
863         }
864
865         ldb_request_set_state(req, LDB_ASYNC_PENDING);
866
867         if (ltdb_cache_load(module) != 0) {
868                 return LDB_ERR_OPERATIONS_ERROR;
869         }
870
871         ret = ltdb_modify_internal(module, req->op.mod.message, req);
872
873         return ret;
874 }
875
876 /*
877   rename a record
878 */
879 static int ltdb_rename(struct ltdb_context *ctx)
880 {
881         struct ldb_module *module = ctx->module;
882         struct ldb_request *req = ctx->req;
883         struct ldb_message *msg;
884         int ret = LDB_SUCCESS;
885
886         ldb_request_set_state(req, LDB_ASYNC_PENDING);
887
888         if (ltdb_cache_load(ctx->module) != 0) {
889                 return LDB_ERR_OPERATIONS_ERROR;
890         }
891
892         msg = talloc(ctx, struct ldb_message);
893         if (msg == NULL) {
894                 return LDB_ERR_OPERATIONS_ERROR;
895         }
896
897         /* in case any attribute of the message was indexed, we need
898            to fetch the old record */
899         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
900         if (ret != LDB_SUCCESS) {
901                 /* not finding the old record is an error */
902                 return ret;
903         }
904
905         /* Always delete first then add, to avoid conflicts with
906          * unique indexes. We rely on the transaction to make this
907          * atomic
908          */
909         ret = ltdb_delete_internal(module, msg->dn);
910         if (ret != LDB_SUCCESS) {
911                 return ret;
912         }
913
914         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
915         if (msg->dn == NULL) {
916                 return LDB_ERR_OPERATIONS_ERROR;
917         }
918
919         ret = ltdb_add_internal(module, msg);
920
921         return ret;
922 }
923
924 static int ltdb_start_trans(struct ldb_module *module)
925 {
926         void *data = ldb_module_get_private(module);
927         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
928
929         if (tdb_transaction_start(ltdb->tdb) != 0) {
930                 return ltdb_err_map(tdb_error(ltdb->tdb));
931         }
932
933         ltdb->in_transaction++;
934
935         ltdb_index_transaction_start(module);
936
937         return LDB_SUCCESS;
938 }
939
940 static int ltdb_prepare_commit(struct ldb_module *module)
941 {
942         void *data = ldb_module_get_private(module);
943         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
944
945         if (ltdb->in_transaction != 1) {
946                 return LDB_SUCCESS;
947         }
948
949         if (ltdb_index_transaction_commit(module) != 0) {
950                 tdb_transaction_cancel(ltdb->tdb);
951                 ltdb->in_transaction--;
952                 return ltdb_err_map(tdb_error(ltdb->tdb));
953         }
954
955         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
956                 ltdb->in_transaction--;
957                 return ltdb_err_map(tdb_error(ltdb->tdb));
958         }
959
960         ltdb->prepared_commit = true;
961
962         return LDB_SUCCESS;
963 }
964
965 static int ltdb_end_trans(struct ldb_module *module)
966 {
967         void *data = ldb_module_get_private(module);
968         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
969
970         if (!ltdb->prepared_commit) {
971                 int ret = ltdb_prepare_commit(module);
972                 if (ret != LDB_SUCCESS) {
973                         return ret;
974                 }
975         }
976
977         ltdb->in_transaction--;
978         ltdb->prepared_commit = false;
979
980         if (tdb_transaction_commit(ltdb->tdb) != 0) {
981                 return ltdb_err_map(tdb_error(ltdb->tdb));
982         }
983
984         return LDB_SUCCESS;
985 }
986
987 static int ltdb_del_trans(struct ldb_module *module)
988 {
989         void *data = ldb_module_get_private(module);
990         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
991
992         ltdb->in_transaction--;
993
994         if (ltdb_index_transaction_cancel(module) != 0) {
995                 tdb_transaction_cancel(ltdb->tdb);
996                 return ltdb_err_map(tdb_error(ltdb->tdb));
997         }
998
999         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
1000                 return ltdb_err_map(tdb_error(ltdb->tdb));
1001         }
1002
1003         return LDB_SUCCESS;
1004 }
1005
1006 /*
1007   return sequenceNumber from @BASEINFO
1008 */
1009 static int ltdb_sequence_number(struct ltdb_context *ctx,
1010                                 struct ldb_extended **ext)
1011 {
1012         struct ldb_context *ldb;
1013         struct ldb_module *module = ctx->module;
1014         struct ldb_request *req = ctx->req;
1015         TALLOC_CTX *tmp_ctx;
1016         struct ldb_seqnum_request *seq;
1017         struct ldb_seqnum_result *res;
1018         struct ldb_message *msg = NULL;
1019         struct ldb_dn *dn;
1020         const char *date;
1021         int ret = LDB_SUCCESS;
1022
1023         ldb = ldb_module_get_ctx(module);
1024
1025         seq = talloc_get_type(req->op.extended.data,
1026                                 struct ldb_seqnum_request);
1027         if (seq == NULL) {
1028                 return LDB_ERR_OPERATIONS_ERROR;
1029         }
1030
1031         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1032
1033         if (ltdb_lock_read(module) != 0) {
1034                 return LDB_ERR_OPERATIONS_ERROR;
1035         }
1036
1037         res = talloc_zero(req, struct ldb_seqnum_result);
1038         if (res == NULL) {
1039                 ret = LDB_ERR_OPERATIONS_ERROR;
1040                 goto done;
1041         }
1042         tmp_ctx = talloc_new(req);
1043         if (tmp_ctx == NULL) {
1044                 ret = LDB_ERR_OPERATIONS_ERROR;
1045                 goto done;
1046         }
1047
1048         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1049
1050         msg = talloc(tmp_ctx, struct ldb_message);
1051         if (msg == NULL) {
1052                 ret = LDB_ERR_OPERATIONS_ERROR;
1053                 goto done;
1054         }
1055
1056         ret = ltdb_search_dn1(module, dn, msg);
1057         if (ret != LDB_SUCCESS) {
1058                 goto done;
1059         }
1060
1061         switch (seq->type) {
1062         case LDB_SEQ_HIGHEST_SEQ:
1063                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1064                 break;
1065         case LDB_SEQ_NEXT:
1066                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1067                 res->seq_num++;
1068                 break;
1069         case LDB_SEQ_HIGHEST_TIMESTAMP:
1070                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1071                 if (date) {
1072                         res->seq_num = ldb_string_to_time(date);
1073                 } else {
1074                         res->seq_num = 0;
1075                         /* zero is as good as anything when we don't know */
1076                 }
1077                 break;
1078         }
1079
1080         *ext = talloc_zero(req, struct ldb_extended);
1081         if (*ext == NULL) {
1082                 ret = LDB_ERR_OPERATIONS_ERROR;
1083                 goto done;
1084         }
1085         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1086         (*ext)->data = talloc_steal(*ext, res);
1087
1088 done:
1089         talloc_free(tmp_ctx);
1090         ltdb_unlock_read(module);
1091         return ret;
1092 }
1093
1094 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1095 {
1096         struct ldb_context *ldb;
1097         struct ldb_request *req;
1098         struct ldb_reply *ares;
1099
1100         ldb = ldb_module_get_ctx(ctx->module);
1101         req = ctx->req;
1102
1103         /* if we already returned an error just return */
1104         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1105                 return;
1106         }
1107
1108         ares = talloc_zero(req, struct ldb_reply);
1109         if (!ares) {
1110                 ldb_oom(ldb);
1111                 req->callback(req, NULL);
1112                 return;
1113         }
1114         ares->type = LDB_REPLY_DONE;
1115         ares->error = error;
1116
1117         req->callback(req, ares);
1118 }
1119
1120 static void ltdb_timeout(struct tevent_context *ev,
1121                           struct tevent_timer *te,
1122                           struct timeval t,
1123                           void *private_data)
1124 {
1125         struct ltdb_context *ctx;
1126         ctx = talloc_get_type(private_data, struct ltdb_context);
1127
1128         if (!ctx->request_terminated) {
1129                 /* request is done now */
1130                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1131         }
1132
1133         if (!ctx->request_terminated) {
1134                 /* neutralize the spy */
1135                 ctx->spy->ctx = NULL;
1136         }
1137         talloc_free(ctx);
1138 }
1139
1140 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1141                                         struct ldb_extended *ext,
1142                                         int error)
1143 {
1144         struct ldb_context *ldb;
1145         struct ldb_request *req;
1146         struct ldb_reply *ares;
1147
1148         ldb = ldb_module_get_ctx(ctx->module);
1149         req = ctx->req;
1150
1151         /* if we already returned an error just return */
1152         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1153                 return;
1154         }
1155
1156         ares = talloc_zero(req, struct ldb_reply);
1157         if (!ares) {
1158                 ldb_oom(ldb);
1159                 req->callback(req, NULL);
1160                 return;
1161         }
1162         ares->type = LDB_REPLY_DONE;
1163         ares->response = ext;
1164         ares->error = error;
1165
1166         req->callback(req, ares);
1167 }
1168
1169 static void ltdb_handle_extended(struct ltdb_context *ctx)
1170 {
1171         struct ldb_extended *ext = NULL;
1172         int ret;
1173
1174         if (strcmp(ctx->req->op.extended.oid,
1175                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1176                 /* get sequence number */
1177                 ret = ltdb_sequence_number(ctx, &ext);
1178         } else {
1179                 /* not recognized */
1180                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1181         }
1182
1183         ltdb_request_extended_done(ctx, ext, ret);
1184 }
1185
1186 static void ltdb_callback(struct tevent_context *ev,
1187                           struct tevent_timer *te,
1188                           struct timeval t,
1189                           void *private_data)
1190 {
1191         struct ltdb_context *ctx;
1192         int ret;
1193
1194         ctx = talloc_get_type(private_data, struct ltdb_context);
1195
1196         if (ctx->request_terminated) {
1197                 goto done;
1198         }
1199
1200         switch (ctx->req->operation) {
1201         case LDB_SEARCH:
1202                 ret = ltdb_search(ctx);
1203                 break;
1204         case LDB_ADD:
1205                 ret = ltdb_add(ctx);
1206                 break;
1207         case LDB_MODIFY:
1208                 ret = ltdb_modify(ctx);
1209                 break;
1210         case LDB_DELETE:
1211                 ret = ltdb_delete(ctx);
1212                 break;
1213         case LDB_RENAME:
1214                 ret = ltdb_rename(ctx);
1215                 break;
1216         case LDB_EXTENDED:
1217                 ltdb_handle_extended(ctx);
1218                 goto done;
1219         default:
1220                 /* no other op supported */
1221                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1222         }
1223
1224         if (!ctx->request_terminated) {
1225                 /* request is done now */
1226                 ltdb_request_done(ctx, ret);
1227         }
1228
1229 done:
1230         if (!ctx->request_terminated) {
1231                 /* neutralize the spy */
1232                 ctx->spy->ctx = NULL;
1233         }
1234         talloc_free(ctx);
1235 }
1236
1237 static int ltdb_request_destructor(void *ptr)
1238 {
1239         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1240
1241         if (spy->ctx != NULL) {
1242                 spy->ctx->request_terminated = true;
1243         }
1244
1245         return 0;
1246 }
1247
1248 static int ltdb_handle_request(struct ldb_module *module,
1249                                struct ldb_request *req)
1250 {
1251         struct ldb_control *control_permissive;
1252         struct ldb_context *ldb;
1253         struct tevent_context *ev;
1254         struct ltdb_context *ac;
1255         struct tevent_timer *te;
1256         struct timeval tv;
1257         int i;
1258
1259         ldb = ldb_module_get_ctx(module);
1260
1261         control_permissive = ldb_request_get_control(req,
1262                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1263
1264         for (i = 0; req->controls && req->controls[i]; i++) {
1265                 if (req->controls[i]->critical &&
1266                     req->controls[i] != control_permissive) {
1267                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1268                                                req->controls[i]->oid);
1269                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1270                 }
1271         }
1272
1273         if (req->starttime == 0 || req->timeout == 0) {
1274                 ldb_set_errstring(ldb, "Invalid timeout settings");
1275                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1276         }
1277
1278         ev = ldb_get_event_context(ldb);
1279
1280         ac = talloc_zero(ldb, struct ltdb_context);
1281         if (ac == NULL) {
1282                 ldb_oom(ldb);
1283                 return LDB_ERR_OPERATIONS_ERROR;
1284         }
1285
1286         ac->module = module;
1287         ac->req = req;
1288
1289         tv.tv_sec = 0;
1290         tv.tv_usec = 0;
1291         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1292         if (NULL == te) {
1293                 talloc_free(ac);
1294                 return LDB_ERR_OPERATIONS_ERROR;
1295         }
1296
1297         tv.tv_sec = req->starttime + req->timeout;
1298         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1299         if (NULL == ac->timeout_event) {
1300                 talloc_free(ac);
1301                 return LDB_ERR_OPERATIONS_ERROR;
1302         }
1303
1304         /* set a spy so that we do not try to use the request context
1305          * if it is freed before ltdb_callback fires */
1306         ac->spy = talloc(req, struct ltdb_req_spy);
1307         if (NULL == ac->spy) {
1308                 talloc_free(ac);
1309                 return LDB_ERR_OPERATIONS_ERROR;
1310         }
1311         ac->spy->ctx = ac;
1312
1313         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1314
1315         return LDB_SUCCESS;
1316 }
1317
1318 static int ltdb_init_rootdse(struct ldb_module *module)
1319 {
1320         struct ldb_context *ldb;
1321         int ret;
1322
1323         ldb = ldb_module_get_ctx(module);
1324
1325         ret = ldb_mod_register_control(module,
1326                                        LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1327         /* ignore errors on this - we expect it for non-sam databases */
1328
1329         /* there can be no module beyond the backend, just return */
1330         return LDB_SUCCESS;
1331 }
1332
1333 static const struct ldb_module_ops ltdb_ops = {
1334         .name              = "tdb",
1335         .init_context      = ltdb_init_rootdse,
1336         .search            = ltdb_handle_request,
1337         .add               = ltdb_handle_request,
1338         .modify            = ltdb_handle_request,
1339         .del               = ltdb_handle_request,
1340         .rename            = ltdb_handle_request,
1341         .extended          = ltdb_handle_request,
1342         .start_transaction = ltdb_start_trans,
1343         .end_transaction   = ltdb_end_trans,
1344         .prepare_commit    = ltdb_prepare_commit,
1345         .del_transaction   = ltdb_del_trans,
1346 };
1347
1348 /*
1349   connect to the database
1350 */
1351 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1352                         unsigned int flags, const char *options[],
1353                         struct ldb_module **_module)
1354 {
1355         struct ldb_module *module;
1356         const char *path;
1357         int tdb_flags, open_flags;
1358         struct ltdb_private *ltdb;
1359
1360         /* parse the url */
1361         if (strchr(url, ':')) {
1362                 if (strncmp(url, "tdb://", 6) != 0) {
1363                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1364                                   "Invalid tdb URL '%s'", url);
1365                         return LDB_ERR_OPERATIONS_ERROR;
1366                 }
1367                 path = url+6;
1368         } else {
1369                 path = url;
1370         }
1371
1372         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1373
1374         /* check for the 'nosync' option */
1375         if (flags & LDB_FLG_NOSYNC) {
1376                 tdb_flags |= TDB_NOSYNC;
1377         }
1378
1379         /* and nommap option */
1380         if (flags & LDB_FLG_NOMMAP) {
1381                 tdb_flags |= TDB_NOMMAP;
1382         }
1383
1384         if (flags & LDB_FLG_RDONLY) {
1385                 open_flags = O_RDONLY;
1386         } else {
1387                 open_flags = O_CREAT | O_RDWR;
1388         }
1389
1390         ltdb = talloc_zero(ldb, struct ltdb_private);
1391         if (!ltdb) {
1392                 ldb_oom(ldb);
1393                 return LDB_ERR_OPERATIONS_ERROR;
1394         }
1395
1396         /* note that we use quite a large default hash size */
1397         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1398                                    tdb_flags, open_flags,
1399                                    ldb_get_create_perms(ldb), ldb);
1400         if (!ltdb->tdb) {
1401                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1402                           "Unable to open tdb '%s'", path);
1403                 talloc_free(ltdb);
1404                 return LDB_ERR_OPERATIONS_ERROR;
1405         }
1406
1407         ltdb->sequence_number = 0;
1408
1409         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1410         if (!module) {
1411                 talloc_free(ltdb);
1412                 return LDB_ERR_OPERATIONS_ERROR;
1413         }
1414         ldb_module_set_private(module, ltdb);
1415         talloc_steal(module, ltdb);
1416
1417         if (ltdb_cache_load(module) != 0) {
1418                 talloc_free(module);
1419                 talloc_free(ltdb);
1420                 return LDB_ERR_OPERATIONS_ERROR;
1421         }
1422
1423         *_module = module;
1424         return LDB_SUCCESS;
1425 }
1426
1427 _PRIVATE_ const struct ldb_backend_ops ldb_tdb_backend_ops = {
1428         .name = "tdb",
1429         .connect_fn = ltdb_connect,
1430 };