ldb_tdb: Warn when reindexing is done
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53 #include "ldb_private.h"
54 #include <tdb.h>
55
56 /*
57   prevent memory errors on callbacks
58 */
59 struct ltdb_req_spy {
60         struct ltdb_context *ctx;
61 };
62
63 /*
64   map a tdb error code to a ldb error code
65 */
66 int ltdb_err_map(enum TDB_ERROR tdb_code)
67 {
68         switch (tdb_code) {
69         case TDB_SUCCESS:
70                 return LDB_SUCCESS;
71         case TDB_ERR_CORRUPT:
72         case TDB_ERR_OOM:
73         case TDB_ERR_EINVAL:
74                 return LDB_ERR_OPERATIONS_ERROR;
75         case TDB_ERR_IO:
76                 return LDB_ERR_PROTOCOL_ERROR;
77         case TDB_ERR_LOCK:
78         case TDB_ERR_NOLOCK:
79                 return LDB_ERR_BUSY;
80         case TDB_ERR_LOCK_TIMEOUT:
81                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
82         case TDB_ERR_EXISTS:
83                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
84         case TDB_ERR_NOEXIST:
85                 return LDB_ERR_NO_SUCH_OBJECT;
86         case TDB_ERR_RDONLY:
87                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
88         default:
89                 break;
90         }
91         return LDB_ERR_OTHER;
92 }
93
94 /*
95   lock the database for read - use by ltdb_search and ltdb_sequence_number
96 */
97 int ltdb_lock_read(struct ldb_module *module)
98 {
99         void *data = ldb_module_get_private(module);
100         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
101         int ret = 0;
102
103         if (ltdb->in_transaction == 0 &&
104             ltdb->read_lock_count == 0) {
105                 ret = tdb_lockall_read(ltdb->tdb);
106         }
107         if (ret == 0) {
108                 ltdb->read_lock_count++;
109         }
110         return ret;
111 }
112
113 /*
114   unlock the database after a ltdb_lock_read()
115 */
116 int ltdb_unlock_read(struct ldb_module *module)
117 {
118         void *data = ldb_module_get_private(module);
119         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
120         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
121                 tdb_unlockall_read(ltdb->tdb);
122                 return 0;
123         }
124         ltdb->read_lock_count--;
125         return 0;
126 }
127
128
129 /*
130   form a TDB_DATA for a record key
131   caller frees
132
133   note that the key for a record can depend on whether the
134   dn refers to a case sensitive index record or not
135 */
136 TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
137 {
138         struct ldb_context *ldb = ldb_module_get_ctx(module);
139         TDB_DATA key;
140         char *key_str = NULL;
141         const char *dn_folded = NULL;
142
143         /*
144           most DNs are case insensitive. The exception is index DNs for
145           case sensitive attributes
146
147           there are 3 cases dealt with in this code:
148
149           1) if the dn doesn't start with @ then uppercase the attribute
150              names and the attributes values of case insensitive attributes
151           2) if the dn starts with @ then leave it alone -
152              the indexing code handles the rest
153         */
154
155         dn_folded = ldb_dn_get_casefold(dn);
156         if (!dn_folded) {
157                 goto failed;
158         }
159
160         key_str = talloc_strdup(ldb, "DN=");
161         if (!key_str) {
162                 goto failed;
163         }
164
165         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
166         if (!key_str) {
167                 goto failed;
168         }
169
170         key.dptr = (uint8_t *)key_str;
171         key.dsize = strlen(key_str) + 1;
172
173         return key;
174
175 failed:
176         errno = ENOMEM;
177         key.dptr = NULL;
178         key.dsize = 0;
179         return key;
180 }
181
182 /*
183   check special dn's have valid attributes
184   currently only @ATTRIBUTES is checked
185 */
186 static int ltdb_check_special_dn(struct ldb_module *module,
187                                  const struct ldb_message *msg)
188 {
189         struct ldb_context *ldb = ldb_module_get_ctx(module);
190         unsigned int i, j;
191
192         if (! ldb_dn_is_special(msg->dn) ||
193             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
194                 return LDB_SUCCESS;
195         }
196
197         /* we have @ATTRIBUTES, let's check attributes are fine */
198         /* should we check that we deny multivalued attributes ? */
199         for (i = 0; i < msg->num_elements; i++) {
200                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
201
202                 for (j = 0; j < msg->elements[i].num_values; j++) {
203                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
204                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
205                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
206                         }
207                 }
208         }
209
210         return LDB_SUCCESS;
211 }
212
213
214 /*
215   we've made a modification to a dn - possibly reindex and
216   update sequence number
217 */
218 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
219 {
220         int ret = LDB_SUCCESS;
221         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
222
223         /* only allow modifies inside a transaction, otherwise the
224          * ldb is unsafe */
225         if (ltdb->in_transaction == 0) {
226                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
227                 return LDB_ERR_OPERATIONS_ERROR;
228         }
229
230         if (ldb_dn_is_special(dn) &&
231             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
232              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) )
233         {
234                 if (ltdb->warn_reindex) {
235                         ldb_debug(ldb_module_get_ctx(module),
236                                 LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
237                                 tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
238                 }
239                 ret = ltdb_reindex(module);
240         }
241
242         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
243         if (ret == LDB_SUCCESS &&
244             !(ldb_dn_is_special(dn) &&
245               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
246                 ret = ltdb_increase_sequence_number(module);
247         }
248
249         /* If the modify was to @OPTIONS, reload the cache */
250         if (ret == LDB_SUCCESS &&
251             ldb_dn_is_special(dn) &&
252             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
253                 ret = ltdb_cache_reload(module);
254         }
255
256         return ret;
257 }
258
259 /*
260   store a record into the db
261 */
262 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
263 {
264         void *data = ldb_module_get_private(module);
265         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
266         TDB_DATA tdb_key, tdb_data;
267         int ret = LDB_SUCCESS;
268
269         tdb_key = ltdb_key(module, msg->dn);
270         if (tdb_key.dptr == NULL) {
271                 return LDB_ERR_OTHER;
272         }
273
274         ret = ldb_pack_data(ldb_module_get_ctx(module),
275                             msg, (struct ldb_val *)&tdb_data);
276         if (ret == -1) {
277                 talloc_free(tdb_key.dptr);
278                 return LDB_ERR_OTHER;
279         }
280
281         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
282         if (ret != 0) {
283                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
284                 goto done;
285         }
286
287 done:
288         talloc_free(tdb_key.dptr);
289         talloc_free(tdb_data.dptr);
290
291         return ret;
292 }
293
294
295 /*
296   check if a attribute is a single valued, for a given element
297  */
298 static bool ldb_tdb_single_valued(const struct ldb_schema_attribute *a,
299                                   struct ldb_message_element *el)
300 {
301         if (!a) return false;
302         if (el != NULL) {
303                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
304                         /* override from a ldb module, for example
305                            used for the description field, which is
306                            marked multi-valued in the schema but which
307                            should not actually accept multiple
308                            values */
309                         return true;
310                 }
311                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
312                         /* override from a ldb module, for example used for
313                            deleted linked attribute entries */
314                         return false;
315                 }
316         }
317         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
318                 return true;
319         }
320         return false;
321 }
322
323 static int ltdb_add_internal(struct ldb_module *module,
324                              const struct ldb_message *msg,
325                              bool check_single_value)
326 {
327         struct ldb_context *ldb = ldb_module_get_ctx(module);
328         int ret = LDB_SUCCESS;
329         unsigned int i, j;
330
331         for (i=0;i<msg->num_elements;i++) {
332                 struct ldb_message_element *el = &msg->elements[i];
333                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
334
335                 if (el->num_values == 0) {
336                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
337                                                el->name, ldb_dn_get_linearized(msg->dn));
338                         return LDB_ERR_CONSTRAINT_VIOLATION;
339                 }
340                 if (check_single_value &&
341                                 el->num_values > 1 &&
342                                 ldb_tdb_single_valued(a, el)) {
343                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
344                                                el->name, ldb_dn_get_linearized(msg->dn));
345                         return LDB_ERR_CONSTRAINT_VIOLATION;
346                 }
347
348                 /* Do not check "@ATTRIBUTES" for duplicated values */
349                 if (ldb_dn_is_special(msg->dn) &&
350                     ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
351                         continue;
352                 }
353
354                 /* TODO: This is O(n^2) - replace with more efficient check */
355                 for (j=0; j<el->num_values; j++) {
356                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
357                                 ldb_asprintf_errstring(ldb,
358                                                        "attribute '%s': value #%u on '%s' provided more than once",
359                                                        el->name, j, ldb_dn_get_linearized(msg->dn));
360                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
361                         }
362                 }
363         }
364
365         ret = ltdb_store(module, msg, TDB_INSERT);
366         if (ret != LDB_SUCCESS) {
367                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
368                         ldb_asprintf_errstring(ldb,
369                                                "Entry %s already exists",
370                                                ldb_dn_get_linearized(msg->dn));
371                 }
372                 return ret;
373         }
374
375         ret = ltdb_index_add_new(module, msg);
376         if (ret != LDB_SUCCESS) {
377                 return ret;
378         }
379
380         ret = ltdb_modified(module, msg->dn);
381
382         return ret;
383 }
384
385 /*
386   add a record to the database
387 */
388 static int ltdb_add(struct ltdb_context *ctx)
389 {
390         struct ldb_module *module = ctx->module;
391         struct ldb_request *req = ctx->req;
392         int ret = LDB_SUCCESS;
393
394         ret = ltdb_check_special_dn(module, req->op.add.message);
395         if (ret != LDB_SUCCESS) {
396                 return ret;
397         }
398
399         ldb_request_set_state(req, LDB_ASYNC_PENDING);
400
401         if (ltdb_cache_load(module) != 0) {
402                 return LDB_ERR_OPERATIONS_ERROR;
403         }
404
405         ret = ltdb_add_internal(module, req->op.add.message, true);
406
407         return ret;
408 }
409
410 /*
411   delete a record from the database, not updating indexes (used for deleting
412   index records)
413 */
414 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
415 {
416         void *data = ldb_module_get_private(module);
417         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
418         TDB_DATA tdb_key;
419         int ret;
420
421         tdb_key = ltdb_key(module, dn);
422         if (!tdb_key.dptr) {
423                 return LDB_ERR_OTHER;
424         }
425
426         ret = tdb_delete(ltdb->tdb, tdb_key);
427         talloc_free(tdb_key.dptr);
428
429         if (ret != 0) {
430                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
431         }
432
433         return ret;
434 }
435
436 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
437 {
438         struct ldb_message *msg;
439         int ret = LDB_SUCCESS;
440
441         msg = ldb_msg_new(module);
442         if (msg == NULL) {
443                 return LDB_ERR_OPERATIONS_ERROR;
444         }
445
446         /* in case any attribute of the message was indexed, we need
447            to fetch the old record */
448         ret = ltdb_search_dn1(module, dn, msg);
449         if (ret != LDB_SUCCESS) {
450                 /* not finding the old record is an error */
451                 goto done;
452         }
453
454         ret = ltdb_delete_noindex(module, dn);
455         if (ret != LDB_SUCCESS) {
456                 goto done;
457         }
458
459         /* remove any indexed attributes */
460         ret = ltdb_index_delete(module, msg);
461         if (ret != LDB_SUCCESS) {
462                 goto done;
463         }
464
465         ret = ltdb_modified(module, dn);
466         if (ret != LDB_SUCCESS) {
467                 goto done;
468         }
469
470 done:
471         talloc_free(msg);
472         return ret;
473 }
474
475 /*
476   delete a record from the database
477 */
478 static int ltdb_delete(struct ltdb_context *ctx)
479 {
480         struct ldb_module *module = ctx->module;
481         struct ldb_request *req = ctx->req;
482         int ret = LDB_SUCCESS;
483
484         ldb_request_set_state(req, LDB_ASYNC_PENDING);
485
486         if (ltdb_cache_load(module) != 0) {
487                 return LDB_ERR_OPERATIONS_ERROR;
488         }
489
490         ret = ltdb_delete_internal(module, req->op.del.dn);
491
492         return ret;
493 }
494
495 /*
496   find an element by attribute name. At the moment this does a linear search,
497   it should be re-coded to use a binary search once all places that modify
498   records guarantee sorted order
499
500   return the index of the first matching element if found, otherwise -1
501 */
502 static int find_element(const struct ldb_message *msg, const char *name)
503 {
504         unsigned int i;
505         for (i=0;i<msg->num_elements;i++) {
506                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
507                         return i;
508                 }
509         }
510         return -1;
511 }
512
513
514 /*
515   add an element to an existing record. Assumes a elements array that we
516   can call re-alloc on, and assumed that we can re-use the data pointers from
517   the passed in additional values. Use with care!
518
519   returns 0 on success, -1 on failure (and sets errno)
520 */
521 static int ltdb_msg_add_element(struct ldb_context *ldb,
522                                 struct ldb_message *msg,
523                                 struct ldb_message_element *el)
524 {
525         struct ldb_message_element *e2;
526         unsigned int i;
527
528         if (el->num_values == 0) {
529                 /* nothing to do here - we don't add empty elements */
530                 return 0;
531         }
532
533         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
534                               msg->num_elements+1);
535         if (!e2) {
536                 errno = ENOMEM;
537                 return -1;
538         }
539
540         msg->elements = e2;
541
542         e2 = &msg->elements[msg->num_elements];
543
544         e2->name = el->name;
545         e2->flags = el->flags;
546         e2->values = talloc_array(msg->elements,
547                                   struct ldb_val, el->num_values);
548         if (!e2->values) {
549                 errno = ENOMEM;
550                 return -1;
551         }
552         for (i=0;i<el->num_values;i++) {
553                 e2->values[i] = el->values[i];
554         }
555         e2->num_values = el->num_values;
556
557         ++msg->num_elements;
558
559         return 0;
560 }
561
562 /*
563   delete all elements having a specified attribute name
564 */
565 static int msg_delete_attribute(struct ldb_module *module,
566                                 struct ldb_context *ldb,
567                                 struct ldb_message *msg, const char *name)
568 {
569         unsigned int i;
570         int ret;
571         struct ldb_message_element *el;
572
573         el = ldb_msg_find_element(msg, name);
574         if (el == NULL) {
575                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
576         }
577         i = el - msg->elements;
578
579         ret = ltdb_index_del_element(module, msg->dn, el);
580         if (ret != LDB_SUCCESS) {
581                 return ret;
582         }
583
584         talloc_free(el->values);
585         if (msg->num_elements > (i+1)) {
586                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
587         }
588         msg->num_elements--;
589         msg->elements = talloc_realloc(msg, msg->elements,
590                                        struct ldb_message_element,
591                                        msg->num_elements);
592         return LDB_SUCCESS;
593 }
594
595 /*
596   delete all elements matching an attribute name/value
597
598   return LDB Error on failure
599 */
600 static int msg_delete_element(struct ldb_module *module,
601                               struct ldb_message *msg,
602                               const char *name,
603                               const struct ldb_val *val)
604 {
605         struct ldb_context *ldb = ldb_module_get_ctx(module);
606         unsigned int i;
607         int found, ret;
608         struct ldb_message_element *el;
609         const struct ldb_schema_attribute *a;
610
611         found = find_element(msg, name);
612         if (found == -1) {
613                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
614         }
615
616         i = (unsigned int) found;
617         el = &(msg->elements[i]);
618
619         a = ldb_schema_attribute_by_name(ldb, el->name);
620
621         for (i=0;i<el->num_values;i++) {
622                 bool matched;
623                 if (a->syntax->operator_fn) {
624                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
625                                                      &el->values[i], val, &matched);
626                         if (ret != LDB_SUCCESS) return ret;
627                 } else {
628                         matched = (a->syntax->comparison_fn(ldb, ldb,
629                                                             &el->values[i], val) == 0);
630                 }
631                 if (matched) {
632                         if (el->num_values == 1) {
633                                 return msg_delete_attribute(module, ldb, msg, name);
634                         }
635
636                         ret = ltdb_index_del_value(module, msg->dn, el, i);
637                         if (ret != LDB_SUCCESS) {
638                                 return ret;
639                         }
640
641                         if (i<el->num_values-1) {
642                                 memmove(&el->values[i], &el->values[i+1],
643                                         sizeof(el->values[i])*
644                                                 (el->num_values-(i+1)));
645                         }
646                         el->num_values--;
647
648                         /* per definition we find in a canonicalised message an
649                            attribute value only once. So we are finished here */
650                         return LDB_SUCCESS;
651                 }
652         }
653
654         /* Not found */
655         return LDB_ERR_NO_SUCH_ATTRIBUTE;
656 }
657
658
659 /*
660   modify a record - internal interface
661
662   yuck - this is O(n^2). Luckily n is usually small so we probably
663   get away with it, but if we ever have really large attribute lists
664   then we'll need to look at this again
665
666   'req' is optional, and is used to specify controls if supplied
667 */
668 int ltdb_modify_internal(struct ldb_module *module,
669                          const struct ldb_message *msg,
670                          struct ldb_request *req)
671 {
672         struct ldb_context *ldb = ldb_module_get_ctx(module);
673         void *data = ldb_module_get_private(module);
674         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
675         TDB_DATA tdb_key, tdb_data;
676         struct ldb_message *msg2;
677         unsigned int i, j, k;
678         int ret = LDB_SUCCESS, idx;
679         struct ldb_control *control_permissive = NULL;
680
681         if (req) {
682                 control_permissive = ldb_request_get_control(req,
683                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
684         }
685
686         tdb_key = ltdb_key(module, msg->dn);
687         if (!tdb_key.dptr) {
688                 return LDB_ERR_OTHER;
689         }
690
691         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
692         if (!tdb_data.dptr) {
693                 talloc_free(tdb_key.dptr);
694                 return ltdb_err_map(tdb_error(ltdb->tdb));
695         }
696
697         msg2 = ldb_msg_new(tdb_key.dptr);
698         if (msg2 == NULL) {
699                 free(tdb_data.dptr);
700                 ret = LDB_ERR_OTHER;
701                 goto done;
702         }
703
704         ret = ldb_unpack_data(ldb_module_get_ctx(module), (struct ldb_val *)&tdb_data, msg2);
705         free(tdb_data.dptr);
706         if (ret == -1) {
707                 ret = LDB_ERR_OTHER;
708                 goto done;
709         }
710
711         if (!msg2->dn) {
712                 msg2->dn = msg->dn;
713         }
714
715         for (i=0; i<msg->num_elements; i++) {
716                 struct ldb_message_element *el = &msg->elements[i], *el2;
717                 struct ldb_val *vals;
718                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
719                 const char *dn;
720
721                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
722                 case LDB_FLAG_MOD_ADD:
723
724                         if (el->num_values == 0) {
725                                 ldb_asprintf_errstring(ldb,
726                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
727                                                        el->name, ldb_dn_get_linearized(msg2->dn));
728                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
729                                 goto done;
730                         }
731
732                         /* make a copy of the array so that a permissive
733                          * control can remove duplicates without changing the
734                          * original values, but do not copy data as we do not
735                          * need to keep it around once the operation is
736                          * finished */
737                         if (control_permissive) {
738                                 el = talloc(msg2, struct ldb_message_element);
739                                 if (!el) {
740                                         ret = LDB_ERR_OTHER;
741                                         goto done;
742                                 }
743                                 *el = msg->elements[i];
744                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
745                                 if (el->values == NULL) {
746                                         ret = LDB_ERR_OTHER;
747                                         goto done;
748                                 }
749                                 for (j = 0; j < el->num_values; j++) {
750                                         el->values[j] = msg->elements[i].values[j];
751                                 }
752                         }
753
754                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
755                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
756                                                        el->name, ldb_dn_get_linearized(msg2->dn));
757                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
758                                 goto done;
759                         }
760
761                         /* Checks if element already exists */
762                         idx = find_element(msg2, el->name);
763                         if (idx == -1) {
764                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
765                                         ret = LDB_ERR_OTHER;
766                                         goto done;
767                                 }
768                                 ret = ltdb_index_add_element(module, msg2->dn,
769                                                              el);
770                                 if (ret != LDB_SUCCESS) {
771                                         goto done;
772                                 }
773                         } else {
774                                 j = (unsigned int) idx;
775                                 el2 = &(msg2->elements[j]);
776
777                                 /* We cannot add another value on a existing one
778                                    if the attribute is single-valued */
779                                 if (ldb_tdb_single_valued(a, el)) {
780                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
781                                                                el->name, ldb_dn_get_linearized(msg2->dn));
782                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
783                                         goto done;
784                                 }
785
786                                 /* Check that values don't exist yet on multi-
787                                    valued attributes or aren't provided twice */
788                                 /* TODO: This is O(n^2) - replace with more efficient check */
789                                 for (j = 0; j < el->num_values; j++) {
790                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
791                                                 if (control_permissive) {
792                                                         /* remove this one as if it was never added */
793                                                         el->num_values--;
794                                                         for (k = j; k < el->num_values; k++) {
795                                                                 el->values[k] = el->values[k + 1];
796                                                         }
797                                                         j--; /* rewind */
798
799                                                         continue;
800                                                 }
801
802                                                 ldb_asprintf_errstring(ldb,
803                                                                        "attribute '%s': value #%u on '%s' already exists",
804                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
805                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
806                                                 goto done;
807                                         }
808                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
809                                                 ldb_asprintf_errstring(ldb,
810                                                                        "attribute '%s': value #%u on '%s' provided more than once",
811                                                                        el->name, j, ldb_dn_get_linearized(msg2->dn));
812                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
813                                                 goto done;
814                                         }
815                                 }
816
817                                 /* Now combine existing and new values to a new
818                                    attribute record */
819                                 vals = talloc_realloc(msg2->elements,
820                                                       el2->values, struct ldb_val,
821                                                       el2->num_values + el->num_values);
822                                 if (vals == NULL) {
823                                         ldb_oom(ldb);
824                                         ret = LDB_ERR_OTHER;
825                                         goto done;
826                                 }
827
828                                 for (j=0; j<el->num_values; j++) {
829                                         vals[el2->num_values + j] =
830                                                 ldb_val_dup(vals, &el->values[j]);
831                                 }
832
833                                 el2->values = vals;
834                                 el2->num_values += el->num_values;
835
836                                 ret = ltdb_index_add_element(module, msg2->dn, el);
837                                 if (ret != LDB_SUCCESS) {
838                                         goto done;
839                                 }
840                         }
841
842                         break;
843
844                 case LDB_FLAG_MOD_REPLACE:
845
846                         if (el->num_values > 1 && ldb_tdb_single_valued(a, el)) {
847                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
848                                                        el->name, ldb_dn_get_linearized(msg2->dn));
849                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
850                                 goto done;
851                         }
852
853                         /* TODO: This is O(n^2) - replace with more efficient check */
854                         for (j=0; j<el->num_values; j++) {
855                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
856                                         ldb_asprintf_errstring(ldb,
857                                                                "attribute '%s': value #%u on '%s' provided more than once",
858                                                                el->name, j, ldb_dn_get_linearized(msg2->dn));
859                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
860                                         goto done;
861                                 }
862                         }
863
864                         /* Checks if element already exists */
865                         idx = find_element(msg2, el->name);
866                         if (idx != -1) {
867                                 j = (unsigned int) idx;
868                                 el2 = &(msg2->elements[j]);
869
870                                 /* we consider two elements to be
871                                  * equal only if the order
872                                  * matches. This allows dbcheck to
873                                  * fix the ordering on attributes
874                                  * where order matters, such as
875                                  * objectClass
876                                  */
877                                 if (ldb_msg_element_equal_ordered(el, el2)) {
878                                         continue;
879                                 }
880
881                                 /* Delete the attribute if it exists in the DB */
882                                 if (msg_delete_attribute(module, ldb, msg2,
883                                                          el->name) != 0) {
884                                         ret = LDB_ERR_OTHER;
885                                         goto done;
886                                 }
887                         }
888
889                         /* Recreate it with the new values */
890                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
891                                 ret = LDB_ERR_OTHER;
892                                 goto done;
893                         }
894
895                         ret = ltdb_index_add_element(module, msg2->dn, el);
896                         if (ret != LDB_SUCCESS) {
897                                 goto done;
898                         }
899
900                         break;
901
902                 case LDB_FLAG_MOD_DELETE:
903                         dn = ldb_dn_get_linearized(msg2->dn);
904                         if (dn == NULL) {
905                                 ret = LDB_ERR_OTHER;
906                                 goto done;
907                         }
908
909                         if (msg->elements[i].num_values == 0) {
910                                 /* Delete the whole attribute */
911                                 ret = msg_delete_attribute(module, ldb, msg2,
912                                                            msg->elements[i].name);
913                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
914                                     control_permissive) {
915                                         ret = LDB_SUCCESS;
916                                 } else {
917                                         ldb_asprintf_errstring(ldb,
918                                                                "attribute '%s': no such attribute for delete on '%s'",
919                                                                msg->elements[i].name, dn);
920                                 }
921                                 if (ret != LDB_SUCCESS) {
922                                         goto done;
923                                 }
924                         } else {
925                                 /* Delete specified values from an attribute */
926                                 for (j=0; j < msg->elements[i].num_values; j++) {
927                                         ret = msg_delete_element(module,
928                                                                  msg2,
929                                                                  msg->elements[i].name,
930                                                                  &msg->elements[i].values[j]);
931                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
932                                             control_permissive) {
933                                                 ret = LDB_SUCCESS;
934                                         } else {
935                                                 ldb_asprintf_errstring(ldb,
936                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
937                                                                        msg->elements[i].name, dn);
938                                         }
939                                         if (ret != LDB_SUCCESS) {
940                                                 goto done;
941                                         }
942                                 }
943                         }
944                         break;
945                 default:
946                         ldb_asprintf_errstring(ldb,
947                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
948                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
949                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
950                         ret = LDB_ERR_PROTOCOL_ERROR;
951                         goto done;
952                 }
953         }
954
955         ret = ltdb_store(module, msg2, TDB_MODIFY);
956         if (ret != LDB_SUCCESS) {
957                 goto done;
958         }
959
960         ret = ltdb_modified(module, msg2->dn);
961         if (ret != LDB_SUCCESS) {
962                 goto done;
963         }
964
965 done:
966         talloc_free(tdb_key.dptr);
967         return ret;
968 }
969
970 /*
971   modify a record
972 */
973 static int ltdb_modify(struct ltdb_context *ctx)
974 {
975         struct ldb_module *module = ctx->module;
976         struct ldb_request *req = ctx->req;
977         int ret = LDB_SUCCESS;
978
979         ret = ltdb_check_special_dn(module, req->op.mod.message);
980         if (ret != LDB_SUCCESS) {
981                 return ret;
982         }
983
984         ldb_request_set_state(req, LDB_ASYNC_PENDING);
985
986         if (ltdb_cache_load(module) != 0) {
987                 return LDB_ERR_OPERATIONS_ERROR;
988         }
989
990         ret = ltdb_modify_internal(module, req->op.mod.message, req);
991
992         return ret;
993 }
994
995 /*
996   rename a record
997 */
998 static int ltdb_rename(struct ltdb_context *ctx)
999 {
1000         struct ldb_module *module = ctx->module;
1001         void *data = ldb_module_get_private(module);
1002         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1003         struct ldb_request *req = ctx->req;
1004         struct ldb_message *msg;
1005         int ret = LDB_SUCCESS;
1006         TDB_DATA tdb_key, tdb_key_old;
1007
1008         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1009
1010         if (ltdb_cache_load(ctx->module) != 0) {
1011                 return LDB_ERR_OPERATIONS_ERROR;
1012         }
1013
1014         msg = ldb_msg_new(ctx);
1015         if (msg == NULL) {
1016                 return LDB_ERR_OPERATIONS_ERROR;
1017         }
1018
1019         /* we need to fetch the old record to re-add under the new name */
1020         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
1021         if (ret != LDB_SUCCESS) {
1022                 /* not finding the old record is an error */
1023                 return ret;
1024         }
1025
1026         /* We need to, before changing the DB, check if the new DN
1027          * exists, so we can return this error to the caller with an
1028          * unmodified DB */
1029         tdb_key = ltdb_key(module, req->op.rename.newdn);
1030         if (!tdb_key.dptr) {
1031                 talloc_free(msg);
1032                 return LDB_ERR_OPERATIONS_ERROR;
1033         }
1034
1035         tdb_key_old = ltdb_key(module, req->op.rename.olddn);
1036         if (!tdb_key_old.dptr) {
1037                 talloc_free(msg);
1038                 talloc_free(tdb_key.dptr);
1039                 return LDB_ERR_OPERATIONS_ERROR;
1040         }
1041
1042         /* Only declare a conflict if the new DN already exists, and it isn't a case change on the old DN */
1043         if (tdb_key_old.dsize != tdb_key.dsize || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1044                 if (tdb_exists(ltdb->tdb, tdb_key)) {
1045                         talloc_free(tdb_key_old.dptr);
1046                         talloc_free(tdb_key.dptr);
1047                         ldb_asprintf_errstring(ldb_module_get_ctx(module),
1048                                                "Entry %s already exists",
1049                                                ldb_dn_get_linearized(msg->dn));
1050                         /* finding the new record already in the DB is an error */
1051                         talloc_free(msg);
1052                         return LDB_ERR_ENTRY_ALREADY_EXISTS;
1053                 }
1054         }
1055         talloc_free(tdb_key_old.dptr);
1056         talloc_free(tdb_key.dptr);
1057
1058         /* Always delete first then add, to avoid conflicts with
1059          * unique indexes. We rely on the transaction to make this
1060          * atomic
1061          */
1062         ret = ltdb_delete_internal(module, msg->dn);
1063         if (ret != LDB_SUCCESS) {
1064                 talloc_free(msg);
1065                 return ret;
1066         }
1067
1068         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1069         if (msg->dn == NULL) {
1070                 talloc_free(msg);
1071                 return LDB_ERR_OPERATIONS_ERROR;
1072         }
1073
1074         /* We don't check single value as we can have more than 1 with
1075          * deleted attributes. We could go through all elements but that's
1076          * maybe not the most efficient way
1077          */
1078         ret = ltdb_add_internal(module, msg, false);
1079
1080         talloc_free(msg);
1081
1082         return ret;
1083 }
1084
1085 static int ltdb_start_trans(struct ldb_module *module)
1086 {
1087         void *data = ldb_module_get_private(module);
1088         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1089
1090         if (tdb_transaction_start(ltdb->tdb) != 0) {
1091                 return ltdb_err_map(tdb_error(ltdb->tdb));
1092         }
1093
1094         ltdb->in_transaction++;
1095
1096         ltdb_index_transaction_start(module);
1097
1098         return LDB_SUCCESS;
1099 }
1100
1101 static int ltdb_prepare_commit(struct ldb_module *module)
1102 {
1103         void *data = ldb_module_get_private(module);
1104         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1105
1106         if (ltdb->in_transaction != 1) {
1107                 return LDB_SUCCESS;
1108         }
1109
1110         if (ltdb_index_transaction_commit(module) != 0) {
1111                 tdb_transaction_cancel(ltdb->tdb);
1112                 ltdb->in_transaction--;
1113                 return ltdb_err_map(tdb_error(ltdb->tdb));
1114         }
1115
1116         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
1117                 ltdb->in_transaction--;
1118                 return ltdb_err_map(tdb_error(ltdb->tdb));
1119         }
1120
1121         ltdb->prepared_commit = true;
1122
1123         return LDB_SUCCESS;
1124 }
1125
1126 static int ltdb_end_trans(struct ldb_module *module)
1127 {
1128         void *data = ldb_module_get_private(module);
1129         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1130
1131         if (!ltdb->prepared_commit) {
1132                 int ret = ltdb_prepare_commit(module);
1133                 if (ret != LDB_SUCCESS) {
1134                         return ret;
1135                 }
1136         }
1137
1138         ltdb->in_transaction--;
1139         ltdb->prepared_commit = false;
1140
1141         if (tdb_transaction_commit(ltdb->tdb) != 0) {
1142                 return ltdb_err_map(tdb_error(ltdb->tdb));
1143         }
1144
1145         return LDB_SUCCESS;
1146 }
1147
1148 static int ltdb_del_trans(struct ldb_module *module)
1149 {
1150         void *data = ldb_module_get_private(module);
1151         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
1152
1153         ltdb->in_transaction--;
1154
1155         if (ltdb_index_transaction_cancel(module) != 0) {
1156                 tdb_transaction_cancel(ltdb->tdb);
1157                 return ltdb_err_map(tdb_error(ltdb->tdb));
1158         }
1159
1160         tdb_transaction_cancel(ltdb->tdb);
1161         return LDB_SUCCESS;
1162 }
1163
1164 /*
1165   return sequenceNumber from @BASEINFO
1166 */
1167 static int ltdb_sequence_number(struct ltdb_context *ctx,
1168                                 struct ldb_extended **ext)
1169 {
1170         struct ldb_context *ldb;
1171         struct ldb_module *module = ctx->module;
1172         struct ldb_request *req = ctx->req;
1173         TALLOC_CTX *tmp_ctx = NULL;
1174         struct ldb_seqnum_request *seq;
1175         struct ldb_seqnum_result *res;
1176         struct ldb_message *msg = NULL;
1177         struct ldb_dn *dn;
1178         const char *date;
1179         int ret = LDB_SUCCESS;
1180
1181         ldb = ldb_module_get_ctx(module);
1182
1183         seq = talloc_get_type(req->op.extended.data,
1184                                 struct ldb_seqnum_request);
1185         if (seq == NULL) {
1186                 return LDB_ERR_OPERATIONS_ERROR;
1187         }
1188
1189         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1190
1191         if (ltdb_lock_read(module) != 0) {
1192                 return LDB_ERR_OPERATIONS_ERROR;
1193         }
1194
1195         res = talloc_zero(req, struct ldb_seqnum_result);
1196         if (res == NULL) {
1197                 ret = LDB_ERR_OPERATIONS_ERROR;
1198                 goto done;
1199         }
1200
1201         tmp_ctx = talloc_new(req);
1202         if (tmp_ctx == NULL) {
1203                 ret = LDB_ERR_OPERATIONS_ERROR;
1204                 goto done;
1205         }
1206
1207         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1208         if (dn == NULL) {
1209                 ret = LDB_ERR_OPERATIONS_ERROR;
1210                 goto done;
1211         }
1212
1213         msg = ldb_msg_new(tmp_ctx);
1214         if (msg == NULL) {
1215                 ret = LDB_ERR_OPERATIONS_ERROR;
1216                 goto done;
1217         }
1218
1219         ret = ltdb_search_dn1(module, dn, msg);
1220         if (ret != LDB_SUCCESS) {
1221                 goto done;
1222         }
1223
1224         switch (seq->type) {
1225         case LDB_SEQ_HIGHEST_SEQ:
1226                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1227                 break;
1228         case LDB_SEQ_NEXT:
1229                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1230                 res->seq_num++;
1231                 break;
1232         case LDB_SEQ_HIGHEST_TIMESTAMP:
1233                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1234                 if (date) {
1235                         res->seq_num = ldb_string_to_time(date);
1236                 } else {
1237                         res->seq_num = 0;
1238                         /* zero is as good as anything when we don't know */
1239                 }
1240                 break;
1241         }
1242
1243         *ext = talloc_zero(req, struct ldb_extended);
1244         if (*ext == NULL) {
1245                 ret = LDB_ERR_OPERATIONS_ERROR;
1246                 goto done;
1247         }
1248         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1249         (*ext)->data = talloc_steal(*ext, res);
1250
1251 done:
1252         talloc_free(tmp_ctx);
1253         ltdb_unlock_read(module);
1254         return ret;
1255 }
1256
1257 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1258 {
1259         struct ldb_context *ldb;
1260         struct ldb_request *req;
1261         struct ldb_reply *ares;
1262
1263         ldb = ldb_module_get_ctx(ctx->module);
1264         req = ctx->req;
1265
1266         /* if we already returned an error just return */
1267         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1268                 return;
1269         }
1270
1271         ares = talloc_zero(req, struct ldb_reply);
1272         if (!ares) {
1273                 ldb_oom(ldb);
1274                 req->callback(req, NULL);
1275                 return;
1276         }
1277         ares->type = LDB_REPLY_DONE;
1278         ares->error = error;
1279
1280         req->callback(req, ares);
1281 }
1282
1283 static void ltdb_timeout(struct tevent_context *ev,
1284                           struct tevent_timer *te,
1285                           struct timeval t,
1286                           void *private_data)
1287 {
1288         struct ltdb_context *ctx;
1289         ctx = talloc_get_type(private_data, struct ltdb_context);
1290
1291         if (!ctx->request_terminated) {
1292                 /* request is done now */
1293                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1294         }
1295
1296         if (ctx->spy) {
1297                 /* neutralize the spy */
1298                 ctx->spy->ctx = NULL;
1299                 ctx->spy = NULL;
1300         }
1301         talloc_free(ctx);
1302 }
1303
1304 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1305                                         struct ldb_extended *ext,
1306                                         int error)
1307 {
1308         struct ldb_context *ldb;
1309         struct ldb_request *req;
1310         struct ldb_reply *ares;
1311
1312         ldb = ldb_module_get_ctx(ctx->module);
1313         req = ctx->req;
1314
1315         /* if we already returned an error just return */
1316         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1317                 return;
1318         }
1319
1320         ares = talloc_zero(req, struct ldb_reply);
1321         if (!ares) {
1322                 ldb_oom(ldb);
1323                 req->callback(req, NULL);
1324                 return;
1325         }
1326         ares->type = LDB_REPLY_DONE;
1327         ares->response = ext;
1328         ares->error = error;
1329
1330         req->callback(req, ares);
1331 }
1332
1333 static void ltdb_handle_extended(struct ltdb_context *ctx)
1334 {
1335         struct ldb_extended *ext = NULL;
1336         int ret;
1337
1338         if (strcmp(ctx->req->op.extended.oid,
1339                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1340                 /* get sequence number */
1341                 ret = ltdb_sequence_number(ctx, &ext);
1342         } else {
1343                 /* not recognized */
1344                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1345         }
1346
1347         ltdb_request_extended_done(ctx, ext, ret);
1348 }
1349
1350 static void ltdb_callback(struct tevent_context *ev,
1351                           struct tevent_timer *te,
1352                           struct timeval t,
1353                           void *private_data)
1354 {
1355         struct ltdb_context *ctx;
1356         int ret;
1357
1358         ctx = talloc_get_type(private_data, struct ltdb_context);
1359
1360         if (ctx->request_terminated) {
1361                 goto done;
1362         }
1363
1364         switch (ctx->req->operation) {
1365         case LDB_SEARCH:
1366                 ret = ltdb_search(ctx);
1367                 break;
1368         case LDB_ADD:
1369                 ret = ltdb_add(ctx);
1370                 break;
1371         case LDB_MODIFY:
1372                 ret = ltdb_modify(ctx);
1373                 break;
1374         case LDB_DELETE:
1375                 ret = ltdb_delete(ctx);
1376                 break;
1377         case LDB_RENAME:
1378                 ret = ltdb_rename(ctx);
1379                 break;
1380         case LDB_EXTENDED:
1381                 ltdb_handle_extended(ctx);
1382                 goto done;
1383         default:
1384                 /* no other op supported */
1385                 ret = LDB_ERR_PROTOCOL_ERROR;
1386         }
1387
1388         if (!ctx->request_terminated) {
1389                 /* request is done now */
1390                 ltdb_request_done(ctx, ret);
1391         }
1392
1393 done:
1394         if (ctx->spy) {
1395                 /* neutralize the spy */
1396                 ctx->spy->ctx = NULL;
1397                 ctx->spy = NULL;
1398         }
1399         talloc_free(ctx);
1400 }
1401
1402 static int ltdb_request_destructor(void *ptr)
1403 {
1404         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1405
1406         if (spy->ctx != NULL) {
1407                 spy->ctx->spy = NULL;
1408                 spy->ctx->request_terminated = true;
1409                 spy->ctx = NULL;
1410         }
1411
1412         return 0;
1413 }
1414
1415 static int ltdb_handle_request(struct ldb_module *module,
1416                                struct ldb_request *req)
1417 {
1418         struct ldb_control *control_permissive;
1419         struct ldb_context *ldb;
1420         struct tevent_context *ev;
1421         struct ltdb_context *ac;
1422         struct tevent_timer *te;
1423         struct timeval tv;
1424         unsigned int i;
1425
1426         ldb = ldb_module_get_ctx(module);
1427
1428         control_permissive = ldb_request_get_control(req,
1429                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1430
1431         for (i = 0; req->controls && req->controls[i]; i++) {
1432                 if (req->controls[i]->critical &&
1433                     req->controls[i] != control_permissive) {
1434                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1435                                                req->controls[i]->oid);
1436                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1437                 }
1438         }
1439
1440         if (req->starttime == 0 || req->timeout == 0) {
1441                 ldb_set_errstring(ldb, "Invalid timeout settings");
1442                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1443         }
1444
1445         ev = ldb_get_event_context(ldb);
1446
1447         ac = talloc_zero(ldb, struct ltdb_context);
1448         if (ac == NULL) {
1449                 ldb_oom(ldb);
1450                 return LDB_ERR_OPERATIONS_ERROR;
1451         }
1452
1453         ac->module = module;
1454         ac->req = req;
1455
1456         tv.tv_sec = 0;
1457         tv.tv_usec = 0;
1458         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1459         if (NULL == te) {
1460                 talloc_free(ac);
1461                 return LDB_ERR_OPERATIONS_ERROR;
1462         }
1463
1464         tv.tv_sec = req->starttime + req->timeout;
1465         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1466         if (NULL == ac->timeout_event) {
1467                 talloc_free(ac);
1468                 return LDB_ERR_OPERATIONS_ERROR;
1469         }
1470
1471         /* set a spy so that we do not try to use the request context
1472          * if it is freed before ltdb_callback fires */
1473         ac->spy = talloc(req, struct ltdb_req_spy);
1474         if (NULL == ac->spy) {
1475                 talloc_free(ac);
1476                 return LDB_ERR_OPERATIONS_ERROR;
1477         }
1478         ac->spy->ctx = ac;
1479
1480         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1481
1482         return LDB_SUCCESS;
1483 }
1484
1485 static int ltdb_init_rootdse(struct ldb_module *module)
1486 {
1487         /* ignore errors on this - we expect it for non-sam databases */
1488         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1489
1490         /* there can be no module beyond the backend, just return */
1491         return LDB_SUCCESS;
1492 }
1493
1494 static const struct ldb_module_ops ltdb_ops = {
1495         .name              = "tdb",
1496         .init_context      = ltdb_init_rootdse,
1497         .search            = ltdb_handle_request,
1498         .add               = ltdb_handle_request,
1499         .modify            = ltdb_handle_request,
1500         .del               = ltdb_handle_request,
1501         .rename            = ltdb_handle_request,
1502         .extended          = ltdb_handle_request,
1503         .start_transaction = ltdb_start_trans,
1504         .end_transaction   = ltdb_end_trans,
1505         .prepare_commit    = ltdb_prepare_commit,
1506         .del_transaction   = ltdb_del_trans,
1507 };
1508
1509 /*
1510   connect to the database
1511 */
1512 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1513                         unsigned int flags, const char *options[],
1514                         struct ldb_module **_module)
1515 {
1516         struct ldb_module *module;
1517         const char *path;
1518         int tdb_flags, open_flags;
1519         struct ltdb_private *ltdb;
1520
1521         /* parse the url */
1522         if (strchr(url, ':')) {
1523                 if (strncmp(url, "tdb://", 6) != 0) {
1524                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1525                                   "Invalid tdb URL '%s'", url);
1526                         return LDB_ERR_OPERATIONS_ERROR;
1527                 }
1528                 path = url+6;
1529         } else {
1530                 path = url;
1531         }
1532
1533         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1534
1535         /* check for the 'nosync' option */
1536         if (flags & LDB_FLG_NOSYNC) {
1537                 tdb_flags |= TDB_NOSYNC;
1538         }
1539
1540         /* and nommap option */
1541         if (flags & LDB_FLG_NOMMAP) {
1542                 tdb_flags |= TDB_NOMMAP;
1543         }
1544
1545         if (flags & LDB_FLG_RDONLY) {
1546                 open_flags = O_RDONLY;
1547         } else {
1548                 open_flags = O_CREAT | O_RDWR;
1549         }
1550
1551         ltdb = talloc_zero(ldb, struct ltdb_private);
1552         if (!ltdb) {
1553                 ldb_oom(ldb);
1554                 return LDB_ERR_OPERATIONS_ERROR;
1555         }
1556
1557         /* note that we use quite a large default hash size */
1558         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1559                                    tdb_flags, open_flags,
1560                                    ldb_get_create_perms(ldb), ldb);
1561         if (!ltdb->tdb) {
1562                 ldb_asprintf_errstring(ldb,
1563                                        "Unable to open tdb '%s'", path);
1564                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1565                           "Unable to open tdb '%s'", path);
1566                 talloc_free(ltdb);
1567                 return LDB_ERR_OPERATIONS_ERROR;
1568         }
1569
1570         if (getenv("LDB_WARN_UNINDEXED")) {
1571                 ltdb->warn_unindexed = true;
1572         }
1573
1574         if (getenv("LDB_WARN_REINDEX")) {
1575                 ltdb->warn_reindex = true;
1576         }
1577
1578         ltdb->sequence_number = 0;
1579
1580         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1581         if (!module) {
1582                 ldb_oom(ldb);
1583                 talloc_free(ltdb);
1584                 return LDB_ERR_OPERATIONS_ERROR;
1585         }
1586         ldb_module_set_private(module, ltdb);
1587         talloc_steal(module, ltdb);
1588
1589         if (ltdb_cache_load(module) != 0) {
1590                 ldb_asprintf_errstring(ldb,
1591                                        "Unable to load ltdb cache records of tdb '%s'", path);
1592                 talloc_free(module);
1593                 return LDB_ERR_OPERATIONS_ERROR;
1594         }
1595
1596         *_module = module;
1597         return LDB_SUCCESS;
1598 }
1599
1600 int ldb_tdb_init(const char *version)
1601 {
1602         LDB_MODULE_CHECK_VERSION(version);
1603         return ldb_register_backend("tdb", ltdb_connect, false);
1604 }