s4-ldb: merged with master
[ira/wip.git] / source4 / lib / ldb / ldb_tdb / ldb_tdb.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell  2004
5    Copyright (C) Stefan Metzmacher  2004
6    Copyright (C) Simo Sorce       2006-2008
7
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_tdb
29  *
30  *  Component: ldb tdb backend
31  *
32  *  Description: core functions for tdb backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asyncronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  */
47
48 #include "ldb_tdb.h"
49
50
51 /*
52   map a tdb error code to a ldb error code
53 */
54 static int ltdb_err_map(enum TDB_ERROR tdb_code)
55 {
56         switch (tdb_code) {
57         case TDB_SUCCESS:
58                 return LDB_SUCCESS;
59         case TDB_ERR_CORRUPT:
60         case TDB_ERR_OOM:
61         case TDB_ERR_EINVAL:
62                 return LDB_ERR_OPERATIONS_ERROR;
63         case TDB_ERR_IO:
64                 return LDB_ERR_PROTOCOL_ERROR;
65         case TDB_ERR_LOCK:
66         case TDB_ERR_NOLOCK:
67                 return LDB_ERR_BUSY;
68         case TDB_ERR_LOCK_TIMEOUT:
69                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
70         case TDB_ERR_EXISTS:
71                 return LDB_ERR_ENTRY_ALREADY_EXISTS;
72         case TDB_ERR_NOEXIST:
73                 return LDB_ERR_NO_SUCH_OBJECT;
74         case TDB_ERR_RDONLY:
75                 return LDB_ERR_INSUFFICIENT_ACCESS_RIGHTS;
76         }
77         return LDB_ERR_OTHER;
78 }
79
80 /*
81   lock the database for read - use by ltdb_search and ltdb_sequence_number
82 */
83 int ltdb_lock_read(struct ldb_module *module)
84 {
85         void *data = ldb_module_get_private(module);
86         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
87         if (ltdb->in_transaction == 0) {
88                 return tdb_lockall_read(ltdb->tdb);
89         }
90         return 0;
91 }
92
93 /*
94   unlock the database after a ltdb_lock_read()
95 */
96 int ltdb_unlock_read(struct ldb_module *module)
97 {
98         void *data = ldb_module_get_private(module);
99         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
100         if (ltdb->in_transaction == 0) {
101                 return tdb_unlockall_read(ltdb->tdb);
102         }
103         return 0;
104 }
105
106 struct ldb_val ldb_dn_get_casefold_as_ldb_val(struct ldb_dn *dn) {
107         struct ldb_val val;
108         const char *casefold_dn = ldb_dn_get_casefold(dn);
109         val.data = (uint8_t *)((uintptr_t)casefold_dn);
110         val.length = strlen(casefold_dn);
111         return val;
112 }
113
114 struct ldb_val ldb_dn_alloc_casefold_as_ldb_val(TALLOC_CTX *mem_ctx, struct ldb_dn *dn) {
115         struct ldb_val val;
116         const char *casefold_dn = ldb_dn_alloc_casefold(mem_ctx, dn);
117         val.data = (uint8_t *)((uintptr_t)casefold_dn);
118         val.length = strlen(casefold_dn);
119         return val;
120 }
121
122 /*
123   form a TDB_DATA for a record key
124   caller frees
125
126   This version takes the casefolded string form of the DN as an ldb_val
127 */
128 struct TDB_DATA ltdb_key_from_casefold_dn(TALLOC_CTX *mem_ctx, 
129                                           struct ldb_val dn_folded)
130 {
131         TDB_DATA key;
132
133         key.dsize = dn_folded.length + 4;
134         key.dptr = talloc_size(mem_ctx, key.dsize);
135         if (!key.dptr) {
136                 goto failed;
137         }
138
139         memcpy(key.dptr, "DN=", 3);
140         memcpy(&key.dptr[3], dn_folded.data, key.dsize - 4);
141
142         key.dptr[key.dsize - 1] = '\0';
143
144         return key;
145
146 failed:
147         errno = ENOMEM;
148         key.dptr = NULL;
149         key.dsize = 0;
150         return key;
151 }
152
153
154 /*
155   form a TDB_DATA for a record key
156   caller frees
157
158   note that the key for a record can depend on whether the
159   dn refers to a case sensitive index record or not
160 */
161 struct TDB_DATA ltdb_key(TALLOC_CTX *mem_ctx, struct ldb_dn *dn)
162 {
163         TDB_DATA key;
164         struct ldb_val dn_folded;
165
166         /*
167           most DNs are case insensitive. The exception is index DNs for
168           case sensitive attributes
169
170           there are 3 cases dealt with in this code:
171
172           1) if the dn doesn't start with @ then uppercase the attribute
173              names and the attributes values of case insensitive attributes
174           2) if the dn starts with @ then leave it alone -
175              the indexing code handles the rest
176         */
177
178         dn_folded = ldb_dn_get_casefold_as_ldb_val(dn);
179         if (!dn_folded.data) {
180                 errno = EINVAL;
181                 key.dptr = NULL;
182                 key.dsize = 0;
183                 return key;
184         }
185
186         return ltdb_key_from_casefold_dn(mem_ctx, dn_folded);
187 }
188
189 /*
190   check special dn's have valid attributes
191   currently only @ATTRIBUTES is checked
192 */
193 static int ltdb_check_special_dn(struct ldb_module *module,
194                           const struct ldb_message *msg)
195 {
196         struct ldb_context *ldb = ldb_module_get_ctx(module);
197         int i, j;
198
199         if (! ldb_dn_is_special(msg->dn) ||
200             ! ldb_dn_check_special(msg->dn, LTDB_ATTRIBUTES)) {
201                 return 0;
202         }
203
204         /* we have @ATTRIBUTES, let's check attributes are fine */
205         /* should we check that we deny multivalued attributes ? */
206         for (i = 0; i < msg->num_elements; i++) {
207                 for (j = 0; j < msg->elements[i].num_values; j++) {
208                         if (ltdb_check_at_attributes_values(&msg->elements[i].values[j]) != 0) {
209                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
210                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
211                         }
212                 }
213         }
214
215         return 0;
216 }
217
218
219 /*
220   we've made a modification to a dn - possibly reindex and
221   update sequence number
222 */
223 static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
224 {
225         int ret = LDB_SUCCESS;
226
227         if (ldb_dn_is_special(dn) &&
228             (ldb_dn_check_special(dn, LTDB_INDEXLIST) ||
229              ldb_dn_check_special(dn, LTDB_ATTRIBUTES)) ) {
230                 ret = ltdb_reindex(module);
231         }
232
233         if (ret == LDB_SUCCESS &&
234             !(ldb_dn_is_special(dn) &&
235               ldb_dn_check_special(dn, LTDB_BASEINFO)) ) {
236                 ret = ltdb_increase_sequence_number(module);
237         }
238
239         return ret;
240 }
241
242 /*
243   store a record into the db
244 */
245 int ltdb_store(struct ldb_module *module, TALLOC_CTX *mem_ctx, 
246                const struct ldb_message *msg, int flgs)
247 {
248         void *data = ldb_module_get_private(module);
249         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
250         TDB_DATA tdb_key, tdb_data;
251         int ret;
252
253         tdb_key = ltdb_key(mem_ctx, msg->dn);
254         if (!tdb_key.dptr) {
255                 return LDB_ERR_OTHER;
256         }
257
258         ret = ltdb_pack_data(module, msg, &tdb_data);
259         if (ret == -1) {
260                 talloc_free(tdb_key.dptr);
261                 return LDB_ERR_OTHER;
262         }
263
264         ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
265         if (ret == -1) {
266                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
267                 goto done;
268         }
269
270         ret = ltdb_index_add(module, mem_ctx, msg);
271         if (ret != LDB_SUCCESS) {
272                 tdb_delete(ltdb->tdb, tdb_key);
273         }
274
275 done:
276         talloc_free(tdb_key.dptr);
277         talloc_free(tdb_data.dptr);
278
279         return ret;
280 }
281
282
283 static int ltdb_add_internal(struct ldb_module *module,
284                              TALLOC_CTX *mem_ctx, 
285                              const struct ldb_message *msg)
286 {
287         struct ldb_context *ldb = ldb_module_get_ctx(module);
288         int ret, i;
289
290         ret = ltdb_check_special_dn(module, msg);
291         if (ret != LDB_SUCCESS) {
292                 return ret;
293         }
294
295         if (ltdb_cache_load(module) != 0) {
296                 return LDB_ERR_OPERATIONS_ERROR;
297         }
298
299         for (i=0;i<msg->num_elements;i++) {
300                 struct ldb_message_element *el = &msg->elements[i];
301                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
302
303                 if (el->num_values == 0) {
304                         ldb_asprintf_errstring(ldb, "attribute %s on %s specified, but with 0 values (illegal)", 
305                                                el->name, ldb_dn_get_linearized(msg->dn));
306                         return LDB_ERR_CONSTRAINT_VIOLATION;
307                 }
308                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
309                         if (el->num_values > 1) {
310                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s speicified more than once", 
311                                                        el->name, ldb_dn_get_linearized(msg->dn));
312                                 return LDB_ERR_CONSTRAINT_VIOLATION;
313                         }
314                 }
315         }
316
317         ret = ltdb_store(module, mem_ctx, msg, TDB_INSERT);
318
319         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
320                 ldb_asprintf_errstring(ldb,
321                                         "Entry %s already exists",
322                                         ldb_dn_get_linearized(msg->dn));
323                 return ret;
324         }
325
326         if (ret == LDB_SUCCESS) {
327                 ret = ltdb_index_one(module, mem_ctx, msg, 1);
328                 if (ret != LDB_SUCCESS) {
329                         return ret;
330                 }
331
332                 ret = ltdb_modified(module, msg->dn);
333                 if (ret != LDB_SUCCESS) {
334                         return ret;
335                 }
336         }
337
338         return ret;
339 }
340
341 /*
342   add a record to the database
343 */
344 static int ltdb_add(struct ltdb_context *ctx)
345 {
346         struct ldb_module *module = ctx->module;
347         struct ldb_request *req = ctx->req;
348         int tret;
349
350         ldb_request_set_state(req, LDB_ASYNC_PENDING);
351
352         tret = ltdb_add_internal(module, req, req->op.add.message);
353         if (tret != LDB_SUCCESS) {
354                 return tret;
355         }
356
357         return LDB_SUCCESS;
358 }
359
360 /*
361   delete a record from the database, not updating indexes (used for deleting
362   index records)
363 */
364 int ltdb_delete_noindex(struct ldb_module *module, TALLOC_CTX *mem_ctx, struct ldb_dn *dn)
365 {
366         void *data = ldb_module_get_private(module);
367         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
368         TDB_DATA tdb_key;
369         int ret;
370
371         tdb_key = ltdb_key(mem_ctx, dn);
372         if (!tdb_key.dptr) {
373                 return LDB_ERR_OTHER;
374         }
375
376         ret = tdb_delete(ltdb->tdb, tdb_key);
377         talloc_free(tdb_key.dptr);
378
379         if (ret != 0) {
380                 ret = ltdb_err_map(tdb_error(ltdb->tdb));
381         }
382
383         return ret;
384 }
385
386 static int ltdb_delete_internal(struct ldb_module *module, TALLOC_CTX *mem_ctx, struct ldb_dn *dn)
387 {
388         struct ldb_message *msg;
389         int ret;
390
391         msg = talloc(mem_ctx, struct ldb_message);
392         if (msg == NULL) {
393                 return LDB_ERR_OPERATIONS_ERROR;
394         }
395
396         /* in case any attribute of the message was indexed, we need
397            to fetch the old record */
398         ret = ltdb_search_dn1(module, dn, msg);
399         if (ret != LDB_SUCCESS) {
400                 /* not finding the old record is an error */
401                 goto done;
402         }
403
404         ret = ltdb_delete_noindex(module, msg, dn);
405         if (ret != LDB_SUCCESS) {
406                 goto done;
407         }
408
409         /* remove one level attribute */
410         ret = ltdb_index_one(module, msg, msg, 0);
411         if (ret != LDB_SUCCESS) {
412                 goto done;
413         }
414
415         /* remove any indexed attributes */
416         ret = ltdb_index_del(module, msg, msg);
417         if (ret != LDB_SUCCESS) {
418                 goto done;
419         }
420
421         ret = ltdb_modified(module, dn);
422         if (ret != LDB_SUCCESS) {
423                 goto done;
424         }
425
426 done:
427         talloc_free(msg);
428         return ret;
429 }
430
431 /*
432   delete a record from the database
433 */
434 static int ltdb_delete(struct ltdb_context *ctx)
435 {
436         struct ldb_module *module = ctx->module;
437         struct ldb_request *req = ctx->req;
438         int tret;
439
440         ldb_request_set_state(req, LDB_ASYNC_PENDING);
441
442         if (ltdb_cache_load(module) != 0) {
443                 return LDB_ERR_OPERATIONS_ERROR;
444         }
445
446         tret = ltdb_delete_internal(module, req, req->op.del.dn);
447         if (tret != LDB_SUCCESS) {
448                 return tret;
449         }
450
451         return LDB_SUCCESS;
452 }
453
454 /*
455   find an element by attribute name. At the moment this does a linear search,
456   it should be re-coded to use a binary search once all places that modify
457   records guarantee sorted order
458
459   return the index of the first matching element if found, otherwise -1
460 */
461 static int find_element(const struct ldb_message *msg, const char *name)
462 {
463         unsigned int i;
464         for (i=0;i<msg->num_elements;i++) {
465                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
466                         return i;
467                 }
468         }
469         return -1;
470 }
471
472
473 /*
474   add an element to an existing record. Assumes a elements array that we
475   can call re-alloc on, and assumed that we can re-use the data pointers from
476   the passed in additional values. Use with care!
477
478   returns 0 on success, -1 on failure (and sets errno)
479 */
480 static int msg_add_element(struct ldb_context *ldb,
481                            struct ldb_message *msg,
482                            struct ldb_message_element *el)
483 {
484         struct ldb_message_element *e2;
485         unsigned int i;
486
487         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
488                               msg->num_elements+1);
489         if (!e2) {
490                 errno = ENOMEM;
491                 return -1;
492         }
493
494         msg->elements = e2;
495
496         e2 = &msg->elements[msg->num_elements];
497
498         e2->name = el->name;
499         e2->flags = el->flags;
500         e2->values = NULL;
501         if (el->num_values != 0) {
502                 e2->values = talloc_array(msg->elements,
503                                           struct ldb_val, el->num_values);
504                 if (!e2->values) {
505                         errno = ENOMEM;
506                         return -1;
507                 }
508         }
509         for (i=0;i<el->num_values;i++) {
510                 e2->values[i] = el->values[i];
511         }
512         e2->num_values = el->num_values;
513
514         msg->num_elements++;
515
516         return 0;
517 }
518
519 /*
520   delete all elements having a specified attribute name
521 */
522 static int msg_delete_attribute(struct ldb_module *module,
523                                 struct ldb_message *msg, const char *name)
524 {
525         unsigned int i, j;
526
527         for (i=0;i<msg->num_elements;i++) {
528                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
529                         for (j=0;j<msg->elements[i].num_values;j++) {
530                                 ltdb_index_del_value(module, msg, msg->dn,
531                                                      &msg->elements[i], j);
532                         }
533                         talloc_free(msg->elements[i].values);
534                         if (msg->num_elements > (i+1)) {
535                                 memmove(&msg->elements[i],
536                                         &msg->elements[i+1],
537                                         sizeof(struct ldb_message_element)*
538                                         (msg->num_elements - (i+1)));
539                         }
540                         msg->num_elements--;
541                         i--;
542                         msg->elements = talloc_realloc(msg, msg->elements,
543                                                         struct ldb_message_element,
544                                                         msg->num_elements);
545                 }
546         }
547
548         return 0;
549 }
550
551 /*
552   delete all elements matching an attribute name/value
553
554   return 0 on success, -1 on failure
555 */
556 static int msg_delete_element(struct ldb_module *module,
557                               struct ldb_message *msg,
558                               const char *name,
559                               const struct ldb_val *val)
560 {
561         struct ldb_context *ldb = ldb_module_get_ctx(module);
562         unsigned int i;
563         int found;
564         struct ldb_message_element *el;
565         const struct ldb_schema_attribute *a;
566
567         found = find_element(msg, name);
568         if (found == -1) {
569                 return -1;
570         }
571
572         el = &msg->elements[found];
573
574         a = ldb_schema_attribute_by_name(ldb, el->name);
575
576         for (i=0;i<el->num_values;i++) {
577                 if (a->syntax->comparison_fn(ldb, msg,
578                                                 &el->values[i], val) == 0) {
579                         if (i<el->num_values-1) {
580                                 memmove(&el->values[i], &el->values[i+1],
581                                         sizeof(el->values[i])*
582                                                 (el->num_values-(i+1)));
583                         }
584                         el->num_values--;
585                         if (el->num_values == 0) {
586                                 return msg_delete_attribute(module, 
587                                                             msg, name);
588                         }
589                         return 0;
590                 }
591         }
592
593         return -1;
594 }
595
596
597 /*
598   modify a record - internal interface
599
600   yuck - this is O(n^2). Luckily n is usually small so we probably
601   get away with it, but if we ever have really large attribute lists
602   then we'll need to look at this again
603 */
604 int ltdb_modify_internal(struct ldb_module *module,
605                          TALLOC_CTX *mem_ctx, 
606                          const struct ldb_message *msg)
607 {
608         struct ldb_context *ldb = ldb_module_get_ctx(module);
609         void *data = ldb_module_get_private(module);
610         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
611         TDB_DATA tdb_key, tdb_data;
612         struct ldb_message *msg2;
613         unsigned i, j;
614         int ret, idx;
615         TALLOC_CTX *tmp_ctx = talloc_new(mem_ctx);
616         if (!tmp_ctx) {
617                 return LDB_ERR_OPERATIONS_ERROR;
618         }
619         tdb_key = ltdb_key(tmp_ctx, msg->dn);
620         if (!tdb_key.dptr) {
621                 talloc_free(tmp_ctx);
622                 return LDB_ERR_OTHER;
623         }
624
625         tdb_data = tdb_fetch(ltdb->tdb, tdb_key);
626         talloc_free(tdb_key.dptr);
627
628         if (!tdb_data.dptr) {
629                 return ltdb_err_map(tdb_error(ltdb->tdb));
630         }
631
632         msg2 = talloc(tmp_ctx, struct ldb_message);
633         if (msg2 == NULL) {
634                 ldb_oom(ldb);
635                 ret = LDB_ERR_OPERATIONS_ERROR;
636                 goto failed;
637         }
638
639         ret = ltdb_unpack_data(module, &tdb_data, msg2);
640         if (ret == -1) {
641         }
642
643         if (!msg2->dn) {
644                 msg2->dn = msg->dn;
645         }
646
647         for (i=0;i<msg->num_elements;i++) {
648                 struct ldb_message_element *el = &msg->elements[i];
649                 struct ldb_message_element *el2;
650                 struct ldb_val *vals;
651                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
652                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
653
654                 case LDB_FLAG_MOD_ADD:
655                         
656                         /* add this element to the message. fail if it
657                            already exists */
658                         idx = find_element(msg2, el->name);
659
660                         if (el->num_values == 0) {
661                                 ldb_asprintf_errstring(ldb, "attribute %s on %s speicified, but with 0 values (illigal)", 
662                                                   el->name, ldb_dn_get_linearized(msg->dn));
663                                 return LDB_ERR_CONSTRAINT_VIOLATION;
664                         }
665                         if (idx == -1) {
666                                 if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
667                                         if (el->num_values > 1) {
668                                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s speicified more than once", 
669                                                                el->name, ldb_dn_get_linearized(msg->dn));
670                                                 return LDB_ERR_CONSTRAINT_VIOLATION;
671                                         }
672                                 }
673                                 if (msg_add_element(ldb, msg2, el) != 0) {
674                                         ret = LDB_ERR_OTHER;
675                                         goto failed;
676                                 }
677                                 continue;
678                         }
679
680                         /* If this is an add, then if it already
681                          * exists in the object, then we violoate the
682                          * single-value rule */
683                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
684                                 return LDB_ERR_CONSTRAINT_VIOLATION;
685                         }
686
687                         el2 = &msg2->elements[idx];
688
689                         /* An attribute with this name already exists,
690                          * add all values if they don't already exist
691                          * (check both the other elements to be added,
692                          * and those already in the db). */
693
694                         for (j=0;j<el->num_values;j++) {
695                                 if (ldb_msg_find_val(el2, &el->values[j])) {
696                                         ldb_asprintf_errstring(ldb, "%s: value #%d already exists", el->name, j);
697                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
698                                         goto failed;
699                                 }
700                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
701                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
702                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
703                                         goto failed;
704                                 }
705                         }
706
707                         vals = talloc_realloc(msg2->elements, el2->values, struct ldb_val,
708                                                 el2->num_values + el->num_values);
709
710                         if (vals == NULL) {
711                                 ldb_oom(ldb);
712                                 ret = LDB_ERR_OPERATIONS_ERROR;
713                                 goto failed;
714                         }
715
716                         for (j=0;j<el->num_values;j++) {
717                                 vals[el2->num_values + j] =
718                                         ldb_val_dup(vals, &el->values[j]);
719                         }
720
721                         el2->values = vals;
722                         el2->num_values += el->num_values;
723
724                         break;
725
726                 case LDB_FLAG_MOD_REPLACE:
727                         if (a && a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
728                                 if (el->num_values > 1) {
729                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s speicified more than once", 
730                                                                el->name, ldb_dn_get_linearized(msg->dn));
731                                         return LDB_ERR_CONSTRAINT_VIOLATION;
732                                 }
733                         }
734                         /* replace all elements of this attribute name with the elements
735                            listed. The attribute not existing is not an error */
736                         msg_delete_attribute(module, msg2, el->name);
737
738                         for (j=0;j<el->num_values;j++) {
739                                 if (ldb_msg_find_val(el, &el->values[j]) != &el->values[j]) {
740                                         ldb_asprintf_errstring(ldb, "%s: value #%d provided more than once", el->name, j);
741                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
742                                         goto failed;
743                                 }
744                         }
745
746                         /* add the replacement element, if not empty */
747                         if (el->num_values != 0 &&
748                             msg_add_element(ldb, msg2, el) != 0) {
749                                 ret = LDB_ERR_OTHER;
750                                 goto failed;
751                         }
752                         break;
753
754                 case LDB_FLAG_MOD_DELETE:
755
756                         /* we could be being asked to delete all
757                            values or just some values */
758                         if (msg->elements[i].num_values == 0) {
759                                 if (msg_delete_attribute(module, msg2, 
760                                                          msg->elements[i].name) != 0) {
761                                         const char *dn = ldb_dn_get_linearized(msg->dn);
762                                         ldb_asprintf_errstring(ldb, "No such attribute: %s for delete on %s", msg->elements[i].name, dn);
763                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
764                                         goto failed;
765                                 }
766                                 break;
767                         }
768                         for (j=0;j<msg->elements[i].num_values;j++) {
769                                 if (msg_delete_element(module,
770                                                        msg2, 
771                                                        msg->elements[i].name,
772                                                        &msg->elements[i].values[j]) != 0) {
773                                         const char *dn = ldb_dn_get_linearized(msg->dn);
774                                         ldb_asprintf_errstring(ldb, "No matching attribute value (%*.*s) when deleting attribute: %s on %s", 
775                                                                (int)msg->elements[i].values[j].length, (int)msg->elements[i].values[j].length, 
776                                                                (const char *)msg->elements[i].values[j].data,
777                                                                msg->elements[i].name, dn);
778                                         ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
779                                         goto failed;
780                                 }
781                                 ret = ltdb_index_del_value(module, tmp_ctx, msg->dn, &msg->elements[i], j);
782                                 if (ret != LDB_SUCCESS) {
783                                         goto failed;
784                                 }
785                         }
786                         break;
787                 default:
788                         ldb_asprintf_errstring(ldb,
789                                 "Invalid ldb_modify flags on %s: 0x%x",
790                                 msg->elements[i].name,
791                                 msg->elements[i].flags & LDB_FLAG_MOD_MASK);
792                         ret = LDB_ERR_PROTOCOL_ERROR;
793                         goto failed;
794                 }
795         }
796
797         /* we've made all the mods
798          * save the modified record back into the database */
799         ret = ltdb_store(module, mem_ctx, msg2, TDB_MODIFY);
800         if (ret != LDB_SUCCESS) {
801                 goto failed;
802         }
803
804         ret = ltdb_modified(module, msg->dn);
805         if (ret != LDB_SUCCESS) {
806                 goto failed;
807         }
808
809         free(tdb_data.dptr);
810         return ret;
811
812 failed:
813         talloc_free(tmp_ctx);
814         free(tdb_data.dptr);
815         return ret;
816 }
817
818 /*
819   modify a record
820 */
821 static int ltdb_modify(struct ltdb_context *ctx)
822 {
823         struct ldb_module *module = ctx->module;
824         struct ldb_request *req = ctx->req;
825         int tret;
826
827         ldb_request_set_state(req, LDB_ASYNC_PENDING);
828
829         tret = ltdb_check_special_dn(module, req->op.mod.message);
830         if (tret != LDB_SUCCESS) {
831                 return tret;
832         }
833
834         if (ltdb_cache_load(module) != 0) {
835                 return LDB_ERR_OPERATIONS_ERROR;
836         }
837
838         tret = ltdb_modify_internal(module, req, req->op.mod.message);
839         if (tret != LDB_SUCCESS) {
840                 return tret;
841         }
842
843         return LDB_SUCCESS;
844 }
845
846 /*
847   rename a record
848 */
849 static int ltdb_rename(struct ltdb_context *ctx)
850 {
851         struct ldb_module *module = ctx->module;
852         struct ldb_request *req = ctx->req;
853         struct ldb_message *msg;
854         int tret;
855
856         ldb_request_set_state(req, LDB_ASYNC_PENDING);
857
858         if (ltdb_cache_load(ctx->module) != 0) {
859                 return LDB_ERR_OPERATIONS_ERROR;
860         }
861
862         msg = talloc(ctx, struct ldb_message);
863         if (msg == NULL) {
864                 return LDB_ERR_OPERATIONS_ERROR;
865         }
866
867         /* in case any attribute of the message was indexed, we need
868            to fetch the old record */
869         tret = ltdb_search_dn1(module, req->op.rename.olddn, msg);
870         if (tret != LDB_SUCCESS) {
871                 talloc_free(msg);
872                 /* not finding the old record is an error */
873                 return tret;
874         }
875
876         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
877         if (!msg->dn) {
878                 talloc_free(msg);
879                 return LDB_ERR_OPERATIONS_ERROR;
880         }
881
882         /* Always delete first then add, to avoid conflicts with
883          * unique indexes. We rely on the transaction to make this
884          * atomic
885          */
886         tret = ltdb_delete_internal(module, msg, req->op.rename.olddn);
887         if (tret != LDB_SUCCESS) {
888                 talloc_free(msg);
889                 return tret;
890         }
891
892         tret = ltdb_add_internal(module, msg, msg);
893         talloc_free(msg);
894         if (tret != LDB_SUCCESS) {
895                 return tret;
896         }
897
898         return LDB_SUCCESS;
899 }
900
901 static int ltdb_start_trans(struct ldb_module *module)
902 {
903         void *data = ldb_module_get_private(module);
904         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
905
906         if (tdb_transaction_start(ltdb->tdb) != 0) {
907                 return ltdb_err_map(tdb_error(ltdb->tdb));
908         }
909
910         ltdb->in_transaction++;
911
912         ltdb_index_transaction_start(module);
913
914         return LDB_SUCCESS;
915 }
916
917 static int ltdb_prepare_commit(struct ldb_module *module)
918 {
919         void *data = ldb_module_get_private(module);
920         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
921
922         if (ltdb->in_transaction != 1) {
923                 return LDB_SUCCESS;
924         }
925
926         if (ltdb_index_transaction_prepare_commit(module) != 0) {
927                 tdb_transaction_cancel(ltdb->tdb);
928                 ltdb->in_transaction--;
929                 return ltdb_err_map(tdb_error(ltdb->tdb));
930         }
931
932         if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
933                 ltdb->in_transaction--;
934                 return ltdb_err_map(tdb_error(ltdb->tdb));
935         }
936
937         ltdb->prepared_commit = true;
938
939         return LDB_SUCCESS;
940 }
941
942 static int ltdb_end_trans(struct ldb_module *module)
943 {
944         void *data = ldb_module_get_private(module);
945         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
946
947         if (!ltdb->prepared_commit) {
948                 int ret = ltdb_prepare_commit(module);
949                 if (ret != LDB_SUCCESS) {
950                         return ret;
951                 }
952         }
953
954         ltdb->in_transaction--;
955         ltdb->prepared_commit = false;
956
957         if (tdb_transaction_commit(ltdb->tdb) != 0) {
958                 return ltdb_err_map(tdb_error(ltdb->tdb));
959         }
960
961         return LDB_SUCCESS;
962 }
963
964 static int ltdb_del_trans(struct ldb_module *module)
965 {
966         void *data = ldb_module_get_private(module);
967         struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
968
969         ltdb->in_transaction--;
970
971         if (ltdb_index_transaction_cancel(module) != 0) {
972                 tdb_transaction_cancel(ltdb->tdb);
973                 return ltdb_err_map(tdb_error(ltdb->tdb));
974         }
975
976         if (tdb_transaction_cancel(ltdb->tdb) != 0) {
977                 return ltdb_err_map(tdb_error(ltdb->tdb));
978         }
979
980         return LDB_SUCCESS;
981 }
982
983 /*
984   return sequenceNumber from @BASEINFO
985 */
986 static int ltdb_sequence_number(struct ltdb_context *ctx,
987                                 struct ldb_extended **ext)
988 {
989         struct ldb_context *ldb;
990         struct ldb_module *module = ctx->module;
991         struct ldb_request *req = ctx->req;
992         TALLOC_CTX *tmp_ctx;
993         struct ldb_seqnum_request *seq;
994         struct ldb_seqnum_result *res;
995         struct ldb_message *msg = NULL;
996         struct ldb_dn *dn;
997         const char *date;
998         int ret;
999
1000         ldb = ldb_module_get_ctx(module);
1001
1002         seq = talloc_get_type(req->op.extended.data,
1003                                 struct ldb_seqnum_request);
1004         if (seq == NULL) {
1005                 return LDB_ERR_OPERATIONS_ERROR;
1006         }
1007
1008         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1009
1010         if (ltdb_lock_read(module) != 0) {
1011                 return LDB_ERR_OPERATIONS_ERROR;
1012         }
1013
1014         res = talloc_zero(req, struct ldb_seqnum_result);
1015         if (res == NULL) {
1016                 ret = LDB_ERR_OPERATIONS_ERROR;
1017                 goto done;
1018         }
1019         tmp_ctx = talloc_new(req);
1020         if (tmp_ctx == NULL) {
1021                 ret = LDB_ERR_OPERATIONS_ERROR;
1022                 goto done;
1023         }
1024
1025         dn = ldb_dn_new(tmp_ctx, ldb, LTDB_BASEINFO);
1026
1027         msg = talloc(tmp_ctx, struct ldb_message);
1028         if (msg == NULL) {
1029                 ret = LDB_ERR_OPERATIONS_ERROR;
1030                 goto done;
1031         }
1032
1033         ret = ltdb_search_dn1(module, dn, msg);
1034         if (ret != LDB_SUCCESS) {
1035                 goto done;
1036         }
1037
1038         switch (seq->type) {
1039         case LDB_SEQ_HIGHEST_SEQ:
1040                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1041                 break;
1042         case LDB_SEQ_NEXT:
1043                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LTDB_SEQUENCE_NUMBER, 0);
1044                 res->seq_num++;
1045                 break;
1046         case LDB_SEQ_HIGHEST_TIMESTAMP:
1047                 date = ldb_msg_find_attr_as_string(msg, LTDB_MOD_TIMESTAMP, NULL);
1048                 if (date) {
1049                         res->seq_num = ldb_string_to_time(date);
1050                 } else {
1051                         res->seq_num = 0;
1052                         /* zero is as good as anything when we don't know */
1053                 }
1054                 break;
1055         }
1056
1057         *ext = talloc_zero(req, struct ldb_extended);
1058         if (*ext == NULL) {
1059                 ret = LDB_ERR_OPERATIONS_ERROR;
1060                 goto done;
1061         }
1062         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1063         (*ext)->data = talloc_steal(*ext, res);
1064
1065         ret = LDB_SUCCESS;
1066
1067 done:
1068         talloc_free(tmp_ctx);
1069         ltdb_unlock_read(module);
1070         return ret;
1071 }
1072
1073 static void ltdb_request_done(struct ltdb_context *ctx, int error)
1074 {
1075         struct ldb_context *ldb;
1076         struct ldb_request *req;
1077         struct ldb_reply *ares;
1078
1079         ldb = ldb_module_get_ctx(ctx->module);
1080         req = ctx->req;
1081
1082         /* if we already returned an error just return */
1083         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1084                 return;
1085         }
1086
1087         ares = talloc_zero(req, struct ldb_reply);
1088         if (!ares) {
1089                 ldb_oom(ldb);
1090                 req->callback(req, NULL);
1091                 return;
1092         }
1093         ares->type = LDB_REPLY_DONE;
1094         ares->error = error;
1095
1096         req->callback(req, ares);
1097 }
1098
1099 static void ltdb_timeout(struct tevent_context *ev,
1100                           struct tevent_timer *te,
1101                           struct timeval t,
1102                           void *private_data)
1103 {
1104         struct ltdb_context *ctx;
1105         ctx = talloc_get_type(private_data, struct ltdb_context);
1106
1107         if (!ctx->request_terminated) {
1108                 /* request is done now */
1109                 ltdb_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1110         }
1111
1112         if (!ctx->request_terminated) {
1113                 /* neutralize the spy */
1114                 ctx->spy->ctx = NULL;
1115         }
1116         talloc_free(ctx);
1117 }
1118
1119 static void ltdb_request_extended_done(struct ltdb_context *ctx,
1120                                         struct ldb_extended *ext,
1121                                         int error)
1122 {
1123         struct ldb_context *ldb;
1124         struct ldb_request *req;
1125         struct ldb_reply *ares;
1126
1127         ldb = ldb_module_get_ctx(ctx->module);
1128         req = ctx->req;
1129
1130         /* if we already returned an error just return */
1131         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1132                 return;
1133         }
1134
1135         ares = talloc_zero(req, struct ldb_reply);
1136         if (!ares) {
1137                 ldb_oom(ldb);
1138                 req->callback(req, NULL);
1139                 return;
1140         }
1141         ares->type = LDB_REPLY_DONE;
1142         ares->response = ext;
1143         ares->error = error;
1144
1145         req->callback(req, ares);
1146 }
1147
1148 static void ltdb_handle_extended(struct ltdb_context *ctx)
1149 {
1150         struct ldb_extended *ext = NULL;
1151         int ret;
1152
1153         if (strcmp(ctx->req->op.extended.oid,
1154                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1155                 /* get sequence number */
1156                 ret = ltdb_sequence_number(ctx, &ext);
1157         } else {
1158                 /* not recognized */
1159                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1160         }
1161
1162         ltdb_request_extended_done(ctx, ext, ret);
1163 }
1164
1165 static void ltdb_callback(struct tevent_context *ev,
1166                           struct tevent_timer *te,
1167                           struct timeval t,
1168                           void *private_data)
1169 {
1170         struct ltdb_context *ctx;
1171         int ret;
1172
1173         ctx = talloc_get_type(private_data, struct ltdb_context);
1174
1175         if (ctx->request_terminated) {
1176                 goto done;
1177         }
1178
1179         switch (ctx->req->operation) {
1180         case LDB_SEARCH:
1181                 ret = ltdb_search(ctx);
1182                 break;
1183         case LDB_ADD:
1184                 ret = ltdb_add(ctx);
1185                 break;
1186         case LDB_MODIFY:
1187                 ret = ltdb_modify(ctx);
1188                 break;
1189         case LDB_DELETE:
1190                 ret = ltdb_delete(ctx);
1191                 break;
1192         case LDB_RENAME:
1193                 ret = ltdb_rename(ctx);
1194                 break;
1195         case LDB_EXTENDED:
1196                 ltdb_handle_extended(ctx);
1197                 goto done;
1198         default:
1199                 /* no other op supported */
1200                 ret = LDB_ERR_UNWILLING_TO_PERFORM;
1201         }
1202
1203         if (!ctx->request_terminated) {
1204                 /* request is done now */
1205                 ltdb_request_done(ctx, ret);
1206         }
1207
1208 done:
1209         if (!ctx->request_terminated) {
1210                 /* neutralize the spy */
1211                 ctx->spy->ctx = NULL;
1212         }
1213         talloc_free(ctx);
1214 }
1215
1216 static int ltdb_request_destructor(void *ptr)
1217 {
1218         struct ltdb_req_spy *spy = talloc_get_type(ptr, struct ltdb_req_spy);
1219
1220         if (spy->ctx != NULL) {
1221                 spy->ctx->request_terminated = true;
1222         }
1223
1224         return 0;
1225 }
1226
1227 static int ltdb_handle_request(struct ldb_module *module,
1228                                struct ldb_request *req)
1229 {
1230         struct ldb_context *ldb;
1231         struct tevent_context *ev;
1232         struct ltdb_context *ac;
1233         struct tevent_timer *te;
1234         struct timeval tv;
1235
1236         if (check_critical_controls(req->controls)) {
1237                 return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1238         }
1239
1240         ldb = ldb_module_get_ctx(module);
1241
1242         if (req->starttime == 0 || req->timeout == 0) {
1243                 ldb_set_errstring(ldb, "Invalid timeout settings");
1244                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1245         }
1246
1247         ev = ldb_get_event_context(ldb);
1248
1249         ac = talloc_zero(ldb, struct ltdb_context);
1250         if (ac == NULL) {
1251                 ldb_set_errstring(ldb, "Out of Memory");
1252                 return LDB_ERR_OPERATIONS_ERROR;
1253         }
1254
1255         ac->module = module;
1256         ac->req = req;
1257
1258         tv.tv_sec = 0;
1259         tv.tv_usec = 0;
1260         te = tevent_add_timer(ev, ac, tv, ltdb_callback, ac);
1261         if (NULL == te) {
1262                 talloc_free(ac);
1263                 return LDB_ERR_OPERATIONS_ERROR;
1264         }
1265
1266         tv.tv_sec = req->starttime + req->timeout;
1267         ac->timeout_event = tevent_add_timer(ev, ac, tv, ltdb_timeout, ac);
1268         if (NULL == ac->timeout_event) {
1269                 talloc_free(ac);
1270                 return LDB_ERR_OPERATIONS_ERROR;
1271         }
1272
1273         /* set a spy so that we do not try to use the request context
1274          * if it is freed before ltdb_callback fires */
1275         ac->spy = talloc(req, struct ltdb_req_spy);
1276         if (NULL == ac->spy) {
1277                 talloc_free(ac);
1278                 return LDB_ERR_OPERATIONS_ERROR;
1279         }
1280         ac->spy->ctx = ac;
1281
1282         talloc_set_destructor((TALLOC_CTX *)ac->spy, ltdb_request_destructor);
1283
1284         return LDB_SUCCESS;
1285 }
1286
1287 static const struct ldb_module_ops ltdb_ops = {
1288         .name              = "tdb",
1289         .search            = ltdb_handle_request,
1290         .add               = ltdb_handle_request,
1291         .modify            = ltdb_handle_request,
1292         .del               = ltdb_handle_request,
1293         .rename            = ltdb_handle_request,
1294         .extended          = ltdb_handle_request,
1295         .start_transaction = ltdb_start_trans,
1296         .end_transaction   = ltdb_end_trans,
1297         .prepare_commit    = ltdb_prepare_commit,
1298         .del_transaction   = ltdb_del_trans,
1299 };
1300
1301 /*
1302   connect to the database
1303 */
1304 static int ltdb_connect(struct ldb_context *ldb, const char *url,
1305                         unsigned int flags, const char *options[],
1306                         struct ldb_module **_module)
1307 {
1308         struct ldb_module *module;
1309         const char *path;
1310         int tdb_flags, open_flags;
1311         struct ltdb_private *ltdb;
1312
1313         /* parse the url */
1314         if (strchr(url, ':')) {
1315                 if (strncmp(url, "tdb://", 6) != 0) {
1316                         ldb_debug(ldb, LDB_DEBUG_ERROR,
1317                                   "Invalid tdb URL '%s'", url);
1318                         return -1;
1319                 }
1320                 path = url+6;
1321         } else {
1322                 path = url;
1323         }
1324
1325         tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
1326
1327         /* check for the 'nosync' option */
1328         if (flags & LDB_FLG_NOSYNC) {
1329                 tdb_flags |= TDB_NOSYNC;
1330         }
1331
1332         /* and nommap option */
1333         if (flags & LDB_FLG_NOMMAP) {
1334                 tdb_flags |= TDB_NOMMAP;
1335         }
1336
1337         if (flags & LDB_FLG_RDONLY) {
1338                 open_flags = O_RDONLY;
1339         } else {
1340                 open_flags = O_CREAT | O_RDWR;
1341         }
1342
1343         ltdb = talloc_zero(ldb, struct ltdb_private);
1344         if (!ltdb) {
1345                 ldb_oom(ldb);
1346                 return -1;
1347         }
1348
1349         /* note that we use quite a large default hash size */
1350         ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
1351                                    tdb_flags, open_flags,
1352                                    ldb_get_create_perms(ldb), ldb);
1353         if (!ltdb->tdb) {
1354                 ldb_debug(ldb, LDB_DEBUG_ERROR,
1355                           "Unable to open tdb '%s'", path);
1356                 talloc_free(ltdb);
1357                 return -1;
1358         }
1359
1360         ltdb->sequence_number = 0;
1361
1362         module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
1363         if (!module) {
1364                 talloc_free(ltdb);
1365                 return -1;
1366         }
1367         ldb_module_set_private(module, ltdb);
1368
1369         if (ltdb_cache_load(module) != 0) {
1370                 talloc_free(module);
1371                 talloc_free(ltdb);
1372                 return -1;
1373         }
1374
1375         *_module = module;
1376         return 0;
1377 }
1378
1379 const struct ldb_backend_ops ldb_tdb_backend_ops = {
1380         .name = "tdb",
1381         .connect_fn = ltdb_connect
1382 };