s4-ldb: fixed re-index during a complex transaction
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         if (ltdb->in_transaction == 0) {
92                 return tdb_lockall_read(ltdb->tdb);
93         }
94         return 0;
95 }
96
97 /*
98   unlock the database after a ltdb_lock_read()
99 */
100 int ltdb_unlock_read(struct ldb_module *module)
101 {
102         void *data = ldb_module_get_private(module);
103         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
104         if (ltdb->in_transaction == 0) {
105                 return tdb_unlockall_read(ltdb->tdb);
106         }
107         return 0;
108 }
109
110
111 /*
112   form a TDB_DATA for a record key
113   caller frees
114
115   note that the key for a record can depend on whether the
116   dn refers to a case sensitive index record or not
117 */
118 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
119 {
120         struct ldb_context *ldb = ldb_module_get_ctx(module);
121         TDB_DATA key;
122         char *key_str = NULL;
123         const char *dn_folded = NULL;
124
125         /*
126           most DNs are case insensitive. The exception is index DNs for
127           case sensitive attributes
128
129           there are 3 cases dealt with in this code:
130
131           1) if the dn doesn't start with @ then uppercase the attribute
132              names and the attributes values of case insensitive attributes
133           2) if the dn starts with @ then leave it alone -
134              the indexing code handles the rest
135         */
136
137         dn_folded = ldb_dn_get_casefold(dn);
138         if (!dn_folded) {
139                 goto failed;
140         }
141
142         key_str = talloc_strdup(ldb, "DN=");
143         if (!key_str) {
144                 goto failed;
145         }
146
147         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
148         if (!key_str) {
149                 goto failed;
150         }
151
152         key.dptr = (uint8_t *)key_str;
153         key.dsize = strlen(key_str) + 1;
154
155         return key;
156
157 failed:
158         errno = ENOMEM;
159         key.dptr = NULL;
160         key.dsize = 0;
161         return key;
162 }
163
164 /*
165   check special dn's have valid attributes
166   currently only @ATTRIBUTES is checked
167 */
168 static int ltdb_check_special_dn(struct ldb_module *module,
169                           const struct ldb_message *msg)
170 {
171         struct ldb_context *ldb = ldb_module_get_ctx(module);
172         int i, j;
173
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return LDB_SUCCESS;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
183
184                 for (j = 0; j < msg->elements[i].num_values; j++) {
185                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
186                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
187                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
188                         }
189                 }
190         }
191
192         return LDB_SUCCESS;
193 }
194
195
196 /*
197   we've made a modification to a dn - possibly reindex and
198   update sequence number
199 */
200 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
201 {
202         int ret = LDB_SUCCESS;
203
204         if (ldb_dn_is_special(dn) &&
205             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
206              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
207                 ret = ltdb_reindex(module);
208         }
209
210         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
211         if (ret == LDB_SUCCESS &&
212             !(ldb_dn_is_special(dn) &&
213               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
214                 ret = ltdb_increase_sequence_number(module);
215         }
216
217         /* If the modify was to @OPTIONS, reload the cache */
218         if (ldb_dn_is_special(dn) &&
219             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
220                 ret = ltdb_cache_reload(module);
221         }
222
223         return ret;
224 }
225
226 /*
227   store a record into the db
228 */
229 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
230 {
231         void *data = ldb_module_get_private(module);
232         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
233         TDB_DATA tdb_key, tdb_data;
234         int ret = LDB_SUCCESS;
235
236         tdb_key = ltdb_key(module, msg->dn);
237         if (tdb_key.dptr == NULL) {
238                 return LDB_ERR_OTHER;
239         }
240
241         ret = ltdb_pack_data(module, msg, &tdb_data);
242         if (ret == -1) {
243                 talloc_free(tdb_key.dptr);
244                 return LDB_ERR_OTHER;
245         }
246
247         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
248         if (ret == -1) {
249                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
250                 goto done;
251         }
252
253 done:
254         talloc_free(tdb_key.dptr);
255         talloc_free(tdb_data.dptr);
256
257         return ret;
258 }
259
260
261 static int ltdb_add_internal(struct ldb_module *module,
262                              const struct ldb_message *msg)
263 {
264         struct ldb_context *ldb = ldb_module_get_ctx(module);
265         int ret = LDB_SUCCESS, i;
266
267         ret = ltdb_check_special_dn(module, msg);
268         if (ret != LDB_SUCCESS) {
269                 return ret;
270         }
271
272         if (ltdb_cache_load(module) != 0) {
273                 return LDB_ERR_OPERATIONS_ERROR;
274         }
275
276         for (i=0;i<msg->num_elements;i++) {
277                 struct ldb_message_element *el = &msg->elements[i];
278                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
279
280                 if (el->num_values == 0) {
281                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
282                                                el->name, ldb_dn_get_linearized(msg->dn));
283                         return LDB_ERR_CONSTRAINT_VIOLATION;
284                 }
285                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
286                         if (el->num_values > 1) {
287                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
288                                                        el->name, ldb_dn_get_linearized(msg->dn));
289                                 return LDB_ERR_CONSTRAINT_VIOLATION;
290                         }
291                 }
292         }
293
294         ret = ltdb_store(module, msg, TDB_INSERT);
295         if (ret != LDB_SUCCESS) {
296                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
297                         ldb_asprintf_errstring(ldb,
298                                                "Entry %s already exists",
299                                                ldb_dn_get_linearized(msg->dn));
300                 }
301                 return ret;
302         }
303
304         ret = ltdb_index_add_new(module, msg);
305         if (ret != LDB_SUCCESS) {
306                 return ret;
307         }
308
309         ret = ltdb_modified(module, msg->dn);
310
311         return ret;
312 }
313
314 /*
315   add a record to the database
316 */
317 static int ltdb_add(struct ltdb_context *ctx)
318 {
319         struct ldb_module *module = ctx->module;
320         struct ldb_request *req = ctx->req;
321         int ret = LDB_SUCCESS;
322
323         ldb_request_set_state(req, LDB_ASYNC_PENDING);
324
325         if (ltdb_cache_load(module) != 0) {
326                 return LDB_ERR_OPERATIONS_ERROR;
327         }
328
329         ret = ltdb_add_internal(module, req->op.add.message);
330
331         return ret;
332 }
333
334 /*
335   delete a record from the database, not updating indexes (used for deleting
336   index records)
337 */
338 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
339 {
340         void *data = ldb_module_get_private(module);
341         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
342         TDB_DATA tdb_key;
343         int ret;
344
345         tdb_key = ltdb_key(module, dn);
346         if (!tdb_key.dptr) {
347                 return LDB_ERR_OTHER;
348         }
349
350         ret = tdb_delete(ltdb->tdb, tdb_key);
351         talloc_free(tdb_key.dptr);
352
353         if (ret != 0) {
354                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
355         }
356
357         return ret;
358 }
359
360 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
361 {
362         struct ldb_message *msg;
363         int ret = LDB_SUCCESS;
364
365         msg = talloc(module, struct ldb_message);
366         if (msg == NULL) {
367                 return LDB_ERR_OPERATIONS_ERROR;
368         }
369
370         /* in case any attribute of the message was indexed, we need
371            to fetch the old record */
372         ret = ltdb_search_dn1(module, dn, msg);
373         if (ret != LDB_SUCCESS) {
374                 /* not finding the old record is an error */
375                 goto done;
376         }
377
378         ret = ltdb_delete_noindex(module, dn);
379         if (ret != LDB_SUCCESS) {
380                 goto done;
381         }
382
383         /* remove any indexed attributes */
384         ret = ltdb_index_delete(module, msg);
385         if (ret != LDB_SUCCESS) {
386                 goto done;
387         }
388
389         ret = ltdb_modified(module, dn);
390         if (ret != LDB_SUCCESS) {
391                 goto done;
392         }
393
394 done:
395         talloc_free(msg);
396         return ret;
397 }
398
399 /*
400   delete a record from the database
401 */
402 static int ltdb_delete(struct ltdb_context *ctx)
403 {
404         struct ldb_module *module = ctx->module;
405         struct ldb_request *req = ctx->req;
406         int ret = LDB_SUCCESS;
407
408         ldb_request_set_state(req, LDB_ASYNC_PENDING);
409
410         if (ltdb_cache_load(module) != 0) {
411                 return LDB_ERR_OPERATIONS_ERROR;
412         }
413
414         ret = ltdb_delete_internal(module, req->op.del.dn);
415
416         return ret;
417 }
418
419 /*
420   find an element by attribute name. At the moment this does a linear search,
421   it should be re-coded to use a binary search once all places that modify
422   records guarantee sorted order
423
424   return the index of the first matching element if found, otherwise -1
425 */
426 static int find_element(const struct ldb_message *msg, const char *name)
427 {
428         unsigned int i;
429         for (i=0;i<msg->num_elements;i++) {
430                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
431                         return i;
432                 }
433         }
434         return -1;
435 }
436
437
438 /*
439   add an element to an existing record. Assumes a elements array that we
440   can call re-alloc on, and assumed that we can re-use the data pointers from
441   the passed in additional values. Use with care!
442
443   returns 0 on success, -1 on failure (and sets errno)
444 */
445 static int ltdb_msg_add_element(struct ldb_context *ldb,
446                                 struct ldb_message *msg,
447                                 struct ldb_message_element *el)
448 {
449         struct ldb_message_element *e2;
450         unsigned int i;
451
452         if (el->num_values == 0) {
453                 /* nothing to do here - we don't add empty elements */
454                 return 0;
455         }
456
457         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
458                               msg->num_elements+1);
459         if (!e2) {
460                 errno = ENOMEM;
461                 return -1;
462         }
463
464         msg->elements = e2;
465
466         e2 = &msg->elements[msg->num_elements];
467
468         e2->name = el->name;
469         e2->flags = el->flags;
470         e2->values = talloc_array(msg->elements,
471                                   struct ldb_val, el->num_values);
472         if (!e2->values) {
473                 errno = ENOMEM;
474                 return -1;
475         }
476         for (i=0;i<el->num_values;i++) {
477                 e2->values[i] = el->values[i];
478         }
479         e2->num_values = el->num_values;
480
481         ++msg->num_elements;
482
483         return 0;
484 }
485
486 /*
487   delete all elements having a specified attribute name
488 */
489 static int msg_delete_attribute(struct ldb_module *module,
490                                 struct ldb_context *ldb,
491                                 struct ldb_message *msg, const char *name)
492 {
493         const char *dn;
494         unsigned int i;
495         int ret;
496         struct ldb_message_element *el;
497
498         dn = ldb_dn_get_linearized(msg->dn);
499         if (dn == NULL) {
500                 return -1;
501         }
502
503         el = ldb_msg_find_element(msg, name);
504         if (el == NULL) {
505                 return -1;
506         }
507         i = el - msg->elements;
508
509         ret = ltdb_index_del_element(module, dn, el);
510         if (ret != LDB_SUCCESS) {
511                 return ret;
512         }
513
514         talloc_free(el->values);
515         if (msg->num_elements > (i+1)) {
516                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
517         }
518         msg->num_elements--;
519         msg->elements = talloc_realloc(msg, msg->elements,
520                                        struct ldb_message_element,
521                                        msg->num_elements);
522         return 0;
523 }
524
525 /*
526   delete all elements matching an attribute name/value
527
528   return 0 on success, -1 on failure
529 */
530 static int msg_delete_element(struct ldb_module *module,
531                               struct ldb_message *msg,
532                               const char *name,
533                               const struct ldb_val *val)
534 {
535         struct ldb_context *ldb = ldb_module_get_ctx(module);
536         unsigned int i;
537         int found, ret;
538         struct ldb_message_element *el;
539         const struct ldb_schema_attribute *a;
540
541         found = find_element(msg, name);
542         if (found == -1) {
543                 return -1;
544         }
545
546         el = &msg->elements[found];
547
548         a = ldb_schema_attribute_by_name(ldb, el->name);
549
550         for (i=0;i<el->num_values;i++) {
551                 if (a->syntax->comparison_fn(ldb, ldb,
552                                              &el->values[i], val) == 0) {
553                         if (el->num_values == 1) {
554                                 return msg_delete_attribute(module, ldb, msg, name);
555                         }
556
557                         ret = ltdb_index_del_value(module, ldb_dn_get_linearized(msg->dn), el, i);
558                         if (ret != LDB_SUCCESS) {
559                                 return -1;
560                         }
561
562                         if (i<el->num_values-1) {
563                                 memmove(&el->values[i], &el->values[i+1],
564                                         sizeof(el->values[i])*
565                                                 (el->num_values-(i+1)));
566                         }
567                         el->num_values--;
568
569                         /* per definition we find in a canonicalised message an
570                            attribute value only once. So we are finished here */
571                         return 0;
572                 }
573         }
574
575         /* Not found */
576         return -1;
577 }
578
579
580 /*
581   modify a record - internal interface
582
583   yuck - this is O(n^2). Luckily n is usually small so we probably
584   get away with it, but if we ever have really large attribute lists
585   then we'll need to look at this again
586 */
587 int ltdb_modify_internal(struct ldb_module *module,
588                          const struct ldb_message *msg)
589 {
590         struct ldb_context *ldb = ldb_module_get_ctx(module);
591         void *data = ldb_module_get_private(module);
592         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
593         TDB_DATA tdb_key, tdb_data;
594         struct ldb_message *msg2;
595         unsigned i, j;
596         int ret = LDB_SUCCESS, idx;
597
598         tdb_key = ltdb_key(module, msg->dn);
599         if (!tdb_key.dptr) {
600                 return LDB_ERR_OTHER;
601         }
602
603         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
604         if (!tdb_data.dptr) {
605                 talloc_free(tdb_key.dptr);
606                 return ltdb_err_map(tdb_error(ltdb->tdb));
607         }
608
609         msg2 = talloc(tdb_key.dptr, struct ldb_message);
610         if (msg2 == NULL) {
611                 free(tdb_data.dptr);
612                 ret = LDB_ERR_OTHER;
613                 goto done;
614         }
615
616         ret = ltdb_unpack_data(module, &tdb_data, msg2);
617         free(tdb_data.dptr);
618         if (ret == -1) {
619                 ret = LDB_ERR_OTHER;
620                 goto done;
621         }
622
623         if (!msg2->dn) {
624                 msg2->dn = msg->dn;
625         }
626
627         for (i=0; i<msg->num_elements; i++) {
628                 struct ldb_message_element *el = &msg->elements[i], *el2;
629                 struct ldb_val *vals;
630                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
631                 const char *dn;
632
633                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
634                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
635                                                ldb_dn_get_linearized(msg->dn));
636                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
637                         goto done;
638                 }
639
640                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
641                 case LDB_FLAG_MOD_ADD:
642                         if (el->num_values == 0) {
643                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
644                                                        el->name, ldb_dn_get_linearized(msg->dn));
645                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
646                                 goto done;
647                         }
648
649                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
650                                 if (el->num_values > 1) {
651                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
652                                                                el->name, ldb_dn_get_linearized(msg->dn));
653                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
654                                         goto done;
655                                 }
656                         }
657
658                         /* Checks if element already exists */
659                         idx = find_element(msg2, el->name);
660                         if (idx == -1) {
661                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
662                                         ret = LDB_ERR_OTHER;
663                                         goto done;
664                                 }
665                                 ret = ltdb_index_add_element(module, msg->dn, el);
666                                 if (ret != LDB_SUCCESS) {
667                                         goto done;
668                                 }
669                         } else {
670                                 /* We cannot add another value on a existing one
671                                    if the attribute is single-valued */
672                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
673                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
674                                                                el->name, ldb_dn_get_linearized(msg->dn));
675                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
676                                         goto done;
677                                 }
678
679                                 el2 = &(msg2->elements[idx]);
680
681                                 /* Check that values don't exist yet on multi-
682                                    valued attributes or aren't provided twice */
683                                 for (j=0; j<el->num_values; j++) {
684                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
685                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
686                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
687                                                 goto done;
688                                         }
689                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
690                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
691                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
692                                                 goto done;
693                                         }
694                                 }
695
696                                 /* Now combine existing and new values to a new
697                                    attribute record */
698                                 vals = talloc_realloc(msg2->elements,
699                                                       el2->values, struct ldb_val,
700                                                       el2->num_values + el->num_values);
701                                 if (vals == NULL) {
702                                         ldb_oom(ldb);
703                                         ret = LDB_ERR_OTHER;
704                                         goto done;
705                                 }
706
707                                 for (j=0; j<el->num_values; j++) {
708                                         vals[el2->num_values + j] =
709                                                 ldb_val_dup(vals, &el->values[j]);
710                                 }
711
712                                 el2->values = vals;
713                                 el2->num_values += el->num_values;
714
715                                 ret = ltdb_index_add_element(module, msg->dn, el);
716                                 if (ret != LDB_SUCCESS) {
717                                         goto done;
718                                 }
719                         }
720
721                         break;
722
723                 case LDB_FLAG_MOD_REPLACE:
724                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
725                                 if (el->num_values > 1) {
726                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
727                                                                el->name, ldb_dn_get_linearized(msg->dn));
728                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
729                                         goto done;
730                                 }
731                         }
732
733                         /* TODO: This is O(n^2) - replace with more efficient check */
734                         for (j=0; j<el->num_values; j++) {
735                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
736                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
737                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
738                                         goto done;
739                                 }
740                         }
741
742                         idx = find_element(msg2, el->name);
743                         if (idx != -1) {
744                                 el2 = &(msg2->elements[idx]);
745                                 if (ldb_msg_element_compare(el, el2) == 0) {
746                                         /* we are replacing with the same values */
747                                         continue;
748                                 }
749                         
750                                 /* Delete the attribute if it exists in the DB */
751                                 ret = msg_delete_attribute(module, ldb, msg2, el->name);
752                                 if (ret != LDB_SUCCESS) {
753                                         goto done;
754                                 }
755                         }
756
757                         /* Recreate it with the new values */
758                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
759                                 ret = LDB_ERR_OTHER;
760                                 goto done;
761                         }
762
763                         ret = ltdb_index_add_element(module, msg->dn, el);
764                         if (ret != LDB_SUCCESS) {
765                                 goto done;
766                         }
767
768                         break;
769
770                 case LDB_FLAG_MOD_DELETE:
771                         dn = ldb_dn_get_linearized(msg->dn);
772                         if (dn == NULL) {
773                                 ret = LDB_ERR_OTHER;
774                                 goto done;
775                         }
776
777                         if (msg->elements[i].num_values == 0) {
778                                 /* Delete the whole attribute */
779                                 if (msg_delete_attribute(module, ldb, msg2,
780                                                          msg->elements[i].name) != 0) {
781                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
782                                                                msg->elements[i].name, dn);
783                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
784                                         goto done;
785                                 }
786                         } else {
787                                 /* Delete specified values from an attribute */
788                                 for (j=0; j < msg->elements[i].num_values; j++) {
789                                         if (msg_delete_element(module,
790                                                                msg2,
791                                                                msg->elements[i].name,
792                                                                &msg->elements[i].values[j]) != 0) {
793                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
794                                                                        msg->elements[i].name, dn);
795                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
796                                                 goto done;
797                                         }
798                                 }
799                         }
800                         break;
801                 default:
802                         ldb_asprintf_errstring(ldb,
803                                 "Invalid ldb_modify flags on %s: 0x%x",
804                                 msg->elements[i].name,
805                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
806                         ret = LDB_ERR_PROTOCOL_ERROR;
807                         goto done;
808                 }
809         }
810
811         ret = ltdb_store(module, msg2, TDB_MODIFY);
812         if (ret != LDB_SUCCESS) {
813                 goto done;
814         }
815
816         ret = ltdb_modified(module, msg->dn);
817         if (ret != LDB_SUCCESS) {
818                 goto done;
819         }
820
821 done:
822         talloc_free(tdb_key.dptr);
823         return ret;
824 }
825
826 /*
827   modify a record
828 */
829 static int ltdb_modify(struct ltdb_context *ctx)
830 {
831         struct ldb_module *module = ctx->module;
832         struct ldb_request *req = ctx->req;
833         int ret = LDB_SUCCESS;
834
835         ret = ltdb_check_special_dn(module, req->op.mod.message);
836         if (ret != LDB_SUCCESS) {
837                 return ret;
838         }
839
840         ldb_request_set_state(req, LDB_ASYNC_PENDING);
841
842         if (ltdb_cache_load(module) != 0) {
843                 return LDB_ERR_OPERATIONS_ERROR;
844         }
845
846         ret = ltdb_modify_internal(module, req->op.mod.message);
847
848         return ret;
849 }
850
851 /*
852   rename a record
853 */
854 static int ltdb_rename(struct ltdb_context *ctx)
855 {
856         struct ldb_module *module = ctx->module;
857         struct ldb_request *req = ctx->req;
858         struct ldb_message *msg;
859         int ret = LDB_SUCCESS;
860
861         ldb_request_set_state(req, LDB_ASYNC_PENDING);
862
863         if (ltdb_cache_load(ctx->module) != 0) {
864                 return LDB_ERR_OPERATIONS_ERROR;
865         }
866
867         msg = talloc(ctx, struct ldb_message);
868         if (msg == NULL) {
869                 return LDB_ERR_OPERATIONS_ERROR;
870         }
871
872         /* in case any attribute of the message was indexed, we need
873            to fetch the old record */
874         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
875         if (ret != LDB_SUCCESS) {
876                 /* not finding the old record is an error */
877                 return ret;
878         }
879
880         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
881         if (msg->dn == NULL) {
882                 return LDB_ERR_OPERATIONS_ERROR;
883         }
884
885         /* Always delete first then add, to avoid conflicts with
886          * unique indexes. We rely on the transaction to make this
887          * atomic
888          */
889         ret = ltdb_delete_internal(module, req->op.rename.olddn);
890         if (ret != LDB_SUCCESS) {
891                 return ret;
892         }
893
894         ret = ltdb_add_internal(module, msg);
895
896         return ret;
897 }
898
899 static int ltdb_start_trans(struct ldb_module *module)
900 {
901         void *data = ldb_module_get_private(module);
902         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
903
904         if (tdb_transaction_start(ltdb->tdb) != 0) {
905                 return ltdb_err_map(tdb_error(ltdb->tdb));
906         }
907
908         ltdb->in_transaction++;
909
910         ltdb_index_transaction_start(module);
911
912         return LDB_SUCCESS;
913 }
914
915 static int ltdb_prepare_commit(struct ldb_module *module)
916 {
917         void *data = ldb_module_get_private(module);
918         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
919
920         if (ltdb->in_transaction != 1) {
921                 return LDB_SUCCESS;
922         }
923
924         if (ltdb_index_transaction_commit(module) != 0) {
925                 tdb_transaction_cancel(ltdb->tdb);
926                 ltdb->in_transaction--;
927                 return ltdb_err_map(tdb_error(ltdb->tdb));
928         }
929
930         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
931                 ltdb->in_transaction--;
932                 return ltdb_err_map(tdb_error(ltdb->tdb));
933         }
934
935         ltdb->prepared_commit = true;
936
937         return LDB_SUCCESS;
938 }
939
940 static int ltdb_end_trans(struct ldb_module *module)
941 {
942         void *data = ldb_module_get_private(module);
943         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
944
945         if (!ltdb->prepared_commit) {
946                 int ret = ltdb_prepare_commit(module);
947                 if (ret != LDB_SUCCESS) {
948                         return ret;
949                 }
950         }
951
952         ltdb->in_transaction--;
953         ltdb->prepared_commit = false;
954
955         if (tdb_transaction_commit(ltdb->tdb) != 0) {
956                 return ltdb_err_map(tdb_error(ltdb->tdb));
957         }
958
959         return LDB_SUCCESS;
960 }
961
962 static int ltdb_del_trans(struct ldb_module *module)
963 {
964         void *data = ldb_module_get_private(module);
965         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
966
967         ltdb->in_transaction--;
968
969         if (ltdb_index_transaction_cancel(module) != 0) {
970                 tdb_transaction_cancel(ltdb->tdb);
971                 return ltdb_err_map(tdb_error(ltdb->tdb));
972         }
973
974         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
975                 return ltdb_err_map(tdb_error(ltdb->tdb));
976         }
977
978         return LDB_SUCCESS;
979 }
980
981 /*
982   return sequenceNumber from @BASEINFO
983 */
984 static int ltdb_sequence_number(struct ltdb_context *ctx,
985                                 struct ldb_extended **ext)
986 {
987         struct ldb_context *ldb;
988         struct ldb_module *module = ctx->module;
989         struct ldb_request *req = ctx->req;
990         TALLOC_CTX *tmp_ctx;
991         struct ldb_seqnum_request *seq;
992         struct ldb_seqnum_result *res;
993         struct ldb_message *msg = NULL;
994         struct ldb_dn *dn;
995         const char *date;
996         int ret = LDB_SUCCESS;
997
998         ldb = ldb_module_get_ctx(module);
999
1000         seq = talloc_get_type(req->op.extended.data,
1001                                 struct ldb_seqnum_request);
1002         if (seq == NULL) {
1003                 return LDB_ERR_OPERATIONS_ERROR;
1004         }
1005
1006         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1007
1008         if (ltdb_lock_read(module) != 0) {
1009                 return LDB_ERR_OPERATIONS_ERROR;
1010         }
1011
1012         res = talloc_zero(req, struct ldb_seqnum_result);
1013         if (res == NULL) {
1014                 ret = LDB_ERR_OPERATIONS_ERROR;
1015                 goto done;
1016         }
1017         tmp_ctx = talloc_new(req);
1018         if (tmp_ctx == NULL) {
1019                 ret = LDB_ERR_OPERATIONS_ERROR;
1020                 goto done;
1021         }
1022
1023         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1024
1025         msg = talloc(tmp_ctx, struct ldb_message);
1026         if (msg == NULL) {
1027                 ret = LDB_ERR_OPERATIONS_ERROR;
1028                 goto done;
1029         }
1030
1031         ret = ltdb_search_dn1(module, dn, msg);
1032         if (ret != LDB_SUCCESS) {
1033                 goto done;
1034         }
1035
1036         switch (seq->type) {
1037         case LDB_SEQ_HIGHEST_SEQ:
1038                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1039                 break;
1040         case LDB_SEQ_NEXT:
1041                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1042                 res->seq_num++;
1043                 break;
1044         case LDB_SEQ_HIGHEST_TIMESTAMP:
1045                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1046                 if (date) {
1047                         res->seq_num = ldb_string_to_time(date);
1048                 } else {
1049                         res->seq_num = 0;
1050                         /* zero is as good as anything when we don't know */
1051                 }
1052                 break;
1053         }
1054
1055         *ext = talloc_zero(req, struct ldb_extended);
1056         if (*ext == NULL) {
1057                 ret = LDB_ERR_OPERATIONS_ERROR;
1058                 goto done;
1059         }
1060         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1061         (*ext)->data = talloc_steal(*ext, res);
1062
1063 done:
1064         talloc_free(tmp_ctx);
1065         ltdb_unlock_read(module);
1066         return ret;
1067 }
1068
1069 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1070 {
1071         struct ldb_context *ldb;
1072         struct ldb_request *req;
1073         struct ldb_reply *ares;
1074
1075         ldb = ldb_module_get_ctx(ctx->module);
1076         req = ctx->req;
1077
1078         /* if we already returned an error just return */
1079         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1080                 return;
1081         }
1082
1083         ares = talloc_zero(req, struct ldb_reply);
1084         if (!ares) {
1085                 ldb_oom(ldb);
1086                 req->callback(req, NULL);
1087                 return;
1088         }
1089         ares->type = LDB_REPLY_DONE;
1090         ares->error = error;
1091
1092         req->callback(req, ares);
1093 }
1094
1095 static void ltdb_timeout(struct tevent_context *ev,
1096                           struct tevent_timer *te,
1097                           struct timeval t,
1098                           void *private_data)
1099 {
1100         struct ltdb_context *ctx;
1101         ctx = talloc_get_type(private_data, struct ltdb_context);
1102
1103         if (!ctx->request_terminated) {
1104                 /* request is done now */
1105                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1106         }
1107
1108         if (!ctx->request_terminated) {
1109                 /* neutralize the spy */
1110                 ctx->spy->ctx = NULL;
1111         }
1112         talloc_free(ctx);
1113 }
1114
1115 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1116                                         struct ldb_extended *ext,
1117                                         int error)
1118 {
1119         struct ldb_context *ldb;
1120         struct ldb_request *req;
1121         struct ldb_reply *ares;
1122
1123         ldb = ldb_module_get_ctx(ctx->module);
1124         req = ctx->req;
1125
1126         /* if we already returned an error just return */
1127         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1128                 return;
1129         }
1130
1131         ares = talloc_zero(req, struct ldb_reply);
1132         if (!ares) {
1133                 ldb_oom(ldb);
1134                 req->callback(req, NULL);
1135                 return;
1136         }
1137         ares->type = LDB_REPLY_DONE;
1138         ares->response = ext;
1139         ares->error = error;
1140
1141         req->callback(req, ares);
1142 }
1143
1144 static void ltdb_handle_extended(struct ltdb_context *ctx)
1145 {
1146         struct ldb_extended *ext = NULL;
1147         int ret;
1148
1149         if (strcmp(ctx->req->op.extended.oid,
1150                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1151                 /* get sequence number */
1152                 ret = ltdb_sequence_number(ctx, &ext);
1153         } else {
1154                 /* not recognized */
1155                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1156         }
1157
1158         ltdb_request_extended_done(ctx, ext, ret);
1159 }
1160
1161 static void ltdb_callback(struct tevent_context *ev,
1162                           struct tevent_timer *te,
1163                           struct timeval t,
1164                           void *private_data)
1165 {
1166         struct ltdb_context *ctx;
1167         int ret;
1168
1169         ctx = talloc_get_type(private_data, struct ltdb_context);
1170
1171         if (ctx->request_terminated) {
1172                 goto done;
1173         }
1174
1175         switch (ctx->req->operation) {
1176         case LDB_SEARCH:
1177                 ret = ltdb_search(ctx);
1178                 break;
1179         case LDB_ADD:
1180                 ret = ltdb_add(ctx);
1181                 break;
1182         case LDB_MODIFY:
1183                 ret = ltdb_modify(ctx);
1184                 break;
1185         case LDB_DELETE:
1186                 ret = ltdb_delete(ctx);
1187                 break;
1188         case LDB_RENAME:
1189                 ret = ltdb_rename(ctx);
1190                 break;
1191         case LDB_EXTENDED:
1192                 ltdb_handle_extended(ctx);
1193                 goto done;
1194         default:
1195                 /* no other op supported */
1196                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1197         }
1198
1199         if (!ctx->request_terminated) {
1200                 /* request is done now */
1201                 ltdb_request_done(ctx, ret);
1202         }
1203
1204 done:
1205         if (!ctx->request_terminated) {
1206                 /* neutralize the spy */
1207                 ctx->spy->ctx = NULL;
1208         }
1209         talloc_free(ctx);
1210 }
1211
1212 static int ltdb_request_destructor(void *ptr)
1213 {
1214         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1215
1216         if (spy->ctx != NULL) {
1217                 spy->ctx->request_terminated = true;
1218         }
1219
1220         return 0;
1221 }
1222
1223 static int ltdb_handle_request(struct ldb_module *module,
1224                                struct ldb_request *req)
1225 {
1226         struct ldb_context *ldb;
1227         struct tevent_context *ev;
1228         struct ltdb_context *ac;
1229         struct tevent_timer *te;
1230         struct timeval tv;
1231
1232         if (check_critical_controls(req->controls)) {
1233                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1234         }
1235
1236         ldb = ldb_module_get_ctx(module);
1237
1238         if (req->starttime == 0 || req->timeout == 0) {
1239                 ldb_set_errstring(ldb, "Invalid timeout settings");
1240                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1241         }
1242
1243         ev = ldb_get_event_context(ldb);
1244
1245         ac = talloc_zero(ldb, struct ltdb_context);
1246         if (ac == NULL) {
1247                 ldb_oom(ldb);
1248                 return LDB_ERR_OPERATIONS_ERROR;
1249         }
1250
1251         ac->module = module;
1252         ac->req = req;
1253
1254         tv.tv_sec = 0;
1255         tv.tv_usec = 0;
1256         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1257         if (NULL == te) {
1258                 talloc_free(ac);
1259                 return LDB_ERR_OPERATIONS_ERROR;
1260         }
1261
1262         tv.tv_sec = req->starttime + req->timeout;
1263         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1264         if (NULL == ac->timeout_event) {
1265                 talloc_free(ac);
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268
1269         /* set a spy so that we do not try to use the request context
1270          * if it is freed before ltdb_callback fires */
1271         ac->spy = talloc(req, struct ltdb_req_spy);
1272         if (NULL == ac->spy) {
1273                 talloc_free(ac);
1274                 return LDB_ERR_OPERATIONS_ERROR;
1275         }
1276         ac->spy->ctx = ac;
1277
1278         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1279
1280         return LDB_SUCCESS;
1281 }
1282
1283 static const struct ldb_module_ops ltdb_ops = {
1284         .name              = "tdb",
1285         .search            = ltdb_handle_request,
1286         .add               = ltdb_handle_request,
1287         .modify            = ltdb_handle_request,
1288         .del               = ltdb_handle_request,
1289         .rename            = ltdb_handle_request,
1290         .extended          = ltdb_handle_request,
1291         .start_transaction = ltdb_start_trans,
1292         .end_transaction   = ltdb_end_trans,
1293         .prepare_commit    = ltdb_prepare_commit,
1294         .del_transaction   = ltdb_del_trans,
1295 };
1296
1297 /*
1298   connect to the database
1299 */
1300 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1301                         unsigned int flags, const char *options[],
1302                         struct ldb_module **_module)
1303 {
1304         struct ldb_module *module;
1305         const char *path;
1306         int tdb_flags, open_flags;
1307         struct ltdb_private *ltdb;
1308
1309         /* parse the url */
1310         if (strchr(url, ':')) {
1311                 if (strncmp(url, "tdb://", 6) != 0) {
1312                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1313                                   "Invalid tdb URL '%s'", url);
1314                         return -1;
1315                 }
1316                 path = url+6;
1317         } else {
1318                 path = url;
1319         }
1320
1321         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1322
1323         /* check for the 'nosync' option */
1324         if (flags & LDB_FLG_NOSYNC) {
1325                 tdb_flags |= TDB_NOSYNC;
1326         }
1327
1328         /* and nommap option */
1329         if (flags & LDB_FLG_NOMMAP) {
1330                 tdb_flags |= TDB_NOMMAP;
1331         }
1332
1333         if (flags & LDB_FLG_RDONLY) {
1334                 open_flags = O_RDONLY;
1335         } else {
1336                 open_flags = O_CREAT | O_RDWR;
1337         }
1338
1339         ltdb = talloc_zero(ldb, struct ltdb_private);
1340         if (!ltdb) {
1341                 ldb_oom(ldb);
1342                 return -1;
1343         }
1344
1345         /* note that we use quite a large default hash size */
1346         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1347                                    tdb_flags, open_flags,
1348                                    ldb_get_create_perms(ldb), ldb);
1349         if (!ltdb->tdb) {
1350                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1351                           "Unable to open tdb '%s'", path);
1352                 talloc_free(ltdb);
1353                 return -1;
1354         }
1355
1356         ltdb->sequence_number = 0;
1357
1358         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1359         if (!module) {
1360                 talloc_free(ltdb);
1361                 return -1;
1362         }
1363         ldb_module_set_private(module, ltdb);
1364         talloc_steal(module, ltdb);
1365
1366         if (ltdb_cache_load(module) != 0) {
1367                 talloc_free(module);
1368                 talloc_free(ltdb);
1369                 return -1;
1370         }
1371
1372         *_module = module;
1373         return 0;
1374 }
1375
1376 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1377         .name = "tdb",
1378         .connect_fn = ltdb_connect
1379 };