63ce6e7aecc2706f6ca400845bfd17b323693591
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 static int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         if (ltdb->in_transaction == 0) {
92                 return tdb_lockall_read(ltdb->tdb);
93         }
94         return 0;
95 }
96
97 /*
98   unlock the database after a ltdb_lock_read()
99 */
100 int ltdb_unlock_read(struct ldb_module *module)
101 {
102         void *data = ldb_module_get_private(module);
103         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
104         if (ltdb->in_transaction == 0) {
105                 return tdb_unlockall_read(ltdb->tdb);
106         }
107         return 0;
108 }
109
110
111 /*
112   form a TDB_DATA for a record key
113   caller frees
114
115   note that the key for a record can depend on whether the
116   dn refers to a case sensitive index record or not
117 */
118 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
119 {
120         struct ldb_context *ldb = ldb_module_get_ctx(module);
121         TDB_DATA key;
122         char *key_str = NULL;
123         const char *dn_folded = NULL;
124
125         /*
126           most DNs are case insensitive. The exception is index DNs for
127           case sensitive attributes
128
129           there are 3 cases dealt with in this code:
130
131           1) if the dn doesn't start with @ then uppercase the attribute
132              names and the attributes values of case insensitive attributes
133           2) if the dn starts with @ then leave it alone -
134              the indexing code handles the rest
135         */
136
137         dn_folded = ldb_dn_get_casefold(dn);
138         if (!dn_folded) {
139                 goto failed;
140         }
141
142         key_str = talloc_strdup(ldb, "DN=");
143         if (!key_str) {
144                 goto failed;
145         }
146
147         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
148         if (!key_str) {
149                 goto failed;
150         }
151
152         key.dptr = (uint8_t *)key_str;
153         key.dsize = strlen(key_str) + 1;
154
155         return key;
156
157 failed:
158         errno = ENOMEM;
159         key.dptr = NULL;
160         key.dsize = 0;
161         return key;
162 }
163
164 /*
165   check special dn's have valid attributes
166   currently only @ATTRIBUTES is checked
167 */
168 static int ltdb_check_special_dn(struct ldb_module *module,
169                           const struct ldb_message *msg)
170 {
171         struct ldb_context *ldb = ldb_module_get_ctx(module);
172         int i, j;
173
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return LDB_SUCCESS;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
183
184                 for (j = 0; j < msg->elements[i].num_values; j++) {
185                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
186                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
187                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
188                         }
189                 }
190         }
191
192         return LDB_SUCCESS;
193 }
194
195
196 /*
197   we've made a modification to a dn - possibly reindex and
198   update sequence number
199 */
200 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
201 {
202         int ret = LDB_SUCCESS;
203
204         if (ldb_dn_is_special(dn) &&
205             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
206              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
207                 ret = ltdb_reindex(module);
208         }
209
210         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
211         if (ret == LDB_SUCCESS &&
212             !(ldb_dn_is_special(dn) &&
213               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
214                 ret = ltdb_increase_sequence_number(module);
215         }
216
217         /* If the modify was to @OPTIONS, reload the cache */
218         if (ldb_dn_is_special(dn) &&
219             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
220                 ret = ltdb_cache_reload(module);
221         }
222
223         return ret;
224 }
225
226 /*
227   store a record into the db
228 */
229 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
230 {
231         void *data = ldb_module_get_private(module);
232         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
233         TDB_DATA tdb_key, tdb_data;
234         int ret = LDB_SUCCESS;
235
236         tdb_key = ltdb_key(module, msg->dn);
237         if (tdb_key.dptr == NULL) {
238                 return LDB_ERR_OTHER;
239         }
240
241         ret = ltdb_pack_data(module, msg, &tdb_data);
242         if (ret == -1) {
243                 talloc_free(tdb_key.dptr);
244                 return LDB_ERR_OTHER;
245         }
246
247         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
248         if (ret == -1) {
249                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
250                 goto done;
251         }
252
253         ret = ltdb_index_add(module, msg);
254         if (ret != LDB_SUCCESS) {
255                 tdb_delete(ltdb->tdb, tdb_key);
256         }
257
258 done:
259         talloc_free(tdb_key.dptr);
260         talloc_free(tdb_data.dptr);
261
262         return ret;
263 }
264
265
266 static int ltdb_add_internal(struct ldb_module *module,
267                              const struct ldb_message *msg)
268 {
269         struct ldb_context *ldb = ldb_module_get_ctx(module);
270         int ret = LDB_SUCCESS, i;
271
272         ret = ltdb_check_special_dn(module, msg);
273         if (ret != LDB_SUCCESS) {
274                 return ret;
275         }
276
277         if (ltdb_cache_load(module) != 0) {
278                 return LDB_ERR_OPERATIONS_ERROR;
279         }
280
281         for (i=0;i<msg->num_elements;i++) {
282                 struct ldb_message_element *el = &msg->elements[i];
283                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
284
285                 if (el->num_values == 0) {
286                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
287                                                el->name, ldb_dn_get_linearized(msg->dn));
288                         return LDB_ERR_CONSTRAINT_VIOLATION;
289                 }
290                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
291                         if (el->num_values > 1) {
292                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
293                                                        el->name, ldb_dn_get_linearized(msg->dn));
294                                 return LDB_ERR_CONSTRAINT_VIOLATION;
295                         }
296                 }
297         }
298
299         ret = ltdb_store(module, msg, TDB_INSERT);
300         if (ret != LDB_SUCCESS) {
301                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
302                         ldb_asprintf_errstring(ldb,
303                                                "Entry %s already exists",
304                                                ldb_dn_get_linearized(msg->dn));
305                 }
306                 return ret;
307         }
308
309         ret = ltdb_index_one(module, msg, 1);
310         if (ret != LDB_SUCCESS) {
311                 return ret;
312         }
313
314         ret = ltdb_modified(module, msg->dn);
315
316         return ret;
317 }
318
319 /*
320   add a record to the database
321 */
322 static int ltdb_add(struct ltdb_context *ctx)
323 {
324         struct ldb_module *module = ctx->module;
325         struct ldb_request *req = ctx->req;
326         int ret = LDB_SUCCESS;
327
328         ldb_request_set_state(req, LDB_ASYNC_PENDING);
329
330         if (ltdb_cache_load(module) != 0) {
331                 return LDB_ERR_OPERATIONS_ERROR;
332         }
333
334         ret = ltdb_add_internal(module, req->op.add.message);
335
336         return ret;
337 }
338
339 /*
340   delete a record from the database, not updating indexes (used for deleting
341   index records)
342 */
343 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
344 {
345         void *data = ldb_module_get_private(module);
346         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
347         TDB_DATA tdb_key;
348         int ret;
349
350         tdb_key = ltdb_key(module, dn);
351         if (!tdb_key.dptr) {
352                 return LDB_ERR_OTHER;
353         }
354
355         ret = tdb_delete(ltdb->tdb, tdb_key);
356         talloc_free(tdb_key.dptr);
357
358         if (ret != 0) {
359                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
360         }
361
362         return ret;
363 }
364
365 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
366 {
367         struct ldb_message *msg;
368         int ret = LDB_SUCCESS;
369
370         msg = talloc(module, struct ldb_message);
371         if (msg == NULL) {
372                 return LDB_ERR_OPERATIONS_ERROR;
373         }
374
375         /* in case any attribute of the message was indexed, we need
376            to fetch the old record */
377         ret = ltdb_search_dn1(module, dn, msg);
378         if (ret != LDB_SUCCESS) {
379                 /* not finding the old record is an error */
380                 goto done;
381         }
382
383         ret = ltdb_delete_noindex(module, dn);
384         if (ret != LDB_SUCCESS) {
385                 goto done;
386         }
387
388         /* remove one level attribute */
389         ret = ltdb_index_one(module, msg, 0);
390         if (ret != LDB_SUCCESS) {
391                 goto done;
392         }
393
394         /* remove any indexed attributes */
395         ret = ltdb_index_del(module, msg);
396         if (ret != LDB_SUCCESS) {
397                 goto done;
398         }
399
400         ret = ltdb_modified(module, dn);
401         if (ret != LDB_SUCCESS) {
402                 goto done;
403         }
404
405 done:
406         talloc_free(msg);
407         return ret;
408 }
409
410 /*
411   delete a record from the database
412 */
413 static int ltdb_delete(struct ltdb_context *ctx)
414 {
415         struct ldb_module *module = ctx->module;
416         struct ldb_request *req = ctx->req;
417         int ret = LDB_SUCCESS;
418
419         ldb_request_set_state(req, LDB_ASYNC_PENDING);
420
421         if (ltdb_cache_load(module) != 0) {
422                 return LDB_ERR_OPERATIONS_ERROR;
423         }
424
425         ret = ltdb_delete_internal(module, req->op.del.dn);
426
427         return ret;
428 }
429
430 /*
431   find an element by attribute name. At the moment this does a linear search,
432   it should be re-coded to use a binary search once all places that modify
433   records guarantee sorted order
434
435   return the index of the first matching element if found, otherwise -1
436 */
437 static int find_element(const struct ldb_message *msg, const char *name)
438 {
439         unsigned int i;
440         for (i=0;i<msg->num_elements;i++) {
441                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
442                         return i;
443                 }
444         }
445         return -1;
446 }
447
448
449 /*
450   add an element to an existing record. Assumes a elements array that we
451   can call re-alloc on, and assumed that we can re-use the data pointers from
452   the passed in additional values. Use with care!
453
454   returns 0 on success, -1 on failure (and sets errno)
455 */
456 static int msg_add_element(struct ldb_context *ldb,
457                            struct ldb_message *msg,
458                            struct ldb_message_element *el)
459 {
460         struct ldb_message_element *e2;
461         unsigned int i;
462
463         if (el->num_values == 0) {
464                 /* nothing to do here - we don't add empty elements */
465                 return 0;
466         }
467
468         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
469                               msg->num_elements+1);
470         if (!e2) {
471                 errno = ENOMEM;
472                 return -1;
473         }
474
475         msg->elements = e2;
476
477         e2 = &msg->elements[msg->num_elements];
478
479         e2->name = el->name;
480         e2->flags = el->flags;
481         e2->values = talloc_array(msg->elements,
482                                   struct ldb_val, el->num_values);
483         if (!e2->values) {
484                 errno = ENOMEM;
485                 return -1;
486         }
487         for (i=0;i<el->num_values;i++) {
488                 e2->values[i] = el->values[i];
489         }
490         e2->num_values = el->num_values;
491
492         ++msg->num_elements;
493
494         return 0;
495 }
496
497 /*
498   delete all elements having a specified attribute name
499 */
500 static int msg_delete_attribute(struct ldb_module *module,
501                                 struct ldb_context *ldb,
502                                 struct ldb_message *msg, const char *name)
503 {
504         const char *dn;
505         unsigned int i, j;
506
507         dn = ldb_dn_get_linearized(msg->dn);
508         if (dn == NULL) {
509                 return -1;
510         }
511
512         for (i=0;i<msg->num_elements;i++) {
513                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
514                         for (j=0;j<msg->elements[i].num_values;j++) {
515                                 ltdb_index_del_value(module, dn,
516                                                      &msg->elements[i], j);
517                         }
518                         talloc_free(msg->elements[i].values);
519                         if (msg->num_elements > (i+1)) {
520                                 memmove(&msg->elements[i],
521                                         &msg->elements[i+1],
522                                         sizeof(struct ldb_message_element)*
523                                         (msg->num_elements - (i+1)));
524                         }
525                         msg->num_elements--;
526                         i--;
527                         msg->elements = talloc_realloc(msg, msg->elements,
528                                                        struct ldb_message_element,
529                                                        msg->num_elements);
530
531                         /* per definition we find in a canonicalised message an
532                            attribute only once. So we are finished here. */
533                         return 0;
534                 }
535         }
536
537         /* Not found */
538         return -1;
539 }
540
541 /*
542   delete all elements matching an attribute name/value
543
544   return 0 on success, -1 on failure
545 */
546 static int msg_delete_element(struct ldb_module *module,
547                               struct ldb_message *msg,
548                               const char *name,
549                               const struct ldb_val *val)
550 {
551         struct ldb_context *ldb = ldb_module_get_ctx(module);
552         unsigned int i;
553         int found;
554         struct ldb_message_element *el;
555         const struct ldb_schema_attribute *a;
556
557         found = find_element(msg, name);
558         if (found == -1) {
559                 return -1;
560         }
561
562         el = &msg->elements[found];
563
564         a = ldb_schema_attribute_by_name(ldb, el->name);
565
566         for (i=0;i<el->num_values;i++) {
567                 if (a->syntax->comparison_fn(ldb, ldb,
568                                                 &el->values[i], val) == 0) {
569                         if (i<el->num_values-1) {
570                                 memmove(&el->values[i], &el->values[i+1],
571                                         sizeof(el->values[i])*
572                                                 (el->num_values-(i+1)));
573                         }
574                         el->num_values--;
575                         if (el->num_values == 0) {
576                                 return msg_delete_attribute(module, ldb,
577                                                             msg, name);
578                         }
579
580                         /* per definition we find in a canonicalised message an
581                            attribute value only once. So we are finished here */
582                         return 0;
583                 }
584         }
585
586         /* Not found */
587         return -1;
588 }
589
590
591 /*
592   modify a record - internal interface
593
594   yuck - this is O(n^2). Luckily n is usually small so we probably
595   get away with it, but if we ever have really large attribute lists
596   then we'll need to look at this again
597 */
598 int ltdb_modify_internal(struct ldb_module *module,
599                          const struct ldb_message *msg)
600 {
601         struct ldb_context *ldb = ldb_module_get_ctx(module);
602         void *data = ldb_module_get_private(module);
603         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
604         TDB_DATA tdb_key, tdb_data;
605         struct ldb_message *msg2;
606         unsigned i, j;
607         int ret = LDB_SUCCESS, idx;
608
609         tdb_key = ltdb_key(module, msg->dn);
610         if (!tdb_key.dptr) {
611                 return LDB_ERR_OTHER;
612         }
613
614         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
615         if (!tdb_data.dptr) {
616                 talloc_free(tdb_key.dptr);
617                 return ltdb_err_map(tdb_error(ltdb->tdb));
618         }
619
620         msg2 = talloc(tdb_key.dptr, struct ldb_message);
621         if (msg2 == NULL) {
622                 free(tdb_data.dptr);
623                 ret = LDB_ERR_OTHER;
624                 goto done;
625         }
626
627         ret = ltdb_unpack_data(module, &tdb_data, msg2);
628         free(tdb_data.dptr);
629         if (ret == -1) {
630                 ret = LDB_ERR_OTHER;
631                 goto done;
632         }
633
634         if (!msg2->dn) {
635                 msg2->dn = msg->dn;
636         }
637
638         for (i=0; i<msg->num_elements; i++) {
639                 struct ldb_message_element *el = &msg->elements[i], *el2;
640                 struct ldb_val *vals;
641                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
642                 const char *dn;
643
644                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
645                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
646                                                ldb_dn_get_linearized(msg->dn));
647                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
648                         goto done;
649                 }
650
651                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
652                 case LDB_FLAG_MOD_ADD:
653                         if (el->num_values == 0) {
654                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
655                                                        el->name, ldb_dn_get_linearized(msg->dn));
656                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
657                                 goto done;
658                         }
659
660                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
661                                 if (el->num_values > 1) {
662                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
663                                                                el->name, ldb_dn_get_linearized(msg->dn));
664                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
665                                         goto done;
666                                 }
667                         }
668
669                         /* Checks if element already exists */
670                         idx = find_element(msg2, el->name);
671                         if (idx == -1) {
672                                 if (msg_add_element(ldb, msg2, el) != 0) {
673                                         ret = LDB_ERR_OTHER;
674                                         goto done;
675                                 }
676                         } else {
677                                 /* We cannot add another value on a existing one
678                                    if the attribute is single-valued */
679                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
680                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
681                                                                el->name, ldb_dn_get_linearized(msg->dn));
682                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
683                                         goto done;
684                                 }
685
686                                 el2 = &(msg2->elements[idx]);
687
688                                 /* Check that values don't exist yet on multi-
689                                    valued attributes or aren't provided twice */
690                                 for (j=0; j<el->num_values; j++) {
691                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
692                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
693                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
694                                                 goto done;
695                                         }
696                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
697                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
698                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
699                                                 goto done;
700                                         }
701                                 }
702
703                                 /* Now combine existing and new values to a new
704                                    attribute record */
705                                 vals = talloc_realloc(msg2->elements,
706                                         el2->values, struct ldb_val,
707                                         el2->num_values + el->num_values);
708                                 if (vals == NULL) {
709                                         ldb_oom(ldb);
710                                         ret = LDB_ERR_OTHER;
711                                         goto done;
712                                 }
713
714                                 for (j=0; j<el->num_values; j++) {
715                                         vals[el2->num_values + j] =
716                                                 ldb_val_dup(vals, &el->values[j]);
717                                 }
718
719                                 el2->values = vals;
720                                 el2->num_values += el->num_values;
721                         }
722
723                         break;
724
725                 case LDB_FLAG_MOD_REPLACE:
726                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
727                                 if (el->num_values > 1) {
728                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
729                                                                el->name, ldb_dn_get_linearized(msg->dn));
730                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
731                                         goto done;
732                                 }
733                         }
734
735                         for (j=0; j<el->num_values; j++) {
736                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
737                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
738                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
739                                         goto done;
740                                 }
741                         }
742
743                         /* Delete the attribute if it exists in the DB */
744                         msg_delete_attribute(module, ldb, msg2, el->name);
745
746                         /* Recreate it with the new values */
747                         if (msg_add_element(ldb, msg2, el) != 0) {
748                                 ret = LDB_ERR_OTHER;
749                                 goto done;
750                         }
751
752                         break;
753
754                 case LDB_FLAG_MOD_DELETE:
755                         dn = ldb_dn_get_linearized(msg->dn);
756                         if (dn == NULL) {
757                                 ret = LDB_ERR_OTHER;
758                                 goto done;
759                         }
760
761                         if (msg->elements[i].num_values == 0) {
762                                 /* Delete the whole attribute */
763                                 if (msg_delete_attribute(module, ldb, msg2,
764                                                          msg->elements[i].name) != 0) {
765                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
766                                                                msg->elements[i].name, dn);
767                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
768                                         goto done;
769                                 }
770                         } else {
771                                 /* Delete specified values from an attribute */
772                                 for (j=0; j < msg->elements[i].num_values; j++) {
773                                         if (msg_delete_element(module,
774                                                                msg2,
775                                                                msg->elements[i].name,
776                                                                &msg->elements[i].values[j]) != 0) {
777                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
778                                                                        msg->elements[i].name, dn);
779                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
780                                                 goto done;
781                                         }
782
783                                         ret = ltdb_index_del_value(module, dn,
784                                                 &msg->elements[i], j);
785                                         if (ret != LDB_SUCCESS) {
786                                                 goto done;
787                                         }
788                                 }
789                         }
790
791                         break;
792                 default:
793                         ldb_asprintf_errstring(ldb,
794                                 "Invalid ldb_modify flags on %s: 0x%x",
795                                 msg->elements[i].name,
796                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
797                         ret = LDB_ERR_PROTOCOL_ERROR;
798                         goto done;
799                 }
800         }
801
802         ret = ltdb_store(module, msg2, TDB_MODIFY);
803         if (ret != LDB_SUCCESS) {
804                 goto done;
805         }
806
807         ret = ltdb_modified(module, msg->dn);
808         if (ret != LDB_SUCCESS) {
809                 goto done;
810         }
811
812 done:
813         talloc_free(tdb_key.dptr);
814         return ret;
815 }
816
817 /*
818   modify a record
819 */
820 static int ltdb_modify(struct ltdb_context *ctx)
821 {
822         struct ldb_module *module = ctx->module;
823         struct ldb_request *req = ctx->req;
824         int ret = LDB_SUCCESS;
825
826         ret = ltdb_check_special_dn(module, req->op.mod.message);
827         if (ret != LDB_SUCCESS) {
828                 return ret;
829         }
830
831         ldb_request_set_state(req, LDB_ASYNC_PENDING);
832
833         if (ltdb_cache_load(module) != 0) {
834                 return LDB_ERR_OPERATIONS_ERROR;
835         }
836
837         ret = ltdb_modify_internal(module, req->op.mod.message);
838
839         return ret;
840 }
841
842 /*
843   rename a record
844 */
845 static int ltdb_rename(struct ltdb_context *ctx)
846 {
847         struct ldb_module *module = ctx->module;
848         struct ldb_request *req = ctx->req;
849         struct ldb_message *msg;
850         int ret = LDB_SUCCESS;
851
852         ldb_request_set_state(req, LDB_ASYNC_PENDING);
853
854         if (ltdb_cache_load(ctx->module) != 0) {
855                 return LDB_ERR_OPERATIONS_ERROR;
856         }
857
858         msg = talloc(ctx, struct ldb_message);
859         if (msg == NULL) {
860                 return LDB_ERR_OPERATIONS_ERROR;
861         }
862
863         /* in case any attribute of the message was indexed, we need
864            to fetch the old record */
865         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
866         if (ret != LDB_SUCCESS) {
867                 /* not finding the old record is an error */
868                 return ret;
869         }
870
871         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
872         if (msg->dn == NULL) {
873                 return LDB_ERR_OPERATIONS_ERROR;
874         }
875
876         /* Always delete first then add, to avoid conflicts with
877          * unique indexes. We rely on the transaction to make this
878          * atomic
879          */
880         ret = ltdb_delete_internal(module, req->op.rename.olddn);
881         if (ret != LDB_SUCCESS) {
882                 return ret;
883         }
884
885         ret = ltdb_add_internal(module, msg);
886
887         return ret;
888 }
889
890 static int ltdb_start_trans(struct ldb_module *module)
891 {
892         void *data = ldb_module_get_private(module);
893         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
894
895         if (tdb_transaction_start(ltdb->tdb) != 0) {
896                 return ltdb_err_map(tdb_error(ltdb->tdb));
897         }
898
899         ltdb->in_transaction++;
900
901         ltdb_index_transaction_start(module);
902
903         return LDB_SUCCESS;
904 }
905
906 static int ltdb_prepare_commit(struct ldb_module *module)
907 {
908         void *data = ldb_module_get_private(module);
909         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
910
911         if (ltdb->in_transaction != 1) {
912                 return LDB_SUCCESS;
913         }
914
915         if (ltdb_index_transaction_commit(module) != 0) {
916                 tdb_transaction_cancel(ltdb->tdb);
917                 ltdb->in_transaction--;
918                 return ltdb_err_map(tdb_error(ltdb->tdb));
919         }
920
921         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
922                 ltdb->in_transaction--;
923                 return ltdb_err_map(tdb_error(ltdb->tdb));
924         }
925
926         ltdb->prepared_commit = true;
927
928         return LDB_SUCCESS;
929 }
930
931 static int ltdb_end_trans(struct ldb_module *module)
932 {
933         void *data = ldb_module_get_private(module);
934         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
935
936         if (!ltdb->prepared_commit) {
937                 int ret = ltdb_prepare_commit(module);
938                 if (ret != LDB_SUCCESS) {
939                         return ret;
940                 }
941         }
942
943         ltdb->in_transaction--;
944         ltdb->prepared_commit = false;
945
946         if (tdb_transaction_commit(ltdb->tdb) != 0) {
947                 return ltdb_err_map(tdb_error(ltdb->tdb));
948         }
949
950         return LDB_SUCCESS;
951 }
952
953 static int ltdb_del_trans(struct ldb_module *module)
954 {
955         void *data = ldb_module_get_private(module);
956         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
957
958         ltdb->in_transaction--;
959
960         if (ltdb_index_transaction_cancel(module) != 0) {
961                 tdb_transaction_cancel(ltdb->tdb);
962                 return ltdb_err_map(tdb_error(ltdb->tdb));
963         }
964
965         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
966                 return ltdb_err_map(tdb_error(ltdb->tdb));
967         }
968
969         return LDB_SUCCESS;
970 }
971
972 /*
973   return sequenceNumber from @BASEINFO
974 */
975 static int ltdb_sequence_number(struct ltdb_context *ctx,
976                                 struct ldb_extended **ext)
977 {
978         struct ldb_context *ldb;
979         struct ldb_module *module = ctx->module;
980         struct ldb_request *req = ctx->req;
981         TALLOC_CTX *tmp_ctx;
982         struct ldb_seqnum_request *seq;
983         struct ldb_seqnum_result *res;
984         struct ldb_message *msg = NULL;
985         struct ldb_dn *dn;
986         const char *date;
987         int ret = LDB_SUCCESS;
988
989         ldb = ldb_module_get_ctx(module);
990
991         seq = talloc_get_type(req->op.extended.data,
992                                 struct ldb_seqnum_request);
993         if (seq == NULL) {
994                 return LDB_ERR_OPERATIONS_ERROR;
995         }
996
997         ldb_request_set_state(req, LDB_ASYNC_PENDING);
998
999         if (ltdb_lock_read(module) != 0) {
1000                 return LDB_ERR_OPERATIONS_ERROR;
1001         }
1002
1003         res = talloc_zero(req, struct ldb_seqnum_result);
1004         if (res == NULL) {
1005                 ret = LDB_ERR_OPERATIONS_ERROR;
1006                 goto done;
1007         }
1008         tmp_ctx = talloc_new(req);
1009         if (tmp_ctx == NULL) {
1010                 ret = LDB_ERR_OPERATIONS_ERROR;
1011                 goto done;
1012         }
1013
1014         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1015
1016         msg = talloc(tmp_ctx, struct ldb_message);
1017         if (msg == NULL) {
1018                 ret = LDB_ERR_OPERATIONS_ERROR;
1019                 goto done;
1020         }
1021
1022         ret = ltdb_search_dn1(module, dn, msg);
1023         if (ret != LDB_SUCCESS) {
1024                 goto done;
1025         }
1026
1027         switch (seq->type) {
1028         case LDB_SEQ_HIGHEST_SEQ:
1029                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1030                 break;
1031         case LDB_SEQ_NEXT:
1032                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1033                 res->seq_num++;
1034                 break;
1035         case LDB_SEQ_HIGHEST_TIMESTAMP:
1036                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1037                 if (date) {
1038                         res->seq_num = ldb_string_to_time(date);
1039                 } else {
1040                         res->seq_num = 0;
1041                         /* zero is as good as anything when we don't know */
1042                 }
1043                 break;
1044         }
1045
1046         *ext = talloc_zero(req, struct ldb_extended);
1047         if (*ext == NULL) {
1048                 ret = LDB_ERR_OPERATIONS_ERROR;
1049                 goto done;
1050         }
1051         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1052         (*ext)->data = talloc_steal(*ext, res);
1053
1054 done:
1055         talloc_free(tmp_ctx);
1056         ltdb_unlock_read(module);
1057         return ret;
1058 }
1059
1060 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1061 {
1062         struct ldb_context *ldb;
1063         struct ldb_request *req;
1064         struct ldb_reply *ares;
1065
1066         ldb = ldb_module_get_ctx(ctx->module);
1067         req = ctx->req;
1068
1069         /* if we already returned an error just return */
1070         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1071                 return;
1072         }
1073
1074         ares = talloc_zero(req, struct ldb_reply);
1075         if (!ares) {
1076                 ldb_oom(ldb);
1077                 req->callback(req, NULL);
1078                 return;
1079         }
1080         ares->type = LDB_REPLY_DONE;
1081         ares->error = error;
1082
1083         req->callback(req, ares);
1084 }
1085
1086 static void ltdb_timeout(struct tevent_context *ev,
1087                           struct tevent_timer *te,
1088                           struct timeval t,
1089                           void *private_data)
1090 {
1091         struct ltdb_context *ctx;
1092         ctx = talloc_get_type(private_data, struct ltdb_context);
1093
1094         if (!ctx->request_terminated) {
1095                 /* request is done now */
1096                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1097         }
1098
1099         if (!ctx->request_terminated) {
1100                 /* neutralize the spy */
1101                 ctx->spy->ctx = NULL;
1102         }
1103         talloc_free(ctx);
1104 }
1105
1106 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1107                                         struct ldb_extended *ext,
1108                                         int error)
1109 {
1110         struct ldb_context *ldb;
1111         struct ldb_request *req;
1112         struct ldb_reply *ares;
1113
1114         ldb = ldb_module_get_ctx(ctx->module);
1115         req = ctx->req;
1116
1117         /* if we already returned an error just return */
1118         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1119                 return;
1120         }
1121
1122         ares = talloc_zero(req, struct ldb_reply);
1123         if (!ares) {
1124                 ldb_oom(ldb);
1125                 req->callback(req, NULL);
1126                 return;
1127         }
1128         ares->type = LDB_REPLY_DONE;
1129         ares->response = ext;
1130         ares->error = error;
1131
1132         req->callback(req, ares);
1133 }
1134
1135 static void ltdb_handle_extended(struct ltdb_context *ctx)
1136 {
1137         struct ldb_extended *ext = NULL;
1138         int ret;
1139
1140         if (strcmp(ctx->req->op.extended.oid,
1141                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1142                 /* get sequence number */
1143                 ret = ltdb_sequence_number(ctx, &ext);
1144         } else {
1145                 /* not recognized */
1146                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1147         }
1148
1149         ltdb_request_extended_done(ctx, ext, ret);
1150 }
1151
1152 static void ltdb_callback(struct tevent_context *ev,
1153                           struct tevent_timer *te,
1154                           struct timeval t,
1155                           void *private_data)
1156 {
1157         struct ltdb_context *ctx;
1158         int ret;
1159
1160         ctx = talloc_get_type(private_data, struct ltdb_context);
1161
1162         if (ctx->request_terminated) {
1163                 goto done;
1164         }
1165
1166         switch (ctx->req->operation) {
1167         case LDB_SEARCH:
1168                 ret = ltdb_search(ctx);
1169                 break;
1170         case LDB_ADD:
1171                 ret = ltdb_add(ctx);
1172                 break;
1173         case LDB_MODIFY:
1174                 ret = ltdb_modify(ctx);
1175                 break;
1176         case LDB_DELETE:
1177                 ret = ltdb_delete(ctx);
1178                 break;
1179         case LDB_RENAME:
1180                 ret = ltdb_rename(ctx);
1181                 break;
1182         case LDB_EXTENDED:
1183                 ltdb_handle_extended(ctx);
1184                 goto done;
1185         default:
1186                 /* no other op supported */
1187                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1188         }
1189
1190         if (!ctx->request_terminated) {
1191                 /* request is done now */
1192                 ltdb_request_done(ctx, ret);
1193         }
1194
1195 done:
1196         if (!ctx->request_terminated) {
1197                 /* neutralize the spy */
1198                 ctx->spy->ctx = NULL;
1199         }
1200         talloc_free(ctx);
1201 }
1202
1203 static int ltdb_request_destructor(void *ptr)
1204 {
1205         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1206
1207         if (spy->ctx != NULL) {
1208                 spy->ctx->request_terminated = true;
1209         }
1210
1211         return 0;
1212 }
1213
1214 static int ltdb_handle_request(struct ldb_module *module,
1215                                struct ldb_request *req)
1216 {
1217         struct ldb_context *ldb;
1218         struct tevent_context *ev;
1219         struct ltdb_context *ac;
1220         struct tevent_timer *te;
1221         struct timeval tv;
1222
1223         if (check_critical_controls(req->controls)) {
1224                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1225         }
1226
1227         ldb = ldb_module_get_ctx(module);
1228
1229         if (req->starttime == 0 || req->timeout == 0) {
1230                 ldb_set_errstring(ldb, "Invalid timeout settings");
1231                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1232         }
1233
1234         ev = ldb_get_event_context(ldb);
1235
1236         ac = talloc_zero(ldb, struct ltdb_context);
1237         if (ac == NULL) {
1238                 ldb_oom(ldb);
1239                 return LDB_ERR_OPERATIONS_ERROR;
1240         }
1241
1242         ac->module = module;
1243         ac->req = req;
1244
1245         tv.tv_sec = 0;
1246         tv.tv_usec = 0;
1247         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1248         if (NULL == te) {
1249                 talloc_free(ac);
1250                 return LDB_ERR_OPERATIONS_ERROR;
1251         }
1252
1253         tv.tv_sec = req->starttime + req->timeout;
1254         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1255         if (NULL == ac->timeout_event) {
1256                 talloc_free(ac);
1257                 return LDB_ERR_OPERATIONS_ERROR;
1258         }
1259
1260         /* set a spy so that we do not try to use the request context
1261          * if it is freed before ltdb_callback fires */
1262         ac->spy = talloc(req, struct ltdb_req_spy);
1263         if (NULL == ac->spy) {
1264                 talloc_free(ac);
1265                 return LDB_ERR_OPERATIONS_ERROR;
1266         }
1267         ac->spy->ctx = ac;
1268
1269         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1270
1271         return LDB_SUCCESS;
1272 }
1273
1274 static const struct ldb_module_ops ltdb_ops = {
1275         .name              = "tdb",
1276         .search            = ltdb_handle_request,
1277         .add               = ltdb_handle_request,
1278         .modify            = ltdb_handle_request,
1279         .del               = ltdb_handle_request,
1280         .rename            = ltdb_handle_request,
1281         .extended          = ltdb_handle_request,
1282         .start_transaction = ltdb_start_trans,
1283         .end_transaction   = ltdb_end_trans,
1284         .prepare_commit    = ltdb_prepare_commit,
1285         .del_transaction   = ltdb_del_trans,
1286 };
1287
1288 /*
1289   connect to the database
1290 */
1291 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1292                         unsigned int flags, const char *options[],
1293                         struct ldb_module **_module)
1294 {
1295         struct ldb_module *module;
1296         const char *path;
1297         int tdb_flags, open_flags;
1298         struct ltdb_private *ltdb;
1299
1300         /* parse the url */
1301         if (strchr(url, ':')) {
1302                 if (strncmp(url, "tdb://", 6) != 0) {
1303                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1304                                   "Invalid tdb URL '%s'", url);
1305                         return -1;
1306                 }
1307                 path = url+6;
1308         } else {
1309                 path = url;
1310         }
1311
1312         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1313
1314         /* check for the 'nosync' option */
1315         if (flags & LDB_FLG_NOSYNC) {
1316                 tdb_flags |= TDB_NOSYNC;
1317         }
1318
1319         /* and nommap option */
1320         if (flags & LDB_FLG_NOMMAP) {
1321                 tdb_flags |= TDB_NOMMAP;
1322         }
1323
1324         if (flags & LDB_FLG_RDONLY) {
1325                 open_flags = O_RDONLY;
1326         } else {
1327                 open_flags = O_CREAT | O_RDWR;
1328         }
1329
1330         ltdb = talloc_zero(ldb, struct ltdb_private);
1331         if (!ltdb) {
1332                 ldb_oom(ldb);
1333                 return -1;
1334         }
1335
1336         /* note that we use quite a large default hash size */
1337         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1338                                    tdb_flags, open_flags,
1339                                    ldb_get_create_perms(ldb), ldb);
1340         if (!ltdb->tdb) {
1341                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1342                           "Unable to open tdb '%s'", path);
1343                 talloc_free(ltdb);
1344                 return -1;
1345         }
1346
1347         ltdb->sequence_number = 0;
1348
1349         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1350         if (!module) {
1351                 talloc_free(ltdb);
1352                 return -1;
1353         }
1354         ldb_module_set_private(module, ltdb);
1355
1356         if (ltdb_cache_load(module) != 0) {
1357                 talloc_free(module);
1358                 talloc_free(ltdb);
1359                 return -1;
1360         }
1361
1362         *_module = module;
1363         return 0;
1364 }
1365
1366 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1367         .name = "tdb",
1368         .connect_fn = ltdb_connect
1369 };