ldb_tdb: provide ldb_key_dn() and ldb_key_msg()
[vlendec/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int tdb_ret = 0;
102         int ret;
103
104         if (ltdb->in_transaction == 0 &&
105             ltdb->read_lock_count == 0) {
106                 tdb_ret = tdb_lockall_read(ltdb->tdb);
107         }
108         if (tdb_ret == 0) {
109                 ltdb->read_lock_count++;
110                 return LDB_SUCCESS;
111         }
112         ret = ltdb_err_map(tdb_error(ltdb->tdb));
113         if (ret == LDB_SUCCESS) {
114                 ret = LDB_ERR_OPERATIONS_ERROR;
115         }
116         ldb_debug_set(ldb_module_get_ctx(module),
117                       LDB_DEBUG_FATAL,
118                       "Failure during ltdb_lock_read(): %s -> %s",
119                       tdb_errorstr(ltdb->tdb),
120                       ldb_strerror(ret));
121         return ret;
122 }
123
124 /*
125   unlock the database after a ltdb_lock_read()
126 */
127 int ltdb_unlock_read(struct ldb_module *module)
128 {
129         void *data = ldb_module_get_private(module);
130         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
131         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
132                 tdb_unlockall_read(ltdb->tdb);
133                 ltdb->read_lock_count--;
134                 return 0;
135         }
136         ltdb->read_lock_count--;
137         return 0;
138 }
139
140
141 /* 
142  * Determine if this key could hold a record.  We allow the new GUID
143  * index, the old DN index and a possible future ID=
144  */
145 bool ltdb_key_is_record(TDB_DATA key)
146 {
147         if (key.dsize < 4) {
148                 return false;
149         }
150
151         if (memcmp(key.dptr, "DN=", 3) == 0) {
152                 return true;
153         }
154         
155         if (memcmp(key.dptr, "ID=", 3) == 0) {
156                 return true;
157         }
158
159         if (key.dsize < 6) {
160                 return false;
161         }
162
163         if (memcmp(key.dptr, "GUID=", 5) == 0) {
164                 return true;
165         }
166         
167         return false;
168 }
169
170 /*
171   form a TDB_DATA for a record key
172   caller frees
173
174   note that the key for a record can depend on whether the
175   dn refers to a case sensitive index record or not
176 */
177 TDB_DATA ltdb_key_dn(struct ldb_module *module, struct ldb_dn *dn)
178 {
179         struct ldb_context *ldb = ldb_module_get_ctx(module);
180         TDB_DATA key;
181         char *key_str = NULL;
182         const char *dn_folded = NULL;
183
184         /*
185           most DNs are case insensitive. The exception is index DNs for
186           case sensitive attributes
187
188           there are 3 cases dealt with in this code:
189
190           1) if the dn doesn't start with @ then uppercase the attribute
191              names and the attributes values of case insensitive attributes
192           2) if the dn starts with @ then leave it alone -
193              the indexing code handles the rest
194         */
195
196         dn_folded = ldb_dn_get_casefold(dn);
197         if (!dn_folded) {
198                 goto failed;
199         }
200
201         key_str = talloc_strdup(ldb, "DN=");
202         if (!key_str) {
203                 goto failed;
204         }
205
206         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
207         if (!key_str) {
208                 goto failed;
209         }
210
211         key.dptr = (uint8_t *)key_str;
212         key.dsize = strlen(key_str) + 1;
213
214         return key;
215
216 failed:
217         errno = ENOMEM;
218         key.dptr = NULL;
219         key.dsize = 0;
220         return key;
221 }
222
223 /*
224   form a TDB_DATA for a record key
225   caller frees
226
227   note that the key for a record can depend on whether the
228   dn refers to a case sensitive index record or not
229 */
230 TDB_DATA ltdb_key_msg(struct ldb_module *module,
231                       const struct ldb_message *msg)
232 {
233         return ltdb_key_dn(module, msg->dn);
234 }
235
236 /*
237   check special dn's have valid attributes
238   currently only @ATTRIBUTES is checked
239 */
240 static int ltdb_check_special_dn(struct ldb_module *module,
241                                  const struct ldb_message *msg)
242 {
243         struct ldb_context *ldb = ldb_module_get_ctx(module);
244         unsigned int i, j;
245
246         if (! ldb_dn_is_special(msg->dn) ||
247             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
248                 return LDB_SUCCESS;
249         }
250
251         /* we have @ATTRIBUTES, let's check attributes are fine */
252         /* should we check that we deny multivalued attributes ? */
253         for (i = 0; i < msg->num_elements; i++) {
254                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
255
256                 for (j = 0; j < msg->elements[i].num_values; j++) {
257                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
258                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
259                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
260                         }
261                 }
262         }
263
264         return LDB_SUCCESS;
265 }
266
267
268 /*
269   we've made a modification to a dn - possibly reindex and
270   update sequence number
271 */
272 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
273 {
274         int ret = LDB_SUCCESS;
275         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
276
277         /* only allow modifies inside a transaction, otherwise the
278          * ldb is unsafe */
279         if (ltdb->in_transaction == 0) {
280                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
281                 return LDB_ERR_OPERATIONS_ERROR;
282         }
283
284         if (ldb_dn_is_special(dn) &&
285             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
286              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
287         {
288                 if (ltdb->warn_reindex) {
289                         ldb_debug(ldb_module_get_ctx(module),
290                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
291                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
292                 }
293                 ret = ltdb_reindex(module);
294         }
295
296         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
297         if (ret == LDB_SUCCESS &&
298             !(ldb_dn_is_special(dn) &&
299               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
300                 ret = ltdb_increase_sequence_number(module);
301         }
302
303         /* If the modify was to @OPTIONS, reload the cache */
304         if (ret == LDB_SUCCESS &&
305             ldb_dn_is_special(dn) &&
306             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
307                 ret = ltdb_cache_reload(module);
308         }
309
310         return ret;
311 }
312
313 /*
314   store a record into the db
315 */
316 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
317 {
318         void *data = ldb_module_get_private(module);
319         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
320         TDB_DATA tdb_key, tdb_data;
321         struct ldb_val ldb_data;
322         int ret = LDB_SUCCESS;
323
324         if (ltdb->read_only) {
325                 return LDB_ERR_UNWILLING_TO_PERFORM;
326         }
327
328         tdb_key = ltdb_key_msg(module, msg);
329         if (tdb_key.dptr == NULL) {
330                 return LDB_ERR_OTHER;
331         }
332
333         ret = ldb_pack_data(ldb_module_get_ctx(module),
334                             msg, &ldb_data);
335         if (ret == -1) {
336                 talloc_free(tdb_key.dptr);
337                 return LDB_ERR_OTHER;
338         }
339
340         tdb_data.dptr = ldb_data.data;
341         tdb_data.dsize = ldb_data.length;
342
343         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
344         if (ret != 0) {
345                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
346                 goto done;
347         }
348
349 done:
350         talloc_free(tdb_key.dptr);
351         talloc_free(ldb_data.data);
352
353         return ret;
354 }
355
356
357 /*
358   check if a attribute is a single valued, for a given element
359  */
360 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
361                                   struct ldb_message_element *el)
362 {
363         if (!a) return false;
364         if (el != NULL) {
365                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
366                         /* override from a ldb module, for example
367                            used for the description field, which is
368                            marked multi-valued in the schema but which
369                            should not actually accept multiple
370                            values */
371                         return true;
372                 }
373                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
374                         /* override from a ldb module, for example used for
375                            deleted linked attribute entries */
376                         return false;
377                 }
378         }
379         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
380                 return true;
381         }
382         return false;
383 }
384
385 static int ltdb_add_internal(struct ldb_module *module,
386                              const struct ldb_message *msg,
387                              bool check_single_value)
388 {
389         struct ldb_context *ldb = ldb_module_get_ctx(module);
390         int ret = LDB_SUCCESS;
391         unsigned int i;
392
393         for (i=0;i<msg->num_elements;i++) {
394                 struct ldb_message_element *el = &msg->elements[i];
395                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
396
397                 if (el->num_values == 0) {
398                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
399                                                el->name, ldb_dn_get_linearized(msg->dn));
400                         return LDB_ERR_CONSTRAINT_VIOLATION;
401                 }
402                 if (check_single_value &&
403                                 el->num_values > 1 &&
404                                 ldb_tdb_single_valued(a, el)) {
405                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
406                                                el->name, ldb_dn_get_linearized(msg->dn));
407                         return LDB_ERR_CONSTRAINT_VIOLATION;
408                 }
409
410                 /* Do not check "@ATTRIBUTES" for duplicated values */
411                 if (ldb_dn_is_special(msg->dn) &&
412                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
413                         continue;
414                 }
415
416                 if (check_single_value &&
417                     !(el->flags &
418                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
419                         struct ldb_val *duplicate = NULL;
420
421                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
422                                                          el, &duplicate, 0);
423                         if (ret != LDB_SUCCESS) {
424                                 return ret;
425                         }
426                         if (duplicate != NULL) {
427                                 ldb_asprintf_errstring(
428                                         ldb,
429                                         "attribute '%s': value '%.*s' on '%s' "
430                                         "provided more than once in ADD object",
431                                         el->name,
432                                         (int)duplicate->length,
433                                         duplicate->data,
434                                         ldb_dn_get_linearized(msg->dn));
435                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
436                         }
437                 }
438         }
439
440         ret = ltdb_store(module, msg, TDB_INSERT);
441         if (ret != LDB_SUCCESS) {
442                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
443                         ldb_asprintf_errstring(ldb,
444                                                "Entry %s already exists",
445                                                ldb_dn_get_linearized(msg->dn));
446                 }
447                 return ret;
448         }
449
450         ret = ltdb_index_add_new(module, msg);
451         if (ret != LDB_SUCCESS) {
452                 return ret;
453         }
454
455         ret = ltdb_modified(module, msg->dn);
456
457         return ret;
458 }
459
460 /*
461   add a record to the database
462 */
463 static int ltdb_add(struct ltdb_context *ctx)
464 {
465         struct ldb_module *module = ctx->module;
466         struct ldb_request *req = ctx->req;
467         int ret = LDB_SUCCESS;
468
469         ret = ltdb_check_special_dn(module, req->op.add.message);
470         if (ret != LDB_SUCCESS) {
471                 return ret;
472         }
473
474         ldb_request_set_state(req, LDB_ASYNC_PENDING);
475
476         if (ltdb_cache_load(module) != 0) {
477                 return LDB_ERR_OPERATIONS_ERROR;
478         }
479
480         ret = ltdb_add_internal(module, req->op.add.message, true);
481
482         return ret;
483 }
484
485 /*
486   delete a record from the database, not updating indexes (used for deleting
487   index records)
488 */
489 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
490 {
491         void *data = ldb_module_get_private(module);
492         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
493         TDB_DATA tdb_key;
494         int ret;
495
496         if (ltdb->read_only) {
497                 return LDB_ERR_UNWILLING_TO_PERFORM;
498         }
499
500         tdb_key = ltdb_key_dn(module, dn);
501         if (!tdb_key.dptr) {
502                 return LDB_ERR_OTHER;
503         }
504
505         ret = tdb_delete(ltdb->tdb, tdb_key);
506         talloc_free(tdb_key.dptr);
507
508         if (ret != 0) {
509                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
510         }
511
512         return ret;
513 }
514
515 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
516 {
517         struct ldb_message *msg;
518         int ret = LDB_SUCCESS;
519
520         msg = ldb_msg_new(module);
521         if (msg == NULL) {
522                 return LDB_ERR_OPERATIONS_ERROR;
523         }
524
525         /* in case any attribute of the message was indexed, we need
526            to fetch the old record */
527         ret = ltdb_search_dn1(module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
528         if (ret != LDB_SUCCESS) {
529                 /* not finding the old record is an error */
530                 goto done;
531         }
532
533         ret = ltdb_delete_noindex(module, dn);
534         if (ret != LDB_SUCCESS) {
535                 goto done;
536         }
537
538         /* remove any indexed attributes */
539         ret = ltdb_index_delete(module, msg);
540         if (ret != LDB_SUCCESS) {
541                 goto done;
542         }
543
544         ret = ltdb_modified(module, dn);
545         if (ret != LDB_SUCCESS) {
546                 goto done;
547         }
548
549 done:
550         talloc_free(msg);
551         return ret;
552 }
553
554 /*
555   delete a record from the database
556 */
557 static int ltdb_delete(struct ltdb_context *ctx)
558 {
559         struct ldb_module *module = ctx->module;
560         struct ldb_request *req = ctx->req;
561         int ret = LDB_SUCCESS;
562
563         ldb_request_set_state(req, LDB_ASYNC_PENDING);
564
565         if (ltdb_cache_load(module) != 0) {
566                 return LDB_ERR_OPERATIONS_ERROR;
567         }
568
569         ret = ltdb_delete_internal(module, req->op.del.dn);
570
571         return ret;
572 }
573
574 /*
575   find an element by attribute name. At the moment this does a linear search,
576   it should be re-coded to use a binary search once all places that modify
577   records guarantee sorted order
578
579   return the index of the first matching element if found, otherwise -1
580 */
581 static int find_element(const struct ldb_message *msg, const char *name)
582 {
583         unsigned int i;
584         for (i=0;i<msg->num_elements;i++) {
585                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
586                         return i;
587                 }
588         }
589         return -1;
590 }
591
592
593 /*
594   add an element to an existing record. Assumes a elements array that we
595   can call re-alloc on, and assumed that we can re-use the data pointers from
596   the passed in additional values. Use with care!
597
598   returns 0 on success, -1 on failure (and sets errno)
599 */
600 static int ltdb_msg_add_element(struct ldb_message *msg,
601                                 struct ldb_message_element *el)
602 {
603         struct ldb_message_element *e2;
604         unsigned int i;
605
606         if (el->num_values == 0) {
607                 /* nothing to do here - we don't add empty elements */
608                 return 0;
609         }
610
611         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
612                               msg->num_elements+1);
613         if (!e2) {
614                 errno = ENOMEM;
615                 return -1;
616         }
617
618         msg->elements = e2;
619
620         e2 = &msg->elements[msg->num_elements];
621
622         e2->name = el->name;
623         e2->flags = el->flags;
624         e2->values = talloc_array(msg->elements,
625                                   struct ldb_val, el->num_values);
626         if (!e2->values) {
627                 errno = ENOMEM;
628                 return -1;
629         }
630         for (i=0;i<el->num_values;i++) {
631                 e2->values[i] = el->values[i];
632         }
633         e2->num_values = el->num_values;
634
635         ++msg->num_elements;
636
637         return 0;
638 }
639
640 /*
641   delete all elements having a specified attribute name
642 */
643 static int msg_delete_attribute(struct ldb_module *module,
644                                 struct ldb_message *msg, const char *name)
645 {
646         unsigned int i;
647         int ret;
648         struct ldb_message_element *el;
649
650         el = ldb_msg_find_element(msg, name);
651         if (el == NULL) {
652                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
653         }
654         i = el - msg->elements;
655
656         ret = ltdb_index_del_element(module, msg->dn, el);
657         if (ret != LDB_SUCCESS) {
658                 return ret;
659         }
660
661         talloc_free(el->values);
662         if (msg->num_elements > (i+1)) {
663                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
664         }
665         msg->num_elements--;
666         msg->elements = talloc_realloc(msg, msg->elements,
667                                        struct ldb_message_element,
668                                        msg->num_elements);
669         return LDB_SUCCESS;
670 }
671
672 /*
673   delete all elements matching an attribute name/value
674
675   return LDB Error on failure
676 */
677 static int msg_delete_element(struct ldb_module *module,
678                               struct ldb_message *msg,
679                               const char *name,
680                               const struct ldb_val *val)
681 {
682         struct ldb_context *ldb = ldb_module_get_ctx(module);
683         unsigned int i;
684         int found, ret;
685         struct ldb_message_element *el;
686         const struct ldb_schema_attribute *a;
687
688         found = find_element(msg, name);
689         if (found == -1) {
690                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
691         }
692
693         i = (unsigned int) found;
694         el = &(msg->elements[i]);
695
696         a = ldb_schema_attribute_by_name(ldb, el->name);
697
698         for (i=0;i<el->num_values;i++) {
699                 bool matched;
700                 if (a->syntax->operator_fn) {
701                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
702                                                      &el->values[i], val, &matched);
703                         if (ret != LDB_SUCCESS) return ret;
704                 } else {
705                         matched = (a->syntax->comparison_fn(ldb, ldb,
706                                                             &el->values[i], val) == 0);
707                 }
708                 if (matched) {
709                         if (el->num_values == 1) {
710                                 return msg_delete_attribute(module, msg, name);
711                         }
712
713                         ret = ltdb_index_del_value(module, msg->dn, el, i);
714                         if (ret != LDB_SUCCESS) {
715                                 return ret;
716                         }
717
718                         if (i<el->num_values-1) {
719                                 memmove(&el->values[i], &el->values[i+1],
720                                         sizeof(el->values[i])*
721                                                 (el->num_values-(i+1)));
722                         }
723                         el->num_values--;
724
725                         /* per definition we find in a canonicalised message an
726                            attribute value only once. So we are finished here */
727                         return LDB_SUCCESS;
728                 }
729         }
730
731         /* Not found */
732         return LDB_ERR_NO_SUCH_ATTRIBUTE;
733 }
734
735
736 /*
737   modify a record - internal interface
738
739   yuck - this is O(n^2). Luckily n is usually small so we probably
740   get away with it, but if we ever have really large attribute lists
741   then we'll need to look at this again
742
743   'req' is optional, and is used to specify controls if supplied
744 */
745 int ltdb_modify_internal(struct ldb_module *module,
746                          const struct ldb_message *msg,
747                          struct ldb_request *req)
748 {
749         struct ldb_context *ldb = ldb_module_get_ctx(module);
750         struct ldb_message *msg2;
751         unsigned int i, j;
752         int ret = LDB_SUCCESS, idx;
753         struct ldb_control *control_permissive = NULL;
754         TALLOC_CTX *mem_ctx = talloc_new(req);
755
756         if (mem_ctx == NULL) {
757                 return ldb_module_oom(module);
758         }
759         
760         if (req) {
761                 control_permissive = ldb_request_get_control(req,
762                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
763         }
764
765         msg2 = ldb_msg_new(mem_ctx);
766         if (msg2 == NULL) {
767                 ret = LDB_ERR_OTHER;
768                 goto done;
769         }
770
771         ret = ltdb_search_dn1(module, msg->dn,
772                               msg2,
773                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
774         if (ret != LDB_SUCCESS) {
775                 goto done;
776         }
777
778         for (i=0; i<msg->num_elements; i++) {
779                 struct ldb_message_element *el = &msg->elements[i], *el2;
780                 struct ldb_val *vals;
781                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
782                 const char *dn;
783                 uint32_t options = 0;
784                 if (control_permissive != NULL) {
785                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
786                 }
787
788                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
789                 case LDB_FLAG_MOD_ADD:
790
791                         if (el->num_values == 0) {
792                                 ldb_asprintf_errstring(ldb,
793                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
794                                                        el->name, ldb_dn_get_linearized(msg2->dn));
795                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
796                                 goto done;
797                         }
798
799                         /* make a copy of the array so that a permissive
800                          * control can remove duplicates without changing the
801                          * original values, but do not copy data as we do not
802                          * need to keep it around once the operation is
803                          * finished */
804                         if (control_permissive) {
805                                 el = talloc(msg2, struct ldb_message_element);
806                                 if (!el) {
807                                         ret = LDB_ERR_OTHER;
808                                         goto done;
809                                 }
810                                 *el = msg->elements[i];
811                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
812                                 if (el->values == NULL) {
813                                         ret = LDB_ERR_OTHER;
814                                         goto done;
815                                 }
816                                 for (j = 0; j < el->num_values; j++) {
817                                         el->values[j] = msg->elements[i].values[j];
818                                 }
819                         }
820
821                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
822                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
823                                                        el->name, ldb_dn_get_linearized(msg2->dn));
824                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
825                                 goto done;
826                         }
827
828                         /* Checks if element already exists */
829                         idx = find_element(msg2, el->name);
830                         if (idx == -1) {
831                                 if (ltdb_msg_add_element(msg2, el) != 0) {
832                                         ret = LDB_ERR_OTHER;
833                                         goto done;
834                                 }
835                                 ret = ltdb_index_add_element(module, msg2->dn,
836                                                              el);
837                                 if (ret != LDB_SUCCESS) {
838                                         goto done;
839                                 }
840                         } else {
841                                 j = (unsigned int) idx;
842                                 el2 = &(msg2->elements[j]);
843
844                                 /* We cannot add another value on a existing one
845                                    if the attribute is single-valued */
846                                 if (ldb_tdb_single_valued(a, el)) {
847                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
848                                                                el->name, ldb_dn_get_linearized(msg2->dn));
849                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
850                                         goto done;
851                                 }
852
853                                 /* Check that values don't exist yet on multi-
854                                    valued attributes or aren't provided twice */
855                                 if (!(el->flags &
856                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
857                                         struct ldb_val *duplicate = NULL;
858                                         ret = ldb_msg_find_common_values(ldb,
859                                                                          msg2,
860                                                                          el,
861                                                                          el2,
862                                                                          options);
863
864                                         if (ret ==
865                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
866                                                 ldb_asprintf_errstring(ldb,
867                                                         "attribute '%s': value "
868                                                         "#%u on '%s' already "
869                                                         "exists", el->name, j,
870                                                         ldb_dn_get_linearized(msg2->dn));
871                                                 goto done;
872                                         } else if (ret != LDB_SUCCESS) {
873                                                 goto done;
874                                         }
875
876                                         ret = ldb_msg_find_duplicate_val(
877                                                 ldb, msg2, el, &duplicate, 0);
878                                         if (ret != LDB_SUCCESS) {
879                                                 goto done;
880                                         }
881                                         if (duplicate != NULL) {
882                                                 ldb_asprintf_errstring(
883                                                         ldb,
884                                                         "attribute '%s': value "
885                                                         "'%.*s' on '%s' "
886                                                         "provided more than "
887                                                         "once in ADD",
888                                                         el->name,
889                                                         (int)duplicate->length,
890                                                         duplicate->data,
891                                                         ldb_dn_get_linearized(msg->dn));
892                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
893                                                 goto done;
894                                         }
895                                 }
896
897                                 /* Now combine existing and new values to a new
898                                    attribute record */
899                                 vals = talloc_realloc(msg2->elements,
900                                                       el2->values, struct ldb_val,
901                                                       el2->num_values + el->num_values);
902                                 if (vals == NULL) {
903                                         ldb_oom(ldb);
904                                         ret = LDB_ERR_OTHER;
905                                         goto done;
906                                 }
907
908                                 for (j=0; j<el->num_values; j++) {
909                                         vals[el2->num_values + j] =
910                                                 ldb_val_dup(vals, &el->values[j]);
911                                 }
912
913                                 el2->values = vals;
914                                 el2->num_values += el->num_values;
915
916                                 ret = ltdb_index_add_element(module, msg2->dn, el);
917                                 if (ret != LDB_SUCCESS) {
918                                         goto done;
919                                 }
920                         }
921
922                         break;
923
924                 case LDB_FLAG_MOD_REPLACE:
925
926                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
927                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
928                                                        el->name, ldb_dn_get_linearized(msg2->dn));
929                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
930                                 goto done;
931                         }
932
933                         /*
934                          * We don't need to check this if we have been
935                          * pre-screened by the repl_meta_data module
936                          * in Samba, or someone else who can claim to
937                          * know what they are doing. 
938                          */
939                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
940                                 struct ldb_val *duplicate = NULL;
941
942                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
943                                                                  &duplicate, 0);
944                                 if (ret != LDB_SUCCESS) {
945                                         goto done;
946                                 }
947                                 if (duplicate != NULL) {
948                                         ldb_asprintf_errstring(
949                                                 ldb,
950                                                 "attribute '%s': value '%.*s' "
951                                                 "on '%s' provided more than "
952                                                 "once in REPLACE",
953                                                 el->name,
954                                                 (int)duplicate->length,
955                                                 duplicate->data,
956                                                 ldb_dn_get_linearized(msg2->dn));
957                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
958                                         goto done;
959                                 }
960                         }
961
962                         /* Checks if element already exists */
963                         idx = find_element(msg2, el->name);
964                         if (idx != -1) {
965                                 j = (unsigned int) idx;
966                                 el2 = &(msg2->elements[j]);
967
968                                 /* we consider two elements to be
969                                  * equal only if the order
970                                  * matches. This allows dbcheck to
971                                  * fix the ordering on attributes
972                                  * where order matters, such as
973                                  * objectClass
974                                  */
975                                 if (ldb_msg_element_equal_ordered(el, el2)) {
976                                         continue;
977                                 }
978
979                                 /* Delete the attribute if it exists in the DB */
980                                 if (msg_delete_attribute(module, msg2,
981                                                          el->name) != 0) {
982                                         ret = LDB_ERR_OTHER;
983                                         goto done;
984                                 }
985                         }
986
987                         /* Recreate it with the new values */
988                         if (ltdb_msg_add_element(msg2, el) != 0) {
989                                 ret = LDB_ERR_OTHER;
990                                 goto done;
991                         }
992
993                         ret = ltdb_index_add_element(module, msg2->dn, el);
994                         if (ret != LDB_SUCCESS) {
995                                 goto done;
996                         }
997
998                         break;
999
1000                 case LDB_FLAG_MOD_DELETE:
1001                         dn = ldb_dn_get_linearized(msg2->dn);
1002                         if (dn == NULL) {
1003                                 ret = LDB_ERR_OTHER;
1004                                 goto done;
1005                         }
1006
1007                         if (msg->elements[i].num_values == 0) {
1008                                 /* Delete the whole attribute */
1009                                 ret = msg_delete_attribute(module, msg2,
1010                                                            msg->elements[i].name);
1011                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1012                                     control_permissive) {
1013                                         ret = LDB_SUCCESS;
1014                                 } else {
1015                                         ldb_asprintf_errstring(ldb,
1016                                                                "attribute '%s': no such attribute for delete on '%s'",
1017                                                                msg->elements[i].name, dn);
1018                                 }
1019                                 if (ret != LDB_SUCCESS) {
1020                                         goto done;
1021                                 }
1022                         } else {
1023                                 /* Delete specified values from an attribute */
1024                                 for (j=0; j < msg->elements[i].num_values; j++) {
1025                                         ret = msg_delete_element(module,
1026                                                                  msg2,
1027                                                                  msg->elements[i].name,
1028                                                                  &msg->elements[i].values[j]);
1029                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1030                                             control_permissive) {
1031                                                 ret = LDB_SUCCESS;
1032                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1033                                                 ldb_asprintf_errstring(ldb,
1034                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1035                                                                        msg->elements[i].name, dn);
1036                                         }
1037                                         if (ret != LDB_SUCCESS) {
1038                                                 goto done;
1039                                         }
1040                                 }
1041                         }
1042                         break;
1043                 default:
1044                         ldb_asprintf_errstring(ldb,
1045                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1046                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1047                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1048                         ret = LDB_ERR_PROTOCOL_ERROR;
1049                         goto done;
1050                 }
1051         }
1052
1053         ret = ltdb_store(module, msg2, TDB_MODIFY);
1054         if (ret != LDB_SUCCESS) {
1055                 goto done;
1056         }
1057
1058         ret = ltdb_modified(module, msg2->dn);
1059         if (ret != LDB_SUCCESS) {
1060                 goto done;
1061         }
1062
1063 done:
1064         TALLOC_FREE(mem_ctx);
1065         return ret;
1066 }
1067
1068 /*
1069   modify a record
1070 */
1071 static int ltdb_modify(struct ltdb_context *ctx)
1072 {
1073         struct ldb_module *module = ctx->module;
1074         struct ldb_request *req = ctx->req;
1075         int ret = LDB_SUCCESS;
1076
1077         ret = ltdb_check_special_dn(module, req->op.mod.message);
1078         if (ret != LDB_SUCCESS) {
1079                 return ret;
1080         }
1081
1082         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1083
1084         if (ltdb_cache_load(module) != 0) {
1085                 return LDB_ERR_OPERATIONS_ERROR;
1086         }
1087
1088         ret = ltdb_modify_internal(module, req->op.mod.message, req);
1089
1090         return ret;
1091 }
1092
1093 /*
1094   rename a record
1095 */
1096 static int ltdb_rename(struct ltdb_context *ctx)
1097 {
1098         struct ldb_module *module = ctx->module;
1099         void *data = ldb_module_get_private(module);
1100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1101         struct ldb_request *req = ctx->req;
1102         struct ldb_message *msg;
1103         int ret = LDB_SUCCESS;
1104         TDB_DATA tdb_key, tdb_key_old;
1105
1106         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1107
1108         if (ltdb_cache_load(ctx->module) != 0) {
1109                 return LDB_ERR_OPERATIONS_ERROR;
1110         }
1111
1112         msg = ldb_msg_new(ctx);
1113         if (msg == NULL) {
1114                 return LDB_ERR_OPERATIONS_ERROR;
1115         }
1116
1117         /* we need to fetch the old record to re-add under the new name */
1118         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg,
1119                               LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1120         if (ret != LDB_SUCCESS) {
1121                 /* not finding the old record is an error */
1122                 return ret;
1123         }
1124
1125         /* We need to, before changing the DB, check if the new DN
1126          * exists, so we can return this error to the caller with an
1127          * unmodified DB */
1128         tdb_key = ltdb_key_dn(module, req->op.rename.newdn);
1129         if (!tdb_key.dptr) {
1130                 talloc_free(msg);
1131                 return LDB_ERR_OPERATIONS_ERROR;
1132         }
1133
1134         tdb_key_old = ltdb_key_dn(module, req->op.rename.olddn);
1135         if (!tdb_key_old.dptr) {
1136                 talloc_free(msg);
1137                 talloc_free(tdb_key.dptr);
1138                 return LDB_ERR_OPERATIONS_ERROR;
1139         }
1140
1141         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1142         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1143                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1144                         talloc_free(tdb_key_old.dptr);
1145                         talloc_free(tdb_key.dptr);
1146                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1147                                                "Entry %s already exists",
1148                                                ldb_dn_get_linearized(req->op.rename.newdn));
1149                         /* finding the new record already in the DB is an error */
1150                         talloc_free(msg);
1151                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1152                 }
1153         }
1154         talloc_free(tdb_key_old.dptr);
1155         talloc_free(tdb_key.dptr);
1156
1157         /* Always delete first then add, to avoid conflicts with
1158          * unique indexes. We rely on the transaction to make this
1159          * atomic
1160          */
1161         ret = ltdb_delete_internal(module, msg->dn);
1162         if (ret != LDB_SUCCESS) {
1163                 talloc_free(msg);
1164                 return ret;
1165         }
1166
1167         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1168         if (msg->dn == NULL) {
1169                 talloc_free(msg);
1170                 return LDB_ERR_OPERATIONS_ERROR;
1171         }
1172
1173         /* We don't check single value as we can have more than 1 with
1174          * deleted attributes. We could go through all elements but that's
1175          * maybe not the most efficient way
1176          */
1177         ret = ltdb_add_internal(module, msg, false);
1178
1179         talloc_free(msg);
1180
1181         return ret;
1182 }
1183
1184 static int ltdb_start_trans(struct ldb_module *module)
1185 {
1186         void *data = ldb_module_get_private(module);
1187         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1188
1189         /* Do not take out the transaction lock on a read-only DB */
1190         if (ltdb->read_only) {
1191                 return LDB_ERR_UNWILLING_TO_PERFORM;
1192         }
1193
1194         if (tdb_transaction_start(ltdb->tdb) != 0) {
1195                 return ltdb_err_map(tdb_error(ltdb->tdb));
1196         }
1197
1198         ltdb->in_transaction++;
1199
1200         ltdb_index_transaction_start(module);
1201
1202         return LDB_SUCCESS;
1203 }
1204
1205 static int ltdb_prepare_commit(struct ldb_module *module)
1206 {
1207         int ret;
1208         void *data = ldb_module_get_private(module);
1209         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1210
1211         if (ltdb->in_transaction != 1) {
1212                 return LDB_SUCCESS;
1213         }
1214
1215         ret = ltdb_index_transaction_commit(module);
1216         if (ret != LDB_SUCCESS) {
1217                 tdb_transaction_cancel(ltdb->tdb);
1218                 ltdb->in_transaction--;
1219                 return ret;
1220         }
1221
1222         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1223                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1224                 ltdb->in_transaction--;
1225                 ldb_debug_set(ldb_module_get_ctx(module),
1226                               LDB_DEBUG_FATAL,
1227                               "Failure during "
1228                               "tdb_transaction_prepare_commit(): %s -> %s",
1229                               tdb_errorstr(ltdb->tdb),
1230                               ldb_strerror(ret));
1231                 return ret;
1232         }
1233
1234         ltdb->prepared_commit = true;
1235
1236         return LDB_SUCCESS;
1237 }
1238
1239 static int ltdb_end_trans(struct ldb_module *module)
1240 {
1241         int ret;
1242         void *data = ldb_module_get_private(module);
1243         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1244
1245         if (!ltdb->prepared_commit) {
1246                 ret = ltdb_prepare_commit(module);
1247                 if (ret != LDB_SUCCESS) {
1248                         return ret;
1249                 }
1250         }
1251
1252         ltdb->in_transaction--;
1253         ltdb->prepared_commit = false;
1254
1255         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1256                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
1257                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1258                                        "Failure during tdb_transaction_commit(): %s -> %s",
1259                                        tdb_errorstr(ltdb->tdb),
1260                                        ldb_strerror(ret));
1261                 return ret;
1262         }
1263
1264         return LDB_SUCCESS;
1265 }
1266
1267 static int ltdb_del_trans(struct ldb_module *module)
1268 {
1269         void *data = ldb_module_get_private(module);
1270         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1271
1272         ltdb->in_transaction--;
1273
1274         if (ltdb_index_transaction_cancel(module) != 0) {
1275                 tdb_transaction_cancel(ltdb->tdb);
1276                 return ltdb_err_map(tdb_error(ltdb->tdb));
1277         }
1278
1279         tdb_transaction_cancel(ltdb->tdb);
1280         return LDB_SUCCESS;
1281 }
1282
1283 /*
1284   return sequenceNumber from @BASEINFO
1285 */
1286 static int ltdb_sequence_number(struct ltdb_context *ctx,
1287                                 struct ldb_extended **ext)
1288 {
1289         struct ldb_context *ldb;
1290         struct ldb_module *module = ctx->module;
1291         struct ldb_request *req = ctx->req;
1292         TALLOC_CTX *tmp_ctx = NULL;
1293         struct ldb_seqnum_request *seq;
1294         struct ldb_seqnum_result *res;
1295         struct ldb_message *msg = NULL;
1296         struct ldb_dn *dn;
1297         const char *date;
1298         int ret = LDB_SUCCESS;
1299
1300         ldb = ldb_module_get_ctx(module);
1301
1302         seq = talloc_get_type(req->op.extended.data,
1303                                 struct ldb_seqnum_request);
1304         if (seq == NULL) {
1305                 return LDB_ERR_OPERATIONS_ERROR;
1306         }
1307
1308         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1309
1310         if (ltdb_lock_read(module) != 0) {
1311                 return LDB_ERR_OPERATIONS_ERROR;
1312         }
1313
1314         res = talloc_zero(req, struct ldb_seqnum_result);
1315         if (res == NULL) {
1316                 ret = LDB_ERR_OPERATIONS_ERROR;
1317                 goto done;
1318         }
1319
1320         tmp_ctx = talloc_new(req);
1321         if (tmp_ctx == NULL) {
1322                 ret = LDB_ERR_OPERATIONS_ERROR;
1323                 goto done;
1324         }
1325
1326         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1327         if (dn == NULL) {
1328                 ret = LDB_ERR_OPERATIONS_ERROR;
1329                 goto done;
1330         }
1331
1332         msg = ldb_msg_new(tmp_ctx);
1333         if (msg == NULL) {
1334                 ret = LDB_ERR_OPERATIONS_ERROR;
1335                 goto done;
1336         }
1337
1338         ret = ltdb_search_dn1(module, dn, msg, 0);
1339         if (ret != LDB_SUCCESS) {
1340                 goto done;
1341         }
1342
1343         switch (seq->type) {
1344         case LDB_SEQ_HIGHEST_SEQ:
1345                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1346                 break;
1347         case LDB_SEQ_NEXT:
1348                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1349                 res->seq_num++;
1350                 break;
1351         case LDB_SEQ_HIGHEST_TIMESTAMP:
1352                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1353                 if (date) {
1354                         res->seq_num = ldb_string_to_time(date);
1355                 } else {
1356                         res->seq_num = 0;
1357                         /* zero is as good as anything when we don't know */
1358                 }
1359                 break;
1360         }
1361
1362         *ext = talloc_zero(req, struct ldb_extended);
1363         if (*ext == NULL) {
1364                 ret = LDB_ERR_OPERATIONS_ERROR;
1365                 goto done;
1366         }
1367         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1368         (*ext)->data = talloc_steal(*ext, res);
1369
1370 done:
1371         talloc_free(tmp_ctx);
1372         ltdb_unlock_read(module);
1373         return ret;
1374 }
1375
1376 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1377 {
1378         struct ldb_context *ldb;
1379         struct ldb_request *req;
1380         struct ldb_reply *ares;
1381
1382         ldb = ldb_module_get_ctx(ctx->module);
1383         req = ctx->req;
1384
1385         /* if we already returned an error just return */
1386         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1387                 return;
1388         }
1389
1390         ares = talloc_zero(req, struct ldb_reply);
1391         if (!ares) {
1392                 ldb_oom(ldb);
1393                 req->callback(req, NULL);
1394                 return;
1395         }
1396         ares->type = LDB_REPLY_DONE;
1397         ares->error = error;
1398
1399         req->callback(req, ares);
1400 }
1401
1402 static void ltdb_timeout(struct tevent_context *ev,
1403                           struct tevent_timer *te,
1404                           struct timeval t,
1405                           void *private_data)
1406 {
1407         struct ltdb_context *ctx;
1408         ctx = talloc_get_type(private_data, struct ltdb_context);
1409
1410         if (!ctx->request_terminated) {
1411                 /* request is done now */
1412                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1413         }
1414
1415         if (ctx->spy) {
1416                 /* neutralize the spy */
1417                 ctx->spy->ctx = NULL;
1418                 ctx->spy = NULL;
1419         }
1420         talloc_free(ctx);
1421 }
1422
1423 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1424                                         struct ldb_extended *ext,
1425                                         int error)
1426 {
1427         struct ldb_context *ldb;
1428         struct ldb_request *req;
1429         struct ldb_reply *ares;
1430
1431         ldb = ldb_module_get_ctx(ctx->module);
1432         req = ctx->req;
1433
1434         /* if we already returned an error just return */
1435         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1436                 return;
1437         }
1438
1439         ares = talloc_zero(req, struct ldb_reply);
1440         if (!ares) {
1441                 ldb_oom(ldb);
1442                 req->callback(req, NULL);
1443                 return;
1444         }
1445         ares->type = LDB_REPLY_DONE;
1446         ares->response = ext;
1447         ares->error = error;
1448
1449         req->callback(req, ares);
1450 }
1451
1452 static void ltdb_handle_extended(struct ltdb_context *ctx)
1453 {
1454         struct ldb_extended *ext = NULL;
1455         int ret;
1456
1457         if (strcmp(ctx->req->op.extended.oid,
1458                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1459                 /* get sequence number */
1460                 ret = ltdb_sequence_number(ctx, &ext);
1461         } else {
1462                 /* not recognized */
1463                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1464         }
1465
1466         ltdb_request_extended_done(ctx, ext, ret);
1467 }
1468
1469 static void ltdb_callback(struct tevent_context *ev,
1470                           struct tevent_timer *te,
1471                           struct timeval t,
1472                           void *private_data)
1473 {
1474         struct ltdb_context *ctx;
1475         int ret;
1476
1477         ctx = talloc_get_type(private_data, struct ltdb_context);
1478
1479         if (ctx->request_terminated) {
1480                 goto done;
1481         }
1482
1483         switch (ctx->req->operation) {
1484         case LDB_SEARCH:
1485                 ret = ltdb_search(ctx);
1486                 break;
1487         case LDB_ADD:
1488                 ret = ltdb_add(ctx);
1489                 break;
1490         case LDB_MODIFY:
1491                 ret = ltdb_modify(ctx);
1492                 break;
1493         case LDB_DELETE:
1494                 ret = ltdb_delete(ctx);
1495                 break;
1496         case LDB_RENAME:
1497                 ret = ltdb_rename(ctx);
1498                 break;
1499         case LDB_EXTENDED:
1500                 ltdb_handle_extended(ctx);
1501                 goto done;
1502         default:
1503                 /* no other op supported */
1504                 ret = LDB_ERR_PROTOCOL_ERROR;
1505         }
1506
1507         if (!ctx->request_terminated) {
1508                 /* request is done now */
1509                 ltdb_request_done(ctx, ret);
1510         }
1511
1512 done:
1513         if (ctx->spy) {
1514                 /* neutralize the spy */
1515                 ctx->spy->ctx = NULL;
1516                 ctx->spy = NULL;
1517         }
1518         talloc_free(ctx);
1519 }
1520
1521 static int ltdb_request_destructor(void *ptr)
1522 {
1523         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1524
1525         if (spy->ctx != NULL) {
1526                 spy->ctx->spy = NULL;
1527                 spy->ctx->request_terminated = true;
1528                 spy->ctx = NULL;
1529         }
1530
1531         return 0;
1532 }
1533
1534 static int ltdb_handle_request(struct ldb_module *module,
1535                                struct ldb_request *req)
1536 {
1537         struct ldb_control *control_permissive;
1538         struct ldb_context *ldb;
1539         struct tevent_context *ev;
1540         struct ltdb_context *ac;
1541         struct tevent_timer *te;
1542         struct timeval tv;
1543         unsigned int i;
1544
1545         ldb = ldb_module_get_ctx(module);
1546
1547         control_permissive = ldb_request_get_control(req,
1548                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1549
1550         for (i = 0; req->controls && req->controls[i]; i++) {
1551                 if (req->controls[i]->critical &&
1552                     req->controls[i] != control_permissive) {
1553                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1554                                                req->controls[i]->oid);
1555                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1556                 }
1557         }
1558
1559         if (req->starttime == 0 || req->timeout == 0) {
1560                 ldb_set_errstring(ldb, "Invalid timeout settings");
1561                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1562         }
1563
1564         ev = ldb_handle_get_event_context(req->handle);
1565
1566         ac = talloc_zero(ldb, struct ltdb_context);
1567         if (ac == NULL) {
1568                 ldb_oom(ldb);
1569                 return LDB_ERR_OPERATIONS_ERROR;
1570         }
1571
1572         ac->module = module;
1573         ac->req = req;
1574
1575         tv.tv_sec = 0;
1576         tv.tv_usec = 0;
1577         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1578         if (NULL == te) {
1579                 talloc_free(ac);
1580                 return LDB_ERR_OPERATIONS_ERROR;
1581         }
1582
1583         if (req->timeout > 0) {
1584                 tv.tv_sec = req->starttime + req->timeout;
1585                 tv.tv_usec = 0;
1586                 ac->timeout_event = tevent_add_timer(ev, ac, tv,
1587                                                      ltdb_timeout, ac);
1588                 if (NULL == ac->timeout_event) {
1589                         talloc_free(ac);
1590                         return LDB_ERR_OPERATIONS_ERROR;
1591                 }
1592         }
1593
1594         /* set a spy so that we do not try to use the request context
1595          * if it is freed before ltdb_callback fires */
1596         ac->spy = talloc(req, struct ltdb_req_spy);
1597         if (NULL == ac->spy) {
1598                 talloc_free(ac);
1599                 return LDB_ERR_OPERATIONS_ERROR;
1600         }
1601         ac->spy->ctx = ac;
1602
1603         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1604
1605         return LDB_SUCCESS;
1606 }
1607
1608 static int ltdb_init_rootdse(struct ldb_module *module)
1609 {
1610         /* ignore errors on this - we expect it for non-sam databases */
1611         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1612
1613         /* there can be no module beyond the backend, just return */
1614         return LDB_SUCCESS;
1615 }
1616
1617 static const struct ldb_module_ops ltdb_ops = {
1618         .name              = "tdb",
1619         .init_context      = ltdb_init_rootdse,
1620         .search            = ltdb_handle_request,
1621         .add               = ltdb_handle_request,
1622         .modify            = ltdb_handle_request,
1623         .del               = ltdb_handle_request,
1624         .rename            = ltdb_handle_request,
1625         .extended          = ltdb_handle_request,
1626         .start_transaction = ltdb_start_trans,
1627         .end_transaction   = ltdb_end_trans,
1628         .prepare_commit    = ltdb_prepare_commit,
1629         .del_transaction   = ltdb_del_trans,
1630         .read_lock         = ltdb_lock_read,
1631         .read_unlock       = ltdb_unlock_read,
1632 };
1633
1634 /*
1635   connect to the database
1636 */
1637 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1638                         unsigned int flags, const char *options[],
1639                         struct ldb_module **_module)
1640 {
1641         struct ldb_module *module;
1642         const char *path;
1643         int tdb_flags, open_flags;
1644         struct ltdb_private *ltdb;
1645
1646         /*
1647          * We hold locks, so we must use a private event context
1648          * on each returned handle
1649          */
1650
1651         ldb_set_require_private_event_context(ldb);
1652
1653         /* parse the url */
1654         if (strchr(url, ':')) {
1655                 if (strncmp(url, "tdb://", 6) != 0) {
1656                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1657                                   "Invalid tdb URL '%s'", url);
1658                         return LDB_ERR_OPERATIONS_ERROR;
1659                 }
1660                 path = url+6;
1661         } else {
1662                 path = url;
1663         }
1664
1665         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1666
1667         /* check for the 'nosync' option */
1668         if (flags & LDB_FLG_NOSYNC) {
1669                 tdb_flags |= TDB_NOSYNC;
1670         }
1671
1672         /* and nommap option */
1673         if (flags & LDB_FLG_NOMMAP) {
1674                 tdb_flags |= TDB_NOMMAP;
1675         }
1676
1677         ltdb = talloc_zero(ldb, struct ltdb_private);
1678         if (!ltdb) {
1679                 ldb_oom(ldb);
1680                 return LDB_ERR_OPERATIONS_ERROR;
1681         }
1682
1683         if (flags & LDB_FLG_RDONLY) {
1684                 /*
1685                  * This is weird, but because we can only have one tdb
1686                  * in this process, and the other one could be
1687                  * read-write, we can't use the tdb readonly.  Plus a
1688                  * read only tdb prohibits the all-record lock.
1689                  */
1690                 open_flags = O_RDWR;
1691
1692                 ltdb->read_only = true;
1693
1694         } else if (flags & LDB_FLG_DONT_CREATE_DB) {
1695                 /*
1696                  * This is used by ldbsearch to prevent creation of the database
1697                  * if the name is wrong
1698                  */
1699                 open_flags = O_RDWR;
1700         } else {
1701                 /*
1702                  * This is the normal case
1703                  */
1704                 open_flags = O_CREAT | O_RDWR;
1705         }
1706
1707         /* note that we use quite a large default hash size */
1708         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1709                                    tdb_flags, open_flags,
1710                                    ldb_get_create_perms(ldb), ldb);
1711         if (!ltdb->tdb) {
1712                 ldb_asprintf_errstring(ldb,
1713                                        "Unable to open tdb '%s': %s", path, strerror(errno));
1714                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1715                           "Unable to open tdb '%s': %s", path, strerror(errno));
1716                 talloc_free(ltdb);
1717                 if (errno == EACCES || errno == EPERM) {
1718                         return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
1719                 }
1720                 return LDB_ERR_OPERATIONS_ERROR;
1721         }
1722
1723         if (getenv("LDB_WARN_UNINDEXED")) {
1724                 ltdb->warn_unindexed = true;
1725         }
1726
1727         if (getenv("LDB_WARN_REINDEX")) {
1728                 ltdb->warn_reindex = true;
1729         }
1730
1731         ltdb->sequence_number = 0;
1732
1733         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1734         if (!module) {
1735                 ldb_oom(ldb);
1736                 talloc_free(ltdb);
1737                 return LDB_ERR_OPERATIONS_ERROR;
1738         }
1739         ldb_module_set_private(module, ltdb);
1740         talloc_steal(module, ltdb);
1741
1742         if (ltdb_cache_load(module) != 0) {
1743                 ldb_asprintf_errstring(ldb,
1744                                        "Unable to load ltdb cache records of tdb '%s'", path);
1745                 talloc_free(module);
1746                 return LDB_ERR_OPERATIONS_ERROR;
1747         }
1748
1749         *_module = module;
1750         return LDB_SUCCESS;
1751 }
1752
1753 int ldb_tdb_init(const char *version)
1754 {
1755         LDB_MODULE_CHECK_VERSION(version);
1756         return ldb_register_backend("tdb", ltdb_connect, false);
1757 }