Merge branch 'master' of git://git.samba.org/samba
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         int ret = 0;
92
93         if (ltdb->in_transaction == 0 &&
94             ltdb->read_lock_count == 0) {
95                 ret = tdb_lockall_read(ltdb->tdb);
96         }
97         if (ret == 0) {
98                 ltdb->read_lock_count++;
99         }
100         return ret;
101 }
102
103 /*
104   unlock the database after a ltdb_lock_read()
105 */
106 int ltdb_unlock_read(struct ldb_module *module)
107 {
108         void *data = ldb_module_get_private(module);
109         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
110         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
111                 return tdb_unlockall_read(ltdb->tdb);
112         }
113         ltdb->read_lock_count--;
114         return 0;
115 }
116
117
118 /*
119   form a TDB_DATA for a record key
120   caller frees
121
122   note that the key for a record can depend on whether the
123   dn refers to a case sensitive index record or not
124 */
125 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
126 {
127         struct ldb_context *ldb = ldb_module_get_ctx(module);
128         TDB_DATA key;
129         char *key_str = NULL;
130         const char *dn_folded = NULL;
131
132         /*
133           most DNs are case insensitive. The exception is index DNs for
134           case sensitive attributes
135
136           there are 3 cases dealt with in this code:
137
138           1) if the dn doesn't start with @ then uppercase the attribute
139              names and the attributes values of case insensitive attributes
140           2) if the dn starts with @ then leave it alone -
141              the indexing code handles the rest
142         */
143
144         dn_folded = ldb_dn_get_casefold(dn);
145         if (!dn_folded) {
146                 goto failed;
147         }
148
149         key_str = talloc_strdup(ldb, "DN=");
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
155         if (!key_str) {
156                 goto failed;
157         }
158
159         key.dptr = (uint8_t *)key_str;
160         key.dsize = strlen(key_str) + 1;
161
162         return key;
163
164 failed:
165         errno = ENOMEM;
166         key.dptr = NULL;
167         key.dsize = 0;
168         return key;
169 }
170
171 /*
172   check special dn's have valid attributes
173   currently only @ATTRIBUTES is checked
174 */
175 static int ltdb_check_special_dn(struct ldb_module *module,
176                           const struct ldb_message *msg)
177 {
178         struct ldb_context *ldb = ldb_module_get_ctx(module);
179         int i, j;
180
181         if (! ldb_dn_is_special(msg->dn) ||
182             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
183                 return LDB_SUCCESS;
184         }
185
186         /* we have @ATTRIBUTES, let's check attributes are fine */
187         /* should we check that we deny multivalued attributes ? */
188         for (i = 0; i < msg->num_elements; i++) {
189                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
190
191                 for (j = 0; j < msg->elements[i].num_values; j++) {
192                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
193                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
194                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
195                         }
196                 }
197         }
198
199         return LDB_SUCCESS;
200 }
201
202
203 /*
204   we've made a modification to a dn - possibly reindex and
205   update sequence number
206 */
207 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
208 {
209         int ret = LDB_SUCCESS;
210         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
211
212         /* only allow modifies inside a transaction, otherwise the
213          * ldb is unsafe */
214         if (ltdb->in_transaction == 0) {
215                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
216                 return LDB_ERR_OPERATIONS_ERROR;
217         }
218
219         if (ldb_dn_is_special(dn) &&
220             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
221              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
222                 ret = ltdb_reindex(module);
223         }
224
225         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
226         if (ret == LDB_SUCCESS &&
227             !(ldb_dn_is_special(dn) &&
228               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
229                 ret = ltdb_increase_sequence_number(module);
230         }
231
232         /* If the modify was to @OPTIONS, reload the cache */
233         if (ldb_dn_is_special(dn) &&
234             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
235                 ret = ltdb_cache_reload(module);
236         }
237
238         return ret;
239 }
240
241 /*
242   store a record into the db
243 */
244 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
245 {
246         void *data = ldb_module_get_private(module);
247         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
248         TDB_DATA tdb_key, tdb_data;
249         int ret = LDB_SUCCESS;
250
251         tdb_key = ltdb_key(module, msg->dn);
252         if (tdb_key.dptr == NULL) {
253                 return LDB_ERR_OTHER;
254         }
255
256         ret = ltdb_pack_data(module, msg, &tdb_data);
257         if (ret == -1) {
258                 talloc_free(tdb_key.dptr);
259                 return LDB_ERR_OTHER;
260         }
261
262         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
263         if (ret == -1) {
264                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
265                 goto done;
266         }
267
268 done:
269         talloc_free(tdb_key.dptr);
270         talloc_free(tdb_data.dptr);
271
272         return ret;
273 }
274
275
276 static int ltdb_add_internal(struct ldb_module *module,
277                              const struct ldb_message *msg)
278 {
279         struct ldb_context *ldb = ldb_module_get_ctx(module);
280         int ret = LDB_SUCCESS, i;
281
282         ret = ltdb_check_special_dn(module, msg);
283         if (ret != LDB_SUCCESS) {
284                 return ret;
285         }
286
287         if (ltdb_cache_load(module) != 0) {
288                 return LDB_ERR_OPERATIONS_ERROR;
289         }
290
291         for (i=0;i<msg->num_elements;i++) {
292                 struct ldb_message_element *el = &msg->elements[i];
293                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
294
295                 if (el->num_values == 0) {
296                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
297                                                el->name, ldb_dn_get_linearized(msg->dn));
298                         return LDB_ERR_CONSTRAINT_VIOLATION;
299                 }
300                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
301                         if (el->num_values > 1) {
302                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
303                                                        el->name, ldb_dn_get_linearized(msg->dn));
304                                 return LDB_ERR_CONSTRAINT_VIOLATION;
305                         }
306                 }
307         }
308
309         ret = ltdb_store(module, msg, TDB_INSERT);
310         if (ret != LDB_SUCCESS) {
311                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
312                         ldb_asprintf_errstring(ldb,
313                                                "Entry %s already exists",
314                                                ldb_dn_get_linearized(msg->dn));
315                 }
316                 return ret;
317         }
318
319         ret = ltdb_index_add_new(module, msg);
320         if (ret != LDB_SUCCESS) {
321                 return ret;
322         }
323
324         ret = ltdb_modified(module, msg->dn);
325
326         return ret;
327 }
328
329 /*
330   add a record to the database
331 */
332 static int ltdb_add(struct ltdb_context *ctx)
333 {
334         struct ldb_module *module = ctx->module;
335         struct ldb_request *req = ctx->req;
336         int ret = LDB_SUCCESS;
337
338         ldb_request_set_state(req, LDB_ASYNC_PENDING);
339
340         if (ltdb_cache_load(module) != 0) {
341                 return LDB_ERR_OPERATIONS_ERROR;
342         }
343
344         ret = ltdb_add_internal(module, req->op.add.message);
345
346         return ret;
347 }
348
349 /*
350   delete a record from the database, not updating indexes (used for deleting
351   index records)
352 */
353 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
354 {
355         void *data = ldb_module_get_private(module);
356         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
357         TDB_DATA tdb_key;
358         int ret;
359
360         tdb_key = ltdb_key(module, dn);
361         if (!tdb_key.dptr) {
362                 return LDB_ERR_OTHER;
363         }
364
365         ret = tdb_delete(ltdb->tdb, tdb_key);
366         talloc_free(tdb_key.dptr);
367
368         if (ret != 0) {
369                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
370         }
371
372         return ret;
373 }
374
375 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         struct ldb_message *msg;
378         int ret = LDB_SUCCESS;
379
380         msg = talloc(module, struct ldb_message);
381         if (msg == NULL) {
382                 return LDB_ERR_OPERATIONS_ERROR;
383         }
384
385         /* in case any attribute of the message was indexed, we need
386            to fetch the old record */
387         ret = ltdb_search_dn1(module, dn, msg);
388         if (ret != LDB_SUCCESS) {
389                 /* not finding the old record is an error */
390                 goto done;
391         }
392
393         ret = ltdb_delete_noindex(module, dn);
394         if (ret != LDB_SUCCESS) {
395                 goto done;
396         }
397
398         /* remove any indexed attributes */
399         ret = ltdb_index_delete(module, msg);
400         if (ret != LDB_SUCCESS) {
401                 goto done;
402         }
403
404         ret = ltdb_modified(module, dn);
405         if (ret != LDB_SUCCESS) {
406                 goto done;
407         }
408
409 done:
410         talloc_free(msg);
411         return ret;
412 }
413
414 /*
415   delete a record from the database
416 */
417 static int ltdb_delete(struct ltdb_context *ctx)
418 {
419         struct ldb_module *module = ctx->module;
420         struct ldb_request *req = ctx->req;
421         int ret = LDB_SUCCESS;
422
423         ldb_request_set_state(req, LDB_ASYNC_PENDING);
424
425         if (ltdb_cache_load(module) != 0) {
426                 return LDB_ERR_OPERATIONS_ERROR;
427         }
428
429         ret = ltdb_delete_internal(module, req->op.del.dn);
430
431         return ret;
432 }
433
434 /*
435   find an element by attribute name. At the moment this does a linear search,
436   it should be re-coded to use a binary search once all places that modify
437   records guarantee sorted order
438
439   return the index of the first matching element if found, otherwise -1
440 */
441 static int find_element(const struct ldb_message *msg, const char *name)
442 {
443         unsigned int i;
444         for (i=0;i<msg->num_elements;i++) {
445                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
446                         return i;
447                 }
448         }
449         return -1;
450 }
451
452
453 /*
454   add an element to an existing record. Assumes a elements array that we
455   can call re-alloc on, and assumed that we can re-use the data pointers from
456   the passed in additional values. Use with care!
457
458   returns 0 on success, -1 on failure (and sets errno)
459 */
460 static int ltdb_msg_add_element(struct ldb_context *ldb,
461                                 struct ldb_message *msg,
462                                 struct ldb_message_element *el)
463 {
464         struct ldb_message_element *e2;
465         unsigned int i;
466
467         if (el->num_values == 0) {
468                 /* nothing to do here - we don't add empty elements */
469                 return 0;
470         }
471
472         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
473                               msg->num_elements+1);
474         if (!e2) {
475                 errno = ENOMEM;
476                 return -1;
477         }
478
479         msg->elements = e2;
480
481         e2 = &msg->elements[msg->num_elements];
482
483         e2->name = el->name;
484         e2->flags = el->flags;
485         e2->values = talloc_array(msg->elements,
486                                   struct ldb_val, el->num_values);
487         if (!e2->values) {
488                 errno = ENOMEM;
489                 return -1;
490         }
491         for (i=0;i<el->num_values;i++) {
492                 e2->values[i] = el->values[i];
493         }
494         e2->num_values = el->num_values;
495
496         ++msg->num_elements;
497
498         return 0;
499 }
500
501 /*
502   delete all elements having a specified attribute name
503 */
504 static int msg_delete_attribute(struct ldb_module *module,
505                                 struct ldb_context *ldb,
506                                 struct ldb_message *msg, const char *name)
507 {
508         unsigned int i;
509         int ret;
510         struct ldb_message_element *el;
511
512         el = ldb_msg_find_element(msg, name);
513         if (el == NULL) {
514                 return -1;
515         }
516         i = el - msg->elements;
517
518         ret = ltdb_index_del_element(module, msg->dn, el);
519         if (ret != LDB_SUCCESS) {
520                 return ret;
521         }
522
523         talloc_free(el->values);
524         if (msg->num_elements > (i+1)) {
525                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
526         }
527         msg->num_elements--;
528         msg->elements = talloc_realloc(msg, msg->elements,
529                                        struct ldb_message_element,
530                                        msg->num_elements);
531         return 0;
532 }
533
534 /*
535   delete all elements matching an attribute name/value
536
537   return 0 on success, -1 on failure
538 */
539 static int msg_delete_element(struct ldb_module *module,
540                               struct ldb_message *msg,
541                               const char *name,
542                               const struct ldb_val *val)
543 {
544         struct ldb_context *ldb = ldb_module_get_ctx(module);
545         unsigned int i;
546         int found, ret;
547         struct ldb_message_element *el;
548         const struct ldb_schema_attribute *a;
549
550         found = find_element(msg, name);
551         if (found == -1) {
552                 return -1;
553         }
554
555         el = &msg->elements[found];
556
557         a = ldb_schema_attribute_by_name(ldb, el->name);
558
559         for (i=0;i<el->num_values;i++) {
560                 if (a->syntax->comparison_fn(ldb, ldb,
561                                              &el->values[i], val) == 0) {
562                         if (el->num_values == 1) {
563                                 return msg_delete_attribute(module, ldb, msg, name);
564                         }
565
566                         ret = ltdb_index_del_value(module, msg->dn, el, i);
567                         if (ret != LDB_SUCCESS) {
568                                 return -1;
569                         }
570
571                         if (i<el->num_values-1) {
572                                 memmove(&el->values[i], &el->values[i+1],
573                                         sizeof(el->values[i])*
574                                                 (el->num_values-(i+1)));
575                         }
576                         el->num_values--;
577
578                         /* per definition we find in a canonicalised message an
579                            attribute value only once. So we are finished here */
580                         return 0;
581                 }
582         }
583
584         /* Not found */
585         return -1;
586 }
587
588
589 /*
590   modify a record - internal interface
591
592   yuck - this is O(n^2). Luckily n is usually small so we probably
593   get away with it, but if we ever have really large attribute lists
594   then we'll need to look at this again
595
596   'req' is optional, and is used to specify controls if supplied
597 */
598 int ltdb_modify_internal(struct ldb_module *module,
599                          const struct ldb_message *msg,
600                          struct ldb_request *req)
601 {
602         struct ldb_context *ldb = ldb_module_get_ctx(module);
603         void *data = ldb_module_get_private(module);
604         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
605         TDB_DATA tdb_key, tdb_data;
606         struct ldb_message *msg2;
607         unsigned i, j;
608         int ret = LDB_SUCCESS, idx;
609
610         tdb_key = ltdb_key(module, msg->dn);
611         if (!tdb_key.dptr) {
612                 return LDB_ERR_OTHER;
613         }
614
615         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
616         if (!tdb_data.dptr) {
617                 talloc_free(tdb_key.dptr);
618                 return ltdb_err_map(tdb_error(ltdb->tdb));
619         }
620
621         msg2 = talloc(tdb_key.dptr, struct ldb_message);
622         if (msg2 == NULL) {
623                 free(tdb_data.dptr);
624                 ret = LDB_ERR_OTHER;
625                 goto done;
626         }
627
628         ret = ltdb_unpack_data(module, &tdb_data, msg2);
629         free(tdb_data.dptr);
630         if (ret == -1) {
631                 ret = LDB_ERR_OTHER;
632                 goto done;
633         }
634
635         if (!msg2->dn) {
636                 msg2->dn = msg->dn;
637         }
638
639         for (i=0; i<msg->num_elements; i++) {
640                 struct ldb_message_element *el = &msg->elements[i], *el2;
641                 struct ldb_val *vals;
642                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
643                 const char *dn;
644
645                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
646                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
647                                                ldb_dn_get_linearized(msg2->dn));
648                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
649                         goto done;
650                 }
651
652                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
653                 case LDB_FLAG_MOD_ADD:
654                         if (el->num_values == 0) {
655                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
656                                                        el->name, ldb_dn_get_linearized(msg2->dn));
657                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
658                                 goto done;
659                         }
660
661                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
662                                 if (el->num_values > 1) {
663                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
664                                                                el->name, ldb_dn_get_linearized(msg2->dn));
665                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
666                                         goto done;
667                                 }
668                         }
669
670                         /* Checks if element already exists */
671                         idx = find_element(msg2, el->name);
672                         if (idx == -1) {
673                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
674                                         ret = LDB_ERR_OTHER;
675                                         goto done;
676                                 }
677                                 ret = ltdb_index_add_element(module, msg2->dn, el);
678                                 if (ret != LDB_SUCCESS) {
679                                         goto done;
680                                 }
681                         } else {
682                                 /* We cannot add another value on a existing one
683                                    if the attribute is single-valued */
684                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
685                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
686                                                                el->name, ldb_dn_get_linearized(msg2->dn));
687                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
688                                         goto done;
689                                 }
690
691                                 el2 = &(msg2->elements[idx]);
692
693                                 /* Check that values don't exist yet on multi-
694                                    valued attributes or aren't provided twice */
695                                 for (j=0; j<el->num_values; j++) {
696                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
697                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
698                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
699                                                 goto done;
700                                         }
701                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
702                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
703                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
704                                                 goto done;
705                                         }
706                                 }
707
708                                 /* Now combine existing and new values to a new
709                                    attribute record */
710                                 vals = talloc_realloc(msg2->elements,
711                                                       el2->values, struct ldb_val,
712                                                       el2->num_values + el->num_values);
713                                 if (vals == NULL) {
714                                         ldb_oom(ldb);
715                                         ret = LDB_ERR_OTHER;
716                                         goto done;
717                                 }
718
719                                 for (j=0; j<el->num_values; j++) {
720                                         vals[el2->num_values + j] =
721                                                 ldb_val_dup(vals, &el->values[j]);
722                                 }
723
724                                 el2->values = vals;
725                                 el2->num_values += el->num_values;
726
727                                 ret = ltdb_index_add_element(module, msg2->dn, el);
728                                 if (ret != LDB_SUCCESS) {
729                                         goto done;
730                                 }
731                         }
732
733                         break;
734
735                 case LDB_FLAG_MOD_REPLACE:
736                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
737                                 /* the RELAX control overrides this
738                                    check for replace. This is needed as
739                                    DRS replication can produce multiple
740                                    values here for a single valued
741                                    attribute when the values are deleted
742                                    links
743                                 */
744                                 if (el->num_values > 1 &&
745                                     (!req || !ldb_request_get_control(req, LDB_CONTROL_RELAX_OID))) {
746                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
747                                                                el->name, ldb_dn_get_linearized(msg2->dn));
748                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
749                                         goto done;
750                                 }
751                         }
752
753                         /* TODO: This is O(n^2) - replace with more efficient check */
754                         for (j=0; j<el->num_values; j++) {
755                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
756                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
757                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
758                                         goto done;
759                                 }
760                         }
761
762                         idx = find_element(msg2, el->name);
763                         if (idx != -1) {
764                                 el2 = &(msg2->elements[idx]);
765                                 if (ldb_msg_element_compare(el, el2) == 0) {
766                                         /* we are replacing with the same values */
767                                         continue;
768                                 }
769                         
770                                 /* Delete the attribute if it exists in the DB */
771                                 if (msg_delete_attribute(module, ldb, msg2, el->name) != 0) {
772                                         ret = LDB_ERR_OTHER;
773                                         goto done;
774                                 }
775                         }
776
777                         /* Recreate it with the new values */
778                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
779                                 ret = LDB_ERR_OTHER;
780                                 goto done;
781                         }
782
783                         ret = ltdb_index_add_element(module, msg2->dn, el);
784                         if (ret != LDB_SUCCESS) {
785                                 goto done;
786                         }
787
788                         break;
789
790                 case LDB_FLAG_MOD_DELETE:
791                         dn = ldb_dn_get_linearized(msg2->dn);
792                         if (dn == NULL) {
793                                 ret = LDB_ERR_OTHER;
794                                 goto done;
795                         }
796
797                         if (msg->elements[i].num_values == 0) {
798                                 /* Delete the whole attribute */
799                                 if (msg_delete_attribute(module, ldb, msg2,
800                                                          msg->elements[i].name) != 0) {
801                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
802                                                                msg->elements[i].name, dn);
803                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
804                                         goto done;
805                                 }
806                         } else {
807                                 /* Delete specified values from an attribute */
808                                 for (j=0; j < msg->elements[i].num_values; j++) {
809                                         if (msg_delete_element(module,
810                                                                msg2,
811                                                                msg->elements[i].name,
812                                                                &msg->elements[i].values[j]) != 0) {
813                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
814                                                                        msg->elements[i].name, dn);
815                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
816                                                 goto done;
817                                         }
818                                 }
819                         }
820                         break;
821                 default:
822                         ldb_asprintf_errstring(ldb,
823                                 "Invalid ldb_modify flags on %s: 0x%x",
824                                 msg->elements[i].name,
825                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
826                         ret = LDB_ERR_PROTOCOL_ERROR;
827                         goto done;
828                 }
829         }
830
831         ret = ltdb_store(module, msg2, TDB_MODIFY);
832         if (ret != LDB_SUCCESS) {
833                 goto done;
834         }
835
836         ret = ltdb_modified(module, msg2->dn);
837         if (ret != LDB_SUCCESS) {
838                 goto done;
839         }
840
841 done:
842         talloc_free(tdb_key.dptr);
843         return ret;
844 }
845
846 /*
847   modify a record
848 */
849 static int ltdb_modify(struct ltdb_context *ctx)
850 {
851         struct ldb_module *module = ctx->module;
852         struct ldb_request *req = ctx->req;
853         int ret = LDB_SUCCESS;
854
855         ret = ltdb_check_special_dn(module, req->op.mod.message);
856         if (ret != LDB_SUCCESS) {
857                 return ret;
858         }
859
860         ldb_request_set_state(req, LDB_ASYNC_PENDING);
861
862         if (ltdb_cache_load(module) != 0) {
863                 return LDB_ERR_OPERATIONS_ERROR;
864         }
865
866         ret = ltdb_modify_internal(module, req->op.mod.message, req);
867
868         return ret;
869 }
870
871 /*
872   rename a record
873 */
874 static int ltdb_rename(struct ltdb_context *ctx)
875 {
876         struct ldb_module *module = ctx->module;
877         struct ldb_request *req = ctx->req;
878         struct ldb_message *msg;
879         int ret = LDB_SUCCESS;
880
881         ldb_request_set_state(req, LDB_ASYNC_PENDING);
882
883         if (ltdb_cache_load(ctx->module) != 0) {
884                 return LDB_ERR_OPERATIONS_ERROR;
885         }
886
887         msg = talloc(ctx, struct ldb_message);
888         if (msg == NULL) {
889                 return LDB_ERR_OPERATIONS_ERROR;
890         }
891
892         /* in case any attribute of the message was indexed, we need
893            to fetch the old record */
894         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
895         if (ret != LDB_SUCCESS) {
896                 /* not finding the old record is an error */
897                 return ret;
898         }
899
900         /* Always delete first then add, to avoid conflicts with
901          * unique indexes. We rely on the transaction to make this
902          * atomic
903          */
904         ret = ltdb_delete_internal(module, msg->dn);
905         if (ret != LDB_SUCCESS) {
906                 return ret;
907         }
908
909         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
910         if (msg->dn == NULL) {
911                 return LDB_ERR_OPERATIONS_ERROR;
912         }
913
914         ret = ltdb_add_internal(module, msg);
915
916         return ret;
917 }
918
919 static int ltdb_start_trans(struct ldb_module *module)
920 {
921         void *data = ldb_module_get_private(module);
922         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
923
924         if (tdb_transaction_start(ltdb->tdb) != 0) {
925                 return ltdb_err_map(tdb_error(ltdb->tdb));
926         }
927
928         ltdb->in_transaction++;
929
930         ltdb_index_transaction_start(module);
931
932         return LDB_SUCCESS;
933 }
934
935 static int ltdb_prepare_commit(struct ldb_module *module)
936 {
937         void *data = ldb_module_get_private(module);
938         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
939
940         if (ltdb->in_transaction != 1) {
941                 return LDB_SUCCESS;
942         }
943
944         if (ltdb_index_transaction_commit(module) != 0) {
945                 tdb_transaction_cancel(ltdb->tdb);
946                 ltdb->in_transaction--;
947                 return ltdb_err_map(tdb_error(ltdb->tdb));
948         }
949
950         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
951                 ltdb->in_transaction--;
952                 return ltdb_err_map(tdb_error(ltdb->tdb));
953         }
954
955         ltdb->prepared_commit = true;
956
957         return LDB_SUCCESS;
958 }
959
960 static int ltdb_end_trans(struct ldb_module *module)
961 {
962         void *data = ldb_module_get_private(module);
963         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
964
965         if (!ltdb->prepared_commit) {
966                 int ret = ltdb_prepare_commit(module);
967                 if (ret != LDB_SUCCESS) {
968                         return ret;
969                 }
970         }
971
972         ltdb->in_transaction--;
973         ltdb->prepared_commit = false;
974
975         if (tdb_transaction_commit(ltdb->tdb) != 0) {
976                 return ltdb_err_map(tdb_error(ltdb->tdb));
977         }
978
979         return LDB_SUCCESS;
980 }
981
982 static int ltdb_del_trans(struct ldb_module *module)
983 {
984         void *data = ldb_module_get_private(module);
985         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
986
987         ltdb->in_transaction--;
988
989         if (ltdb_index_transaction_cancel(module) != 0) {
990                 tdb_transaction_cancel(ltdb->tdb);
991                 return ltdb_err_map(tdb_error(ltdb->tdb));
992         }
993
994         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
995                 return ltdb_err_map(tdb_error(ltdb->tdb));
996         }
997
998         return LDB_SUCCESS;
999 }
1000
1001 /*
1002   return sequenceNumber from @BASEINFO
1003 */
1004 static int ltdb_sequence_number(struct ltdb_context *ctx,
1005                                 struct ldb_extended **ext)
1006 {
1007         struct ldb_context *ldb;
1008         struct ldb_module *module = ctx->module;
1009         struct ldb_request *req = ctx->req;
1010         TALLOC_CTX *tmp_ctx;
1011         struct ldb_seqnum_request *seq;
1012         struct ldb_seqnum_result *res;
1013         struct ldb_message *msg = NULL;
1014         struct ldb_dn *dn;
1015         const char *date;
1016         int ret = LDB_SUCCESS;
1017
1018         ldb = ldb_module_get_ctx(module);
1019
1020         seq = talloc_get_type(req->op.extended.data,
1021                                 struct ldb_seqnum_request);
1022         if (seq == NULL) {
1023                 return LDB_ERR_OPERATIONS_ERROR;
1024         }
1025
1026         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1027
1028         if (ltdb_lock_read(module) != 0) {
1029                 return LDB_ERR_OPERATIONS_ERROR;
1030         }
1031
1032         res = talloc_zero(req, struct ldb_seqnum_result);
1033         if (res == NULL) {
1034                 ret = LDB_ERR_OPERATIONS_ERROR;
1035                 goto done;
1036         }
1037         tmp_ctx = talloc_new(req);
1038         if (tmp_ctx == NULL) {
1039                 ret = LDB_ERR_OPERATIONS_ERROR;
1040                 goto done;
1041         }
1042
1043         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1044
1045         msg = talloc(tmp_ctx, struct ldb_message);
1046         if (msg == NULL) {
1047                 ret = LDB_ERR_OPERATIONS_ERROR;
1048                 goto done;
1049         }
1050
1051         ret = ltdb_search_dn1(module, dn, msg);
1052         if (ret != LDB_SUCCESS) {
1053                 goto done;
1054         }
1055
1056         switch (seq->type) {
1057         case LDB_SEQ_HIGHEST_SEQ:
1058                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1059                 break;
1060         case LDB_SEQ_NEXT:
1061                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1062                 res->seq_num++;
1063                 break;
1064         case LDB_SEQ_HIGHEST_TIMESTAMP:
1065                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1066                 if (date) {
1067                         res->seq_num = ldb_string_to_time(date);
1068                 } else {
1069                         res->seq_num = 0;
1070                         /* zero is as good as anything when we don't know */
1071                 }
1072                 break;
1073         }
1074
1075         *ext = talloc_zero(req, struct ldb_extended);
1076         if (*ext == NULL) {
1077                 ret = LDB_ERR_OPERATIONS_ERROR;
1078                 goto done;
1079         }
1080         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1081         (*ext)->data = talloc_steal(*ext, res);
1082
1083 done:
1084         talloc_free(tmp_ctx);
1085         ltdb_unlock_read(module);
1086         return ret;
1087 }
1088
1089 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1090 {
1091         struct ldb_context *ldb;
1092         struct ldb_request *req;
1093         struct ldb_reply *ares;
1094
1095         ldb = ldb_module_get_ctx(ctx->module);
1096         req = ctx->req;
1097
1098         /* if we already returned an error just return */
1099         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1100                 return;
1101         }
1102
1103         ares = talloc_zero(req, struct ldb_reply);
1104         if (!ares) {
1105                 ldb_oom(ldb);
1106                 req->callback(req, NULL);
1107                 return;
1108         }
1109         ares->type = LDB_REPLY_DONE;
1110         ares->error = error;
1111
1112         req->callback(req, ares);
1113 }
1114
1115 static void ltdb_timeout(struct tevent_context *ev,
1116                           struct tevent_timer *te,
1117                           struct timeval t,
1118                           void *private_data)
1119 {
1120         struct ltdb_context *ctx;
1121         ctx = talloc_get_type(private_data, struct ltdb_context);
1122
1123         if (!ctx->request_terminated) {
1124                 /* request is done now */
1125                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1126         }
1127
1128         if (!ctx->request_terminated) {
1129                 /* neutralize the spy */
1130                 ctx->spy->ctx = NULL;
1131         }
1132         talloc_free(ctx);
1133 }
1134
1135 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1136                                         struct ldb_extended *ext,
1137                                         int error)
1138 {
1139         struct ldb_context *ldb;
1140         struct ldb_request *req;
1141         struct ldb_reply *ares;
1142
1143         ldb = ldb_module_get_ctx(ctx->module);
1144         req = ctx->req;
1145
1146         /* if we already returned an error just return */
1147         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1148                 return;
1149         }
1150
1151         ares = talloc_zero(req, struct ldb_reply);
1152         if (!ares) {
1153                 ldb_oom(ldb);
1154                 req->callback(req, NULL);
1155                 return;
1156         }
1157         ares->type = LDB_REPLY_DONE;
1158         ares->response = ext;
1159         ares->error = error;
1160
1161         req->callback(req, ares);
1162 }
1163
1164 static void ltdb_handle_extended(struct ltdb_context *ctx)
1165 {
1166         struct ldb_extended *ext = NULL;
1167         int ret;
1168
1169         if (strcmp(ctx->req->op.extended.oid,
1170                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1171                 /* get sequence number */
1172                 ret = ltdb_sequence_number(ctx, &ext);
1173         } else {
1174                 /* not recognized */
1175                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1176         }
1177
1178         ltdb_request_extended_done(ctx, ext, ret);
1179 }
1180
1181 static void ltdb_callback(struct tevent_context *ev,
1182                           struct tevent_timer *te,
1183                           struct timeval t,
1184                           void *private_data)
1185 {
1186         struct ltdb_context *ctx;
1187         int ret;
1188
1189         ctx = talloc_get_type(private_data, struct ltdb_context);
1190
1191         if (ctx->request_terminated) {
1192                 goto done;
1193         }
1194
1195         switch (ctx->req->operation) {
1196         case LDB_SEARCH:
1197                 ret = ltdb_search(ctx);
1198                 break;
1199         case LDB_ADD:
1200                 ret = ltdb_add(ctx);
1201                 break;
1202         case LDB_MODIFY:
1203                 ret = ltdb_modify(ctx);
1204                 break;
1205         case LDB_DELETE:
1206                 ret = ltdb_delete(ctx);
1207                 break;
1208         case LDB_RENAME:
1209                 ret = ltdb_rename(ctx);
1210                 break;
1211         case LDB_EXTENDED:
1212                 ltdb_handle_extended(ctx);
1213                 goto done;
1214         default:
1215                 /* no other op supported */
1216                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1217         }
1218
1219         if (!ctx->request_terminated) {
1220                 /* request is done now */
1221                 ltdb_request_done(ctx, ret);
1222         }
1223
1224 done:
1225         if (!ctx->request_terminated) {
1226                 /* neutralize the spy */
1227                 ctx->spy->ctx = NULL;
1228         }
1229         talloc_free(ctx);
1230 }
1231
1232 static int ltdb_request_destructor(void *ptr)
1233 {
1234         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1235
1236         if (spy->ctx != NULL) {
1237                 spy->ctx->request_terminated = true;
1238         }
1239
1240         return 0;
1241 }
1242
1243 static int ltdb_handle_request(struct ldb_module *module,
1244                                struct ldb_request *req)
1245 {
1246         struct ldb_context *ldb;
1247         struct tevent_context *ev;
1248         struct ltdb_context *ac;
1249         struct tevent_timer *te;
1250         struct timeval tv;
1251         int i;
1252
1253         ldb = ldb_module_get_ctx(module);
1254
1255         for (i = 0; req->controls && req->controls[i]; i++) {
1256                 if (req->controls[i]->critical) {
1257                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1258                                                req->controls[i]->oid);
1259                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1260                 }
1261         }
1262
1263         if (req->starttime == 0 || req->timeout == 0) {
1264                 ldb_set_errstring(ldb, "Invalid timeout settings");
1265                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1266         }
1267
1268         ev = ldb_get_event_context(ldb);
1269
1270         ac = talloc_zero(ldb, struct ltdb_context);
1271         if (ac == NULL) {
1272                 ldb_oom(ldb);
1273                 return LDB_ERR_OPERATIONS_ERROR;
1274         }
1275
1276         ac->module = module;
1277         ac->req = req;
1278
1279         tv.tv_sec = 0;
1280         tv.tv_usec = 0;
1281         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1282         if (NULL == te) {
1283                 talloc_free(ac);
1284                 return LDB_ERR_OPERATIONS_ERROR;
1285         }
1286
1287         tv.tv_sec = req->starttime + req->timeout;
1288         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1289         if (NULL == ac->timeout_event) {
1290                 talloc_free(ac);
1291                 return LDB_ERR_OPERATIONS_ERROR;
1292         }
1293
1294         /* set a spy so that we do not try to use the request context
1295          * if it is freed before ltdb_callback fires */
1296         ac->spy = talloc(req, struct ltdb_req_spy);
1297         if (NULL == ac->spy) {
1298                 talloc_free(ac);
1299                 return LDB_ERR_OPERATIONS_ERROR;
1300         }
1301         ac->spy->ctx = ac;
1302
1303         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1304
1305         return LDB_SUCCESS;
1306 }
1307
1308 static const struct ldb_module_ops ltdb_ops = {
1309         .name              = "tdb",
1310         .search            = ltdb_handle_request,
1311         .add               = ltdb_handle_request,
1312         .modify            = ltdb_handle_request,
1313         .del               = ltdb_handle_request,
1314         .rename            = ltdb_handle_request,
1315         .extended          = ltdb_handle_request,
1316         .start_transaction = ltdb_start_trans,
1317         .end_transaction   = ltdb_end_trans,
1318         .prepare_commit    = ltdb_prepare_commit,
1319         .del_transaction   = ltdb_del_trans,
1320 };
1321
1322 /*
1323   connect to the database
1324 */
1325 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1326                         unsigned int flags, const char *options[],
1327                         struct ldb_module **_module)
1328 {
1329         struct ldb_module *module;
1330         const char *path;
1331         int tdb_flags, open_flags;
1332         struct ltdb_private *ltdb;
1333
1334         /* parse the url */
1335         if (strchr(url, ':')) {
1336                 if (strncmp(url, "tdb://", 6) != 0) {
1337                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1338                                   "Invalid tdb URL '%s'", url);
1339                         return LDB_ERR_OPERATIONS_ERROR;
1340                 }
1341                 path = url+6;
1342         } else {
1343                 path = url;
1344         }
1345
1346         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1347
1348         /* check for the 'nosync' option */
1349         if (flags & LDB_FLG_NOSYNC) {
1350                 tdb_flags |= TDB_NOSYNC;
1351         }
1352
1353         /* and nommap option */
1354         if (flags & LDB_FLG_NOMMAP) {
1355                 tdb_flags |= TDB_NOMMAP;
1356         }
1357
1358         if (flags & LDB_FLG_RDONLY) {
1359                 open_flags = O_RDONLY;
1360         } else {
1361                 open_flags = O_CREAT | O_RDWR;
1362         }
1363
1364         ltdb = talloc_zero(ldb, struct ltdb_private);
1365         if (!ltdb) {
1366                 ldb_oom(ldb);
1367                 return LDB_ERR_OPERATIONS_ERROR;
1368         }
1369
1370         /* note that we use quite a large default hash size */
1371         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1372                                    tdb_flags, open_flags,
1373                                    ldb_get_create_perms(ldb), ldb);
1374         if (!ltdb->tdb) {
1375                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1376                           "Unable to open tdb '%s'", path);
1377                 talloc_free(ltdb);
1378                 return LDB_ERR_OPERATIONS_ERROR;
1379         }
1380
1381         ltdb->sequence_number = 0;
1382
1383         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1384         if (!module) {
1385                 talloc_free(ltdb);
1386                 return LDB_ERR_OPERATIONS_ERROR;
1387         }
1388         ldb_module_set_private(module, ltdb);
1389         talloc_steal(module, ltdb);
1390
1391         if (ltdb_cache_load(module) != 0) {
1392                 talloc_free(module);
1393                 talloc_free(ltdb);
1394                 return LDB_ERR_OPERATIONS_ERROR;
1395         }
1396
1397         *_module = module;
1398         return LDB_SUCCESS;
1399 }
1400
1401 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1402         .name = "tdb",
1403         .connect_fn = ltdb_connect
1404 };