b8b4d399ef348b4148b4737d476ebe52b60f14e2
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_tdb.h"
53
54
55 /*
56   map a tdb error code to a ldb error code
57 */
58 int ltdb_err_map(enum TDB_ERROR tdb_code)
59 {
60         switch (tdb_code) {
61         case TDB_SUCCESS:
62                 return LDB_SUCCESS;
63         case TDB_ERR_CORRUPT:
64         case TDB_ERR_OOM:
65         case TDB_ERR_EINVAL:
66                 return LDB_ERR_OPERATIONS_ERROR;
67         case TDB_ERR_IO:
68                 return LDB_ERR_PROTOCOL_ERROR;
69         case TDB_ERR_LOCK:
70         case TDB_ERR_NOLOCK:
71                 return LDB_ERR_BUSY;
72         case TDB_ERR_LOCK_TIMEOUT:
73                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
74         case TDB_ERR_EXISTS:
75                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
76         case TDB_ERR_NOEXIST:
77                 return LDB_ERR_NO_SUCH_OBJECT;
78         case TDB_ERR_RDONLY:
79                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
80         }
81         return LDB_ERR_OTHER;
82 }
83
84 /*
85   lock the database for read - use by ltdb_search and ltdb_sequence_number
86 */
87 int ltdb_lock_read(struct ldb_module *module)
88 {
89         void *data = ldb_module_get_private(module);
90         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
91         int ret = 0;
92
93         if (ltdb->in_transaction == 0 &&
94             ltdb->read_lock_count == 0) {
95                 ret = tdb_lockall_read(ltdb->tdb);
96         }
97         if (ret == 0) {
98                 ltdb->read_lock_count++;
99         }
100         return ret;
101 }
102
103 /*
104   unlock the database after a ltdb_lock_read()
105 */
106 int ltdb_unlock_read(struct ldb_module *module)
107 {
108         void *data = ldb_module_get_private(module);
109         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
110         if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
111                 return tdb_unlockall_read(ltdb->tdb);
112         }
113         ltdb->read_lock_count--;
114         return 0;
115 }
116
117
118 /*
119   form a TDB_DATA for a record key
120   caller frees
121
122   note that the key for a record can depend on whether the
123   dn refers to a case sensitive index record or not
124 */
125 struct TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn)
126 {
127         struct ldb_context *ldb = ldb_module_get_ctx(module);
128         TDB_DATA key;
129         char *key_str = NULL;
130         const char *dn_folded = NULL;
131
132         /*
133           most DNs are case insensitive. The exception is index DNs for
134           case sensitive attributes
135
136           there are 3 cases dealt with in this code:
137
138           1) if the dn doesn't start with @ then uppercase the attribute
139              names and the attributes values of case insensitive attributes
140           2) if the dn starts with @ then leave it alone -
141              the indexing code handles the rest
142         */
143
144         dn_folded = ldb_dn_get_casefold(dn);
145         if (!dn_folded) {
146                 goto failed;
147         }
148
149         key_str = talloc_strdup(ldb, "DN=");
150         if (!key_str) {
151                 goto failed;
152         }
153
154         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
155         if (!key_str) {
156                 goto failed;
157         }
158
159         key.dptr = (uint8_t *)key_str;
160         key.dsize = strlen(key_str) + 1;
161
162         return key;
163
164 failed:
165         errno = ENOMEM;
166         key.dptr = NULL;
167         key.dsize = 0;
168         return key;
169 }
170
171 /*
172   check special dn's have valid attributes
173   currently only @ATTRIBUTES is checked
174 */
175 static int ltdb_check_special_dn(struct ldb_module *module,
176                           const struct ldb_message *msg)
177 {
178         struct ldb_context *ldb = ldb_module_get_ctx(module);
179         int i, j;
180
181         if (! ldb_dn_is_special(msg->dn) ||
182             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
183                 return LDB_SUCCESS;
184         }
185
186         /* we have @ATTRIBUTES, let's check attributes are fine */
187         /* should we check that we deny multivalued attributes ? */
188         for (i = 0; i < msg->num_elements; i++) {
189                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
190
191                 for (j = 0; j < msg->elements[i].num_values; j++) {
192                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
193                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
194                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
195                         }
196                 }
197         }
198
199         return LDB_SUCCESS;
200 }
201
202
203 /*
204   we've made a modification to a dn - possibly reindex and
205   update sequence number
206 */
207 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
208 {
209         int ret = LDB_SUCCESS;
210         struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
211
212         /* only allow modifies inside a transaction, otherwise the
213          * ldb is unsafe */
214         if (ltdb->in_transaction == 0) {
215                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
216                 return LDB_ERR_OPERATIONS_ERROR;
217         }
218
219         if (ldb_dn_is_special(dn) &&
220             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
221              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
222                 ret = ltdb_reindex(module);
223         }
224
225         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
226         if (ret == LDB_SUCCESS &&
227             !(ldb_dn_is_special(dn) &&
228               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
229                 ret = ltdb_increase_sequence_number(module);
230         }
231
232         /* If the modify was to @OPTIONS, reload the cache */
233         if (ret == LDB_SUCCESS &&
234             ldb_dn_is_special(dn) &&
235             (ldb_dn_check_special(dn, LTDB_OPTIONS)) ) {
236                 ret = ltdb_cache_reload(module);
237         }
238
239         return ret;
240 }
241
242 /*
243   store a record into the db
244 */
245 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs)
246 {
247         void *data = ldb_module_get_private(module);
248         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
249         TDB_DATA tdb_key, tdb_data;
250         int ret = LDB_SUCCESS;
251
252         tdb_key = ltdb_key(module, msg->dn);
253         if (tdb_key.dptr == NULL) {
254                 return LDB_ERR_OTHER;
255         }
256
257         ret = ltdb_pack_data(module, msg, &tdb_data);
258         if (ret == -1) {
259                 talloc_free(tdb_key.dptr);
260                 return LDB_ERR_OTHER;
261         }
262
263         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
264         if (ret == -1) {
265                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
266                 goto done;
267         }
268
269 done:
270         talloc_free(tdb_key.dptr);
271         talloc_free(tdb_data.dptr);
272
273         return ret;
274 }
275
276
277 static int ltdb_add_internal(struct ldb_module *module,
278                              const struct ldb_message *msg)
279 {
280         struct ldb_context *ldb = ldb_module_get_ctx(module);
281         int ret = LDB_SUCCESS, i;
282
283         ret = ltdb_check_special_dn(module, msg);
284         if (ret != LDB_SUCCESS) {
285                 return ret;
286         }
287
288         if (ltdb_cache_load(module) != 0) {
289                 return LDB_ERR_OPERATIONS_ERROR;
290         }
291
292         for (i=0;i<msg->num_elements;i++) {
293                 struct ldb_message_element *el = &msg->elements[i];
294                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
295
296                 if (el->num_values == 0) {
297                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
298                                                el->name, ldb_dn_get_linearized(msg->dn));
299                         return LDB_ERR_CONSTRAINT_VIOLATION;
300                 }
301                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
302                         if (el->num_values > 1) {
303                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
304                                                        el->name, ldb_dn_get_linearized(msg->dn));
305                                 return LDB_ERR_CONSTRAINT_VIOLATION;
306                         }
307                 }
308         }
309
310         ret = ltdb_store(module, msg, TDB_INSERT);
311         if (ret != LDB_SUCCESS) {
312                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
313                         ldb_asprintf_errstring(ldb,
314                                                "Entry %s already exists",
315                                                ldb_dn_get_linearized(msg->dn));
316                 }
317                 return ret;
318         }
319
320         ret = ltdb_index_add_new(module, msg);
321         if (ret != LDB_SUCCESS) {
322                 return ret;
323         }
324
325         ret = ltdb_modified(module, msg->dn);
326
327         return ret;
328 }
329
330 /*
331   add a record to the database
332 */
333 static int ltdb_add(struct ltdb_context *ctx)
334 {
335         struct ldb_module *module = ctx->module;
336         struct ldb_request *req = ctx->req;
337         int ret = LDB_SUCCESS;
338
339         ldb_request_set_state(req, LDB_ASYNC_PENDING);
340
341         if (ltdb_cache_load(module) != 0) {
342                 return LDB_ERR_OPERATIONS_ERROR;
343         }
344
345         ret = ltdb_add_internal(module, req->op.add.message);
346
347         return ret;
348 }
349
350 /*
351   delete a record from the database, not updating indexes (used for deleting
352   index records)
353 */
354 int ltdb_delete_noindex(struct ldb_module *module, struct ldb_dn *dn)
355 {
356         void *data = ldb_module_get_private(module);
357         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
358         TDB_DATA tdb_key;
359         int ret;
360
361         tdb_key = ltdb_key(module, dn);
362         if (!tdb_key.dptr) {
363                 return LDB_ERR_OTHER;
364         }
365
366         ret = tdb_delete(ltdb->tdb, tdb_key);
367         talloc_free(tdb_key.dptr);
368
369         if (ret != 0) {
370                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
371         }
372
373         return ret;
374 }
375
376 static int ltdb_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
377 {
378         struct ldb_message *msg;
379         int ret = LDB_SUCCESS;
380
381         msg = talloc(module, struct ldb_message);
382         if (msg == NULL) {
383                 return LDB_ERR_OPERATIONS_ERROR;
384         }
385
386         /* in case any attribute of the message was indexed, we need
387            to fetch the old record */
388         ret = ltdb_search_dn1(module, dn, msg);
389         if (ret != LDB_SUCCESS) {
390                 /* not finding the old record is an error */
391                 goto done;
392         }
393
394         ret = ltdb_delete_noindex(module, dn);
395         if (ret != LDB_SUCCESS) {
396                 goto done;
397         }
398
399         /* remove any indexed attributes */
400         ret = ltdb_index_delete(module, msg);
401         if (ret != LDB_SUCCESS) {
402                 goto done;
403         }
404
405         ret = ltdb_modified(module, dn);
406         if (ret != LDB_SUCCESS) {
407                 goto done;
408         }
409
410 done:
411         talloc_free(msg);
412         return ret;
413 }
414
415 /*
416   delete a record from the database
417 */
418 static int ltdb_delete(struct ltdb_context *ctx)
419 {
420         struct ldb_module *module = ctx->module;
421         struct ldb_request *req = ctx->req;
422         int ret = LDB_SUCCESS;
423
424         ldb_request_set_state(req, LDB_ASYNC_PENDING);
425
426         if (ltdb_cache_load(module) != 0) {
427                 return LDB_ERR_OPERATIONS_ERROR;
428         }
429
430         ret = ltdb_delete_internal(module, req->op.del.dn);
431
432         return ret;
433 }
434
435 /*
436   find an element by attribute name. At the moment this does a linear search,
437   it should be re-coded to use a binary search once all places that modify
438   records guarantee sorted order
439
440   return the index of the first matching element if found, otherwise -1
441 */
442 static int find_element(const struct ldb_message *msg, const char *name)
443 {
444         unsigned int i;
445         for (i=0;i<msg->num_elements;i++) {
446                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
447                         return i;
448                 }
449         }
450         return -1;
451 }
452
453
454 /*
455   add an element to an existing record. Assumes a elements array that we
456   can call re-alloc on, and assumed that we can re-use the data pointers from
457   the passed in additional values. Use with care!
458
459   returns 0 on success, -1 on failure (and sets errno)
460 */
461 static int ltdb_msg_add_element(struct ldb_context *ldb,
462                                 struct ldb_message *msg,
463                                 struct ldb_message_element *el)
464 {
465         struct ldb_message_element *e2;
466         unsigned int i;
467
468         if (el->num_values == 0) {
469                 /* nothing to do here - we don't add empty elements */
470                 return 0;
471         }
472
473         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
474                               msg->num_elements+1);
475         if (!e2) {
476                 errno = ENOMEM;
477                 return -1;
478         }
479
480         msg->elements = e2;
481
482         e2 = &msg->elements[msg->num_elements];
483
484         e2->name = el->name;
485         e2->flags = el->flags;
486         e2->values = talloc_array(msg->elements,
487                                   struct ldb_val, el->num_values);
488         if (!e2->values) {
489                 errno = ENOMEM;
490                 return -1;
491         }
492         for (i=0;i<el->num_values;i++) {
493                 e2->values[i] = el->values[i];
494         }
495         e2->num_values = el->num_values;
496
497         ++msg->num_elements;
498
499         return 0;
500 }
501
502 /*
503   delete all elements having a specified attribute name
504 */
505 static int msg_delete_attribute(struct ldb_module *module,
506                                 struct ldb_context *ldb,
507                                 struct ldb_message *msg, const char *name)
508 {
509         unsigned int i;
510         int ret;
511         struct ldb_message_element *el;
512
513         el = ldb_msg_find_element(msg, name);
514         if (el == NULL) {
515                 return -1;
516         }
517         i = el - msg->elements;
518
519         ret = ltdb_index_del_element(module, msg->dn, el);
520         if (ret != LDB_SUCCESS) {
521                 return ret;
522         }
523
524         talloc_free(el->values);
525         if (msg->num_elements > (i+1)) {
526                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
527         }
528         msg->num_elements--;
529         msg->elements = talloc_realloc(msg, msg->elements,
530                                        struct ldb_message_element,
531                                        msg->num_elements);
532         return 0;
533 }
534
535 /*
536   delete all elements matching an attribute name/value
537
538   return 0 on success, -1 on failure
539 */
540 static int msg_delete_element(struct ldb_module *module,
541                               struct ldb_message *msg,
542                               const char *name,
543                               const struct ldb_val *val)
544 {
545         struct ldb_context *ldb = ldb_module_get_ctx(module);
546         unsigned int i;
547         int found, ret;
548         struct ldb_message_element *el;
549         const struct ldb_schema_attribute *a;
550
551         found = find_element(msg, name);
552         if (found == -1) {
553                 return -1;
554         }
555
556         el = &msg->elements[found];
557
558         a = ldb_schema_attribute_by_name(ldb, el->name);
559
560         for (i=0;i<el->num_values;i++) {
561                 if (a->syntax->comparison_fn(ldb, ldb,
562                                              &el->values[i], val) == 0) {
563                         if (el->num_values == 1) {
564                                 return msg_delete_attribute(module, ldb, msg, name);
565                         }
566
567                         ret = ltdb_index_del_value(module, msg->dn, el, i);
568                         if (ret != LDB_SUCCESS) {
569                                 return -1;
570                         }
571
572                         if (i<el->num_values-1) {
573                                 memmove(&el->values[i], &el->values[i+1],
574                                         sizeof(el->values[i])*
575                                                 (el->num_values-(i+1)));
576                         }
577                         el->num_values--;
578
579                         /* per definition we find in a canonicalised message an
580                            attribute value only once. So we are finished here */
581                         return 0;
582                 }
583         }
584
585         /* Not found */
586         return -1;
587 }
588
589
590 /*
591   modify a record - internal interface
592
593   yuck - this is O(n^2). Luckily n is usually small so we probably
594   get away with it, but if we ever have really large attribute lists
595   then we'll need to look at this again
596
597   'req' is optional, and is used to specify controls if supplied
598 */
599 int ltdb_modify_internal(struct ldb_module *module,
600                          const struct ldb_message *msg,
601                          struct ldb_request *req)
602 {
603         struct ldb_context *ldb = ldb_module_get_ctx(module);
604         void *data = ldb_module_get_private(module);
605         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
606         TDB_DATA tdb_key, tdb_data;
607         struct ldb_message *msg2;
608         unsigned i, j;
609         int ret = LDB_SUCCESS, idx;
610
611         tdb_key = ltdb_key(module, msg->dn);
612         if (!tdb_key.dptr) {
613                 return LDB_ERR_OTHER;
614         }
615
616         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
617         if (!tdb_data.dptr) {
618                 talloc_free(tdb_key.dptr);
619                 return ltdb_err_map(tdb_error(ltdb->tdb));
620         }
621
622         msg2 = talloc(tdb_key.dptr, struct ldb_message);
623         if (msg2 == NULL) {
624                 free(tdb_data.dptr);
625                 ret = LDB_ERR_OTHER;
626                 goto done;
627         }
628
629         ret = ltdb_unpack_data(module, &tdb_data, msg2);
630         free(tdb_data.dptr);
631         if (ret == -1) {
632                 ret = LDB_ERR_OTHER;
633                 goto done;
634         }
635
636         if (!msg2->dn) {
637                 msg2->dn = msg->dn;
638         }
639
640         for (i=0; i<msg->num_elements; i++) {
641                 struct ldb_message_element *el = &msg->elements[i], *el2;
642                 struct ldb_val *vals;
643                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
644                 const char *dn;
645
646                 if (ldb_attr_cmp(el->name, "distinguishedName") == 0) {
647                         ldb_asprintf_errstring(ldb, "it is not permitted to perform a modify on 'distinguishedName' (use rename instead): %s",
648                                                ldb_dn_get_linearized(msg2->dn));
649                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
650                         goto done;
651                 }
652
653                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
654                 case LDB_FLAG_MOD_ADD:
655                         if (el->num_values == 0) {
656                                 ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illigal)",
657                                                        el->name, ldb_dn_get_linearized(msg2->dn));
658                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
659                                 goto done;
660                         }
661
662                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
663                                 if (el->num_values > 1) {
664                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
665                                                                el->name, ldb_dn_get_linearized(msg2->dn));
666                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
667                                         goto done;
668                                 }
669                         }
670
671                         /* Checks if element already exists */
672                         idx = find_element(msg2, el->name);
673                         if (idx == -1) {
674                                 if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
675                                         ret = LDB_ERR_OTHER;
676                                         goto done;
677                                 }
678                                 ret = ltdb_index_add_element(module, msg2->dn, el);
679                                 if (ret != LDB_SUCCESS) {
680                                         goto done;
681                                 }
682                         } else {
683                                 /* We cannot add another value on a existing one
684                                    if the attribute is single-valued */
685                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
686                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
687                                                                el->name, ldb_dn_get_linearized(msg2->dn));
688                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
689                                         goto done;
690                                 }
691
692                                 el2 = &(msg2->elements[idx]);
693
694                                 /* Check that values don't exist yet on multi-
695                                    valued attributes or aren't provided twice */
696                                 for (j=0; j<el->num_values; j++) {
697                                         if (ldb_msg_find_val(el2, &el->values[j]) != NULL) {
698                                                 ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
699                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
700                                                 goto done;
701                                         }
702                                         if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
703                                                 ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
704                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
705                                                 goto done;
706                                         }
707                                 }
708
709                                 /* Now combine existing and new values to a new
710                                    attribute record */
711                                 vals = talloc_realloc(msg2->elements,
712                                                       el2->values, struct ldb_val,
713                                                       el2->num_values + el->num_values);
714                                 if (vals == NULL) {
715                                         ldb_oom(ldb);
716                                         ret = LDB_ERR_OTHER;
717                                         goto done;
718                                 }
719
720                                 for (j=0; j<el->num_values; j++) {
721                                         vals[el2->num_values + j] =
722                                                 ldb_val_dup(vals, &el->values[j]);
723                                 }
724
725                                 el2->values = vals;
726                                 el2->num_values += el->num_values;
727
728                                 ret = ltdb_index_add_element(module, msg2->dn, el);
729                                 if (ret != LDB_SUCCESS) {
730                                         goto done;
731                                 }
732                         }
733
734                         break;
735
736                 case LDB_FLAG_MOD_REPLACE:
737                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
738                                 /* the RELAX control overrides this
739                                    check for replace. This is needed as
740                                    DRS replication can produce multiple
741                                    values here for a single valued
742                                    attribute when the values are deleted
743                                    links
744                                 */
745                                 if (el->num_values > 1 &&
746                                     (!req || !ldb_request_get_control(req, LDB_CONTROL_RELAX_OID))) {
747                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
748                                                                el->name, ldb_dn_get_linearized(msg2->dn));
749                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
750                                         goto done;
751                                 }
752                         }
753
754                         /* TODO: This is O(n^2) - replace with more efficient check */
755                         for (j=0; j<el->num_values; j++) {
756                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
757                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
758                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
759                                         goto done;
760                                 }
761                         }
762
763                         idx = find_element(msg2, el->name);
764                         if (idx != -1) {
765                                 el2 = &(msg2->elements[idx]);
766                                 if (ldb_msg_element_compare(el, el2) == 0) {
767                                         /* we are replacing with the same values */
768                                         continue;
769                                 }
770                         
771                                 /* Delete the attribute if it exists in the DB */
772                                 if (msg_delete_attribute(module, ldb, msg2, el->name) != 0) {
773                                         ret = LDB_ERR_OTHER;
774                                         goto done;
775                                 }
776                         }
777
778                         /* Recreate it with the new values */
779                         if (ltdb_msg_add_element(ldb, msg2, el) != 0) {
780                                 ret = LDB_ERR_OTHER;
781                                 goto done;
782                         }
783
784                         ret = ltdb_index_add_element(module, msg2->dn, el);
785                         if (ret != LDB_SUCCESS) {
786                                 goto done;
787                         }
788
789                         break;
790
791                 case LDB_FLAG_MOD_DELETE:
792                         dn = ldb_dn_get_linearized(msg2->dn);
793                         if (dn == NULL) {
794                                 ret = LDB_ERR_OTHER;
795                                 goto done;
796                         }
797
798                         if (msg->elements[i].num_values == 0) {
799                                 /* Delete the whole attribute */
800                                 if (msg_delete_attribute(module, ldb, msg2,
801                                                          msg->elements[i].name) != 0) {
802                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s",
803                                                                msg->elements[i].name, dn);
804                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
805                                         goto done;
806                                 }
807                         } else {
808                                 /* Delete specified values from an attribute */
809                                 for (j=0; j < msg->elements[i].num_values; j++) {
810                                         if (msg_delete_element(module,
811                                                                msg2,
812                                                                msg->elements[i].name,
813                                                                &msg->elements[i].values[j]) != 0) {
814                                                 ldb_asprintf_errstring(ldb, "No matching attribute value when deleting attribute: %s on %s",
815                                                                        msg->elements[i].name, dn);
816                                                 ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
817                                                 goto done;
818                                         }
819                                 }
820                         }
821                         break;
822                 default:
823                         ldb_asprintf_errstring(ldb,
824                                 "Invalid ldb_modify flags on %s: 0x%x",
825                                 msg->elements[i].name,
826                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
827                         ret = LDB_ERR_PROTOCOL_ERROR;
828                         goto done;
829                 }
830         }
831
832         ret = ltdb_store(module, msg2, TDB_MODIFY);
833         if (ret != LDB_SUCCESS) {
834                 goto done;
835         }
836
837         ret = ltdb_modified(module, msg2->dn);
838         if (ret != LDB_SUCCESS) {
839                 goto done;
840         }
841
842 done:
843         talloc_free(tdb_key.dptr);
844         return ret;
845 }
846
847 /*
848   modify a record
849 */
850 static int ltdb_modify(struct ltdb_context *ctx)
851 {
852         struct ldb_module *module = ctx->module;
853         struct ldb_request *req = ctx->req;
854         int ret = LDB_SUCCESS;
855
856         ret = ltdb_check_special_dn(module, req->op.mod.message);
857         if (ret != LDB_SUCCESS) {
858                 return ret;
859         }
860
861         ldb_request_set_state(req, LDB_ASYNC_PENDING);
862
863         if (ltdb_cache_load(module) != 0) {
864                 return LDB_ERR_OPERATIONS_ERROR;
865         }
866
867         ret = ltdb_modify_internal(module, req->op.mod.message, req);
868
869         return ret;
870 }
871
872 /*
873   rename a record
874 */
875 static int ltdb_rename(struct ltdb_context *ctx)
876 {
877         struct ldb_module *module = ctx->module;
878         struct ldb_request *req = ctx->req;
879         struct ldb_message *msg;
880         int ret = LDB_SUCCESS;
881
882         ldb_request_set_state(req, LDB_ASYNC_PENDING);
883
884         if (ltdb_cache_load(ctx->module) != 0) {
885                 return LDB_ERR_OPERATIONS_ERROR;
886         }
887
888         msg = talloc(ctx, struct ldb_message);
889         if (msg == NULL) {
890                 return LDB_ERR_OPERATIONS_ERROR;
891         }
892
893         /* in case any attribute of the message was indexed, we need
894            to fetch the old record */
895         ret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
896         if (ret != LDB_SUCCESS) {
897                 /* not finding the old record is an error */
898                 return ret;
899         }
900
901         /* Always delete first then add, to avoid conflicts with
902          * unique indexes. We rely on the transaction to make this
903          * atomic
904          */
905         ret = ltdb_delete_internal(module, msg->dn);
906         if (ret != LDB_SUCCESS) {
907                 return ret;
908         }
909
910         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
911         if (msg->dn == NULL) {
912                 return LDB_ERR_OPERATIONS_ERROR;
913         }
914
915         ret = ltdb_add_internal(module, msg);
916
917         return ret;
918 }
919
920 static int ltdb_start_trans(struct ldb_module *module)
921 {
922         void *data = ldb_module_get_private(module);
923         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
924
925         if (tdb_transaction_start(ltdb->tdb) != 0) {
926                 return ltdb_err_map(tdb_error(ltdb->tdb));
927         }
928
929         ltdb->in_transaction++;
930
931         ltdb_index_transaction_start(module);
932
933         return LDB_SUCCESS;
934 }
935
936 static int ltdb_prepare_commit(struct ldb_module *module)
937 {
938         void *data = ldb_module_get_private(module);
939         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
940
941         if (ltdb->in_transaction != 1) {
942                 return LDB_SUCCESS;
943         }
944
945         if (ltdb_index_transaction_commit(module) != 0) {
946                 tdb_transaction_cancel(ltdb->tdb);
947                 ltdb->in_transaction--;
948                 return ltdb_err_map(tdb_error(ltdb->tdb));
949         }
950
951         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
952                 ltdb->in_transaction--;
953                 return ltdb_err_map(tdb_error(ltdb->tdb));
954         }
955
956         ltdb->prepared_commit = true;
957
958         return LDB_SUCCESS;
959 }
960
961 static int ltdb_end_trans(struct ldb_module *module)
962 {
963         void *data = ldb_module_get_private(module);
964         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
965
966         if (!ltdb->prepared_commit) {
967                 int ret = ltdb_prepare_commit(module);
968                 if (ret != LDB_SUCCESS) {
969                         return ret;
970                 }
971         }
972
973         ltdb->in_transaction--;
974         ltdb->prepared_commit = false;
975
976         if (tdb_transaction_commit(ltdb->tdb) != 0) {
977                 return ltdb_err_map(tdb_error(ltdb->tdb));
978         }
979
980         return LDB_SUCCESS;
981 }
982
983 static int ltdb_del_trans(struct ldb_module *module)
984 {
985         void *data = ldb_module_get_private(module);
986         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
987
988         ltdb->in_transaction--;
989
990         if (ltdb_index_transaction_cancel(module) != 0) {
991                 tdb_transaction_cancel(ltdb->tdb);
992                 return ltdb_err_map(tdb_error(ltdb->tdb));
993         }
994
995         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
996                 return ltdb_err_map(tdb_error(ltdb->tdb));
997         }
998
999         return LDB_SUCCESS;
1000 }
1001
1002 /*
1003   return sequenceNumber from @BASEINFO
1004 */
1005 static int ltdb_sequence_number(struct ltdb_context *ctx,
1006                                 struct ldb_extended **ext)
1007 {
1008         struct ldb_context *ldb;
1009         struct ldb_module *module = ctx->module;
1010         struct ldb_request *req = ctx->req;
1011         TALLOC_CTX *tmp_ctx;
1012         struct ldb_seqnum_request *seq;
1013         struct ldb_seqnum_result *res;
1014         struct ldb_message *msg = NULL;
1015         struct ldb_dn *dn;
1016         const char *date;
1017         int ret = LDB_SUCCESS;
1018
1019         ldb = ldb_module_get_ctx(module);
1020
1021         seq = talloc_get_type(req->op.extended.data,
1022                                 struct ldb_seqnum_request);
1023         if (seq == NULL) {
1024                 return LDB_ERR_OPERATIONS_ERROR;
1025         }
1026
1027         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1028
1029         if (ltdb_lock_read(module) != 0) {
1030                 return LDB_ERR_OPERATIONS_ERROR;
1031         }
1032
1033         res = talloc_zero(req, struct ldb_seqnum_result);
1034         if (res == NULL) {
1035                 ret = LDB_ERR_OPERATIONS_ERROR;
1036                 goto done;
1037         }
1038         tmp_ctx = talloc_new(req);
1039         if (tmp_ctx == NULL) {
1040                 ret = LDB_ERR_OPERATIONS_ERROR;
1041                 goto done;
1042         }
1043
1044         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1045
1046         msg = talloc(tmp_ctx, struct ldb_message);
1047         if (msg == NULL) {
1048                 ret = LDB_ERR_OPERATIONS_ERROR;
1049                 goto done;
1050         }
1051
1052         ret = ltdb_search_dn1(module, dn, msg);
1053         if (ret != LDB_SUCCESS) {
1054                 goto done;
1055         }
1056
1057         switch (seq->type) {
1058         case LDB_SEQ_HIGHEST_SEQ:
1059                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1060                 break;
1061         case LDB_SEQ_NEXT:
1062                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1063                 res->seq_num++;
1064                 break;
1065         case LDB_SEQ_HIGHEST_TIMESTAMP:
1066                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1067                 if (date) {
1068                         res->seq_num = ldb_string_to_time(date);
1069                 } else {
1070                         res->seq_num = 0;
1071                         /* zero is as good as anything when we don't know */
1072                 }
1073                 break;
1074         }
1075
1076         *ext = talloc_zero(req, struct ldb_extended);
1077         if (*ext == NULL) {
1078                 ret = LDB_ERR_OPERATIONS_ERROR;
1079                 goto done;
1080         }
1081         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1082         (*ext)->data = talloc_steal(*ext, res);
1083
1084 done:
1085         talloc_free(tmp_ctx);
1086         ltdb_unlock_read(module);
1087         return ret;
1088 }
1089
1090 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1091 {
1092         struct ldb_context *ldb;
1093         struct ldb_request *req;
1094         struct ldb_reply *ares;
1095
1096         ldb = ldb_module_get_ctx(ctx->module);
1097         req = ctx->req;
1098
1099         /* if we already returned an error just return */
1100         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1101                 return;
1102         }
1103
1104         ares = talloc_zero(req, struct ldb_reply);
1105         if (!ares) {
1106                 ldb_oom(ldb);
1107                 req->callback(req, NULL);
1108                 return;
1109         }
1110         ares->type = LDB_REPLY_DONE;
1111         ares->error = error;
1112
1113         req->callback(req, ares);
1114 }
1115
1116 static void ltdb_timeout(struct tevent_context *ev,
1117                           struct tevent_timer *te,
1118                           struct timeval t,
1119                           void *private_data)
1120 {
1121         struct ltdb_context *ctx;
1122         ctx = talloc_get_type(private_data, struct ltdb_context);
1123
1124         if (!ctx->request_terminated) {
1125                 /* request is done now */
1126                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1127         }
1128
1129         if (!ctx->request_terminated) {
1130                 /* neutralize the spy */
1131                 ctx->spy->ctx = NULL;
1132         }
1133         talloc_free(ctx);
1134 }
1135
1136 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1137                                         struct ldb_extended *ext,
1138                                         int error)
1139 {
1140         struct ldb_context *ldb;
1141         struct ldb_request *req;
1142         struct ldb_reply *ares;
1143
1144         ldb = ldb_module_get_ctx(ctx->module);
1145         req = ctx->req;
1146
1147         /* if we already returned an error just return */
1148         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1149                 return;
1150         }
1151
1152         ares = talloc_zero(req, struct ldb_reply);
1153         if (!ares) {
1154                 ldb_oom(ldb);
1155                 req->callback(req, NULL);
1156                 return;
1157         }
1158         ares->type = LDB_REPLY_DONE;
1159         ares->response = ext;
1160         ares->error = error;
1161
1162         req->callback(req, ares);
1163 }
1164
1165 static void ltdb_handle_extended(struct ltdb_context *ctx)
1166 {
1167         struct ldb_extended *ext = NULL;
1168         int ret;
1169
1170         if (strcmp(ctx->req->op.extended.oid,
1171                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1172                 /* get sequence number */
1173                 ret = ltdb_sequence_number(ctx, &ext);
1174         } else {
1175                 /* not recognized */
1176                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1177         }
1178
1179         ltdb_request_extended_done(ctx, ext, ret);
1180 }
1181
1182 static void ltdb_callback(struct tevent_context *ev,
1183                           struct tevent_timer *te,
1184                           struct timeval t,
1185                           void *private_data)
1186 {
1187         struct ltdb_context *ctx;
1188         int ret;
1189
1190         ctx = talloc_get_type(private_data, struct ltdb_context);
1191
1192         if (ctx->request_terminated) {
1193                 goto done;
1194         }
1195
1196         switch (ctx->req->operation) {
1197         case LDB_SEARCH:
1198                 ret = ltdb_search(ctx);
1199                 break;
1200         case LDB_ADD:
1201                 ret = ltdb_add(ctx);
1202                 break;
1203         case LDB_MODIFY:
1204                 ret = ltdb_modify(ctx);
1205                 break;
1206         case LDB_DELETE:
1207                 ret = ltdb_delete(ctx);
1208                 break;
1209         case LDB_RENAME:
1210                 ret = ltdb_rename(ctx);
1211                 break;
1212         case LDB_EXTENDED:
1213                 ltdb_handle_extended(ctx);
1214                 goto done;
1215         default:
1216                 /* no other op supported */
1217                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1218         }
1219
1220         if (!ctx->request_terminated) {
1221                 /* request is done now */
1222                 ltdb_request_done(ctx, ret);
1223         }
1224
1225 done:
1226         if (!ctx->request_terminated) {
1227                 /* neutralize the spy */
1228                 ctx->spy->ctx = NULL;
1229         }
1230         talloc_free(ctx);
1231 }
1232
1233 static int ltdb_request_destructor(void *ptr)
1234 {
1235         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1236
1237         if (spy->ctx != NULL) {
1238                 spy->ctx->request_terminated = true;
1239         }
1240
1241         return 0;
1242 }
1243
1244 static int ltdb_handle_request(struct ldb_module *module,
1245                                struct ldb_request *req)
1246 {
1247         struct ldb_context *ldb;
1248         struct tevent_context *ev;
1249         struct ltdb_context *ac;
1250         struct tevent_timer *te;
1251         struct timeval tv;
1252         int i;
1253
1254         ldb = ldb_module_get_ctx(module);
1255
1256         for (i = 0; req->controls && req->controls[i]; i++) {
1257                 if (req->controls[i]->critical) {
1258                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1259                                                req->controls[i]->oid);
1260                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1261                 }
1262         }
1263
1264         if (req->starttime == 0 || req->timeout == 0) {
1265                 ldb_set_errstring(ldb, "Invalid timeout settings");
1266                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1267         }
1268
1269         ev = ldb_get_event_context(ldb);
1270
1271         ac = talloc_zero(ldb, struct ltdb_context);
1272         if (ac == NULL) {
1273                 ldb_oom(ldb);
1274                 return LDB_ERR_OPERATIONS_ERROR;
1275         }
1276
1277         ac->module = module;
1278         ac->req = req;
1279
1280         tv.tv_sec = 0;
1281         tv.tv_usec = 0;
1282         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1283         if (NULL == te) {
1284                 talloc_free(ac);
1285                 return LDB_ERR_OPERATIONS_ERROR;
1286         }
1287
1288         tv.tv_sec = req->starttime + req->timeout;
1289         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1290         if (NULL == ac->timeout_event) {
1291                 talloc_free(ac);
1292                 return LDB_ERR_OPERATIONS_ERROR;
1293         }
1294
1295         /* set a spy so that we do not try to use the request context
1296          * if it is freed before ltdb_callback fires */
1297         ac->spy = talloc(req, struct ltdb_req_spy);
1298         if (NULL == ac->spy) {
1299                 talloc_free(ac);
1300                 return LDB_ERR_OPERATIONS_ERROR;
1301         }
1302         ac->spy->ctx = ac;
1303
1304         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1305
1306         return LDB_SUCCESS;
1307 }
1308
1309 static const struct ldb_module_ops ltdb_ops = {
1310         .name              = "tdb",
1311         .search            = ltdb_handle_request,
1312         .add               = ltdb_handle_request,
1313         .modify            = ltdb_handle_request,
1314         .del               = ltdb_handle_request,
1315         .rename            = ltdb_handle_request,
1316         .extended          = ltdb_handle_request,
1317         .start_transaction = ltdb_start_trans,
1318         .end_transaction   = ltdb_end_trans,
1319         .prepare_commit    = ltdb_prepare_commit,
1320         .del_transaction   = ltdb_del_trans,
1321 };
1322
1323 /*
1324   connect to the database
1325 */
1326 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1327                         unsigned int flags, const char *options[],
1328                         struct ldb_module **_module)
1329 {
1330         struct ldb_module *module;
1331         const char *path;
1332         int tdb_flags, open_flags;
1333         struct ltdb_private *ltdb;
1334
1335         /* parse the url */
1336         if (strchr(url, ':')) {
1337                 if (strncmp(url, "tdb://", 6) != 0) {
1338                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1339                                   "Invalid tdb URL '%s'", url);
1340                         return LDB_ERR_OPERATIONS_ERROR;
1341                 }
1342                 path = url+6;
1343         } else {
1344                 path = url;
1345         }
1346
1347         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1348
1349         /* check for the 'nosync' option */
1350         if (flags & LDB_FLG_NOSYNC) {
1351                 tdb_flags |= TDB_NOSYNC;
1352         }
1353
1354         /* and nommap option */
1355         if (flags & LDB_FLG_NOMMAP) {
1356                 tdb_flags |= TDB_NOMMAP;
1357         }
1358
1359         if (flags & LDB_FLG_RDONLY) {
1360                 open_flags = O_RDONLY;
1361         } else {
1362                 open_flags = O_CREAT | O_RDWR;
1363         }
1364
1365         ltdb = talloc_zero(ldb, struct ltdb_private);
1366         if (!ltdb) {
1367                 ldb_oom(ldb);
1368                 return LDB_ERR_OPERATIONS_ERROR;
1369         }
1370
1371         /* note that we use quite a large default hash size */
1372         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1373                                    tdb_flags, open_flags,
1374                                    ldb_get_create_perms(ldb), ldb);
1375         if (!ltdb->tdb) {
1376                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1377                           "Unable to open tdb '%s'", path);
1378                 talloc_free(ltdb);
1379                 return LDB_ERR_OPERATIONS_ERROR;
1380         }
1381
1382         ltdb->sequence_number = 0;
1383
1384         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1385         if (!module) {
1386                 talloc_free(ltdb);
1387                 return LDB_ERR_OPERATIONS_ERROR;
1388         }
1389         ldb_module_set_private(module, ltdb);
1390         talloc_steal(module, ltdb);
1391
1392         if (ltdb_cache_load(module) != 0) {
1393                 talloc_free(module);
1394                 talloc_free(ltdb);
1395                 return LDB_ERR_OPERATIONS_ERROR;
1396         }
1397
1398         *_module = module;
1399         return LDB_SUCCESS;
1400 }
1401
1402 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1403         .name = "tdb",
1404         .connect_fn = ltdb_connect
1405 };