s4:ldb_tdb - Rework/Various
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 static int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         if (ltdb->in_transaction == 0) {
92                 return tdb_lockall_read(ltdb->tdb);
93         }
94         return 0;
95 }
96
97 /*
98   unlock the database after a ltdb_lock_read()
99 */
100 int ltdb_unlock_read(struct ldb_module *module)
101 {
102         void *data = ldb_module_get_private(module);
103         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
104         if (ltdb->in_transaction == 0) {
105                 return tdb_unlockall_read(ltdb->tdb);
106         }
107         return 0;
108 }
109
110
111 /*
112   form a TDB_DATA for a record key
113   caller frees
114
115   note that the key for a record can depend on whether the
116   dn refers to a case sensitive index record or not
117 */
118 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
119 {
120         struct ldb_context *ldb = ldb_module_get_ctx(module);
121         TDB_DATA key;
122         char *key_str = NULL;
123         const char *dn_folded = NULL;
124
125         /*
126           most DNs are case insensitive. The exception is index DNs for
127           case sensitive attributes
128
129           there are 3 cases dealt with in this code:
130
131           1) if the dn doesn't start with @ then uppercase the attribute
132              names and the attributes values of case insensitive attributes
133           2) if the dn starts with @ then leave it alone -
134              the indexing code handles the rest
135         */
136
137         dn_folded = ldb_dn_get_casefold(dn);
138         if (!dn_folded) {
139                 goto failed;
140         }
141
142         key_str = talloc_strdup(ldb, "DN=");
143         if (!key_str) {
144                 goto failed;
145         }
146
147         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
148         if (!key_str) {
149                 goto failed;
150         }
151
152         key.dptr = (uint8_t *)key_str;
153         key.dsize = strlen(key_str) + 1;
154
155         return key;
156
157 failed:
158         errno = ENOMEM;
159         key.dptr = NULL;
160         key.dsize = 0;
161         return key;
162 }
163
164 /*
165   check special dn's have valid attributes
166   currently only @ATTRIBUTES is checked
167 */
168 static int ltdb_check_special_dn(struct ldb_module *module,
169                           const struct ldb_message *msg)
170 {
171         struct ldb_context *ldb = ldb_module_get_ctx(module);
172         int i, j;
173
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return LDB_SUCCESS;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 for (j = 0; j < msg->elements[i].num_values; j++) {
183                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
184                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
185                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
186                         }
187                 }
188         }
189
190         return LDB_SUCCESS;
191 }
192
193
194 /*
195   we've made a modification to a dn - possibly reindex and
196   update sequence number
197 */
198 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
199 {
200         int ret = LDB_SUCCESS;
201
202         if (ldb_dn_is_special(dn) &&
203             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
204              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
205                 ret = ltdb_reindex(module);
206         }
207
208         if (ret == LDB_SUCCESS &&
209             !(ldb_dn_is_special(dn) &&
210               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
211                 ret = ltdb_increase_sequence_number(module);
212         }
213
214         return ret;
215 }
216
217 /*
218   store a record into the db
219 */
220 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
221 {
222         void *data = ldb_module_get_private(module);
223         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
224         TDB_DATA tdb_key, tdb_data;
225         int ret = LDB_SUCCESS;
226
227         tdb_key = ltdb_key(module, msg->dn);
228         if (!tdb_key.dptr) {
229                 return LDB_ERR_OTHER;
230         }
231
232         ret = ltdb_pack_data(module, msg, &tdb_data);
233         if (ret == -1) {
234                 talloc_free(tdb_key.dptr);
235                 return LDB_ERR_OTHER;
236         }
237
238         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
239         if (ret == -1) {
240                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
241                 goto done;
242         }
243
244         ret = ltdb_index_add(module, msg);
245         if (ret != LDB_SUCCESS) {
246                 tdb_delete(ltdb->tdb, tdb_key);
247         }
248
249 done:
250         talloc_free(tdb_key.dptr);
251         talloc_free(tdb_data.dptr);
252
253         return ret;
254 }
255
256
257 static int ltdb_add_internal(struct ldb_module *module,
258                              const struct ldb_message *msg)
259 {
260         struct ldb_context *ldb = ldb_module_get_ctx(module);
261         int ret = LDB_SUCCESS, i;
262
263         ret = ltdb_check_special_dn(module, msg);
264         if (ret != LDB_SUCCESS) {
265                 goto done;
266         }
267
268         if (ltdb_cache_load(module) != 0) {
269                 ret = LDB_ERR_OPERATIONS_ERROR;
270                 goto done;
271         }
272
273         for (i=0;i<msg->num_elements;i++) {
274                 struct ldb_message_element *el = &msg->elements[i];
275                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
276
277                 if (el->num_values == 0) {
278                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
279                                                el->name, ldb_dn_get_linearized(msg->dn));
280                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
281                         goto done;
282                 }
283                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
284                         if (el->num_values > 1) {
285                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
286                                                        el->name, ldb_dn_get_linearized(msg->dn));
287                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
288                                 goto done;
289                         }
290                 }
291         }
292
293         ret = ltdb_store(module, msg, TDB_INSERT);
294         if (ret != LDB_SUCCESS) {
295                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
296                         ldb_asprintf_errstring(ldb,
297                                                "Entry %s already exists",
298                                                ldb_dn_get_linearized(msg->dn));
299                 }
300                 goto done;
301         }
302
303         ret = ltdb_index_one(module, msg, 1);
304         if (ret != LDB_SUCCESS) {
305                 goto done;
306         }
307
308         ret = ltdb_modified(module, msg->dn);
309         if (ret != LDB_SUCCESS) {
310                 goto done;
311         }
312
313 done:
314         return ret;
315 }
316
317 /*
318   add a record to the database
319 */
320 static int ltdb_add(struct ltdb_context *ctx)
321 {
322         struct ldb_module *module = ctx->module;
323         struct ldb_request *req = ctx->req;
324         int ret = LDB_SUCCESS;
325
326         ldb_request_set_state(req, LDB_ASYNC_PENDING);
327
328         if (ltdb_cache_load(module) != 0) {
329                 return LDB_ERR_OPERATIONS_ERROR;
330         }
331
332         ret = ltdb_add_internal(module, req->op.add.message);
333
334         return ret;
335 }
336
337 /*
338   delete a record from the database, not updating indexes (used for deleting
339   index records)
340 */
341 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
342 {
343         void *data = ldb_module_get_private(module);
344         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
345         TDB_DATA tdb_key;
346         int ret;
347
348         tdb_key = ltdb_key(module, dn);
349         if (!tdb_key.dptr) {
350                 return LDB_ERR_OTHER;
351         }
352
353         ret = tdb_delete(ltdb->tdb, tdb_key);
354         talloc_free(tdb_key.dptr);
355
356         if (ret != 0) {
357                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
358         }
359
360         return ret;
361 }
362
363 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
364 {
365         struct ldb_message *msg;
366         int ret = LDB_SUCCESS;
367
368         msg = talloc(module, struct ldb_message);
369         if (msg == NULL) {
370                 return LDB_ERR_OPERATIONS_ERROR;
371         }
372
373         /* in case any attribute of the message was indexed, we need
374            to fetch the old record */
375         ret = ltdb_search_dn1(module, dn, msg);
376         if (ret != LDB_SUCCESS) {
377                 /* not finding the old record is an error */
378                 goto done;
379         }
380
381         ret = ltdb_delete_noindex(module, dn);
382         if (ret != LDB_SUCCESS) {
383                 goto done;
384         }
385
386         /* remove one level attribute */
387         ret = ltdb_index_one(module, msg, 0);
388         if (ret != LDB_SUCCESS) {
389                 goto done;
390         }
391
392         /* remove any indexed attributes */
393         ret = ltdb_index_del(module, msg);
394         if (ret != LDB_SUCCESS) {
395                 goto done;
396         }
397
398         ret = ltdb_modified(module, dn);
399         if (ret != LDB_SUCCESS) {
400                 goto done;
401         }
402
403 done:
404         talloc_free(msg);
405         return ret;
406 }
407
408 /*
409   delete a record from the database
410 */
411 static int ltdb_delete(struct ltdb_context *ctx)
412 {
413         struct ldb_module *module = ctx->module;
414         struct ldb_request *req = ctx->req;
415         int ret = LDB_SUCCESS;
416
417         ldb_request_set_state(req, LDB_ASYNC_PENDING);
418
419         if (ltdb_cache_load(module) != 0) {
420                 return LDB_ERR_OPERATIONS_ERROR;
421         }
422
423         ret = ltdb_delete_internal(module, req->op.del.dn);
424
425         return ret;
426 }
427
428 /*
429   find an element by attribute name. At the moment this does a linear search,
430   it should be re-coded to use a binary search once all places that modify
431   records guarantee sorted order
432
433   return the index of the first matching element if found, otherwise -1
434 */
435 static int find_element(const struct ldb_message *msg, const char *name)
436 {
437         unsigned int i;
438         for (i=0;i<msg->num_elements;i++) {
439                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
440                         return i;
441                 }
442         }
443         return -1;
444 }
445
446
447 /*
448   add an element to an existing record. Assumes a elements array that we
449   can call re-alloc on, and assumed that we can re-use the data pointers from
450   the passed in additional values. Use with care!
451
452   returns 0 on success, -1 on failure (and sets errno)
453 */
454 static int msg_add_element(struct ldb_context *ldb,
455                            struct ldb_message *msg,
456                            struct ldb_message_element *el)
457 {
458         struct ldb_message_element *e2;
459         unsigned int i;
460
461         if (el->num_values == 0) {
462                 /* nothing to do here - we don't add empty elements */
463                 return 0;
464         }
465
466         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
467                               msg->num_elements+1);
468         if (!e2) {
469                 errno = ENOMEM;
470                 return -1;
471         }
472
473         msg->elements = e2;
474
475         e2 = &msg->elements[msg->num_elements];
476
477         e2->name = el->name;
478         e2->flags = el->flags;
479         e2->values = talloc_array(msg->elements,
480                                   struct ldb_val, el->num_values);
481         if (!e2->values) {
482                 errno = ENOMEM;
483                 return -1;
484         }
485         for (i=0;i<el->num_values;i++) {
486                 e2->values[i] = el->values[i];
487         }
488         e2->num_values = el->num_values;
489
490         ++msg->num_elements;
491
492         return 0;
493 }
494
495 /*
496   delete all elements having a specified attribute name
497 */
498 static int msg_delete_attribute(struct ldb_module *module,
499                                 struct ldb_context *ldb,
500                                 struct ldb_message *msg, const char *name)
501 {
502         const char *dn;
503         unsigned int i, j;
504
505         dn = ldb_dn_get_linearized(msg->dn);
506         if (dn == NULL) {
507                 return -1;
508         }
509
510         for (i=0;i<msg->num_elements;i++) {
511                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
512                         for (j=0;j<msg->elements[i].num_values;j++) {
513                                 ltdb_index_del_value(module, dn,
514                                                      &msg->elements[i], j);
515                         }
516                         talloc_free(msg->elements[i].values);
517                         if (msg->num_elements > (i+1)) {
518                                 memmove(&msg->elements[i],
519                                         &msg->elements[i+1],
520                                         sizeof(struct ldb_message_element)*
521                                         (msg->num_elements - (i+1)));
522                         }
523                         msg->num_elements--;
524                         i--;
525                         msg->elements = talloc_realloc(msg, msg->elements,
526                                                        struct ldb_message_element,
527                                                        msg->num_elements);
528
529                         /* per definition we find in a canonicalised message an
530                            attribute only once. So we are finished here. */
531                         return 0;
532                 }
533         }
534
535         /* Not found */
536         return -1;
537 }
538
539 /*
540   delete all elements matching an attribute name/value
541
542   return 0 on success, -1 on failure
543 */
544 static int msg_delete_element(struct ldb_module *module,
545                               struct ldb_message *msg,
546                               const char *name,
547                               const struct ldb_val *val)
548 {
549         struct ldb_context *ldb = ldb_module_get_ctx(module);
550         unsigned int i;
551         int found;
552         struct ldb_message_element *el;
553         const struct ldb_schema_attribute *a;
554
555         found = find_element(msg, name);
556         if (found == -1) {
557                 return -1;
558         }
559
560         el = &msg->elements[found];
561
562         a = ldb_schema_attribute_by_name(ldb, el->name);
563
564         for (i=0;i<el->num_values;i++) {
565                 if (a->syntax->comparison_fn(ldb, ldb,
566                                                 &el->values[i], val) == 0) {
567                         if (i<el->num_values-1) {
568                                 memmove(&el->values[i], &el->values[i+1],
569                                         sizeof(el->values[i])*
570                                                 (el->num_values-(i+1)));
571                         }
572                         el->num_values--;
573                         if (el->num_values == 0) {
574                                 return msg_delete_attribute(module, ldb,
575                                                             msg, name);
576                         }
577
578                         /* per definition we find in a canonicalised message an
579                            attribute value only once. So we are finished here */
580                         return 0;
581                 }
582         }
583
584         /* Not found */
585         return -1;
586 }
587
588
589 /*
590   modify a record - internal interface
591
592   yuck - this is O(n^2). Luckily n is usually small so we probably
593   get away with it, but if we ever have really large attribute lists
594   then we'll need to look at this again
595 */
596 int ltdb_modify_internal(struct ldb_module *module,
597                          const struct ldb_message *msg)
598 {
599         struct ldb_context *ldb = ldb_module_get_ctx(module);
600         void *data = ldb_module_get_private(module);
601         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
602         TDB_DATA tdb_key, tdb_data;
603         struct ldb_message *msg2;
604         unsigned i, j;
605         int ret = LDB_SUCCESS, idx;
606
607         tdb_key = ltdb_key(module, msg->dn);
608         if (!tdb_key.dptr) {
609                 return LDB_ERR_OTHER;
610         }
611
612         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
613         if (!tdb_data.dptr) {
614                 talloc_free(tdb_key.dptr);
615                 return ltdb_err_map(tdb_error(ltdb->tdb));
616         }
617
618         msg2 = talloc(tdb_key.dptr, struct ldb_message);
619         if (msg2 == NULL) {
620                 free(tdb_data.dptr);
621                 ret = LDB_ERR_OTHER;
622                 goto done;
623         }
624
625         ret = ltdb_unpack_data(module, &tdb_data, msg2);
626         free(tdb_data.dptr);
627         if (ret == -1) {
628                 ret = LDB_ERR_OTHER;
629                 goto done;
630         }
631
632         if (!msg2->dn) {
633                 msg2->dn = msg->dn;
634         }
635
636         for (i=0; i<msg->num_elements; i++) {
637                 struct ldb_message_element *el = &msg->elements[i], *el2;
638                 struct ldb_val *vals;
639                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
640                 const char *dn;
641
642                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
643                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
644                                                ldb_dn_get_linearized(msg->dn));
645                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
646                         goto done;
647                 }
648
649                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
650                 case LDB_FLAG_MOD_ADD:
651                         if (el->num_values == 0) {
652                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
653                                                        el->name, ldb_dn_get_linearized(msg->dn));
654                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
655                                 goto done;
656                         }
657
658                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
659                                 if (el->num_values > 1) {
660                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
661                                                                el->name, ldb_dn_get_linearized(msg->dn));
662                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
663                                         goto done;
664                                 }
665                         }
666
667                         /* Checks if element already exists */
668                         idx = find_element(msg2, el->name);
669                         if (idx == -1) {
670                                 if (msg_add_element(ldb, msg2, el) != 0) {
671                                         ret = LDB_ERR_OTHER;
672                                         goto done;
673                                 }
674                         } else {
675                                 /* We cannot add another value on a existing one
676                                    if the attribute is single-valued */
677                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
678                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
679                                                                el->name, ldb_dn_get_linearized(msg->dn));
680                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
681                                         goto done;
682                                 }
683
684                                 el2 = &(msg2->elements[idx]);
685
686                                 /* Check that values don't exist yet on multi-
687                                    valued attributes or aren't provided twice */
688                                 for (j=0; j<el->num_values; j++) {
689                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
690                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
691                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
692                                                 goto done;
693                                         }
694                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
695                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
696                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
697                                                 goto done;
698                                         }
699                                 }
700
701                                 /* Now combine existing and new values to a new
702                                    attribute record */
703                                 vals = talloc_realloc(msg2->elements,
704                                         el2->values, struct ldb_val,
705                                         el2->num_values + el->num_values);
706                                 if (vals == NULL) {
707                                         ldb_oom(ldb);
708                                         ret = LDB_ERR_OTHER;
709                                         goto done;
710                                 }
711
712                                 for (j=0; j<el->num_values; j++) {
713                                         vals[el2->num_values + j] =
714                                                 ldb_val_dup(vals, &el->values[j]);
715                                 }
716
717                                 el2->values = vals;
718                                 el2->num_values += el->num_values;
719                         }
720
721                         break;
722
723                 case LDB_FLAG_MOD_REPLACE:
724                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
725                                 if (el->num_values > 1) {
726                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
727                                                                el->name, ldb_dn_get_linearized(msg->dn));
728                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
729                                         goto done;
730                                 }
731                         }
732
733                         for (j=0; j<el->num_values; j++) {
734                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
735                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
736                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
737                                         goto done;
738                                 }
739                         }
740
741                         /* Delete the attribute if it exists in the DB */
742                         msg_delete_attribute(module, ldb, msg2, el->name);
743
744                         /* Recreate it with the new values */
745                         if (msg_add_element(ldb, msg2, el) != 0) {
746                                 ret = LDB_ERR_OTHER;
747                                 goto done;
748                         }
749
750                         break;
751
752                 case LDB_FLAG_MOD_DELETE:
753                         dn = ldb_dn_get_linearized(msg->dn);
754                         if (dn == NULL) {
755                                 ret = LDB_ERR_OTHER;
756                                 goto done;
757                         }
758
759                         if (msg->elements[i].num_values == 0) {
760                                 /* Delete the whole attribute */
761                                 if (msg_delete_attribute(module, ldb, msg2,
762                                                          msg->elements[i].name) != 0) {
763                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
764                                                                msg->elements[i].name, dn);
765                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
766                                         goto done;
767                                 }
768                         } else {
769                                 /* Delete specified values from an attribute */
770                                 for (j=0; j < msg->elements[i].num_values; j++) {
771                                         if (msg_delete_element(module,
772                                                                msg2,
773                                                                msg->elements[i].name,
774                                                                &msg->elements[i].values[j]) != 0) {
775                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
776                                                                        msg->elements[i].name, dn);
777                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
778                                                 goto done;
779                                         }
780
781                                         ret = ltdb_index_del_value(module, dn,
782                                                 &msg->elements[i], j);
783                                         if (ret != LDB_SUCCESS) {
784                                                 goto done;
785                                         }
786                                 }
787                         }
788
789                         break;
790                 default:
791                         ldb_asprintf_errstring(ldb,
792                                 "Invalid ldb_modify flags on %s: 0x%x",
793                                 msg->elements[i].name,
794                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
795                         ret = LDB_ERR_PROTOCOL_ERROR;
796                         goto done;
797                 }
798         }
799
800         ret = ltdb_store(module, msg2, TDB_MODIFY);
801         if (ret != LDB_SUCCESS) {
802                 goto done;
803         }
804
805         ret = ltdb_modified(module, msg->dn);
806         if (ret != LDB_SUCCESS) {
807                 goto done;
808         }
809
810 done:
811         talloc_free(tdb_key.dptr);
812         return ret;
813 }
814
815 /*
816   modify a record
817 */
818 static int ltdb_modify(struct ltdb_context *ctx)
819 {
820         struct ldb_module *module = ctx->module;
821         struct ldb_request *req = ctx->req;
822         int ret = LDB_SUCCESS;
823
824         ret = ltdb_check_special_dn(module, req->op.mod.message);
825         if (ret != LDB_SUCCESS) {
826                 return ret;
827         }
828
829         ldb_request_set_state(req, LDB_ASYNC_PENDING);
830
831         if (ltdb_cache_load(module) != 0) {
832                 return LDB_ERR_OPERATIONS_ERROR;
833         }
834
835         ret = ltdb_modify_internal(module, req->op.mod.message);
836
837         return ret;
838 }
839
840 /*
841   rename a record
842 */
843 static int ltdb_rename(struct ltdb_context *ctx)
844 {
845         struct ldb_module *module = ctx->module;
846         struct ldb_request *req = ctx->req;
847         struct ldb_message *msg;
848         int ret = LDB_SUCCESS;
849
850         ldb_request_set_state(req, LDB_ASYNC_PENDING);
851
852         if (ltdb_cache_load(ctx->module) != 0) {
853                 return LDB_ERR_OPERATIONS_ERROR;
854         }
855
856         msg = talloc(ctx, struct ldb_message);
857         if (msg == NULL) {
858                 ret = LDB_ERR_OPERATIONS_ERROR;
859                 goto done;
860         }
861
862         /* in case any attribute of the message was indexed, we need
863            to fetch the old record */
864         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
865         if (ret != LDB_SUCCESS) {
866                 /* not finding the old record is an error */
867                 goto done;
868         }
869
870         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
871         if (msg->dn == NULL) {
872                 ret = LDB_ERR_OPERATIONS_ERROR;
873                 goto done;
874         }
875
876         /* Always delete first then add, to avoid conflicts with
877          * unique indexes. We rely on the transaction to make this
878          * atomic
879          */
880         ret = ltdb_delete_internal(module, req->op.rename.olddn);
881         if (ret != LDB_SUCCESS) {
882                 goto done;
883         }
884
885         ret = ltdb_add_internal(module, msg);
886
887 done:
888         return ret;
889 }
890
891 static int ltdb_start_trans(struct ldb_module *module)
892 {
893         void *data = ldb_module_get_private(module);
894         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
895
896         if (tdb_transaction_start(ltdb->tdb) != 0) {
897                 return ltdb_err_map(tdb_error(ltdb->tdb));
898         }
899
900         ltdb->in_transaction++;
901
902         ltdb_index_transaction_start(module);
903
904         return LDB_SUCCESS;
905 }
906
907 static int ltdb_prepare_commit(struct ldb_module *module)
908 {
909         void *data = ldb_module_get_private(module);
910         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
911
912         if (ltdb->in_transaction != 1) {
913                 return LDB_SUCCESS;
914         }
915
916         if (ltdb_index_transaction_commit(module) != 0) {
917                 tdb_transaction_cancel(ltdb->tdb);
918                 ltdb->in_transaction--;
919                 return ltdb_err_map(tdb_error(ltdb->tdb));
920         }
921
922         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
923                 ltdb->in_transaction--;
924                 return ltdb_err_map(tdb_error(ltdb->tdb));
925         }
926
927         ltdb->prepared_commit = true;
928
929         return LDB_SUCCESS;
930 }
931
932 static int ltdb_end_trans(struct ldb_module *module)
933 {
934         void *data = ldb_module_get_private(module);
935         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
936
937         if (!ltdb->prepared_commit) {
938                 int ret = ltdb_prepare_commit(module);
939                 if (ret != LDB_SUCCESS) {
940                         return ret;
941                 }
942         }
943
944         ltdb->in_transaction--;
945         ltdb->prepared_commit = false;
946
947         if (tdb_transaction_commit(ltdb->tdb) != 0) {
948                 return ltdb_err_map(tdb_error(ltdb->tdb));
949         }
950
951         return LDB_SUCCESS;
952 }
953
954 static int ltdb_del_trans(struct ldb_module *module)
955 {
956         void *data = ldb_module_get_private(module);
957         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
958
959         ltdb->in_transaction--;
960
961         if (ltdb_index_transaction_cancel(module) != 0) {
962                 tdb_transaction_cancel(ltdb->tdb);
963                 return ltdb_err_map(tdb_error(ltdb->tdb));
964         }
965
966         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
967                 return ltdb_err_map(tdb_error(ltdb->tdb));
968         }
969
970         return LDB_SUCCESS;
971 }
972
973 /*
974   return sequenceNumber from @BASEINFO
975 */
976 static int ltdb_sequence_number(struct ltdb_context *ctx,
977                                 struct ldb_extended **ext)
978 {
979         struct ldb_context *ldb;
980         struct ldb_module *module = ctx->module;
981         struct ldb_request *req = ctx->req;
982         TALLOC_CTX *tmp_ctx;
983         struct ldb_seqnum_request *seq;
984         struct ldb_seqnum_result *res;
985         struct ldb_message *msg = NULL;
986         struct ldb_dn *dn;
987         const char *date;
988         int ret = LDB_SUCCESS;
989
990         ldb = ldb_module_get_ctx(module);
991
992         seq = talloc_get_type(req->op.extended.data,
993                                 struct ldb_seqnum_request);
994         if (seq == NULL) {
995                 return LDB_ERR_OPERATIONS_ERROR;
996         }
997
998         ldb_request_set_state(req, LDB_ASYNC_PENDING);
999
1000         if (ltdb_lock_read(module) != 0) {
1001                 return LDB_ERR_OPERATIONS_ERROR;
1002         }
1003
1004         res = talloc_zero(req, struct ldb_seqnum_result);
1005         if (res == NULL) {
1006                 ret = LDB_ERR_OPERATIONS_ERROR;
1007                 goto done;
1008         }
1009         tmp_ctx = talloc_new(req);
1010         if (tmp_ctx == NULL) {
1011                 ret = LDB_ERR_OPERATIONS_ERROR;
1012                 goto done;
1013         }
1014
1015         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1016
1017         msg = talloc(tmp_ctx, struct ldb_message);
1018         if (msg == NULL) {
1019                 ret = LDB_ERR_OPERATIONS_ERROR;
1020                 goto done;
1021         }
1022
1023         ret = ltdb_search_dn1(module, dn, msg);
1024         if (ret != LDB_SUCCESS) {
1025                 goto done;
1026         }
1027
1028         switch (seq->type) {
1029         case LDB_SEQ_HIGHEST_SEQ:
1030                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1031                 break;
1032         case LDB_SEQ_NEXT:
1033                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1034                 res->seq_num++;
1035                 break;
1036         case LDB_SEQ_HIGHEST_TIMESTAMP:
1037                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1038                 if (date) {
1039                         res->seq_num = ldb_string_to_time(date);
1040                 } else {
1041                         res->seq_num = 0;
1042                         /* zero is as good as anything when we don't know */
1043                 }
1044                 break;
1045         }
1046
1047         *ext = talloc_zero(req, struct ldb_extended);
1048         if (*ext == NULL) {
1049                 ret = LDB_ERR_OPERATIONS_ERROR;
1050                 goto done;
1051         }
1052         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1053         (*ext)->data = talloc_steal(*ext, res);
1054
1055 done:
1056         talloc_free(tmp_ctx);
1057         ltdb_unlock_read(module);
1058         return ret;
1059 }
1060
1061 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1062 {
1063         struct ldb_context *ldb;
1064         struct ldb_request *req;
1065         struct ldb_reply *ares;
1066
1067         ldb = ldb_module_get_ctx(ctx->module);
1068         req = ctx->req;
1069
1070         /* if we already returned an error just return */
1071         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1072                 return;
1073         }
1074
1075         ares = talloc_zero(req, struct ldb_reply);
1076         if (!ares) {
1077                 ldb_oom(ldb);
1078                 req->callback(req, NULL);
1079                 return;
1080         }
1081         ares->type = LDB_REPLY_DONE;
1082         ares->error = error;
1083
1084         req->callback(req, ares);
1085 }
1086
1087 static void ltdb_timeout(struct tevent_context *ev,
1088                           struct tevent_timer *te,
1089                           struct timeval t,
1090                           void *private_data)
1091 {
1092         struct ltdb_context *ctx;
1093         ctx = talloc_get_type(private_data, struct ltdb_context);
1094
1095         if (!ctx->request_terminated) {
1096                 /* request is done now */
1097                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1098         }
1099
1100         if (!ctx->request_terminated) {
1101                 /* neutralize the spy */
1102                 ctx->spy->ctx = NULL;
1103         }
1104         talloc_free(ctx);
1105 }
1106
1107 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1108                                         struct ldb_extended *ext,
1109                                         int error)
1110 {
1111         struct ldb_context *ldb;
1112         struct ldb_request *req;
1113         struct ldb_reply *ares;
1114
1115         ldb = ldb_module_get_ctx(ctx->module);
1116         req = ctx->req;
1117
1118         /* if we already returned an error just return */
1119         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1120                 return;
1121         }
1122
1123         ares = talloc_zero(req, struct ldb_reply);
1124         if (!ares) {
1125                 ldb_oom(ldb);
1126                 req->callback(req, NULL);
1127                 return;
1128         }
1129         ares->type = LDB_REPLY_DONE;
1130         ares->response = ext;
1131         ares->error = error;
1132
1133         req->callback(req, ares);
1134 }
1135
1136 static void ltdb_handle_extended(struct ltdb_context *ctx)
1137 {
1138         struct ldb_extended *ext = NULL;
1139         int ret;
1140
1141         if (strcmp(ctx->req->op.extended.oid,
1142                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1143                 /* get sequence number */
1144                 ret = ltdb_sequence_number(ctx, &ext);
1145         } else {
1146                 /* not recognized */
1147                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1148         }
1149
1150         ltdb_request_extended_done(ctx, ext, ret);
1151 }
1152
1153 static void ltdb_callback(struct tevent_context *ev,
1154                           struct tevent_timer *te,
1155                           struct timeval t,
1156                           void *private_data)
1157 {
1158         struct ltdb_context *ctx;
1159         int ret;
1160
1161         ctx = talloc_get_type(private_data, struct ltdb_context);
1162
1163         if (ctx->request_terminated) {
1164                 goto done;
1165         }
1166
1167         switch (ctx->req->operation) {
1168         case LDB_SEARCH:
1169                 ret = ltdb_search(ctx);
1170                 break;
1171         case LDB_ADD:
1172                 ret = ltdb_add(ctx);
1173                 break;
1174         case LDB_MODIFY:
1175                 ret = ltdb_modify(ctx);
1176                 break;
1177         case LDB_DELETE:
1178                 ret = ltdb_delete(ctx);
1179                 break;
1180         case LDB_RENAME:
1181                 ret = ltdb_rename(ctx);
1182                 break;
1183         case LDB_EXTENDED:
1184                 ltdb_handle_extended(ctx);
1185                 goto done;
1186         default:
1187                 /* no other op supported */
1188                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1189         }
1190
1191         if (!ctx->request_terminated) {
1192                 /* request is done now */
1193                 ltdb_request_done(ctx, ret);
1194         }
1195
1196 done:
1197         if (!ctx->request_terminated) {
1198                 /* neutralize the spy */
1199                 ctx->spy->ctx = NULL;
1200         }
1201         talloc_free(ctx);
1202 }
1203
1204 static int ltdb_request_destructor(void *ptr)
1205 {
1206         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1207
1208         if (spy->ctx != NULL) {
1209                 spy->ctx->request_terminated = true;
1210         }
1211
1212         return 0;
1213 }
1214
1215 static int ltdb_handle_request(struct ldb_module *module,
1216                                struct ldb_request *req)
1217 {
1218         struct ldb_context *ldb;
1219         struct tevent_context *ev;
1220         struct ltdb_context *ac;
1221         struct tevent_timer *te;
1222         struct timeval tv;
1223
1224         if (check_critical_controls(req->controls)) {
1225                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1226         }
1227
1228         ldb = ldb_module_get_ctx(module);
1229
1230         if (req->starttime == 0 || req->timeout == 0) {
1231                 ldb_set_errstring(ldb, "Invalid timeout settings");
1232                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1233         }
1234
1235         ev = ldb_get_event_context(ldb);
1236
1237         ac = talloc_zero(ldb, struct ltdb_context);
1238         if (ac == NULL) {
1239                 ldb_oom(ldb);
1240                 return LDB_ERR_OPERATIONS_ERROR;
1241         }
1242
1243         ac->module = module;
1244         ac->req = req;
1245
1246         tv.tv_sec = 0;
1247         tv.tv_usec = 0;
1248         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1249         if (NULL == te) {
1250                 talloc_free(ac);
1251                 return LDB_ERR_OPERATIONS_ERROR;
1252         }
1253
1254         tv.tv_sec = req->starttime + req->timeout;
1255         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1256         if (NULL == ac->timeout_event) {
1257                 talloc_free(ac);
1258                 return LDB_ERR_OPERATIONS_ERROR;
1259         }
1260
1261         /* set a spy so that we do not try to use the request context
1262          * if it is freed before ltdb_callback fires */
1263         ac->spy = talloc(req, struct ltdb_req_spy);
1264         if (NULL == ac->spy) {
1265                 talloc_free(ac);
1266                 return LDB_ERR_OPERATIONS_ERROR;
1267         }
1268         ac->spy->ctx = ac;
1269
1270         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1271
1272         return LDB_SUCCESS;
1273 }
1274
1275 static const struct ldb_module_ops ltdb_ops = {
1276         .name              = "tdb",
1277         .search            = ltdb_handle_request,
1278         .add               = ltdb_handle_request,
1279         .modify            = ltdb_handle_request,
1280         .del               = ltdb_handle_request,
1281         .rename            = ltdb_handle_request,
1282         .extended          = ltdb_handle_request,
1283         .start_transaction = ltdb_start_trans,
1284         .end_transaction   = ltdb_end_trans,
1285         .prepare_commit    = ltdb_prepare_commit,
1286         .del_transaction   = ltdb_del_trans,
1287 };
1288
1289 /*
1290   connect to the database
1291 */
1292 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1293                         unsigned int flags, const char *options[],
1294                         struct ldb_module **_module)
1295 {
1296         struct ldb_module *module;
1297         const char *path;
1298         int tdb_flags, open_flags;
1299         struct ltdb_private *ltdb;
1300
1301         /* parse the url */
1302         if (strchr(url, ':')) {
1303                 if (strncmp(url, "tdb://", 6) != 0) {
1304                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1305                                   "Invalid tdb URL '%s'", url);
1306                         return -1;
1307                 }
1308                 path = url+6;
1309         } else {
1310                 path = url;
1311         }
1312
1313         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1314
1315         /* check for the 'nosync' option */
1316         if (flags & LDB_FLG_NOSYNC) {
1317                 tdb_flags |= TDB_NOSYNC;
1318         }
1319
1320         /* and nommap option */
1321         if (flags & LDB_FLG_NOMMAP) {
1322                 tdb_flags |= TDB_NOMMAP;
1323         }
1324
1325         if (flags & LDB_FLG_RDONLY) {
1326                 open_flags = O_RDONLY;
1327         } else {
1328                 open_flags = O_CREAT | O_RDWR;
1329         }
1330
1331         ltdb = talloc_zero(ldb, struct ltdb_private);
1332         if (!ltdb) {
1333                 ldb_oom(ldb);
1334                 return -1;
1335         }
1336
1337         /* note that we use quite a large default hash size */
1338         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1339                                    tdb_flags, open_flags,
1340                                    ldb_get_create_perms(ldb), ldb);
1341         if (!ltdb->tdb) {
1342                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1343                           "Unable to open tdb '%s'", path);
1344                 talloc_free(ltdb);
1345                 return -1;
1346         }
1347
1348         ltdb->sequence_number = 0;
1349
1350         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1351         if (!module) {
1352                 talloc_free(ltdb);
1353                 return -1;
1354         }
1355         ldb_module_set_private(module, ltdb);
1356
1357         if (ltdb_cache_load(module) != 0) {
1358                 talloc_free(module);
1359                 talloc_free(ltdb);
1360                 return -1;
1361         }
1362
1363         *_module = module;
1364         return 0;
1365 }
1366
1367 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1368         .name = "tdb",
1369         .connect_fn = ltdb_connect
1370 };