Allow (and ignore) distinguishedName on special records
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 static int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         if (ltdb->in_transaction == 0) {
92                 return tdb_lockall_read(ltdb->tdb);
93         }
94         return 0;
95 }
96
97 /*
98   unlock the database after a ltdb_lock_read()
99 */
100 int ltdb_unlock_read(struct ldb_module *module)
101 {
102         void *data = ldb_module_get_private(module);
103         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
104         if (ltdb->in_transaction == 0) {
105                 return tdb_unlockall_read(ltdb->tdb);
106         }
107         return 0;
108 }
109
110
111 /*
112   form a TDB_DATA for a record key
113   caller frees
114
115   note that the key for a record can depend on whether the
116   dn refers to a case sensitive index record or not
117 */
118 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
119 {
120         struct ldb_context *ldb = ldb_module_get_ctx(module);
121         TDB_DATA key;
122         char *key_str = NULL;
123         const char *dn_folded = NULL;
124
125         /*
126           most DNs are case insensitive. The exception is index DNs for
127           case sensitive attributes
128
129           there are 3 cases dealt with in this code:
130
131           1) if the dn doesn't start with @ then uppercase the attribute
132              names and the attributes values of case insensitive attributes
133           2) if the dn starts with @ then leave it alone -
134              the indexing code handles the rest
135         */
136
137         dn_folded = ldb_dn_get_casefold(dn);
138         if (!dn_folded) {
139                 goto failed;
140         }
141
142         key_str = talloc_strdup(ldb, "DN=");
143         if (!key_str) {
144                 goto failed;
145         }
146
147         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
148         if (!key_str) {
149                 goto failed;
150         }
151
152         key.dptr = (uint8_t *)key_str;
153         key.dsize = strlen(key_str) + 1;
154
155         return key;
156
157 failed:
158         errno = ENOMEM;
159         key.dptr = NULL;
160         key.dsize = 0;
161         return key;
162 }
163
164 /*
165   check special dn's have valid attributes
166   currently only @ATTRIBUTES is checked
167 */
168 static int ltdb_check_special_dn(struct ldb_module *module,
169                           const struct ldb_message *msg)
170 {
171         struct ldb_context *ldb = ldb_module_get_ctx(module);
172         int i, j;
173
174         if (! ldb_dn_is_special(msg->dn) ||
175             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
176                 return LDB_SUCCESS;
177         }
178
179         /* we have @ATTRIBUTES, let's check attributes are fine */
180         /* should we check that we deny multivalued attributes ? */
181         for (i = 0; i < msg->num_elements; i++) {
182                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
183
184                 for (j = 0; j < msg->elements[i].num_values; j++) {
185                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
186                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
187                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
188                         }
189                 }
190         }
191
192         return LDB_SUCCESS;
193 }
194
195
196 /*
197   we've made a modification to a dn - possibly reindex and
198   update sequence number
199 */
200 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
201 {
202         int ret = LDB_SUCCESS;
203
204         if (ldb_dn_is_special(dn) &&
205             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
206              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
207                 ret = ltdb_reindex(module);
208         }
209
210         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
211         if (ret == LDB_SUCCESS &&
212             !(ldb_dn_is_special(dn) &&
213               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
214                 ret = ltdb_increase_sequence_number(module);
215         }
216
217         /* If the modify was to @OPTIONS, reload the cache */
218         if (ldb_dn_is_special(dn) &&
219             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
220                 ret = ltdb_cache_reload(module);
221         }
222
223         return ret;
224 }
225
226 /*
227   store a record into the db
228 */
229 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
230 {
231         void *data = ldb_module_get_private(module);
232         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
233         TDB_DATA tdb_key, tdb_data;
234         int ret = LDB_SUCCESS;
235
236         tdb_key = ltdb_key(module, msg->dn);
237         if (!tdb_key.dptr) {
238                 return LDB_ERR_OTHER;
239         }
240
241         ret = ltdb_pack_data(module, msg, &tdb_data);
242         if (ret == -1) {
243                 talloc_free(tdb_key.dptr);
244                 return LDB_ERR_OTHER;
245         }
246
247         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
248         if (ret == -1) {
249                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
250                 goto done;
251         }
252
253         ret = ltdb_index_add(module, msg);
254         if (ret != LDB_SUCCESS) {
255                 tdb_delete(ltdb->tdb, tdb_key);
256         }
257
258 done:
259         talloc_free(tdb_key.dptr);
260         talloc_free(tdb_data.dptr);
261
262         return ret;
263 }
264
265
266 static int ltdb_add_internal(struct ldb_module *module,
267                              const struct ldb_message *msg)
268 {
269         struct ldb_context *ldb = ldb_module_get_ctx(module);
270         int ret = LDB_SUCCESS, i;
271
272         ret = ltdb_check_special_dn(module, msg);
273         if (ret != LDB_SUCCESS) {
274                 goto done;
275         }
276
277         if (ltdb_cache_load(module) != 0) {
278                 ret = LDB_ERR_OPERATIONS_ERROR;
279                 goto done;
280         }
281
282         for (i=0;i<msg->num_elements;i++) {
283                 struct ldb_message_element *el = &msg->elements[i];
284                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
285
286                 if (el->num_values == 0) {
287                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
288                                                el->name, ldb_dn_get_linearized(msg->dn));
289                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
290                         goto done;
291                 }
292                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
293                         if (el->num_values > 1) {
294                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
295                                                        el->name, ldb_dn_get_linearized(msg->dn));
296                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
297                                 goto done;
298                         }
299                 }
300         }
301
302         ret = ltdb_store(module, msg, TDB_INSERT);
303         if (ret != LDB_SUCCESS) {
304                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
305                         ldb_asprintf_errstring(ldb,
306                                                "Entry %s already exists",
307                                                ldb_dn_get_linearized(msg->dn));
308                 }
309                 goto done;
310         }
311
312         ret = ltdb_index_one(module, msg, 1);
313         if (ret != LDB_SUCCESS) {
314                 goto done;
315         }
316
317         ret = ltdb_modified(module, msg->dn);
318         if (ret != LDB_SUCCESS) {
319                 goto done;
320         }
321
322 done:
323         return ret;
324 }
325
326 /*
327   add a record to the database
328 */
329 static int ltdb_add(struct ltdb_context *ctx)
330 {
331         struct ldb_module *module = ctx->module;
332         struct ldb_request *req = ctx->req;
333         int ret = LDB_SUCCESS;
334
335         ldb_request_set_state(req, LDB_ASYNC_PENDING);
336
337         if (ltdb_cache_load(module) != 0) {
338                 return LDB_ERR_OPERATIONS_ERROR;
339         }
340
341         ret = ltdb_add_internal(module, req->op.add.message);
342
343         return ret;
344 }
345
346 /*
347   delete a record from the database, not updating indexes (used for deleting
348   index records)
349 */
350 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
351 {
352         void *data = ldb_module_get_private(module);
353         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
354         TDB_DATA tdb_key;
355         int ret;
356
357         tdb_key = ltdb_key(module, dn);
358         if (!tdb_key.dptr) {
359                 return LDB_ERR_OTHER;
360         }
361
362         ret = tdb_delete(ltdb->tdb, tdb_key);
363         talloc_free(tdb_key.dptr);
364
365         if (ret != 0) {
366                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
367         }
368
369         return ret;
370 }
371
372 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
373 {
374         struct ldb_message *msg;
375         int ret = LDB_SUCCESS;
376
377         msg = talloc(module, struct ldb_message);
378         if (msg == NULL) {
379                 return LDB_ERR_OPERATIONS_ERROR;
380         }
381
382         /* in case any attribute of the message was indexed, we need
383            to fetch the old record */
384         ret = ltdb_search_dn1(module, dn, msg);
385         if (ret != LDB_SUCCESS) {
386                 /* not finding the old record is an error */
387                 goto done;
388         }
389
390         ret = ltdb_delete_noindex(module, dn);
391         if (ret != LDB_SUCCESS) {
392                 goto done;
393         }
394
395         /* remove one level attribute */
396         ret = ltdb_index_one(module, msg, 0);
397         if (ret != LDB_SUCCESS) {
398                 goto done;
399         }
400
401         /* remove any indexed attributes */
402         ret = ltdb_index_del(module, msg);
403         if (ret != LDB_SUCCESS) {
404                 goto done;
405         }
406
407         ret = ltdb_modified(module, dn);
408         if (ret != LDB_SUCCESS) {
409                 goto done;
410         }
411
412 done:
413         talloc_free(msg);
414         return ret;
415 }
416
417 /*
418   delete a record from the database
419 */
420 static int ltdb_delete(struct ltdb_context *ctx)
421 {
422         struct ldb_module *module = ctx->module;
423         struct ldb_request *req = ctx->req;
424         int ret = LDB_SUCCESS;
425
426         ldb_request_set_state(req, LDB_ASYNC_PENDING);
427
428         if (ltdb_cache_load(module) != 0) {
429                 return LDB_ERR_OPERATIONS_ERROR;
430         }
431
432         ret = ltdb_delete_internal(module, req->op.del.dn);
433
434         return ret;
435 }
436
437 /*
438   find an element by attribute name. At the moment this does a linear search,
439   it should be re-coded to use a binary search once all places that modify
440   records guarantee sorted order
441
442   return the index of the first matching element if found, otherwise -1
443 */
444 static int find_element(const struct ldb_message *msg, const char *name)
445 {
446         unsigned int i;
447         for (i=0;i<msg->num_elements;i++) {
448                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
449                         return i;
450                 }
451         }
452         return -1;
453 }
454
455
456 /*
457   add an element to an existing record. Assumes a elements array that we
458   can call re-alloc on, and assumed that we can re-use the data pointers from
459   the passed in additional values. Use with care!
460
461   returns 0 on success, -1 on failure (and sets errno)
462 */
463 static int msg_add_element(struct ldb_context *ldb,
464                            struct ldb_message *msg,
465                            struct ldb_message_element *el)
466 {
467         struct ldb_message_element *e2;
468         unsigned int i;
469
470         if (el->num_values == 0) {
471                 /* nothing to do here - we don't add empty elements */
472                 return 0;
473         }
474
475         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
476                               msg->num_elements+1);
477         if (!e2) {
478                 errno = ENOMEM;
479                 return -1;
480         }
481
482         msg->elements = e2;
483
484         e2 = &msg->elements[msg->num_elements];
485
486         e2->name = el->name;
487         e2->flags = el->flags;
488         e2->values = talloc_array(msg->elements,
489                                   struct ldb_val, el->num_values);
490         if (!e2->values) {
491                 errno = ENOMEM;
492                 return -1;
493         }
494         for (i=0;i<el->num_values;i++) {
495                 e2->values[i] = el->values[i];
496         }
497         e2->num_values = el->num_values;
498
499         ++msg->num_elements;
500
501         return 0;
502 }
503
504 /*
505   delete all elements having a specified attribute name
506 */
507 static int msg_delete_attribute(struct ldb_module *module,
508                                 struct ldb_context *ldb,
509                                 struct ldb_message *msg, const char *name)
510 {
511         const char *dn;
512         unsigned int i, j;
513
514         dn = ldb_dn_get_linearized(msg->dn);
515         if (dn == NULL) {
516                 return -1;
517         }
518
519         for (i=0;i<msg->num_elements;i++) {
520                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
521                         for (j=0;j<msg->elements[i].num_values;j++) {
522                                 ltdb_index_del_value(module, dn,
523                                                      &msg->elements[i], j);
524                         }
525                         talloc_free(msg->elements[i].values);
526                         if (msg->num_elements > (i+1)) {
527                                 memmove(&msg->elements[i],
528                                         &msg->elements[i+1],
529                                         sizeof(struct ldb_message_element)*
530                                         (msg->num_elements - (i+1)));
531                         }
532                         msg->num_elements--;
533                         i--;
534                         msg->elements = talloc_realloc(msg, msg->elements,
535                                                        struct ldb_message_element,
536                                                        msg->num_elements);
537
538                         /* per definition we find in a canonicalised message an
539                            attribute only once. So we are finished here. */
540                         return 0;
541                 }
542         }
543
544         /* Not found */
545         return -1;
546 }
547
548 /*
549   delete all elements matching an attribute name/value
550
551   return 0 on success, -1 on failure
552 */
553 static int msg_delete_element(struct ldb_module *module,
554                               struct ldb_message *msg,
555                               const char *name,
556                               const struct ldb_val *val)
557 {
558         struct ldb_context *ldb = ldb_module_get_ctx(module);
559         unsigned int i;
560         int found;
561         struct ldb_message_element *el;
562         const struct ldb_schema_attribute *a;
563
564         found = find_element(msg, name);
565         if (found == -1) {
566                 return -1;
567         }
568
569         el = &msg->elements[found];
570
571         a = ldb_schema_attribute_by_name(ldb, el->name);
572
573         for (i=0;i<el->num_values;i++) {
574                 if (a->syntax->comparison_fn(ldb, ldb,
575                                                 &el->values[i], val) == 0) {
576                         if (i<el->num_values-1) {
577                                 memmove(&el->values[i], &el->values[i+1],
578                                         sizeof(el->values[i])*
579                                                 (el->num_values-(i+1)));
580                         }
581                         el->num_values--;
582                         if (el->num_values == 0) {
583                                 return msg_delete_attribute(module, ldb,
584                                                             msg, name);
585                         }
586
587                         /* per definition we find in a canonicalised message an
588                            attribute value only once. So we are finished here */
589                         return 0;
590                 }
591         }
592
593         /* Not found */
594         return -1;
595 }
596
597
598 /*
599   modify a record - internal interface
600
601   yuck - this is O(n^2). Luckily n is usually small so we probably
602   get away with it, but if we ever have really large attribute lists
603   then we'll need to look at this again
604 */
605 int ltdb_modify_internal(struct ldb_module *module,
606                          const struct ldb_message *msg)
607 {
608         struct ldb_context *ldb = ldb_module_get_ctx(module);
609         void *data = ldb_module_get_private(module);
610         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
611         TDB_DATA tdb_key, tdb_data;
612         struct ldb_message *msg2;
613         unsigned i, j;
614         int ret = LDB_SUCCESS, idx;
615
616         tdb_key = ltdb_key(module, msg->dn);
617         if (!tdb_key.dptr) {
618                 return LDB_ERR_OTHER;
619         }
620
621         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
622         if (!tdb_data.dptr) {
623                 talloc_free(tdb_key.dptr);
624                 return ltdb_err_map(tdb_error(ltdb->tdb));
625         }
626
627         msg2 = talloc(tdb_key.dptr, struct ldb_message);
628         if (msg2 == NULL) {
629                 free(tdb_data.dptr);
630                 ret = LDB_ERR_OTHER;
631                 goto done;
632         }
633
634         ret = ltdb_unpack_data(module, &tdb_data, msg2);
635         free(tdb_data.dptr);
636         if (ret == -1) {
637                 ret = LDB_ERR_OTHER;
638                 goto done;
639         }
640
641         if (!msg2->dn) {
642                 msg2->dn = msg->dn;
643         }
644
645         for (i=0; i<msg->num_elements; i++) {
646                 struct ldb_message_element *el = &msg->elements[i], *el2;
647                 struct ldb_val *vals;
648                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
649                 const char *dn;
650
651                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
652                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
653                                                ldb_dn_get_linearized(msg->dn));
654                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
655                         goto done;
656                 }
657
658                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
659                 case LDB_FLAG_MOD_ADD:
660                         if (el->num_values == 0) {
661                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
662                                                        el->name, ldb_dn_get_linearized(msg->dn));
663                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
664                                 goto done;
665                         }
666
667                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
668                                 if (el->num_values > 1) {
669                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
670                                                                el->name, ldb_dn_get_linearized(msg->dn));
671                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
672                                         goto done;
673                                 }
674                         }
675
676                         /* Checks if element already exists */
677                         idx = find_element(msg2, el->name);
678                         if (idx == -1) {
679                                 if (msg_add_element(ldb, msg2, el) != 0) {
680                                         ret = LDB_ERR_OTHER;
681                                         goto done;
682                                 }
683                         } else {
684                                 /* We cannot add another value on a existing one
685                                    if the attribute is single-valued */
686                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
687                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
688                                                                el->name, ldb_dn_get_linearized(msg->dn));
689                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
690                                         goto done;
691                                 }
692
693                                 el2 = &(msg2->elements[idx]);
694
695                                 /* Check that values don't exist yet on multi-
696                                    valued attributes or aren't provided twice */
697                                 for (j=0; j<el->num_values; j++) {
698                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
699                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
700                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
701                                                 goto done;
702                                         }
703                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
704                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
705                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
706                                                 goto done;
707                                         }
708                                 }
709
710                                 /* Now combine existing and new values to a new
711                                    attribute record */
712                                 vals = talloc_realloc(msg2->elements,
713                                         el2->values, struct ldb_val,
714                                         el2->num_values + el->num_values);
715                                 if (vals == NULL) {
716                                         ldb_oom(ldb);
717                                         ret = LDB_ERR_OTHER;
718                                         goto done;
719                                 }
720
721                                 for (j=0; j<el->num_values; j++) {
722                                         vals[el2->num_values + j] =
723                                                 ldb_val_dup(vals, &el->values[j]);
724                                 }
725
726                                 el2->values = vals;
727                                 el2->num_values += el->num_values;
728                         }
729
730                         break;
731
732                 case LDB_FLAG_MOD_REPLACE:
733                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
734                                 if (el->num_values > 1) {
735                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
736                                                                el->name, ldb_dn_get_linearized(msg->dn));
737                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
738                                         goto done;
739                                 }
740                         }
741
742                         for (j=0; j<el->num_values; j++) {
743                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
744                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
745                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
746                                         goto done;
747                                 }
748                         }
749
750                         /* Delete the attribute if it exists in the DB */
751                         msg_delete_attribute(module, ldb, msg2, el->name);
752
753                         /* Recreate it with the new values */
754                         if (msg_add_element(ldb, msg2, el) != 0) {
755                                 ret = LDB_ERR_OTHER;
756                                 goto done;
757                         }
758
759                         break;
760
761                 case LDB_FLAG_MOD_DELETE:
762                         dn = ldb_dn_get_linearized(msg->dn);
763                         if (dn == NULL) {
764                                 ret = LDB_ERR_OTHER;
765                                 goto done;
766                         }
767
768                         if (msg->elements[i].num_values == 0) {
769                                 /* Delete the whole attribute */
770                                 if (msg_delete_attribute(module, ldb, msg2,
771                                                          msg->elements[i].name) != 0) {
772                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
773                                                                msg->elements[i].name, dn);
774                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
775                                         goto done;
776                                 }
777                         } else {
778                                 /* Delete specified values from an attribute */
779                                 for (j=0; j < msg->elements[i].num_values; j++) {
780                                         if (msg_delete_element(module,
781                                                                msg2,
782                                                                msg->elements[i].name,
783                                                                &msg->elements[i].values[j]) != 0) {
784                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
785                                                                        msg->elements[i].name, dn);
786                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
787                                                 goto done;
788                                         }
789
790                                         ret = ltdb_index_del_value(module, dn,
791                                                 &msg->elements[i], j);
792                                         if (ret != LDB_SUCCESS) {
793                                                 goto done;
794                                         }
795                                 }
796                         }
797
798                         break;
799                 default:
800                         ldb_asprintf_errstring(ldb,
801                                 "Invalid ldb_modify flags on %s: 0x%x",
802                                 msg->elements[i].name,
803                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
804                         ret = LDB_ERR_PROTOCOL_ERROR;
805                         goto done;
806                 }
807         }
808
809         ret = ltdb_store(module, msg2, TDB_MODIFY);
810         if (ret != LDB_SUCCESS) {
811                 goto done;
812         }
813
814         ret = ltdb_modified(module, msg->dn);
815         if (ret != LDB_SUCCESS) {
816                 goto done;
817         }
818
819 done:
820         talloc_free(tdb_key.dptr);
821         return ret;
822 }
823
824 /*
825   modify a record
826 */
827 static int ltdb_modify(struct ltdb_context *ctx)
828 {
829         struct ldb_module *module = ctx->module;
830         struct ldb_request *req = ctx->req;
831         int ret = LDB_SUCCESS;
832
833         ret = ltdb_check_special_dn(module, req->op.mod.message);
834         if (ret != LDB_SUCCESS) {
835                 return ret;
836         }
837
838         ldb_request_set_state(req, LDB_ASYNC_PENDING);
839
840         if (ltdb_cache_load(module) != 0) {
841                 return LDB_ERR_OPERATIONS_ERROR;
842         }
843
844         ret = ltdb_modify_internal(module, req->op.mod.message);
845
846         return ret;
847 }
848
849 /*
850   rename a record
851 */
852 static int ltdb_rename(struct ltdb_context *ctx)
853 {
854         struct ldb_module *module = ctx->module;
855         struct ldb_request *req = ctx->req;
856         struct ldb_message *msg;
857         int ret = LDB_SUCCESS;
858
859         ldb_request_set_state(req, LDB_ASYNC_PENDING);
860
861         if (ltdb_cache_load(ctx->module) != 0) {
862                 return LDB_ERR_OPERATIONS_ERROR;
863         }
864
865         msg = talloc(ctx, struct ldb_message);
866         if (msg == NULL) {
867                 ret = LDB_ERR_OPERATIONS_ERROR;
868                 goto done;
869         }
870
871         /* in case any attribute of the message was indexed, we need
872            to fetch the old record */
873         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
874         if (ret != LDB_SUCCESS) {
875                 /* not finding the old record is an error */
876                 goto done;
877         }
878
879         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
880         if (msg->dn == NULL) {
881                 ret = LDB_ERR_OPERATIONS_ERROR;
882                 goto done;
883         }
884
885         /* Always delete first then add, to avoid conflicts with
886          * unique indexes. We rely on the transaction to make this
887          * atomic
888          */
889         ret = ltdb_delete_internal(module, req->op.rename.olddn);
890         if (ret != LDB_SUCCESS) {
891                 goto done;
892         }
893
894         ret = ltdb_add_internal(module, msg);
895
896 done:
897         return ret;
898 }
899
900 static int ltdb_start_trans(struct ldb_module *module)
901 {
902         void *data = ldb_module_get_private(module);
903         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
904
905         if (tdb_transaction_start(ltdb->tdb) != 0) {
906                 return ltdb_err_map(tdb_error(ltdb->tdb));
907         }
908
909         ltdb->in_transaction++;
910
911         ltdb_index_transaction_start(module);
912
913         return LDB_SUCCESS;
914 }
915
916 static int ltdb_prepare_commit(struct ldb_module *module)
917 {
918         void *data = ldb_module_get_private(module);
919         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
920
921         if (ltdb->in_transaction != 1) {
922                 return LDB_SUCCESS;
923         }
924
925         if (ltdb_index_transaction_commit(module) != 0) {
926                 tdb_transaction_cancel(ltdb->tdb);
927                 ltdb->in_transaction--;
928                 return ltdb_err_map(tdb_error(ltdb->tdb));
929         }
930
931         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
932                 ltdb->in_transaction--;
933                 return ltdb_err_map(tdb_error(ltdb->tdb));
934         }
935
936         ltdb->prepared_commit = true;
937
938         return LDB_SUCCESS;
939 }
940
941 static int ltdb_end_trans(struct ldb_module *module)
942 {
943         void *data = ldb_module_get_private(module);
944         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
945
946         if (!ltdb->prepared_commit) {
947                 int ret = ltdb_prepare_commit(module);
948                 if (ret != LDB_SUCCESS) {
949                         return ret;
950                 }
951         }
952
953         ltdb->in_transaction--;
954         ltdb->prepared_commit = false;
955
956         if (tdb_transaction_commit(ltdb->tdb) != 0) {
957                 return ltdb_err_map(tdb_error(ltdb->tdb));
958         }
959
960         return LDB_SUCCESS;
961 }
962
963 static int ltdb_del_trans(struct ldb_module *module)
964 {
965         void *data = ldb_module_get_private(module);
966         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
967
968         ltdb->in_transaction--;
969
970         if (ltdb_index_transaction_cancel(module) != 0) {
971                 tdb_transaction_cancel(ltdb->tdb);
972                 return ltdb_err_map(tdb_error(ltdb->tdb));
973         }
974
975         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
976                 return ltdb_err_map(tdb_error(ltdb->tdb));
977         }
978
979         return LDB_SUCCESS;
980 }
981
982 /*
983   return sequenceNumber from @BASEINFO
984 */
985 static int ltdb_sequence_number(struct ltdb_context *ctx,
986                                 struct ldb_extended **ext)
987 {
988         struct ldb_context *ldb;
989         struct ldb_module *module = ctx->module;
990         struct ldb_request *req = ctx->req;
991         TALLOC_CTX *tmp_ctx;
992         struct ldb_seqnum_request *seq;
993         struct ldb_seqnum_result *res;
994         struct ldb_message *msg = NULL;
995         struct ldb_dn *dn;
996         const char *date;
997         int ret = LDB_SUCCESS;
998
999         ldb = ldb_module_get_ctx(module);
1000
1001         seq = talloc_get_type(req->op.extended.data,
1002                                 struct ldb_seqnum_request);
1003         if (seq == NULL) {
1004                 return LDB_ERR_OPERATIONS_ERROR;
1005         }
1006
1007         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1008
1009         if (ltdb_lock_read(module) != 0) {
1010                 return LDB_ERR_OPERATIONS_ERROR;
1011         }
1012
1013         res = talloc_zero(req, struct ldb_seqnum_result);
1014         if (res == NULL) {
1015                 ret = LDB_ERR_OPERATIONS_ERROR;
1016                 goto done;
1017         }
1018         tmp_ctx = talloc_new(req);
1019         if (tmp_ctx == NULL) {
1020                 ret = LDB_ERR_OPERATIONS_ERROR;
1021                 goto done;
1022         }
1023
1024         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1025
1026         msg = talloc(tmp_ctx, struct ldb_message);
1027         if (msg == NULL) {
1028                 ret = LDB_ERR_OPERATIONS_ERROR;
1029                 goto done;
1030         }
1031
1032         ret = ltdb_search_dn1(module, dn, msg);
1033         if (ret != LDB_SUCCESS) {
1034                 goto done;
1035         }
1036
1037         switch (seq->type) {
1038         case LDB_SEQ_HIGHEST_SEQ:
1039                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1040                 break;
1041         case LDB_SEQ_NEXT:
1042                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1043                 res->seq_num++;
1044                 break;
1045         case LDB_SEQ_HIGHEST_TIMESTAMP:
1046                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1047                 if (date) {
1048                         res->seq_num = ldb_string_to_time(date);
1049                 } else {
1050                         res->seq_num = 0;
1051                         /* zero is as good as anything when we don't know */
1052                 }
1053                 break;
1054         }
1055
1056         *ext = talloc_zero(req, struct ldb_extended);
1057         if (*ext == NULL) {
1058                 ret = LDB_ERR_OPERATIONS_ERROR;
1059                 goto done;
1060         }
1061         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1062         (*ext)->data = talloc_steal(*ext, res);
1063
1064 done:
1065         talloc_free(tmp_ctx);
1066         ltdb_unlock_read(module);
1067         return ret;
1068 }
1069
1070 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1071 {
1072         struct ldb_context *ldb;
1073         struct ldb_request *req;
1074         struct ldb_reply *ares;
1075
1076         ldb = ldb_module_get_ctx(ctx->module);
1077         req = ctx->req;
1078
1079         /* if we already returned an error just return */
1080         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1081                 return;
1082         }
1083
1084         ares = talloc_zero(req, struct ldb_reply);
1085         if (!ares) {
1086                 ldb_oom(ldb);
1087                 req->callback(req, NULL);
1088                 return;
1089         }
1090         ares->type = LDB_REPLY_DONE;
1091         ares->error = error;
1092
1093         req->callback(req, ares);
1094 }
1095
1096 static void ltdb_timeout(struct tevent_context *ev,
1097                           struct tevent_timer *te,
1098                           struct timeval t,
1099                           void *private_data)
1100 {
1101         struct ltdb_context *ctx;
1102         ctx = talloc_get_type(private_data, struct ltdb_context);
1103
1104         if (!ctx->request_terminated) {
1105                 /* request is done now */
1106                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1107         }
1108
1109         if (!ctx->request_terminated) {
1110                 /* neutralize the spy */
1111                 ctx->spy->ctx = NULL;
1112         }
1113         talloc_free(ctx);
1114 }
1115
1116 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1117                                         struct ldb_extended *ext,
1118                                         int error)
1119 {
1120         struct ldb_context *ldb;
1121         struct ldb_request *req;
1122         struct ldb_reply *ares;
1123
1124         ldb = ldb_module_get_ctx(ctx->module);
1125         req = ctx->req;
1126
1127         /* if we already returned an error just return */
1128         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1129                 return;
1130         }
1131
1132         ares = talloc_zero(req, struct ldb_reply);
1133         if (!ares) {
1134                 ldb_oom(ldb);
1135                 req->callback(req, NULL);
1136                 return;
1137         }
1138         ares->type = LDB_REPLY_DONE;
1139         ares->response = ext;
1140         ares->error = error;
1141
1142         req->callback(req, ares);
1143 }
1144
1145 static void ltdb_handle_extended(struct ltdb_context *ctx)
1146 {
1147         struct ldb_extended *ext = NULL;
1148         int ret;
1149
1150         if (strcmp(ctx->req->op.extended.oid,
1151                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1152                 /* get sequence number */
1153                 ret = ltdb_sequence_number(ctx, &ext);
1154         } else {
1155                 /* not recognized */
1156                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1157         }
1158
1159         ltdb_request_extended_done(ctx, ext, ret);
1160 }
1161
1162 static void ltdb_callback(struct tevent_context *ev,
1163                           struct tevent_timer *te,
1164                           struct timeval t,
1165                           void *private_data)
1166 {
1167         struct ltdb_context *ctx;
1168         int ret;
1169
1170         ctx = talloc_get_type(private_data, struct ltdb_context);
1171
1172         if (ctx->request_terminated) {
1173                 goto done;
1174         }
1175
1176         switch (ctx->req->operation) {
1177         case LDB_SEARCH:
1178                 ret = ltdb_search(ctx);
1179                 break;
1180         case LDB_ADD:
1181                 ret = ltdb_add(ctx);
1182                 break;
1183         case LDB_MODIFY:
1184                 ret = ltdb_modify(ctx);
1185                 break;
1186         case LDB_DELETE:
1187                 ret = ltdb_delete(ctx);
1188                 break;
1189         case LDB_RENAME:
1190                 ret = ltdb_rename(ctx);
1191                 break;
1192         case LDB_EXTENDED:
1193                 ltdb_handle_extended(ctx);
1194                 goto done;
1195         default:
1196                 /* no other op supported */
1197                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1198         }
1199
1200         if (!ctx->request_terminated) {
1201                 /* request is done now */
1202                 ltdb_request_done(ctx, ret);
1203         }
1204
1205 done:
1206         if (!ctx->request_terminated) {
1207                 /* neutralize the spy */
1208                 ctx->spy->ctx = NULL;
1209         }
1210         talloc_free(ctx);
1211 }
1212
1213 static int ltdb_request_destructor(void *ptr)
1214 {
1215         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1216
1217         if (spy->ctx != NULL) {
1218                 spy->ctx->request_terminated = true;
1219         }
1220
1221         return 0;
1222 }
1223
1224 static int ltdb_handle_request(struct ldb_module *module,
1225                                struct ldb_request *req)
1226 {
1227         struct ldb_context *ldb;
1228         struct tevent_context *ev;
1229         struct ltdb_context *ac;
1230         struct tevent_timer *te;
1231         struct timeval tv;
1232
1233         if (check_critical_controls(req->controls)) {
1234                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1235         }
1236
1237         ldb = ldb_module_get_ctx(module);
1238
1239         if (req->starttime == 0 || req->timeout == 0) {
1240                 ldb_set_errstring(ldb, "Invalid timeout settings");
1241                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1242         }
1243
1244         ev = ldb_get_event_context(ldb);
1245
1246         ac = talloc_zero(ldb, struct ltdb_context);
1247         if (ac == NULL) {
1248                 ldb_oom(ldb);
1249                 return LDB_ERR_OPERATIONS_ERROR;
1250         }
1251
1252         ac->module = module;
1253         ac->req = req;
1254
1255         tv.tv_sec = 0;
1256         tv.tv_usec = 0;
1257         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1258         if (NULL == te) {
1259                 talloc_free(ac);
1260                 return LDB_ERR_OPERATIONS_ERROR;
1261         }
1262
1263         tv.tv_sec = req->starttime + req->timeout;
1264         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1265         if (NULL == ac->timeout_event) {
1266                 talloc_free(ac);
1267                 return LDB_ERR_OPERATIONS_ERROR;
1268         }
1269
1270         /* set a spy so that we do not try to use the request context
1271          * if it is freed before ltdb_callback fires */
1272         ac->spy = talloc(req, struct ltdb_req_spy);
1273         if (NULL == ac->spy) {
1274                 talloc_free(ac);
1275                 return LDB_ERR_OPERATIONS_ERROR;
1276         }
1277         ac->spy->ctx = ac;
1278
1279         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1280
1281         return LDB_SUCCESS;
1282 }
1283
1284 static const struct ldb_module_ops ltdb_ops = {
1285         .name              = "tdb",
1286         .search            = ltdb_handle_request,
1287         .add               = ltdb_handle_request,
1288         .modify            = ltdb_handle_request,
1289         .del               = ltdb_handle_request,
1290         .rename            = ltdb_handle_request,
1291         .extended          = ltdb_handle_request,
1292         .start_transaction = ltdb_start_trans,
1293         .end_transaction   = ltdb_end_trans,
1294         .prepare_commit    = ltdb_prepare_commit,
1295         .del_transaction   = ltdb_del_trans,
1296 };
1297
1298 /*
1299   connect to the database
1300 */
1301 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1302                         unsigned int flags, const char *options[],
1303                         struct ldb_module **_module)
1304 {
1305         struct ldb_module *module;
1306         const char *path;
1307         int tdb_flags, open_flags;
1308         struct ltdb_private *ltdb;
1309
1310         /* parse the url */
1311         if (strchr(url, ':')) {
1312                 if (strncmp(url, "tdb://", 6) != 0) {
1313                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1314                                   "Invalid tdb URL '%s'", url);
1315                         return -1;
1316                 }
1317                 path = url+6;
1318         } else {
1319                 path = url;
1320         }
1321
1322         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1323
1324         /* check for the 'nosync' option */
1325         if (flags & LDB_FLG_NOSYNC) {
1326                 tdb_flags |= TDB_NOSYNC;
1327         }
1328
1329         /* and nommap option */
1330         if (flags & LDB_FLG_NOMMAP) {
1331                 tdb_flags |= TDB_NOMMAP;
1332         }
1333
1334         if (flags & LDB_FLG_RDONLY) {
1335                 open_flags = O_RDONLY;
1336         } else {
1337                 open_flags = O_CREAT | O_RDWR;
1338         }
1339
1340         ltdb = talloc_zero(ldb, struct ltdb_private);
1341         if (!ltdb) {
1342                 ldb_oom(ldb);
1343                 return -1;
1344         }
1345
1346         /* note that we use quite a large default hash size */
1347         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1348                                    tdb_flags, open_flags,
1349                                    ldb_get_create_perms(ldb), ldb);
1350         if (!ltdb->tdb) {
1351                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1352                           "Unable to open tdb '%s'", path);
1353                 talloc_free(ltdb);
1354                 return -1;
1355         }
1356
1357         ltdb->sequence_number = 0;
1358
1359         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1360         if (!module) {
1361                 talloc_free(ltdb);
1362                 return -1;
1363         }
1364         ldb_module_set_private(module, ltdb);
1365
1366         if (ltdb_cache_load(module) != 0) {
1367                 talloc_free(module);
1368                 talloc_free(ltdb);
1369                 return -1;
1370         }
1371
1372         *_module = module;
1373         return 0;
1374 }
1375
1376 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1377         .name = "tdb",
1378         .connect_fn = ltdb_connect
1379 };