s4-ldb: show the OID of any unhandled critical controls
[samba.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         int ret = 0;
92
93         if (ltdb->in_transaction == 0 &&
94             ltdb->read_lock_count == 0) {
95                 ret = tdb_lockall_read(ltdb->tdb);
96         }
97         if (ret == 0) {
98                 ltdb->read_lock_count++;
99         }
100         return ret;
101 }
102
103 /*
104   unlock the database after a ltdb_lock_read()
105 */
106 int ltdb_unlock_read(struct ldb_module *module)
107 {
108         void *data = ldb_module_get_private(module);
109         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
110         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
111                 return tdb_unlockall_read(ltdb->tdb);
112         }
113         ltdb->read_lock_count--;
114         return 0;
115 }
116
117
118 /*
119   form a TDB_DATA for a record key
120   caller frees
121
122   note that the key for a record can depend on whether the
123   dn refers to a case sensitive index record or not
124 */
125 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
126 {
127         struct ldb_context *ldb = ldb_module_get_ctx(module);
128         TDB_DATA key;
129         char *key_str = NULL;
130         const char *dn_folded = NULL;
131
132         /*
133           most DNs are case insensitive. The exception is index DNs for
134           case sensitive attributes
135
136           there are 3 cases dealt with in this code:
137
138           1) if the dn doesn't start with @ then uppercase the attribute
139              names and the attributes values of case insensitive attributes
140           2) if the dn starts with @ then leave it alone -
141              the indexing code handles the rest
142         */
143
144         dn_folded = ldb_dn_get_casefold(dn);
145         if (!dn_folded) {
146                 goto failed;
147         }
148
149         key_str = talloc_strdup(ldb, "DN=");
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
155         if (!key_str) {
156                 goto failed;
157         }
158
159         key.dptr = (uint8_t *)key_str;
160         key.dsize = strlen(key_str) + 1;
161
162         return key;
163
164 failed:
165         errno = ENOMEM;
166         key.dptr = NULL;
167         key.dsize = 0;
168         return key;
169 }
170
171 /*
172   check special dn's have valid attributes
173   currently only @ATTRIBUTES is checked
174 */
175 static int ltdb_check_special_dn(struct ldb_module *module,
176                           const struct ldb_message *msg)
177 {
178         struct ldb_context *ldb = ldb_module_get_ctx(module);
179         int i, j;
180
181         if (! ldb_dn_is_special(msg->dn) ||
182             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
183                 return LDB_SUCCESS;
184         }
185
186         /* we have @ATTRIBUTES, let's check attributes are fine */
187         /* should we check that we deny multivalued attributes ? */
188         for (i = 0; i < msg->num_elements; i++) {
189                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
190
191                 for (j = 0; j < msg->elements[i].num_values; j++) {
192                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
193                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
194                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
195                         }
196                 }
197         }
198
199         return LDB_SUCCESS;
200 }
201
202
203 /*
204   we've made a modification to a dn - possibly reindex and
205   update sequence number
206 */
207 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
208 {
209         int ret = LDB_SUCCESS;
210         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
211
212         /* only allow modifies inside a transaction, otherwise the
213          * ldb is unsafe */
214         if (ltdb->in_transaction == 0) {
215                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
216                 return LDB_ERR_OPERATIONS_ERROR;
217         }
218
219         if (ldb_dn_is_special(dn) &&
220             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
221              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
222                 ret = ltdb_reindex(module);
223         }
224
225         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
226         if (ret == LDB_SUCCESS &&
227             !(ldb_dn_is_special(dn) &&
228               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
229                 ret = ltdb_increase_sequence_number(module);
230         }
231
232         /* If the modify was to @OPTIONS, reload the cache */
233         if (ldb_dn_is_special(dn) &&
234             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
235                 ret = ltdb_cache_reload(module);
236         }
237
238         return ret;
239 }
240
241 /*
242   store a record into the db
243 */
244 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
245 {
246         void *data = ldb_module_get_private(module);
247         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
248         TDB_DATA tdb_key, tdb_data;
249         int ret = LDB_SUCCESS;
250
251         tdb_key = ltdb_key(module, msg->dn);
252         if (tdb_key.dptr == NULL) {
253                 return LDB_ERR_OTHER;
254         }
255
256         ret = ltdb_pack_data(module, msg, &tdb_data);
257         if (ret == -1) {
258                 talloc_free(tdb_key.dptr);
259                 return LDB_ERR_OTHER;
260         }
261
262         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
263         if (ret == -1) {
264                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
265                 goto done;
266         }
267
268 done:
269         talloc_free(tdb_key.dptr);
270         talloc_free(tdb_data.dptr);
271
272         return ret;
273 }
274
275
276 static int ltdb_add_internal(struct ldb_module *module,
277                              const struct ldb_message *msg)
278 {
279         struct ldb_context *ldb = ldb_module_get_ctx(module);
280         int ret = LDB_SUCCESS, i;
281
282         ret = ltdb_check_special_dn(module, msg);
283         if (ret != LDB_SUCCESS) {
284                 return ret;
285         }
286
287         if (ltdb_cache_load(module) != 0) {
288                 return LDB_ERR_OPERATIONS_ERROR;
289         }
290
291         for (i=0;i<msg->num_elements;i++) {
292                 struct ldb_message_element *el = &msg->elements[i];
293                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
294
295                 if (el->num_values == 0) {
296                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
297                                                el->name, ldb_dn_get_linearized(msg->dn));
298                         return LDB_ERR_CONSTRAINT_VIOLATION;
299                 }
300                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
301                         if (el->num_values > 1) {
302                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
303                                                        el->name, ldb_dn_get_linearized(msg->dn));
304                                 return LDB_ERR_CONSTRAINT_VIOLATION;
305                         }
306                 }
307         }
308
309         ret = ltdb_store(module, msg, TDB_INSERT);
310         if (ret != LDB_SUCCESS) {
311                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
312                         ldb_asprintf_errstring(ldb,
313                                                "Entry %s already exists",
314                                                ldb_dn_get_linearized(msg->dn));
315                 }
316                 return ret;
317         }
318
319         ret = ltdb_index_add_new(module, msg);
320         if (ret != LDB_SUCCESS) {
321                 return ret;
322         }
323
324         ret = ltdb_modified(module, msg->dn);
325
326         return ret;
327 }
328
329 /*
330   add a record to the database
331 */
332 static int ltdb_add(struct ltdb_context *ctx)
333 {
334         struct ldb_module *module = ctx->module;
335         struct ldb_request *req = ctx->req;
336         int ret = LDB_SUCCESS;
337
338         ldb_request_set_state(req, LDB_ASYNC_PENDING);
339
340         if (ltdb_cache_load(module) != 0) {
341                 return LDB_ERR_OPERATIONS_ERROR;
342         }
343
344         ret = ltdb_add_internal(module, req->op.add.message);
345
346         return ret;
347 }
348
349 /*
350   delete a record from the database, not updating indexes (used for deleting
351   index records)
352 */
353 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
354 {
355         void *data = ldb_module_get_private(module);
356         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
357         TDB_DATA tdb_key;
358         int ret;
359
360         tdb_key = ltdb_key(module, dn);
361         if (!tdb_key.dptr) {
362                 return LDB_ERR_OTHER;
363         }
364
365         ret = tdb_delete(ltdb->tdb, tdb_key);
366         talloc_free(tdb_key.dptr);
367
368         if (ret != 0) {
369                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
370         }
371
372         return ret;
373 }
374
375 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
376 {
377         struct ldb_message *msg;
378         int ret = LDB_SUCCESS;
379
380         msg = talloc(module, struct ldb_message);
381         if (msg == NULL) {
382                 return LDB_ERR_OPERATIONS_ERROR;
383         }
384
385         /* in case any attribute of the message was indexed, we need
386            to fetch the old record */
387         ret = ltdb_search_dn1(module, dn, msg);
388         if (ret != LDB_SUCCESS) {
389                 /* not finding the old record is an error */
390                 goto done;
391         }
392
393         ret = ltdb_delete_noindex(module, dn);
394         if (ret != LDB_SUCCESS) {
395                 goto done;
396         }
397
398         /* remove any indexed attributes */
399         ret = ltdb_index_delete(module, msg);
400         if (ret != LDB_SUCCESS) {
401                 goto done;
402         }
403
404         ret = ltdb_modified(module, dn);
405         if (ret != LDB_SUCCESS) {
406                 goto done;
407         }
408
409 done:
410         talloc_free(msg);
411         return ret;
412 }
413
414 /*
415   delete a record from the database
416 */
417 static int ltdb_delete(struct ltdb_context *ctx)
418 {
419         struct ldb_module *module = ctx->module;
420         struct ldb_request *req = ctx->req;
421         int ret = LDB_SUCCESS;
422
423         ldb_request_set_state(req, LDB_ASYNC_PENDING);
424
425         if (ltdb_cache_load(module) != 0) {
426                 return LDB_ERR_OPERATIONS_ERROR;
427         }
428
429         ret = ltdb_delete_internal(module, req->op.del.dn);
430
431         return ret;
432 }
433
434 /*
435   find an element by attribute name. At the moment this does a linear search,
436   it should be re-coded to use a binary search once all places that modify
437   records guarantee sorted order
438
439   return the index of the first matching element if found, otherwise -1
440 */
441 static int find_element(const struct ldb_message *msg, const char *name)
442 {
443         unsigned int i;
444         for (i=0;i<msg->num_elements;i++) {
445                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
446                         return i;
447                 }
448         }
449         return -1;
450 }
451
452
453 /*
454   add an element to an existing record. Assumes a elements array that we
455   can call re-alloc on, and assumed that we can re-use the data pointers from
456   the passed in additional values. Use with care!
457
458   returns 0 on success, -1 on failure (and sets errno)
459 */
460 static int ltdb_msg_add_element(struct ldb_context *ldb,
461                                 struct ldb_message *msg,
462                                 struct ldb_message_element *el)
463 {
464         struct ldb_message_element *e2;
465         unsigned int i;
466
467         if (el->num_values == 0) {
468                 /* nothing to do here - we don't add empty elements */
469                 return 0;
470         }
471
472         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
473                               msg->num_elements+1);
474         if (!e2) {
475                 errno = ENOMEM;
476                 return -1;
477         }
478
479         msg->elements = e2;
480
481         e2 = &msg->elements[msg->num_elements];
482
483         e2->name = el->name;
484         e2->flags = el->flags;
485         e2->values = talloc_array(msg->elements,
486                                   struct ldb_val, el->num_values);
487         if (!e2->values) {
488                 errno = ENOMEM;
489                 return -1;
490         }
491         for (i=0;i<el->num_values;i++) {
492                 e2->values[i] = el->values[i];
493         }
494         e2->num_values = el->num_values;
495
496         ++msg->num_elements;
497
498         return 0;
499 }
500
501 /*
502   delete all elements having a specified attribute name
503 */
504 static int msg_delete_attribute(struct ldb_module *module,
505                                 struct ldb_context *ldb,
506                                 struct ldb_message *msg, const char *name)
507 {
508         unsigned int i;
509         int ret;
510         struct ldb_message_element *el;
511
512         el = ldb_msg_find_element(msg, name);
513         if (el == NULL) {
514                 return -1;
515         }
516         i = el - msg->elements;
517
518         ret = ltdb_index_del_element(module, msg->dn, el);
519         if (ret != LDB_SUCCESS) {
520                 return ret;
521         }
522
523         talloc_free(el->values);
524         if (msg->num_elements > (i+1)) {
525                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
526         }
527         msg->num_elements--;
528         msg->elements = talloc_realloc(msg, msg->elements,
529                                        struct ldb_message_element,
530                                        msg->num_elements);
531         return 0;
532 }
533
534 /*
535   delete all elements matching an attribute name/value
536
537   return 0 on success, -1 on failure
538 */
539 static int msg_delete_element(struct ldb_module *module,
540                               struct ldb_message *msg,
541                               const char *name,
542                               const struct ldb_val *val)
543 {
544         struct ldb_context *ldb = ldb_module_get_ctx(module);
545         unsigned int i;
546         int found, ret;
547         struct ldb_message_element *el;
548         const struct ldb_schema_attribute *a;
549
550         found = find_element(msg, name);
551         if (found == -1) {
552                 return -1;
553         }
554
555         el = &msg->elements[found];
556
557         a = ldb_schema_attribute_by_name(ldb, el->name);
558
559         for (i=0;i<el->num_values;i++) {
560                 if (a->syntax->comparison_fn(ldb, ldb,
561                                              &el->values[i], val) == 0) {
562                         if (el->num_values == 1) {
563                                 return msg_delete_attribute(module, ldb, msg, name);
564                         }
565
566                         ret = ltdb_index_del_value(module, msg->dn, el, i);
567                         if (ret != LDB_SUCCESS) {
568                                 return -1;
569                         }
570
571                         if (i<el->num_values-1) {
572                                 memmove(&el->values[i], &el->values[i+1],
573                                         sizeof(el->values[i])*
574                                                 (el->num_values-(i+1)));
575                         }
576                         el->num_values--;
577
578                         /* per definition we find in a canonicalised message an
579                            attribute value only once. So we are finished here */
580                         return 0;
581                 }
582         }
583
584         /* Not found */
585         return -1;
586 }
587
588
589 /*
590   modify a record - internal interface
591
592   yuck - this is O(n^2). Luckily n is usually small so we probably
593   get away with it, but if we ever have really large attribute lists
594   then we'll need to look at this again
595 */
596 int ltdb_modify_internal(struct ldb_module *module,
597                          const struct ldb_message *msg)
598 {
599         struct ldb_context *ldb = ldb_module_get_ctx(module);
600         void *data = ldb_module_get_private(module);
601         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
602         TDB_DATA tdb_key, tdb_data;
603         struct ldb_message *msg2;
604         unsigned i, j;
605         int ret = LDB_SUCCESS, idx;
606
607         tdb_key = ltdb_key(module, msg->dn);
608         if (!tdb_key.dptr) {
609                 return LDB_ERR_OTHER;
610         }
611
612         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
613         if (!tdb_data.dptr) {
614                 talloc_free(tdb_key.dptr);
615                 return ltdb_err_map(tdb_error(ltdb->tdb));
616         }
617
618         msg2 = talloc(tdb_key.dptr, struct ldb_message);
619         if (msg2 == NULL) {
620                 free(tdb_data.dptr);
621                 ret = LDB_ERR_OTHER;
622                 goto done;
623         }
624
625         ret = ltdb_unpack_data(module, &tdb_data, msg2);
626         free(tdb_data.dptr);
627         if (ret == -1) {
628                 ret = LDB_ERR_OTHER;
629                 goto done;
630         }
631
632         if (!msg2->dn) {
633                 msg2->dn = msg->dn;
634         }
635
636         for (i=0; i<msg->num_elements; i++) {
637                 struct ldb_message_element *el = &msg->elements[i], *el2;
638                 struct ldb_val *vals;
639                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
640                 const char *dn;
641
642                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
643                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
644                                                ldb_dn_get_linearized(msg2->dn));
645                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
646                         goto done;
647                 }
648
649                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
650                 case LDB_FLAG_MOD_ADD:
651                         if (el->num_values == 0) {
652                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
653                                                        el->name, ldb_dn_get_linearized(msg2->dn));
654                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
655                                 goto done;
656                         }
657
658                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
659                                 if (el->num_values > 1) {
660                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
661                                                                el->name, ldb_dn_get_linearized(msg2->dn));
662                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
663                                         goto done;
664                                 }
665                         }
666
667                         /* Checks if element already exists */
668                         idx = find_element(msg2, el->name);
669                         if (idx == -1) {
670                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
671                                         ret = LDB_ERR_OTHER;
672                                         goto done;
673                                 }
674                                 ret = ltdb_index_add_element(module, msg2->dn, el);
675                                 if (ret != LDB_SUCCESS) {
676                                         goto done;
677                                 }
678                         } else {
679                                 /* We cannot add another value on a existing one
680                                    if the attribute is single-valued */
681                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
682                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
683                                                                el->name, ldb_dn_get_linearized(msg2->dn));
684                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
685                                         goto done;
686                                 }
687
688                                 el2 = &(msg2->elements[idx]);
689
690                                 /* Check that values don't exist yet on multi-
691                                    valued attributes or aren't provided twice */
692                                 for (j=0; j<el->num_values; j++) {
693                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
694                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
695                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
696                                                 goto done;
697                                         }
698                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
699                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
700                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
701                                                 goto done;
702                                         }
703                                 }
704
705                                 /* Now combine existing and new values to a new
706                                    attribute record */
707                                 vals = talloc_realloc(msg2->elements,
708                                                       el2->values, struct ldb_val,
709                                                       el2->num_values + el->num_values);
710                                 if (vals == NULL) {
711                                         ldb_oom(ldb);
712                                         ret = LDB_ERR_OTHER;
713                                         goto done;
714                                 }
715
716                                 for (j=0; j<el->num_values; j++) {
717                                         vals[el2->num_values + j] =
718                                                 ldb_val_dup(vals, &el->values[j]);
719                                 }
720
721                                 el2->values = vals;
722                                 el2->num_values += el->num_values;
723
724                                 ret = ltdb_index_add_element(module, msg2->dn, el);
725                                 if (ret != LDB_SUCCESS) {
726                                         goto done;
727                                 }
728                         }
729
730                         break;
731
732                 case LDB_FLAG_MOD_REPLACE:
733                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
734                                 if (el->num_values > 1) {
735                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
736                                                                el->name, ldb_dn_get_linearized(msg2->dn));
737                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
738                                         goto done;
739                                 }
740                         }
741
742                         /* TODO: This is O(n^2) - replace with more efficient check */
743                         for (j=0; j<el->num_values; j++) {
744                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
745                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
746                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
747                                         goto done;
748                                 }
749                         }
750
751                         idx = find_element(msg2, el->name);
752                         if (idx != -1) {
753                                 el2 = &(msg2->elements[idx]);
754                                 if (ldb_msg_element_compare(el, el2) == 0) {
755                                         /* we are replacing with the same values */
756                                         continue;
757                                 }
758                         
759                                 /* Delete the attribute if it exists in the DB */
760                                 if (msg_delete_attribute(module, ldb, msg2, el->name) != 0) {
761                                         ret = LDB_ERR_OTHER;
762                                         goto done;
763                                 }
764                         }
765
766                         /* Recreate it with the new values */
767                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
768                                 ret = LDB_ERR_OTHER;
769                                 goto done;
770                         }
771
772                         ret = ltdb_index_add_element(module, msg2->dn, el);
773                         if (ret != LDB_SUCCESS) {
774                                 goto done;
775                         }
776
777                         break;
778
779                 case LDB_FLAG_MOD_DELETE:
780                         dn = ldb_dn_get_linearized(msg2->dn);
781                         if (dn == NULL) {
782                                 ret = LDB_ERR_OTHER;
783                                 goto done;
784                         }
785
786                         if (msg->elements[i].num_values == 0) {
787                                 /* Delete the whole attribute */
788                                 if (msg_delete_attribute(module, ldb, msg2,
789                                                          msg->elements[i].name) != 0) {
790                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
791                                                                msg->elements[i].name, dn);
792                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
793                                         goto done;
794                                 }
795                         } else {
796                                 /* Delete specified values from an attribute */
797                                 for (j=0; j < msg->elements[i].num_values; j++) {
798                                         if (msg_delete_element(module,
799                                                                msg2,
800                                                                msg->elements[i].name,
801                                                                &msg->elements[i].values[j]) != 0) {
802                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
803                                                                        msg->elements[i].name, dn);
804                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
805                                                 goto done;
806                                         }
807                                 }
808                         }
809                         break;
810                 default:
811                         ldb_asprintf_errstring(ldb,
812                                 "Invalid ldb_modify flags on %s: 0x%x",
813                                 msg->elements[i].name,
814                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
815                         ret = LDB_ERR_PROTOCOL_ERROR;
816                         goto done;
817                 }
818         }
819
820         ret = ltdb_store(module, msg2, TDB_MODIFY);
821         if (ret != LDB_SUCCESS) {
822                 goto done;
823         }
824
825         ret = ltdb_modified(module, msg2->dn);
826         if (ret != LDB_SUCCESS) {
827                 goto done;
828         }
829
830 done:
831         talloc_free(tdb_key.dptr);
832         return ret;
833 }
834
835 /*
836   modify a record
837 */
838 static int ltdb_modify(struct ltdb_context *ctx)
839 {
840         struct ldb_module *module = ctx->module;
841         struct ldb_request *req = ctx->req;
842         int ret = LDB_SUCCESS;
843
844         ret = ltdb_check_special_dn(module, req->op.mod.message);
845         if (ret != LDB_SUCCESS) {
846                 return ret;
847         }
848
849         ldb_request_set_state(req, LDB_ASYNC_PENDING);
850
851         if (ltdb_cache_load(module) != 0) {
852                 return LDB_ERR_OPERATIONS_ERROR;
853         }
854
855         ret = ltdb_modify_internal(module, req->op.mod.message);
856
857         return ret;
858 }
859
860 /*
861   rename a record
862 */
863 static int ltdb_rename(struct ltdb_context *ctx)
864 {
865         struct ldb_module *module = ctx->module;
866         struct ldb_request *req = ctx->req;
867         struct ldb_message *msg;
868         int ret = LDB_SUCCESS;
869
870         ldb_request_set_state(req, LDB_ASYNC_PENDING);
871
872         if (ltdb_cache_load(ctx->module) != 0) {
873                 return LDB_ERR_OPERATIONS_ERROR;
874         }
875
876         msg = talloc(ctx, struct ldb_message);
877         if (msg == NULL) {
878                 return LDB_ERR_OPERATIONS_ERROR;
879         }
880
881         /* in case any attribute of the message was indexed, we need
882            to fetch the old record */
883         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
884         if (ret != LDB_SUCCESS) {
885                 /* not finding the old record is an error */
886                 return ret;
887         }
888
889         /* Always delete first then add, to avoid conflicts with
890          * unique indexes. We rely on the transaction to make this
891          * atomic
892          */
893         ret = ltdb_delete_internal(module, msg->dn);
894         if (ret != LDB_SUCCESS) {
895                 return ret;
896         }
897
898         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
899         if (msg->dn == NULL) {
900                 return LDB_ERR_OPERATIONS_ERROR;
901         }
902
903         ret = ltdb_add_internal(module, msg);
904
905         return ret;
906 }
907
908 static int ltdb_start_trans(struct ldb_module *module)
909 {
910         void *data = ldb_module_get_private(module);
911         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
912
913         if (tdb_transaction_start(ltdb->tdb) != 0) {
914                 return ltdb_err_map(tdb_error(ltdb->tdb));
915         }
916
917         ltdb->in_transaction++;
918
919         ltdb_index_transaction_start(module);
920
921         return LDB_SUCCESS;
922 }
923
924 static int ltdb_prepare_commit(struct ldb_module *module)
925 {
926         void *data = ldb_module_get_private(module);
927         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
928
929         if (ltdb->in_transaction != 1) {
930                 return LDB_SUCCESS;
931         }
932
933         if (ltdb_index_transaction_commit(module) != 0) {
934                 tdb_transaction_cancel(ltdb->tdb);
935                 ltdb->in_transaction--;
936                 return ltdb_err_map(tdb_error(ltdb->tdb));
937         }
938
939         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
940                 ltdb->in_transaction--;
941                 return ltdb_err_map(tdb_error(ltdb->tdb));
942         }
943
944         ltdb->prepared_commit = true;
945
946         return LDB_SUCCESS;
947 }
948
949 static int ltdb_end_trans(struct ldb_module *module)
950 {
951         void *data = ldb_module_get_private(module);
952         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
953
954         if (!ltdb->prepared_commit) {
955                 int ret = ltdb_prepare_commit(module);
956                 if (ret != LDB_SUCCESS) {
957                         return ret;
958                 }
959         }
960
961         ltdb->in_transaction--;
962         ltdb->prepared_commit = false;
963
964         if (tdb_transaction_commit(ltdb->tdb) != 0) {
965                 return ltdb_err_map(tdb_error(ltdb->tdb));
966         }
967
968         return LDB_SUCCESS;
969 }
970
971 static int ltdb_del_trans(struct ldb_module *module)
972 {
973         void *data = ldb_module_get_private(module);
974         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
975
976         ltdb->in_transaction--;
977
978         if (ltdb_index_transaction_cancel(module) != 0) {
979                 tdb_transaction_cancel(ltdb->tdb);
980                 return ltdb_err_map(tdb_error(ltdb->tdb));
981         }
982
983         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
984                 return ltdb_err_map(tdb_error(ltdb->tdb));
985         }
986
987         return LDB_SUCCESS;
988 }
989
990 /*
991   return sequenceNumber from @BASEINFO
992 */
993 static int ltdb_sequence_number(struct ltdb_context *ctx,
994                                 struct ldb_extended **ext)
995 {
996         struct ldb_context *ldb;
997         struct ldb_module *module = ctx->module;
998         struct ldb_request *req = ctx->req;
999         TALLOC_CTX *tmp_ctx;
1000         struct ldb_seqnum_request *seq;
1001         struct ldb_seqnum_result *res;
1002         struct ldb_message *msg = NULL;
1003         struct ldb_dn *dn;
1004         const char *date;
1005         int ret = LDB_SUCCESS;
1006
1007         ldb = ldb_module_get_ctx(module);
1008
1009         seq = talloc_get_type(req->op.extended.data,
1010                                 struct ldb_seqnum_request);
1011         if (seq == NULL) {
1012                 return LDB_ERR_OPERATIONS_ERROR;
1013         }
1014
1015         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1016
1017         if (ltdb_lock_read(module) != 0) {
1018                 return LDB_ERR_OPERATIONS_ERROR;
1019         }
1020
1021         res = talloc_zero(req, struct ldb_seqnum_result);
1022         if (res == NULL) {
1023                 ret = LDB_ERR_OPERATIONS_ERROR;
1024                 goto done;
1025         }
1026         tmp_ctx = talloc_new(req);
1027         if (tmp_ctx == NULL) {
1028                 ret = LDB_ERR_OPERATIONS_ERROR;
1029                 goto done;
1030         }
1031
1032         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1033
1034         msg = talloc(tmp_ctx, struct ldb_message);
1035         if (msg == NULL) {
1036                 ret = LDB_ERR_OPERATIONS_ERROR;
1037                 goto done;
1038         }
1039
1040         ret = ltdb_search_dn1(module, dn, msg);
1041         if (ret != LDB_SUCCESS) {
1042                 goto done;
1043         }
1044
1045         switch (seq->type) {
1046         case LDB_SEQ_HIGHEST_SEQ:
1047                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1048                 break;
1049         case LDB_SEQ_NEXT:
1050                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1051                 res->seq_num++;
1052                 break;
1053         case LDB_SEQ_HIGHEST_TIMESTAMP:
1054                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1055                 if (date) {
1056                         res->seq_num = ldb_string_to_time(date);
1057                 } else {
1058                         res->seq_num = 0;
1059                         /* zero is as good as anything when we don't know */
1060                 }
1061                 break;
1062         }
1063
1064         *ext = talloc_zero(req, struct ldb_extended);
1065         if (*ext == NULL) {
1066                 ret = LDB_ERR_OPERATIONS_ERROR;
1067                 goto done;
1068         }
1069         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1070         (*ext)->data = talloc_steal(*ext, res);
1071
1072 done:
1073         talloc_free(tmp_ctx);
1074         ltdb_unlock_read(module);
1075         return ret;
1076 }
1077
1078 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1079 {
1080         struct ldb_context *ldb;
1081         struct ldb_request *req;
1082         struct ldb_reply *ares;
1083
1084         ldb = ldb_module_get_ctx(ctx->module);
1085         req = ctx->req;
1086
1087         /* if we already returned an error just return */
1088         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1089                 return;
1090         }
1091
1092         ares = talloc_zero(req, struct ldb_reply);
1093         if (!ares) {
1094                 ldb_oom(ldb);
1095                 req->callback(req, NULL);
1096                 return;
1097         }
1098         ares->type = LDB_REPLY_DONE;
1099         ares->error = error;
1100
1101         req->callback(req, ares);
1102 }
1103
1104 static void ltdb_timeout(struct tevent_context *ev,
1105                           struct tevent_timer *te,
1106                           struct timeval t,
1107                           void *private_data)
1108 {
1109         struct ltdb_context *ctx;
1110         ctx = talloc_get_type(private_data, struct ltdb_context);
1111
1112         if (!ctx->request_terminated) {
1113                 /* request is done now */
1114                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1115         }
1116
1117         if (!ctx->request_terminated) {
1118                 /* neutralize the spy */
1119                 ctx->spy->ctx = NULL;
1120         }
1121         talloc_free(ctx);
1122 }
1123
1124 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1125                                         struct ldb_extended *ext,
1126                                         int error)
1127 {
1128         struct ldb_context *ldb;
1129         struct ldb_request *req;
1130         struct ldb_reply *ares;
1131
1132         ldb = ldb_module_get_ctx(ctx->module);
1133         req = ctx->req;
1134
1135         /* if we already returned an error just return */
1136         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1137                 return;
1138         }
1139
1140         ares = talloc_zero(req, struct ldb_reply);
1141         if (!ares) {
1142                 ldb_oom(ldb);
1143                 req->callback(req, NULL);
1144                 return;
1145         }
1146         ares->type = LDB_REPLY_DONE;
1147         ares->response = ext;
1148         ares->error = error;
1149
1150         req->callback(req, ares);
1151 }
1152
1153 static void ltdb_handle_extended(struct ltdb_context *ctx)
1154 {
1155         struct ldb_extended *ext = NULL;
1156         int ret;
1157
1158         if (strcmp(ctx->req->op.extended.oid,
1159                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1160                 /* get sequence number */
1161                 ret = ltdb_sequence_number(ctx, &ext);
1162         } else {
1163                 /* not recognized */
1164                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1165         }
1166
1167         ltdb_request_extended_done(ctx, ext, ret);
1168 }
1169
1170 static void ltdb_callback(struct tevent_context *ev,
1171                           struct tevent_timer *te,
1172                           struct timeval t,
1173                           void *private_data)
1174 {
1175         struct ltdb_context *ctx;
1176         int ret;
1177
1178         ctx = talloc_get_type(private_data, struct ltdb_context);
1179
1180         if (ctx->request_terminated) {
1181                 goto done;
1182         }
1183
1184         switch (ctx->req->operation) {
1185         case LDB_SEARCH:
1186                 ret = ltdb_search(ctx);
1187                 break;
1188         case LDB_ADD:
1189                 ret = ltdb_add(ctx);
1190                 break;
1191         case LDB_MODIFY:
1192                 ret = ltdb_modify(ctx);
1193                 break;
1194         case LDB_DELETE:
1195                 ret = ltdb_delete(ctx);
1196                 break;
1197         case LDB_RENAME:
1198                 ret = ltdb_rename(ctx);
1199                 break;
1200         case LDB_EXTENDED:
1201                 ltdb_handle_extended(ctx);
1202                 goto done;
1203         default:
1204                 /* no other op supported */
1205                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1206         }
1207
1208         if (!ctx->request_terminated) {
1209                 /* request is done now */
1210                 ltdb_request_done(ctx, ret);
1211         }
1212
1213 done:
1214         if (!ctx->request_terminated) {
1215                 /* neutralize the spy */
1216                 ctx->spy->ctx = NULL;
1217         }
1218         talloc_free(ctx);
1219 }
1220
1221 static int ltdb_request_destructor(void *ptr)
1222 {
1223         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1224
1225         if (spy->ctx != NULL) {
1226                 spy->ctx->request_terminated = true;
1227         }
1228
1229         return 0;
1230 }
1231
1232 static int ltdb_handle_request(struct ldb_module *module,
1233                                struct ldb_request *req)
1234 {
1235         struct ldb_context *ldb;
1236         struct tevent_context *ev;
1237         struct ltdb_context *ac;
1238         struct tevent_timer *te;
1239         struct timeval tv;
1240         int i;
1241
1242         ldb = ldb_module_get_ctx(module);
1243
1244         for (i = 0; req->controls && req->controls[i]; i++) {
1245                 if (req->controls[i]->critical) {
1246                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1247                                                req->controls[i]->oid);
1248                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1249                 }
1250         }
1251
1252         if (req->starttime == 0 || req->timeout == 0) {
1253                 ldb_set_errstring(ldb, "Invalid timeout settings");
1254                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1255         }
1256
1257         ev = ldb_get_event_context(ldb);
1258
1259         ac = talloc_zero(ldb, struct ltdb_context);
1260         if (ac == NULL) {
1261                 ldb_oom(ldb);
1262                 return LDB_ERR_OPERATIONS_ERROR;
1263         }
1264
1265         ac->module = module;
1266         ac->req = req;
1267
1268         tv.tv_sec = 0;
1269         tv.tv_usec = 0;
1270         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1271         if (NULL == te) {
1272                 talloc_free(ac);
1273                 return LDB_ERR_OPERATIONS_ERROR;
1274         }
1275
1276         tv.tv_sec = req->starttime + req->timeout;
1277         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1278         if (NULL == ac->timeout_event) {
1279                 talloc_free(ac);
1280                 return LDB_ERR_OPERATIONS_ERROR;
1281         }
1282
1283         /* set a spy so that we do not try to use the request context
1284          * if it is freed before ltdb_callback fires */
1285         ac->spy = talloc(req, struct ltdb_req_spy);
1286         if (NULL == ac->spy) {
1287                 talloc_free(ac);
1288                 return LDB_ERR_OPERATIONS_ERROR;
1289         }
1290         ac->spy->ctx = ac;
1291
1292         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1293
1294         return LDB_SUCCESS;
1295 }
1296
1297 static const struct ldb_module_ops ltdb_ops = {
1298         .name              = "tdb",
1299         .search            = ltdb_handle_request,
1300         .add               = ltdb_handle_request,
1301         .modify            = ltdb_handle_request,
1302         .del               = ltdb_handle_request,
1303         .rename            = ltdb_handle_request,
1304         .extended          = ltdb_handle_request,
1305         .start_transaction = ltdb_start_trans,
1306         .end_transaction   = ltdb_end_trans,
1307         .prepare_commit    = ltdb_prepare_commit,
1308         .del_transaction   = ltdb_del_trans,
1309 };
1310
1311 /*
1312   connect to the database
1313 */
1314 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1315                         unsigned int flags, const char *options[],
1316                         struct ldb_module **_module)
1317 {
1318         struct ldb_module *module;
1319         const char *path;
1320         int tdb_flags, open_flags;
1321         struct ltdb_private *ltdb;
1322
1323         /* parse the url */
1324         if (strchr(url, ':')) {
1325                 if (strncmp(url, "tdb://", 6) != 0) {
1326                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1327                                   "Invalid tdb URL '%s'", url);
1328                         return LDB_ERR_OPERATIONS_ERROR;
1329                 }
1330                 path = url+6;
1331         } else {
1332                 path = url;
1333         }
1334
1335         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1336
1337         /* check for the 'nosync' option */
1338         if (flags & LDB_FLG_NOSYNC) {
1339                 tdb_flags |= TDB_NOSYNC;
1340         }
1341
1342         /* and nommap option */
1343         if (flags & LDB_FLG_NOMMAP) {
1344                 tdb_flags |= TDB_NOMMAP;
1345         }
1346
1347         if (flags & LDB_FLG_RDONLY) {
1348                 open_flags = O_RDONLY;
1349         } else {
1350                 open_flags = O_CREAT | O_RDWR;
1351         }
1352
1353         ltdb = talloc_zero(ldb, struct ltdb_private);
1354         if (!ltdb) {
1355                 ldb_oom(ldb);
1356                 return LDB_ERR_OPERATIONS_ERROR;
1357         }
1358
1359         /* note that we use quite a large default hash size */
1360         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1361                                    tdb_flags, open_flags,
1362                                    ldb_get_create_perms(ldb), ldb);
1363         if (!ltdb->tdb) {
1364                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1365                           "Unable to open tdb '%s'", path);
1366                 talloc_free(ltdb);
1367                 return LDB_ERR_OPERATIONS_ERROR;
1368         }
1369
1370         ltdb->sequence_number = 0;
1371
1372         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1373         if (!module) {
1374                 talloc_free(ltdb);
1375                 return LDB_ERR_OPERATIONS_ERROR;
1376         }
1377         ldb_module_set_private(module, ltdb);
1378         talloc_steal(module, ltdb);
1379
1380         if (ltdb_cache_load(module) != 0) {
1381                 talloc_free(module);
1382                 talloc_free(ltdb);
1383                 return LDB_ERR_OPERATIONS_ERROR;
1384         }
1385
1386         *_module = module;
1387         return LDB_SUCCESS;
1388 }
1389
1390 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1391         .name = "tdb",
1392         .connect_fn = ltdb_connect
1393 };