Merge branch 'master' of ssh://git.samba.org/data/git/samba
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         if (ltdb->in_transaction == 0) {
92                 return tdb_lockall_read(ltdb->tdb);
93         }
94         return 0;
95 }
96
97 /*
98   unlock the database after a ltdb_lock_read()
99 */
100 int ltdb_unlock_read(struct ldb_module *module)
101 {
102         void *data = ldb_module_get_private(module);
103         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
104         if (ltdb->in_transaction == 0) {
105                 return tdb_unlockall_read(ltdb->tdb);
106         }
107         return 0;
108 }
109
110
111 /*
112   form a TDB_DATA for a record key
113   caller frees
114
115   note that the key for a record can depend on whether the
116   dn refers to a case sensitive index record or not
117 */
118 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
119 {
120         struct ldb_context *ldb = ldb_module_get_ctx(module);
121         TDB_DATA key;
122         char *key_str = NULL;
123         const char *dn_folded = NULL;
124
125         /*
126           most DNs are case insensitive. The exception is index DNs for
127           case sensitive attributes
128
129           there are 3 cases dealt with in this code:
130
131           1) if the dn doesn't start with @ then uppercase the attribute
132              names and the attributes values of case insensitive attributes
133           2) if the dn starts with @ then leave it alone -
134              the indexing code handles the rest
135         */
136
137         dn_folded = ldb_dn_get_casefold(dn);
138         if (!dn_folded) {
139                 goto failed;
140         }
141
142         key_str = talloc_strdup(ldb, "DN=");
143         if (!key_str) {
144                 goto failed;
145         }
146
147         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
148         if (!key_str) {
149                 goto failed;
150         }
151
152         key.dptr = (uint8_t *)key_str;
153         key.dsize = strlen(key_str) + 1;
154
155         return key;
156
157 failed:
158         errno = ENOMEM;
159         key.dptr = NULL;
160         key.dsize = 0;
161         return key;
162 }
163
164 /*
165   check special dn's have valid attributes
166   currently only @ATTRIBUTES is checked
167 */
168 static int ltdb_check_special_dn(struct ldb_module *module,
169                           const struct ldb_message *msg)
170 {
171         struct ldb_context *ldb = ldb_module_get_ctx(module);
172         int i, j;
173
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return LDB_SUCCESS;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
183
184                 for (j = 0; j < msg->elements[i].num_values; j++) {
185                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
186                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
187                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
188                         }
189                 }
190         }
191
192         return LDB_SUCCESS;
193 }
194
195
196 /*
197   we've made a modification to a dn - possibly reindex and
198   update sequence number
199 */
200 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
201 {
202         int ret = LDB_SUCCESS;
203         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
204
205         /* only allow modifies inside a transaction, otherwise the
206          * ldb is unsafe */
207         if (ltdb->in_transaction == 0) {
208                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
209                 return LDB_ERR_OPERATIONS_ERROR;
210         }
211
212         if (ldb_dn_is_special(dn) &&
213             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
214              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
215                 ret = ltdb_reindex(module);
216         }
217
218         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
219         if (ret == LDB_SUCCESS &&
220             !(ldb_dn_is_special(dn) &&
221               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
222                 ret = ltdb_increase_sequence_number(module);
223         }
224
225         /* If the modify was to @OPTIONS, reload the cache */
226         if (ldb_dn_is_special(dn) &&
227             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
228                 ret = ltdb_cache_reload(module);
229         }
230
231         return ret;
232 }
233
234 /*
235   store a record into the db
236 */
237 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
238 {
239         void *data = ldb_module_get_private(module);
240         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
241         TDB_DATA tdb_key, tdb_data;
242         int ret = LDB_SUCCESS;
243
244         tdb_key = ltdb_key(module, msg->dn);
245         if (tdb_key.dptr == NULL) {
246                 return LDB_ERR_OTHER;
247         }
248
249         ret = ltdb_pack_data(module, msg, &tdb_data);
250         if (ret == -1) {
251                 talloc_free(tdb_key.dptr);
252                 return LDB_ERR_OTHER;
253         }
254
255         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
256         if (ret == -1) {
257                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
258                 goto done;
259         }
260
261 done:
262         talloc_free(tdb_key.dptr);
263         talloc_free(tdb_data.dptr);
264
265         return ret;
266 }
267
268
269 static int ltdb_add_internal(struct ldb_module *module,
270                              const struct ldb_message *msg)
271 {
272         struct ldb_context *ldb = ldb_module_get_ctx(module);
273         int ret = LDB_SUCCESS, i;
274
275         ret = ltdb_check_special_dn(module, msg);
276         if (ret != LDB_SUCCESS) {
277                 return ret;
278         }
279
280         if (ltdb_cache_load(module) != 0) {
281                 return LDB_ERR_OPERATIONS_ERROR;
282         }
283
284         for (i=0;i<msg->num_elements;i++) {
285                 struct ldb_message_element *el = &msg->elements[i];
286                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
287
288                 if (el->num_values == 0) {
289                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
290                                                el->name, ldb_dn_get_linearized(msg->dn));
291                         return LDB_ERR_CONSTRAINT_VIOLATION;
292                 }
293                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
294                         if (el->num_values > 1) {
295                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
296                                                        el->name, ldb_dn_get_linearized(msg->dn));
297                                 return LDB_ERR_CONSTRAINT_VIOLATION;
298                         }
299                 }
300         }
301
302         ret = ltdb_store(module, msg, TDB_INSERT);
303         if (ret != LDB_SUCCESS) {
304                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
305                         ldb_asprintf_errstring(ldb,
306                                                "Entry %s already exists",
307                                                ldb_dn_get_linearized(msg->dn));
308                 }
309                 return ret;
310         }
311
312         ret = ltdb_index_add_new(module, msg);
313         if (ret != LDB_SUCCESS) {
314                 return ret;
315         }
316
317         ret = ltdb_modified(module, msg->dn);
318
319         return ret;
320 }
321
322 /*
323   add a record to the database
324 */
325 static int ltdb_add(struct ltdb_context *ctx)
326 {
327         struct ldb_module *module = ctx->module;
328         struct ldb_request *req = ctx->req;
329         int ret = LDB_SUCCESS;
330
331         ldb_request_set_state(req, LDB_ASYNC_PENDING);
332
333         if (ltdb_cache_load(module) != 0) {
334                 return LDB_ERR_OPERATIONS_ERROR;
335         }
336
337         ret = ltdb_add_internal(module, req->op.add.message);
338
339         return ret;
340 }
341
342 /*
343   delete a record from the database, not updating indexes (used for deleting
344   index records)
345 */
346 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
347 {
348         void *data = ldb_module_get_private(module);
349         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
350         TDB_DATA tdb_key;
351         int ret;
352
353         tdb_key = ltdb_key(module, dn);
354         if (!tdb_key.dptr) {
355                 return LDB_ERR_OTHER;
356         }
357
358         ret = tdb_delete(ltdb->tdb, tdb_key);
359         talloc_free(tdb_key.dptr);
360
361         if (ret != 0) {
362                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
363         }
364
365         return ret;
366 }
367
368 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
369 {
370         struct ldb_message *msg;
371         int ret = LDB_SUCCESS;
372
373         msg = talloc(module, struct ldb_message);
374         if (msg == NULL) {
375                 return LDB_ERR_OPERATIONS_ERROR;
376         }
377
378         /* in case any attribute of the message was indexed, we need
379            to fetch the old record */
380         ret = ltdb_search_dn1(module, dn, msg);
381         if (ret != LDB_SUCCESS) {
382                 /* not finding the old record is an error */
383                 goto done;
384         }
385
386         ret = ltdb_delete_noindex(module, dn);
387         if (ret != LDB_SUCCESS) {
388                 goto done;
389         }
390
391         /* remove any indexed attributes */
392         ret = ltdb_index_delete(module, msg);
393         if (ret != LDB_SUCCESS) {
394                 goto done;
395         }
396
397         ret = ltdb_modified(module, dn);
398         if (ret != LDB_SUCCESS) {
399                 goto done;
400         }
401
402 done:
403         talloc_free(msg);
404         return ret;
405 }
406
407 /*
408   delete a record from the database
409 */
410 static int ltdb_delete(struct ltdb_context *ctx)
411 {
412         struct ldb_module *module = ctx->module;
413         struct ldb_request *req = ctx->req;
414         int ret = LDB_SUCCESS;
415
416         ldb_request_set_state(req, LDB_ASYNC_PENDING);
417
418         if (ltdb_cache_load(module) != 0) {
419                 return LDB_ERR_OPERATIONS_ERROR;
420         }
421
422         ret = ltdb_delete_internal(module, req->op.del.dn);
423
424         return ret;
425 }
426
427 /*
428   find an element by attribute name. At the moment this does a linear search,
429   it should be re-coded to use a binary search once all places that modify
430   records guarantee sorted order
431
432   return the index of the first matching element if found, otherwise -1
433 */
434 static int find_element(const struct ldb_message *msg, const char *name)
435 {
436         unsigned int i;
437         for (i=0;i<msg->num_elements;i++) {
438                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
439                         return i;
440                 }
441         }
442         return -1;
443 }
444
445
446 /*
447   add an element to an existing record. Assumes a elements array that we
448   can call re-alloc on, and assumed that we can re-use the data pointers from
449   the passed in additional values. Use with care!
450
451   returns 0 on success, -1 on failure (and sets errno)
452 */
453 static int ltdb_msg_add_element(struct ldb_context *ldb,
454                                 struct ldb_message *msg,
455                                 struct ldb_message_element *el)
456 {
457         struct ldb_message_element *e2;
458         unsigned int i;
459
460         if (el->num_values == 0) {
461                 /* nothing to do here - we don't add empty elements */
462                 return 0;
463         }
464
465         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
466                               msg->num_elements+1);
467         if (!e2) {
468                 errno = ENOMEM;
469                 return -1;
470         }
471
472         msg->elements = e2;
473
474         e2 = &msg->elements[msg->num_elements];
475
476         e2->name = el->name;
477         e2->flags = el->flags;
478         e2->values = talloc_array(msg->elements,
479                                   struct ldb_val, el->num_values);
480         if (!e2->values) {
481                 errno = ENOMEM;
482                 return -1;
483         }
484         for (i=0;i<el->num_values;i++) {
485                 e2->values[i] = el->values[i];
486         }
487         e2->num_values = el->num_values;
488
489         ++msg->num_elements;
490
491         return 0;
492 }
493
494 /*
495   delete all elements having a specified attribute name
496 */
497 static int msg_delete_attribute(struct ldb_module *module,
498                                 struct ldb_context *ldb,
499                                 struct ldb_message *msg, const char *name)
500 {
501         const char *dn;
502         unsigned int i;
503         int ret;
504         struct ldb_message_element *el;
505
506         dn = ldb_dn_get_linearized(msg->dn);
507         if (dn == NULL) {
508                 return -1;
509         }
510
511         el = ldb_msg_find_element(msg, name);
512         if (el == NULL) {
513                 return -1;
514         }
515         i = el - msg->elements;
516
517         ret = ltdb_index_del_element(module, dn, el);
518         if (ret != LDB_SUCCESS) {
519                 return ret;
520         }
521
522         talloc_free(el->values);
523         if (msg->num_elements > (i+1)) {
524                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
525         }
526         msg->num_elements--;
527         msg->elements = talloc_realloc(msg, msg->elements,
528                                        struct ldb_message_element,
529                                        msg->num_elements);
530         return 0;
531 }
532
533 /*
534   delete all elements matching an attribute name/value
535
536   return 0 on success, -1 on failure
537 */
538 static int msg_delete_element(struct ldb_module *module,
539                               struct ldb_message *msg,
540                               const char *name,
541                               const struct ldb_val *val)
542 {
543         struct ldb_context *ldb = ldb_module_get_ctx(module);
544         unsigned int i;
545         int found, ret;
546         struct ldb_message_element *el;
547         const struct ldb_schema_attribute *a;
548
549         found = find_element(msg, name);
550         if (found == -1) {
551                 return -1;
552         }
553
554         el = &msg->elements[found];
555
556         a = ldb_schema_attribute_by_name(ldb, el->name);
557
558         for (i=0;i<el->num_values;i++) {
559                 if (a->syntax->comparison_fn(ldb, ldb,
560                                              &el->values[i], val) == 0) {
561                         if (el->num_values == 1) {
562                                 return msg_delete_attribute(module, ldb, msg, name);
563                         }
564
565                         ret = ltdb_index_del_value(module, ldb_dn_get_linearized(msg->dn), el, i);
566                         if (ret != LDB_SUCCESS) {
567                                 return -1;
568                         }
569
570                         if (i<el->num_values-1) {
571                                 memmove(&el->values[i], &el->values[i+1],
572                                         sizeof(el->values[i])*
573                                                 (el->num_values-(i+1)));
574                         }
575                         el->num_values--;
576
577                         /* per definition we find in a canonicalised message an
578                            attribute value only once. So we are finished here */
579                         return 0;
580                 }
581         }
582
583         /* Not found */
584         return -1;
585 }
586
587
588 /*
589   modify a record - internal interface
590
591   yuck - this is O(n^2). Luckily n is usually small so we probably
592   get away with it, but if we ever have really large attribute lists
593   then we'll need to look at this again
594 */
595 int ltdb_modify_internal(struct ldb_module *module,
596                          const struct ldb_message *msg)
597 {
598         struct ldb_context *ldb = ldb_module_get_ctx(module);
599         void *data = ldb_module_get_private(module);
600         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
601         TDB_DATA tdb_key, tdb_data;
602         struct ldb_message *msg2;
603         unsigned i, j;
604         int ret = LDB_SUCCESS, idx;
605
606         tdb_key = ltdb_key(module, msg->dn);
607         if (!tdb_key.dptr) {
608                 return LDB_ERR_OTHER;
609         }
610
611         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
612         if (!tdb_data.dptr) {
613                 talloc_free(tdb_key.dptr);
614                 return ltdb_err_map(tdb_error(ltdb->tdb));
615         }
616
617         msg2 = talloc(tdb_key.dptr, struct ldb_message);
618         if (msg2 == NULL) {
619                 free(tdb_data.dptr);
620                 ret = LDB_ERR_OTHER;
621                 goto done;
622         }
623
624         ret = ltdb_unpack_data(module, &tdb_data, msg2);
625         free(tdb_data.dptr);
626         if (ret == -1) {
627                 ret = LDB_ERR_OTHER;
628                 goto done;
629         }
630
631         if (!msg2->dn) {
632                 msg2->dn = msg->dn;
633         }
634
635         for (i=0; i<msg->num_elements; i++) {
636                 struct ldb_message_element *el = &msg->elements[i], *el2;
637                 struct ldb_val *vals;
638                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
639                 const char *dn;
640
641                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
642                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
643                                                ldb_dn_get_linearized(msg->dn));
644                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
645                         goto done;
646                 }
647
648                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
649                 case LDB_FLAG_MOD_ADD:
650                         if (el->num_values == 0) {
651                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
652                                                        el->name, ldb_dn_get_linearized(msg->dn));
653                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
654                                 goto done;
655                         }
656
657                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
658                                 if (el->num_values > 1) {
659                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
660                                                                el->name, ldb_dn_get_linearized(msg->dn));
661                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
662                                         goto done;
663                                 }
664                         }
665
666                         /* Checks if element already exists */
667                         idx = find_element(msg2, el->name);
668                         if (idx == -1) {
669                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
670                                         ret = LDB_ERR_OTHER;
671                                         goto done;
672                                 }
673                                 ret = ltdb_index_add_element(module, msg->dn, el);
674                                 if (ret != LDB_SUCCESS) {
675                                         goto done;
676                                 }
677                         } else {
678                                 /* We cannot add another value on a existing one
679                                    if the attribute is single-valued */
680                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
681                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
682                                                                el->name, ldb_dn_get_linearized(msg->dn));
683                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
684                                         goto done;
685                                 }
686
687                                 el2 = &(msg2->elements[idx]);
688
689                                 /* Check that values don't exist yet on multi-
690                                    valued attributes or aren't provided twice */
691                                 for (j=0; j<el->num_values; j++) {
692                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
693                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
694                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
695                                                 goto done;
696                                         }
697                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
698                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
699                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
700                                                 goto done;
701                                         }
702                                 }
703
704                                 /* Now combine existing and new values to a new
705                                    attribute record */
706                                 vals = talloc_realloc(msg2->elements,
707                                                       el2->values, struct ldb_val,
708                                                       el2->num_values + el->num_values);
709                                 if (vals == NULL) {
710                                         ldb_oom(ldb);
711                                         ret = LDB_ERR_OTHER;
712                                         goto done;
713                                 }
714
715                                 for (j=0; j<el->num_values; j++) {
716                                         vals[el2->num_values + j] =
717                                                 ldb_val_dup(vals, &el->values[j]);
718                                 }
719
720                                 el2->values = vals;
721                                 el2->num_values += el->num_values;
722
723                                 ret = ltdb_index_add_element(module, msg->dn, el);
724                                 if (ret != LDB_SUCCESS) {
725                                         goto done;
726                                 }
727                         }
728
729                         break;
730
731                 case LDB_FLAG_MOD_REPLACE:
732                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
733                                 if (el->num_values > 1) {
734                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
735                                                                el->name, ldb_dn_get_linearized(msg->dn));
736                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
737                                         goto done;
738                                 }
739                         }
740
741                         /* TODO: This is O(n^2) - replace with more efficient check */
742                         for (j=0; j<el->num_values; j++) {
743                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
744                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
745                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
746                                         goto done;
747                                 }
748                         }
749
750                         idx = find_element(msg2, el->name);
751                         if (idx != -1) {
752                                 el2 = &(msg2->elements[idx]);
753                                 if (ldb_msg_element_compare(el, el2) == 0) {
754                                         /* we are replacing with the same values */
755                                         continue;
756                                 }
757                         
758                                 /* Delete the attribute if it exists in the DB */
759                                 ret = msg_delete_attribute(module, ldb, msg2, el->name);
760                                 if (ret != LDB_SUCCESS) {
761                                         goto done;
762                                 }
763                         }
764
765                         /* Recreate it with the new values */
766                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
767                                 ret = LDB_ERR_OTHER;
768                                 goto done;
769                         }
770
771                         ret = ltdb_index_add_element(module, msg->dn, el);
772                         if (ret != LDB_SUCCESS) {
773                                 goto done;
774                         }
775
776                         break;
777
778                 case LDB_FLAG_MOD_DELETE:
779                         dn = ldb_dn_get_linearized(msg->dn);
780                         if (dn == NULL) {
781                                 ret = LDB_ERR_OTHER;
782                                 goto done;
783                         }
784
785                         if (msg->elements[i].num_values == 0) {
786                                 /* Delete the whole attribute */
787                                 if (msg_delete_attribute(module, ldb, msg2,
788                                                          msg->elements[i].name) != 0) {
789                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
790                                                                msg->elements[i].name, dn);
791                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
792                                         goto done;
793                                 }
794                         } else {
795                                 /* Delete specified values from an attribute */
796                                 for (j=0; j < msg->elements[i].num_values; j++) {
797                                         if (msg_delete_element(module,
798                                                                msg2,
799                                                                msg->elements[i].name,
800                                                                &msg->elements[i].values[j]) != 0) {
801                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
802                                                                        msg->elements[i].name, dn);
803                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
804                                                 goto done;
805                                         }
806                                 }
807                         }
808                         break;
809                 default:
810                         ldb_asprintf_errstring(ldb,
811                                 "Invalid ldb_modify flags on %s: 0x%x",
812                                 msg->elements[i].name,
813                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
814                         ret = LDB_ERR_PROTOCOL_ERROR;
815                         goto done;
816                 }
817         }
818
819         ret = ltdb_store(module, msg2, TDB_MODIFY);
820         if (ret != LDB_SUCCESS) {
821                 goto done;
822         }
823
824         ret = ltdb_modified(module, msg->dn);
825         if (ret != LDB_SUCCESS) {
826                 goto done;
827         }
828
829 done:
830         talloc_free(tdb_key.dptr);
831         return ret;
832 }
833
834 /*
835   modify a record
836 */
837 static int ltdb_modify(struct ltdb_context *ctx)
838 {
839         struct ldb_module *module = ctx->module;
840         struct ldb_request *req = ctx->req;
841         int ret = LDB_SUCCESS;
842
843         ret = ltdb_check_special_dn(module, req->op.mod.message);
844         if (ret != LDB_SUCCESS) {
845                 return ret;
846         }
847
848         ldb_request_set_state(req, LDB_ASYNC_PENDING);
849
850         if (ltdb_cache_load(module) != 0) {
851                 return LDB_ERR_OPERATIONS_ERROR;
852         }
853
854         ret = ltdb_modify_internal(module, req->op.mod.message);
855
856         return ret;
857 }
858
859 /*
860   rename a record
861 */
862 static int ltdb_rename(struct ltdb_context *ctx)
863 {
864         struct ldb_module *module = ctx->module;
865         struct ldb_request *req = ctx->req;
866         struct ldb_message *msg;
867         int ret = LDB_SUCCESS;
868
869         ldb_request_set_state(req, LDB_ASYNC_PENDING);
870
871         if (ltdb_cache_load(ctx->module) != 0) {
872                 return LDB_ERR_OPERATIONS_ERROR;
873         }
874
875         msg = talloc(ctx, struct ldb_message);
876         if (msg == NULL) {
877                 return LDB_ERR_OPERATIONS_ERROR;
878         }
879
880         /* in case any attribute of the message was indexed, we need
881            to fetch the old record */
882         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
883         if (ret != LDB_SUCCESS) {
884                 /* not finding the old record is an error */
885                 return ret;
886         }
887
888         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
889         if (msg->dn == NULL) {
890                 return LDB_ERR_OPERATIONS_ERROR;
891         }
892
893         /* Always delete first then add, to avoid conflicts with
894          * unique indexes. We rely on the transaction to make this
895          * atomic
896          */
897         ret = ltdb_delete_internal(module, req->op.rename.olddn);
898         if (ret != LDB_SUCCESS) {
899                 return ret;
900         }
901
902         ret = ltdb_add_internal(module, msg);
903
904         return ret;
905 }
906
907 static int ltdb_start_trans(struct ldb_module *module)
908 {
909         void *data = ldb_module_get_private(module);
910         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
911
912         if (tdb_transaction_start(ltdb->tdb) != 0) {
913                 return ltdb_err_map(tdb_error(ltdb->tdb));
914         }
915
916         ltdb->in_transaction++;
917
918         ltdb_index_transaction_start(module);
919
920         return LDB_SUCCESS;
921 }
922
923 static int ltdb_prepare_commit(struct ldb_module *module)
924 {
925         void *data = ldb_module_get_private(module);
926         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
927
928         if (ltdb->in_transaction != 1) {
929                 return LDB_SUCCESS;
930         }
931
932         if (ltdb_index_transaction_commit(module) != 0) {
933                 tdb_transaction_cancel(ltdb->tdb);
934                 ltdb->in_transaction--;
935                 return ltdb_err_map(tdb_error(ltdb->tdb));
936         }
937
938         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
939                 ltdb->in_transaction--;
940                 return ltdb_err_map(tdb_error(ltdb->tdb));
941         }
942
943         ltdb->prepared_commit = true;
944
945         return LDB_SUCCESS;
946 }
947
948 static int ltdb_end_trans(struct ldb_module *module)
949 {
950         void *data = ldb_module_get_private(module);
951         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
952
953         if (!ltdb->prepared_commit) {
954                 int ret = ltdb_prepare_commit(module);
955                 if (ret != LDB_SUCCESS) {
956                         return ret;
957                 }
958         }
959
960         ltdb->in_transaction--;
961         ltdb->prepared_commit = false;
962
963         if (tdb_transaction_commit(ltdb->tdb) != 0) {
964                 return ltdb_err_map(tdb_error(ltdb->tdb));
965         }
966
967         return LDB_SUCCESS;
968 }
969
970 static int ltdb_del_trans(struct ldb_module *module)
971 {
972         void *data = ldb_module_get_private(module);
973         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
974
975         ltdb->in_transaction--;
976
977         if (ltdb_index_transaction_cancel(module) != 0) {
978                 tdb_transaction_cancel(ltdb->tdb);
979                 return ltdb_err_map(tdb_error(ltdb->tdb));
980         }
981
982         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
983                 return ltdb_err_map(tdb_error(ltdb->tdb));
984         }
985
986         return LDB_SUCCESS;
987 }
988
989 /*
990   return sequenceNumber from @BASEINFO
991 */
992 static int ltdb_sequence_number(struct ltdb_context *ctx,
993                                 struct ldb_extended **ext)
994 {
995         struct ldb_context *ldb;
996         struct ldb_module *module = ctx->module;
997         struct ldb_request *req = ctx->req;
998         TALLOC_CTX *tmp_ctx;
999         struct ldb_seqnum_request *seq;
1000         struct ldb_seqnum_result *res;
1001         struct ldb_message *msg = NULL;
1002         struct ldb_dn *dn;
1003         const char *date;
1004         int ret = LDB_SUCCESS;
1005
1006         ldb = ldb_module_get_ctx(module);
1007
1008         seq = talloc_get_type(req->op.extended.data,
1009                                 struct ldb_seqnum_request);
1010         if (seq == NULL) {
1011                 return LDB_ERR_OPERATIONS_ERROR;
1012         }
1013
1014         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1015
1016         if (ltdb_lock_read(module) != 0) {
1017                 return LDB_ERR_OPERATIONS_ERROR;
1018         }
1019
1020         res = talloc_zero(req, struct ldb_seqnum_result);
1021         if (res == NULL) {
1022                 ret = LDB_ERR_OPERATIONS_ERROR;
1023                 goto done;
1024         }
1025         tmp_ctx = talloc_new(req);
1026         if (tmp_ctx == NULL) {
1027                 ret = LDB_ERR_OPERATIONS_ERROR;
1028                 goto done;
1029         }
1030
1031         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1032
1033         msg = talloc(tmp_ctx, struct ldb_message);
1034         if (msg == NULL) {
1035                 ret = LDB_ERR_OPERATIONS_ERROR;
1036                 goto done;
1037         }
1038
1039         ret = ltdb_search_dn1(module, dn, msg);
1040         if (ret != LDB_SUCCESS) {
1041                 goto done;
1042         }
1043
1044         switch (seq->type) {
1045         case LDB_SEQ_HIGHEST_SEQ:
1046                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1047                 break;
1048         case LDB_SEQ_NEXT:
1049                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1050                 res->seq_num++;
1051                 break;
1052         case LDB_SEQ_HIGHEST_TIMESTAMP:
1053                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1054                 if (date) {
1055                         res->seq_num = ldb_string_to_time(date);
1056                 } else {
1057                         res->seq_num = 0;
1058                         /* zero is as good as anything when we don't know */
1059                 }
1060                 break;
1061         }
1062
1063         *ext = talloc_zero(req, struct ldb_extended);
1064         if (*ext == NULL) {
1065                 ret = LDB_ERR_OPERATIONS_ERROR;
1066                 goto done;
1067         }
1068         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1069         (*ext)->data = talloc_steal(*ext, res);
1070
1071 done:
1072         talloc_free(tmp_ctx);
1073         ltdb_unlock_read(module);
1074         return ret;
1075 }
1076
1077 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1078 {
1079         struct ldb_context *ldb;
1080         struct ldb_request *req;
1081         struct ldb_reply *ares;
1082
1083         ldb = ldb_module_get_ctx(ctx->module);
1084         req = ctx->req;
1085
1086         /* if we already returned an error just return */
1087         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1088                 return;
1089         }
1090
1091         ares = talloc_zero(req, struct ldb_reply);
1092         if (!ares) {
1093                 ldb_oom(ldb);
1094                 req->callback(req, NULL);
1095                 return;
1096         }
1097         ares->type = LDB_REPLY_DONE;
1098         ares->error = error;
1099
1100         req->callback(req, ares);
1101 }
1102
1103 static void ltdb_timeout(struct tevent_context *ev,
1104                           struct tevent_timer *te,
1105                           struct timeval t,
1106                           void *private_data)
1107 {
1108         struct ltdb_context *ctx;
1109         ctx = talloc_get_type(private_data, struct ltdb_context);
1110
1111         if (!ctx->request_terminated) {
1112                 /* request is done now */
1113                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1114         }
1115
1116         if (!ctx->request_terminated) {
1117                 /* neutralize the spy */
1118                 ctx->spy->ctx = NULL;
1119         }
1120         talloc_free(ctx);
1121 }
1122
1123 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1124                                         struct ldb_extended *ext,
1125                                         int error)
1126 {
1127         struct ldb_context *ldb;
1128         struct ldb_request *req;
1129         struct ldb_reply *ares;
1130
1131         ldb = ldb_module_get_ctx(ctx->module);
1132         req = ctx->req;
1133
1134         /* if we already returned an error just return */
1135         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1136                 return;
1137         }
1138
1139         ares = talloc_zero(req, struct ldb_reply);
1140         if (!ares) {
1141                 ldb_oom(ldb);
1142                 req->callback(req, NULL);
1143                 return;
1144         }
1145         ares->type = LDB_REPLY_DONE;
1146         ares->response = ext;
1147         ares->error = error;
1148
1149         req->callback(req, ares);
1150 }
1151
1152 static void ltdb_handle_extended(struct ltdb_context *ctx)
1153 {
1154         struct ldb_extended *ext = NULL;
1155         int ret;
1156
1157         if (strcmp(ctx->req->op.extended.oid,
1158                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1159                 /* get sequence number */
1160                 ret = ltdb_sequence_number(ctx, &ext);
1161         } else {
1162                 /* not recognized */
1163                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1164         }
1165
1166         ltdb_request_extended_done(ctx, ext, ret);
1167 }
1168
1169 static void ltdb_callback(struct tevent_context *ev,
1170                           struct tevent_timer *te,
1171                           struct timeval t,
1172                           void *private_data)
1173 {
1174         struct ltdb_context *ctx;
1175         int ret;
1176
1177         ctx = talloc_get_type(private_data, struct ltdb_context);
1178
1179         if (ctx->request_terminated) {
1180                 goto done;
1181         }
1182
1183         switch (ctx->req->operation) {
1184         case LDB_SEARCH:
1185                 ret = ltdb_search(ctx);
1186                 break;
1187         case LDB_ADD:
1188                 ret = ltdb_add(ctx);
1189                 break;
1190         case LDB_MODIFY:
1191                 ret = ltdb_modify(ctx);
1192                 break;
1193         case LDB_DELETE:
1194                 ret = ltdb_delete(ctx);
1195                 break;
1196         case LDB_RENAME:
1197                 ret = ltdb_rename(ctx);
1198                 break;
1199         case LDB_EXTENDED:
1200                 ltdb_handle_extended(ctx);
1201                 goto done;
1202         default:
1203                 /* no other op supported */
1204                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1205         }
1206
1207         if (!ctx->request_terminated) {
1208                 /* request is done now */
1209                 ltdb_request_done(ctx, ret);
1210         }
1211
1212 done:
1213         if (!ctx->request_terminated) {
1214                 /* neutralize the spy */
1215                 ctx->spy->ctx = NULL;
1216         }
1217         talloc_free(ctx);
1218 }
1219
1220 static int ltdb_request_destructor(void *ptr)
1221 {
1222         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1223
1224         if (spy->ctx != NULL) {
1225                 spy->ctx->request_terminated = true;
1226         }
1227
1228         return 0;
1229 }
1230
1231 static int ltdb_handle_request(struct ldb_module *module,
1232                                struct ldb_request *req)
1233 {
1234         struct ldb_context *ldb;
1235         struct tevent_context *ev;
1236         struct ltdb_context *ac;
1237         struct tevent_timer *te;
1238         struct timeval tv;
1239
1240         if (check_critical_controls(req->controls)) {
1241                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1242         }
1243
1244         ldb = ldb_module_get_ctx(module);
1245
1246         if (req->starttime == 0 || req->timeout == 0) {
1247                 ldb_set_errstring(ldb, "Invalid timeout settings");
1248                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1249         }
1250
1251         ev = ldb_get_event_context(ldb);
1252
1253         ac = talloc_zero(ldb, struct ltdb_context);
1254         if (ac == NULL) {
1255                 ldb_oom(ldb);
1256                 return LDB_ERR_OPERATIONS_ERROR;
1257         }
1258
1259         ac->module = module;
1260         ac->req = req;
1261
1262         tv.tv_sec = 0;
1263         tv.tv_usec = 0;
1264         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1265         if (NULL == te) {
1266                 talloc_free(ac);
1267                 return LDB_ERR_OPERATIONS_ERROR;
1268         }
1269
1270         tv.tv_sec = req->starttime + req->timeout;
1271         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1272         if (NULL == ac->timeout_event) {
1273                 talloc_free(ac);
1274                 return LDB_ERR_OPERATIONS_ERROR;
1275         }
1276
1277         /* set a spy so that we do not try to use the request context
1278          * if it is freed before ltdb_callback fires */
1279         ac->spy = talloc(req, struct ltdb_req_spy);
1280         if (NULL == ac->spy) {
1281                 talloc_free(ac);
1282                 return LDB_ERR_OPERATIONS_ERROR;
1283         }
1284         ac->spy->ctx = ac;
1285
1286         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1287
1288         return LDB_SUCCESS;
1289 }
1290
1291 static const struct ldb_module_ops ltdb_ops = {
1292         .name              = "tdb",
1293         .search            = ltdb_handle_request,
1294         .add               = ltdb_handle_request,
1295         .modify            = ltdb_handle_request,
1296         .del               = ltdb_handle_request,
1297         .rename            = ltdb_handle_request,
1298         .extended          = ltdb_handle_request,
1299         .start_transaction = ltdb_start_trans,
1300         .end_transaction   = ltdb_end_trans,
1301         .prepare_commit    = ltdb_prepare_commit,
1302         .del_transaction   = ltdb_del_trans,
1303 };
1304
1305 /*
1306   connect to the database
1307 */
1308 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1309                         unsigned int flags, const char *options[],
1310                         struct ldb_module **_module)
1311 {
1312         struct ldb_module *module;
1313         const char *path;
1314         int tdb_flags, open_flags;
1315         struct ltdb_private *ltdb;
1316
1317         /* parse the url */
1318         if (strchr(url, ':')) {
1319                 if (strncmp(url, "tdb://", 6) != 0) {
1320                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1321                                   "Invalid tdb URL '%s'", url);
1322                         return LDB_ERR_OPERATIONS_ERROR;
1323                 }
1324                 path = url+6;
1325         } else {
1326                 path = url;
1327         }
1328
1329         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1330
1331         /* check for the 'nosync' option */
1332         if (flags & LDB_FLG_NOSYNC) {
1333                 tdb_flags |= TDB_NOSYNC;
1334         }
1335
1336         /* and nommap option */
1337         if (flags & LDB_FLG_NOMMAP) {
1338                 tdb_flags |= TDB_NOMMAP;
1339         }
1340
1341         if (flags & LDB_FLG_RDONLY) {
1342                 open_flags = O_RDONLY;
1343         } else {
1344                 open_flags = O_CREAT | O_RDWR;
1345         }
1346
1347         ltdb = talloc_zero(ldb, struct ltdb_private);
1348         if (!ltdb) {
1349                 ldb_oom(ldb);
1350                 return LDB_ERR_OPERATIONS_ERROR;
1351         }
1352
1353         /* note that we use quite a large default hash size */
1354         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1355                                    tdb_flags, open_flags,
1356                                    ldb_get_create_perms(ldb), ldb);
1357         if (!ltdb->tdb) {
1358                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1359                           "Unable to open tdb '%s'", path);
1360                 talloc_free(ltdb);
1361                 return LDB_ERR_OPERATIONS_ERROR;
1362         }
1363
1364         ltdb->sequence_number = 0;
1365
1366         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1367         if (!module) {
1368                 talloc_free(ltdb);
1369                 return LDB_ERR_OPERATIONS_ERROR;
1370         }
1371         ldb_module_set_private(module, ltdb);
1372         talloc_steal(module, ltdb);
1373
1374         if (ltdb_cache_load(module) != 0) {
1375                 talloc_free(module);
1376                 talloc_free(ltdb);
1377                 return LDB_ERR_OPERATIONS_ERROR;
1378         }
1379
1380         *_module = module;
1381         return LDB_SUCCESS;
1382 }
1383
1384 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1385         .name = "tdb",
1386         .connect_fn = ltdb_connect
1387 };