lib ldb: rename LTDB_* constants to LDB_KV_*
[bbaumbach/samba-autobuild/.git] / lib / ldb / ldb_key_value / ldb_kv.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_kv
29  *
30  *  Component: ldb key value backend
31  *
32  *  Description: core functions for ldb key value backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_kv.h"
53 #include "ldb_private.h"
54
55 /*
56   prevent memory errors on callbacks
57 */
58 struct ldb_kv_req_spy {
59         struct ldb_kv_context *ctx;
60 };
61
62 /*
63  * Determine if this key could hold a record.  We allow the new GUID
64  * index, the old DN index and a possible future ID=
65  */
66 bool ldb_kv_key_is_record(TDB_DATA key)
67 {
68         if (key.dsize < 4) {
69                 return false;
70         }
71
72         if (memcmp(key.dptr, "DN=", 3) == 0) {
73                 return true;
74         }
75
76         if (memcmp(key.dptr, "ID=", 3) == 0) {
77                 return true;
78         }
79
80         if (key.dsize < sizeof(LDB_KV_GUID_KEY_PREFIX)) {
81                 return false;
82         }
83
84         if (memcmp(key.dptr, LDB_KV_GUID_KEY_PREFIX,
85                    sizeof(LDB_KV_GUID_KEY_PREFIX) - 1) == 0) {
86                 return true;
87         }
88
89         return false;
90 }
91
92 /*
93   form a TDB_DATA for a record key
94   caller frees
95
96   note that the key for a record can depend on whether the
97   dn refers to a case sensitive index record or not
98 */
99 TDB_DATA ldb_kv_key_dn(struct ldb_module *module,
100                        TALLOC_CTX *mem_ctx,
101                        struct ldb_dn *dn)
102 {
103         TDB_DATA key;
104         char *key_str = NULL;
105         const char *dn_folded = NULL;
106
107         /*
108           most DNs are case insensitive. The exception is index DNs for
109           case sensitive attributes
110
111           there are 3 cases dealt with in this code:
112
113           1) if the dn doesn't start with @ then uppercase the attribute
114              names and the attributes values of case insensitive attributes
115           2) if the dn starts with @ then leave it alone -
116              the indexing code handles the rest
117         */
118
119         dn_folded = ldb_dn_get_casefold(dn);
120         if (!dn_folded) {
121                 goto failed;
122         }
123
124         key_str = talloc_strdup(mem_ctx, "DN=");
125         if (!key_str) {
126                 goto failed;
127         }
128
129         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
130         if (!key_str) {
131                 goto failed;
132         }
133
134         key.dptr = (uint8_t *)key_str;
135         key.dsize = strlen(key_str) + 1;
136
137         return key;
138
139 failed:
140         errno = ENOMEM;
141         key.dptr = NULL;
142         key.dsize = 0;
143         return key;
144 }
145
146 /* The caller is to provide a correctly sized key */
147 int ldb_kv_guid_to_key(struct ldb_module *module,
148                        struct ldb_kv_private *ldb_kv,
149                        const struct ldb_val *GUID_val,
150                        TDB_DATA *key)
151 {
152         const char *GUID_prefix = LDB_KV_GUID_KEY_PREFIX;
153         const int GUID_prefix_len = sizeof(LDB_KV_GUID_KEY_PREFIX) - 1;
154
155         if (key->dsize != (GUID_val->length+GUID_prefix_len)) {
156                 return LDB_ERR_OPERATIONS_ERROR;
157         }
158
159         memcpy(key->dptr, GUID_prefix, GUID_prefix_len);
160         memcpy(&key->dptr[GUID_prefix_len],
161                GUID_val->data, GUID_val->length);
162         return LDB_SUCCESS;
163 }
164
165 /*
166  * The caller is to provide a correctly sized key, used only in
167  * the GUID index mode
168  */
169 int ldb_kv_idx_to_key(struct ldb_module *module,
170                       struct ldb_kv_private *ldb_kv,
171                       TALLOC_CTX *mem_ctx,
172                       const struct ldb_val *idx_val,
173                       TDB_DATA *key)
174 {
175         struct ldb_context *ldb = ldb_module_get_ctx(module);
176         struct ldb_dn *dn;
177
178         if (ldb_kv->cache->GUID_index_attribute != NULL) {
179                 return ldb_kv_guid_to_key(module, ldb_kv, idx_val, key);
180         }
181
182         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
183         if (dn == NULL) {
184                 /*
185                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
186                  * to the caller, as this in an invalid index value
187                  */
188                 return LDB_ERR_OPERATIONS_ERROR;
189         }
190         /* form the key */
191         *key = ldb_kv_key_dn(module, mem_ctx, dn);
192         TALLOC_FREE(dn);
193         if (!key->dptr) {
194                 return ldb_module_oom(module);
195         }
196         return LDB_SUCCESS;
197 }
198
199 /*
200   form a TDB_DATA for a record key
201   caller frees mem_ctx, which may or may not have the key
202   as a child.
203
204   note that the key for a record can depend on whether a
205   GUID index is in use, or the DN is used as the key
206 */
207 TDB_DATA ldb_kv_key_msg(struct ldb_module *module,
208                         TALLOC_CTX *mem_ctx,
209                         const struct ldb_message *msg)
210 {
211         void *data = ldb_module_get_private(module);
212         struct ldb_kv_private *ldb_kv =
213             talloc_get_type(data, struct ldb_kv_private);
214         TDB_DATA key;
215         const struct ldb_val *guid_val;
216         int ret;
217
218         if (ldb_kv->cache->GUID_index_attribute == NULL) {
219                 return ldb_kv_key_dn(module, mem_ctx, msg->dn);
220         }
221
222         if (ldb_dn_is_special(msg->dn)) {
223                 return ldb_kv_key_dn(module, mem_ctx, msg->dn);
224         }
225
226         guid_val =
227             ldb_msg_find_ldb_val(msg, ldb_kv->cache->GUID_index_attribute);
228         if (guid_val == NULL) {
229                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
230                                        "Did not find GUID attribute %s "
231                                        "in %s, required for TDB record "
232                                        "key in " LDB_KV_IDXGUID " mode.",
233                                        ldb_kv->cache->GUID_index_attribute,
234                                        ldb_dn_get_linearized(msg->dn));
235                 errno = EINVAL;
236                 key.dptr = NULL;
237                 key.dsize = 0;
238                 return key;
239         }
240
241         /* In this case, allocate with talloc */
242         key.dptr = talloc_size(mem_ctx, LDB_KV_GUID_KEY_SIZE);
243         if (key.dptr == NULL) {
244                 errno = ENOMEM;
245                 key.dptr = NULL;
246                 key.dsize = 0;
247                 return key;
248         }
249         key.dsize = talloc_get_size(key.dptr);
250
251         ret = ldb_kv_guid_to_key(module, ldb_kv, guid_val, &key);
252
253         if (ret != LDB_SUCCESS) {
254                 errno = EINVAL;
255                 key.dptr = NULL;
256                 key.dsize = 0;
257                 return key;
258         }
259         return key;
260 }
261
262 /*
263   check special dn's have valid attributes
264   currently only @ATTRIBUTES is checked
265 */
266 static int ldb_kv_check_special_dn(struct ldb_module *module,
267                                    const struct ldb_message *msg)
268 {
269         struct ldb_context *ldb = ldb_module_get_ctx(module);
270         unsigned int i, j;
271
272         if (! ldb_dn_is_special(msg->dn) ||
273             ! ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
274                 return LDB_SUCCESS;
275         }
276
277         /* we have @ATTRIBUTES, let's check attributes are fine */
278         /* should we check that we deny multivalued attributes ? */
279         for (i = 0; i < msg->num_elements; i++) {
280                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
281
282                 for (j = 0; j < msg->elements[i].num_values; j++) {
283                         if (ldb_kv_check_at_attributes_values(
284                                 &msg->elements[i].values[j]) != 0) {
285                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
286                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
287                         }
288                 }
289         }
290
291         return LDB_SUCCESS;
292 }
293
294
295 /*
296   we've made a modification to a dn - possibly reindex and
297   update sequence number
298 */
299 static int ldb_kv_modified(struct ldb_module *module, struct ldb_dn *dn)
300 {
301         int ret = LDB_SUCCESS;
302         struct ldb_kv_private *ldb_kv = talloc_get_type(
303             ldb_module_get_private(module), struct ldb_kv_private);
304
305         /* only allow modifies inside a transaction, otherwise the
306          * ldb is unsafe */
307         if (ldb_kv->kv_ops->transaction_active(ldb_kv) == false) {
308                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
309                 return LDB_ERR_OPERATIONS_ERROR;
310         }
311
312         if (ldb_dn_is_special(dn) &&
313             (ldb_dn_check_special(dn, LDB_KV_INDEXLIST) ||
314              ldb_dn_check_special(dn, LDB_KV_ATTRIBUTES)) )
315         {
316                 if (ldb_kv->warn_reindex) {
317                         ldb_debug(ldb_module_get_ctx(module),
318                                   LDB_DEBUG_ERROR,
319                                   "Reindexing %s due to modification on %s",
320                                   ldb_kv->kv_ops->name(ldb_kv),
321                                   ldb_dn_get_linearized(dn));
322                 }
323                 ret = ldb_kv_reindex(module);
324         }
325
326         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
327         if (ret == LDB_SUCCESS &&
328             !(ldb_dn_is_special(dn) &&
329               ldb_dn_check_special(dn, LDB_KV_BASEINFO)) ) {
330                 ret = ldb_kv_increase_sequence_number(module);
331         }
332
333         /* If the modify was to @OPTIONS, reload the cache */
334         if (ret == LDB_SUCCESS &&
335             ldb_dn_is_special(dn) &&
336             (ldb_dn_check_special(dn, LDB_KV_OPTIONS)) ) {
337                 ret = ldb_kv_cache_reload(module);
338         }
339
340         if (ret != LDB_SUCCESS) {
341                 ldb_kv->reindex_failed = true;
342         }
343
344         return ret;
345 }
346 /*
347   store a record into the db
348 */
349 int ldb_kv_store(struct ldb_module *module,
350                  const struct ldb_message *msg,
351                  int flgs)
352 {
353         void *data = ldb_module_get_private(module);
354         struct ldb_kv_private *ldb_kv =
355             talloc_get_type(data, struct ldb_kv_private);
356         TDB_DATA tdb_key;
357         struct ldb_val ldb_key;
358         struct ldb_val ldb_data;
359         int ret = LDB_SUCCESS;
360         TALLOC_CTX *key_ctx = talloc_new(module);
361
362         if (key_ctx == NULL) {
363                 return ldb_module_oom(module);
364         }
365
366         if (ldb_kv->read_only) {
367                 talloc_free(key_ctx);
368                 return LDB_ERR_UNWILLING_TO_PERFORM;
369         }
370
371         tdb_key = ldb_kv_key_msg(module, key_ctx, msg);
372         if (tdb_key.dptr == NULL) {
373                 TALLOC_FREE(key_ctx);
374                 return LDB_ERR_OTHER;
375         }
376
377         ret = ldb_pack_data(ldb_module_get_ctx(module),
378                             msg, &ldb_data);
379         if (ret == -1) {
380                 TALLOC_FREE(key_ctx);
381                 return LDB_ERR_OTHER;
382         }
383
384         ldb_key.data = tdb_key.dptr;
385         ldb_key.length = tdb_key.dsize;
386
387         ret = ldb_kv->kv_ops->store(ldb_kv, ldb_key, ldb_data, flgs);
388         if (ret != 0) {
389                 bool is_special = ldb_dn_is_special(msg->dn);
390                 ret = ldb_kv->kv_ops->error(ldb_kv);
391
392                 /*
393                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
394                  * the GUID, so re-map
395                  */
396                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS && !is_special &&
397                     ldb_kv->cache->GUID_index_attribute != NULL) {
398                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
399                 }
400                 goto done;
401         }
402
403 done:
404         TALLOC_FREE(key_ctx);
405         talloc_free(ldb_data.data);
406
407         return ret;
408 }
409
410
411 /*
412   check if a attribute is a single valued, for a given element
413  */
414 static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
415                                  struct ldb_message_element *el)
416 {
417         if (!a) return false;
418         if (el != NULL) {
419                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
420                         /* override from a ldb module, for example
421                            used for the description field, which is
422                            marked multi-valued in the schema but which
423                            should not actually accept multiple
424                            values */
425                         return true;
426                 }
427                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
428                         /* override from a ldb module, for example used for
429                            deleted linked attribute entries */
430                         return false;
431                 }
432         }
433         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
434                 return true;
435         }
436         return false;
437 }
438
439 static int ldb_kv_add_internal(struct ldb_module *module,
440                                struct ldb_kv_private *ldb_kv,
441                                const struct ldb_message *msg,
442                                bool check_single_value)
443 {
444         struct ldb_context *ldb = ldb_module_get_ctx(module);
445         int ret = LDB_SUCCESS;
446         unsigned int i;
447
448         for (i=0;i<msg->num_elements;i++) {
449                 struct ldb_message_element *el = &msg->elements[i];
450                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
451
452                 if (el->num_values == 0) {
453                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
454                                                el->name, ldb_dn_get_linearized(msg->dn));
455                         return LDB_ERR_CONSTRAINT_VIOLATION;
456                 }
457                 if (check_single_value && el->num_values > 1 &&
458                     ldb_kv_single_valued(a, el)) {
459                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
460                                                el->name, ldb_dn_get_linearized(msg->dn));
461                         return LDB_ERR_CONSTRAINT_VIOLATION;
462                 }
463
464                 /* Do not check "@ATTRIBUTES" for duplicated values */
465                 if (ldb_dn_is_special(msg->dn) &&
466                     ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
467                         continue;
468                 }
469
470                 if (check_single_value &&
471                     !(el->flags &
472                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
473                         struct ldb_val *duplicate = NULL;
474
475                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
476                                                          el, &duplicate, 0);
477                         if (ret != LDB_SUCCESS) {
478                                 return ret;
479                         }
480                         if (duplicate != NULL) {
481                                 ldb_asprintf_errstring(
482                                         ldb,
483                                         "attribute '%s': value '%.*s' on '%s' "
484                                         "provided more than once in ADD object",
485                                         el->name,
486                                         (int)duplicate->length,
487                                         duplicate->data,
488                                         ldb_dn_get_linearized(msg->dn));
489                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
490                         }
491                 }
492         }
493
494         ret = ldb_kv_store(module, msg, TDB_INSERT);
495         if (ret != LDB_SUCCESS) {
496                 /*
497                  * Try really hard to get the right error code for
498                  * a re-add situation, as this can matter!
499                  */
500                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
501                         int ret2;
502                         struct ldb_dn *dn2 = NULL;
503                         TALLOC_CTX *mem_ctx = talloc_new(module);
504                         if (mem_ctx == NULL) {
505                                 return ldb_module_operr(module);
506                         }
507                         ret2 =
508                             ldb_kv_search_base(module, mem_ctx, msg->dn, &dn2);
509                         TALLOC_FREE(mem_ctx);
510                         if (ret2 == LDB_SUCCESS) {
511                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
512                         }
513                 }
514                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
515                         ldb_asprintf_errstring(ldb,
516                                                "Entry %s already exists",
517                                                ldb_dn_get_linearized(msg->dn));
518                 }
519                 return ret;
520         }
521
522         ret = ldb_kv_index_add_new(module, ldb_kv, msg);
523         if (ret != LDB_SUCCESS) {
524                 /*
525                  * If we failed to index, delete the message again.
526                  *
527                  * This is particularly important for the GUID index
528                  * case, which will only fail for a duplicate DN
529                  * in the index add.
530                  *
531                  * Note that the caller may not cancel the transation
532                  * and this means the above add might really show up!
533                  */
534                 ldb_kv_delete_noindex(module, msg);
535                 return ret;
536         }
537
538         ret = ldb_kv_modified(module, msg->dn);
539
540         return ret;
541 }
542
543 /*
544   add a record to the database
545 */
546 static int ldb_kv_add(struct ldb_kv_context *ctx)
547 {
548         struct ldb_module *module = ctx->module;
549         struct ldb_request *req = ctx->req;
550         void *data = ldb_module_get_private(module);
551         struct ldb_kv_private *ldb_kv =
552             talloc_get_type(data, struct ldb_kv_private);
553         int ret = LDB_SUCCESS;
554
555         if (ldb_kv->max_key_length != 0 &&
556             ldb_kv->cache->GUID_index_attribute == NULL &&
557             !ldb_dn_is_special(req->op.add.message->dn)) {
558                 ldb_set_errstring(ldb_module_get_ctx(module),
559                                   "Must operate ldb_mdb in GUID "
560                                   "index mode, but " LDB_KV_IDXGUID " not set.");
561                 return LDB_ERR_UNWILLING_TO_PERFORM;
562         }
563
564         ret = ldb_kv_check_special_dn(module, req->op.add.message);
565         if (ret != LDB_SUCCESS) {
566                 return ret;
567         }
568
569         ldb_request_set_state(req, LDB_ASYNC_PENDING);
570
571         if (ldb_kv_cache_load(module) != 0) {
572                 return LDB_ERR_OPERATIONS_ERROR;
573         }
574
575         ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
576
577         return ret;
578 }
579
580 /*
581   delete a record from the database, not updating indexes (used for deleting
582   index records)
583 */
584 int ldb_kv_delete_noindex(struct ldb_module *module,
585                           const struct ldb_message *msg)
586 {
587         void *data = ldb_module_get_private(module);
588         struct ldb_kv_private *ldb_kv =
589             talloc_get_type(data, struct ldb_kv_private);
590         struct ldb_val ldb_key;
591         TDB_DATA tdb_key;
592         int ret;
593         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
594
595         if (tdb_key_ctx == NULL) {
596                 return ldb_module_oom(module);
597         }
598
599         if (ldb_kv->read_only) {
600                 talloc_free(tdb_key_ctx);
601                 return LDB_ERR_UNWILLING_TO_PERFORM;
602         }
603
604         tdb_key = ldb_kv_key_msg(module, tdb_key_ctx, msg);
605         if (!tdb_key.dptr) {
606                 TALLOC_FREE(tdb_key_ctx);
607                 return LDB_ERR_OTHER;
608         }
609
610         ldb_key.data = tdb_key.dptr;
611         ldb_key.length = tdb_key.dsize;
612
613         ret = ldb_kv->kv_ops->delete (ldb_kv, ldb_key);
614         TALLOC_FREE(tdb_key_ctx);
615
616         if (ret != 0) {
617                 ret = ldb_kv->kv_ops->error(ldb_kv);
618         }
619
620         return ret;
621 }
622
623 static int ldb_kv_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
624 {
625         struct ldb_message *msg;
626         int ret = LDB_SUCCESS;
627
628         msg = ldb_msg_new(module);
629         if (msg == NULL) {
630                 return LDB_ERR_OPERATIONS_ERROR;
631         }
632
633         /* in case any attribute of the message was indexed, we need
634            to fetch the old record */
635         ret = ldb_kv_search_dn1(
636             module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
637         if (ret != LDB_SUCCESS) {
638                 /* not finding the old record is an error */
639                 goto done;
640         }
641
642         ret = ldb_kv_delete_noindex(module, msg);
643         if (ret != LDB_SUCCESS) {
644                 goto done;
645         }
646
647         /* remove any indexed attributes */
648         ret = ldb_kv_index_delete(module, msg);
649         if (ret != LDB_SUCCESS) {
650                 goto done;
651         }
652
653         ret = ldb_kv_modified(module, dn);
654         if (ret != LDB_SUCCESS) {
655                 goto done;
656         }
657
658 done:
659         talloc_free(msg);
660         return ret;
661 }
662
663 /*
664   delete a record from the database
665 */
666 static int ldb_kv_delete(struct ldb_kv_context *ctx)
667 {
668         struct ldb_module *module = ctx->module;
669         struct ldb_request *req = ctx->req;
670         int ret = LDB_SUCCESS;
671
672         ldb_request_set_state(req, LDB_ASYNC_PENDING);
673
674         if (ldb_kv_cache_load(module) != 0) {
675                 return LDB_ERR_OPERATIONS_ERROR;
676         }
677
678         ret = ldb_kv_delete_internal(module, req->op.del.dn);
679
680         return ret;
681 }
682
683 /*
684   find an element by attribute name. At the moment this does a linear search,
685   it should be re-coded to use a binary search once all places that modify
686   records guarantee sorted order
687
688   return the index of the first matching element if found, otherwise -1
689 */
690 static int ldb_kv_find_element(const struct ldb_message *msg, const char *name)
691 {
692         unsigned int i;
693         for (i=0;i<msg->num_elements;i++) {
694                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
695                         return i;
696                 }
697         }
698         return -1;
699 }
700
701
702 /*
703   add an element to an existing record. Assumes a elements array that we
704   can call re-alloc on, and assumed that we can re-use the data pointers from
705   the passed in additional values. Use with care!
706
707   returns 0 on success, -1 on failure (and sets errno)
708 */
709 static int ldb_kv_msg_add_element(struct ldb_message *msg,
710                                   struct ldb_message_element *el)
711 {
712         struct ldb_message_element *e2;
713         unsigned int i;
714
715         if (el->num_values == 0) {
716                 /* nothing to do here - we don't add empty elements */
717                 return 0;
718         }
719
720         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
721                               msg->num_elements+1);
722         if (!e2) {
723                 errno = ENOMEM;
724                 return -1;
725         }
726
727         msg->elements = e2;
728
729         e2 = &msg->elements[msg->num_elements];
730
731         e2->name = el->name;
732         e2->flags = el->flags;
733         e2->values = talloc_array(msg->elements,
734                                   struct ldb_val, el->num_values);
735         if (!e2->values) {
736                 errno = ENOMEM;
737                 return -1;
738         }
739         for (i=0;i<el->num_values;i++) {
740                 e2->values[i] = el->values[i];
741         }
742         e2->num_values = el->num_values;
743
744         ++msg->num_elements;
745
746         return 0;
747 }
748
749 /*
750   delete all elements having a specified attribute name
751 */
752 static int ldb_kv_msg_delete_attribute(struct ldb_module *module,
753                                        struct ldb_kv_private *ldb_kv,
754                                        struct ldb_message *msg,
755                                        const char *name)
756 {
757         unsigned int i;
758         int ret;
759         struct ldb_message_element *el;
760         bool is_special = ldb_dn_is_special(msg->dn);
761
762         if (!is_special && ldb_kv->cache->GUID_index_attribute != NULL &&
763             ldb_attr_cmp(name, ldb_kv->cache->GUID_index_attribute) == 0) {
764                 struct ldb_context *ldb = ldb_module_get_ctx(module);
765                 ldb_asprintf_errstring(ldb,
766                                        "Must not modify GUID "
767                                        "attribute %s (used as DB index)",
768                                        ldb_kv->cache->GUID_index_attribute);
769                 return LDB_ERR_CONSTRAINT_VIOLATION;
770         }
771
772         el = ldb_msg_find_element(msg, name);
773         if (el == NULL) {
774                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
775         }
776         i = el - msg->elements;
777
778         ret = ldb_kv_index_del_element(module, ldb_kv, msg, el);
779         if (ret != LDB_SUCCESS) {
780                 return ret;
781         }
782
783         talloc_free(el->values);
784         if (msg->num_elements > (i+1)) {
785                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
786         }
787         msg->num_elements--;
788         msg->elements = talloc_realloc(msg, msg->elements,
789                                        struct ldb_message_element,
790                                        msg->num_elements);
791         return LDB_SUCCESS;
792 }
793
794 /*
795   delete all elements matching an attribute name/value
796
797   return LDB Error on failure
798 */
799 static int ldb_kv_msg_delete_element(struct ldb_module *module,
800                                      struct ldb_kv_private *ldb_kv,
801                                      struct ldb_message *msg,
802                                      const char *name,
803                                      const struct ldb_val *val)
804 {
805         struct ldb_context *ldb = ldb_module_get_ctx(module);
806         unsigned int i;
807         int found, ret;
808         struct ldb_message_element *el;
809         const struct ldb_schema_attribute *a;
810
811         found = ldb_kv_find_element(msg, name);
812         if (found == -1) {
813                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
814         }
815
816         i = (unsigned int) found;
817         el = &(msg->elements[i]);
818
819         a = ldb_schema_attribute_by_name(ldb, el->name);
820
821         for (i=0;i<el->num_values;i++) {
822                 bool matched;
823                 if (a->syntax->operator_fn) {
824                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
825                                                      &el->values[i], val, &matched);
826                         if (ret != LDB_SUCCESS) return ret;
827                 } else {
828                         matched = (a->syntax->comparison_fn(ldb, ldb,
829                                                             &el->values[i], val) == 0);
830                 }
831                 if (matched) {
832                         if (el->num_values == 1) {
833                                 return ldb_kv_msg_delete_attribute(
834                                     module, ldb_kv, msg, name);
835                         }
836
837                         ret =
838                             ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
839                         if (ret != LDB_SUCCESS) {
840                                 return ret;
841                         }
842
843                         if (i<el->num_values-1) {
844                                 memmove(&el->values[i], &el->values[i+1],
845                                         sizeof(el->values[i])*
846                                                 (el->num_values-(i+1)));
847                         }
848                         el->num_values--;
849
850                         /* per definition we find in a canonicalised message an
851                            attribute value only once. So we are finished here */
852                         return LDB_SUCCESS;
853                 }
854         }
855
856         /* Not found */
857         return LDB_ERR_NO_SUCH_ATTRIBUTE;
858 }
859
860 /*
861   modify a record - internal interface
862
863   yuck - this is O(n^2). Luckily n is usually small so we probably
864   get away with it, but if we ever have really large attribute lists
865   then we'll need to look at this again
866
867   'req' is optional, and is used to specify controls if supplied
868 */
869 int ldb_kv_modify_internal(struct ldb_module *module,
870                            const struct ldb_message *msg,
871                            struct ldb_request *req)
872 {
873         struct ldb_context *ldb = ldb_module_get_ctx(module);
874         void *data = ldb_module_get_private(module);
875         struct ldb_kv_private *ldb_kv =
876             talloc_get_type(data, struct ldb_kv_private);
877         struct ldb_message *msg2;
878         unsigned int i, j;
879         int ret = LDB_SUCCESS, idx;
880         struct ldb_control *control_permissive = NULL;
881         TALLOC_CTX *mem_ctx = talloc_new(req);
882
883         if (mem_ctx == NULL) {
884                 return ldb_module_oom(module);
885         }
886
887         if (req) {
888                 control_permissive = ldb_request_get_control(req,
889                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
890         }
891
892         msg2 = ldb_msg_new(mem_ctx);
893         if (msg2 == NULL) {
894                 ret = LDB_ERR_OTHER;
895                 goto done;
896         }
897
898         ret = ldb_kv_search_dn1(
899             module, msg->dn, msg2, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
900         if (ret != LDB_SUCCESS) {
901                 goto done;
902         }
903
904         for (i=0; i<msg->num_elements; i++) {
905                 struct ldb_message_element *el = &msg->elements[i], *el2;
906                 struct ldb_val *vals;
907                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
908                 const char *dn;
909                 uint32_t options = 0;
910                 if (control_permissive != NULL) {
911                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
912                 }
913
914                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
915                 case LDB_FLAG_MOD_ADD:
916
917                         if (el->num_values == 0) {
918                                 ldb_asprintf_errstring(ldb,
919                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
920                                                        el->name, ldb_dn_get_linearized(msg2->dn));
921                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
922                                 goto done;
923                         }
924
925                         /* make a copy of the array so that a permissive
926                          * control can remove duplicates without changing the
927                          * original values, but do not copy data as we do not
928                          * need to keep it around once the operation is
929                          * finished */
930                         if (control_permissive) {
931                                 el = talloc(msg2, struct ldb_message_element);
932                                 if (!el) {
933                                         ret = LDB_ERR_OTHER;
934                                         goto done;
935                                 }
936                                 *el = msg->elements[i];
937                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
938                                 if (el->values == NULL) {
939                                         ret = LDB_ERR_OTHER;
940                                         goto done;
941                                 }
942                                 for (j = 0; j < el->num_values; j++) {
943                                         el->values[j] = msg->elements[i].values[j];
944                                 }
945                         }
946
947                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
948                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
949                                                        el->name, ldb_dn_get_linearized(msg2->dn));
950                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
951                                 goto done;
952                         }
953
954                         /* Checks if element already exists */
955                         idx = ldb_kv_find_element(msg2, el->name);
956                         if (idx == -1) {
957                                 if (ldb_kv_msg_add_element(msg2, el) != 0) {
958                                         ret = LDB_ERR_OTHER;
959                                         goto done;
960                                 }
961                                 ret = ldb_kv_index_add_element(
962                                     module, ldb_kv, msg2, el);
963                                 if (ret != LDB_SUCCESS) {
964                                         goto done;
965                                 }
966                         } else {
967                                 j = (unsigned int) idx;
968                                 el2 = &(msg2->elements[j]);
969
970                                 /* We cannot add another value on a existing one
971                                    if the attribute is single-valued */
972                                 if (ldb_kv_single_valued(a, el)) {
973                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
974                                                                el->name, ldb_dn_get_linearized(msg2->dn));
975                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
976                                         goto done;
977                                 }
978
979                                 /* Check that values don't exist yet on multi-
980                                    valued attributes or aren't provided twice */
981                                 if (!(el->flags &
982                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
983                                         struct ldb_val *duplicate = NULL;
984                                         ret = ldb_msg_find_common_values(ldb,
985                                                                          msg2,
986                                                                          el,
987                                                                          el2,
988                                                                          options);
989
990                                         if (ret ==
991                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
992                                                 ldb_asprintf_errstring(ldb,
993                                                         "attribute '%s': value "
994                                                         "#%u on '%s' already "
995                                                         "exists", el->name, j,
996                                                         ldb_dn_get_linearized(msg2->dn));
997                                                 goto done;
998                                         } else if (ret != LDB_SUCCESS) {
999                                                 goto done;
1000                                         }
1001
1002                                         ret = ldb_msg_find_duplicate_val(
1003                                                 ldb, msg2, el, &duplicate, 0);
1004                                         if (ret != LDB_SUCCESS) {
1005                                                 goto done;
1006                                         }
1007                                         if (duplicate != NULL) {
1008                                                 ldb_asprintf_errstring(
1009                                                         ldb,
1010                                                         "attribute '%s': value "
1011                                                         "'%.*s' on '%s' "
1012                                                         "provided more than "
1013                                                         "once in ADD",
1014                                                         el->name,
1015                                                         (int)duplicate->length,
1016                                                         duplicate->data,
1017                                                         ldb_dn_get_linearized(msg->dn));
1018                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1019                                                 goto done;
1020                                         }
1021                                 }
1022
1023                                 /* Now combine existing and new values to a new
1024                                    attribute record */
1025                                 vals = talloc_realloc(msg2->elements,
1026                                                       el2->values, struct ldb_val,
1027                                                       el2->num_values + el->num_values);
1028                                 if (vals == NULL) {
1029                                         ldb_oom(ldb);
1030                                         ret = LDB_ERR_OTHER;
1031                                         goto done;
1032                                 }
1033
1034                                 for (j=0; j<el->num_values; j++) {
1035                                         vals[el2->num_values + j] =
1036                                                 ldb_val_dup(vals, &el->values[j]);
1037                                 }
1038
1039                                 el2->values = vals;
1040                                 el2->num_values += el->num_values;
1041
1042                                 ret = ldb_kv_index_add_element(
1043                                     module, ldb_kv, msg2, el);
1044                                 if (ret != LDB_SUCCESS) {
1045                                         goto done;
1046                                 }
1047                         }
1048
1049                         break;
1050
1051                 case LDB_FLAG_MOD_REPLACE:
1052
1053                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
1054                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1055                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1056                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1057                                 goto done;
1058                         }
1059
1060                         /*
1061                          * We don't need to check this if we have been
1062                          * pre-screened by the repl_meta_data module
1063                          * in Samba, or someone else who can claim to
1064                          * know what they are doing.
1065                          */
1066                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1067                                 struct ldb_val *duplicate = NULL;
1068
1069                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1070                                                                  &duplicate, 0);
1071                                 if (ret != LDB_SUCCESS) {
1072                                         goto done;
1073                                 }
1074                                 if (duplicate != NULL) {
1075                                         ldb_asprintf_errstring(
1076                                                 ldb,
1077                                                 "attribute '%s': value '%.*s' "
1078                                                 "on '%s' provided more than "
1079                                                 "once in REPLACE",
1080                                                 el->name,
1081                                                 (int)duplicate->length,
1082                                                 duplicate->data,
1083                                                 ldb_dn_get_linearized(msg2->dn));
1084                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1085                                         goto done;
1086                                 }
1087                         }
1088
1089                         /* Checks if element already exists */
1090                         idx = ldb_kv_find_element(msg2, el->name);
1091                         if (idx != -1) {
1092                                 j = (unsigned int) idx;
1093                                 el2 = &(msg2->elements[j]);
1094
1095                                 /* we consider two elements to be
1096                                  * equal only if the order
1097                                  * matches. This allows dbcheck to
1098                                  * fix the ordering on attributes
1099                                  * where order matters, such as
1100                                  * objectClass
1101                                  */
1102                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1103                                         continue;
1104                                 }
1105
1106                                 /* Delete the attribute if it exists in the DB */
1107                                 if (ldb_kv_msg_delete_attribute(
1108                                         module, ldb_kv, msg2, el->name) != 0) {
1109                                         ret = LDB_ERR_OTHER;
1110                                         goto done;
1111                                 }
1112                         }
1113
1114                         /* Recreate it with the new values */
1115                         if (ldb_kv_msg_add_element(msg2, el) != 0) {
1116                                 ret = LDB_ERR_OTHER;
1117                                 goto done;
1118                         }
1119
1120                         ret =
1121                             ldb_kv_index_add_element(module, ldb_kv, msg2, el);
1122                         if (ret != LDB_SUCCESS) {
1123                                 goto done;
1124                         }
1125
1126                         break;
1127
1128                 case LDB_FLAG_MOD_DELETE:
1129                         dn = ldb_dn_get_linearized(msg2->dn);
1130                         if (dn == NULL) {
1131                                 ret = LDB_ERR_OTHER;
1132                                 goto done;
1133                         }
1134
1135                         if (msg->elements[i].num_values == 0) {
1136                                 /* Delete the whole attribute */
1137                                 ret = ldb_kv_msg_delete_attribute(
1138                                     module,
1139                                     ldb_kv,
1140                                     msg2,
1141                                     msg->elements[i].name);
1142                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1143                                     control_permissive) {
1144                                         ret = LDB_SUCCESS;
1145                                 } else {
1146                                         ldb_asprintf_errstring(ldb,
1147                                                                "attribute '%s': no such attribute for delete on '%s'",
1148                                                                msg->elements[i].name, dn);
1149                                 }
1150                                 if (ret != LDB_SUCCESS) {
1151                                         goto done;
1152                                 }
1153                         } else {
1154                                 /* Delete specified values from an attribute */
1155                                 for (j=0; j < msg->elements[i].num_values; j++) {
1156                                         ret = ldb_kv_msg_delete_element(
1157                                             module,
1158                                             ldb_kv,
1159                                             msg2,
1160                                             msg->elements[i].name,
1161                                             &msg->elements[i].values[j]);
1162                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1163                                             control_permissive) {
1164                                                 ret = LDB_SUCCESS;
1165                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1166                                                 ldb_asprintf_errstring(ldb,
1167                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1168                                                                        msg->elements[i].name, dn);
1169                                         }
1170                                         if (ret != LDB_SUCCESS) {
1171                                                 goto done;
1172                                         }
1173                                 }
1174                         }
1175                         break;
1176                 default:
1177                         ldb_asprintf_errstring(ldb,
1178                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1179                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1180                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1181                         ret = LDB_ERR_PROTOCOL_ERROR;
1182                         goto done;
1183                 }
1184         }
1185
1186         ret = ldb_kv_store(module, msg2, TDB_MODIFY);
1187         if (ret != LDB_SUCCESS) {
1188                 goto done;
1189         }
1190
1191         ret = ldb_kv_modified(module, msg2->dn);
1192         if (ret != LDB_SUCCESS) {
1193                 goto done;
1194         }
1195
1196 done:
1197         TALLOC_FREE(mem_ctx);
1198         return ret;
1199 }
1200
1201 /*
1202   modify a record
1203 */
1204 static int ldb_kv_modify(struct ldb_kv_context *ctx)
1205 {
1206         struct ldb_module *module = ctx->module;
1207         struct ldb_request *req = ctx->req;
1208         int ret = LDB_SUCCESS;
1209
1210         ret = ldb_kv_check_special_dn(module, req->op.mod.message);
1211         if (ret != LDB_SUCCESS) {
1212                 return ret;
1213         }
1214
1215         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1216
1217         if (ldb_kv_cache_load(module) != 0) {
1218                 return LDB_ERR_OPERATIONS_ERROR;
1219         }
1220
1221         ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
1222
1223         return ret;
1224 }
1225
1226 /*
1227   rename a record
1228 */
1229 static int ldb_kv_rename(struct ldb_kv_context *ctx)
1230 {
1231         struct ldb_module *module = ctx->module;
1232         void *data = ldb_module_get_private(module);
1233         struct ldb_kv_private *ldb_kv =
1234             talloc_get_type(data, struct ldb_kv_private);
1235         struct ldb_request *req = ctx->req;
1236         struct ldb_message *msg;
1237         int ret = LDB_SUCCESS;
1238         TDB_DATA tdb_key, tdb_key_old;
1239         struct ldb_dn *db_dn;
1240
1241         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1242
1243         if (ldb_kv_cache_load(ctx->module) != 0) {
1244                 return LDB_ERR_OPERATIONS_ERROR;
1245         }
1246
1247         msg = ldb_msg_new(ctx);
1248         if (msg == NULL) {
1249                 return LDB_ERR_OPERATIONS_ERROR;
1250         }
1251
1252         /* we need to fetch the old record to re-add under the new name */
1253         ret = ldb_kv_search_dn1(module,
1254                                 req->op.rename.olddn,
1255                                 msg,
1256                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1257         if (ret != LDB_SUCCESS) {
1258                 /* not finding the old record is an error */
1259                 return ret;
1260         }
1261
1262         /* We need to, before changing the DB, check if the new DN
1263          * exists, so we can return this error to the caller with an
1264          * unmodified DB
1265          *
1266          * Even in GUID index mode we use ltdb_key_dn() as we are
1267          * trying to figure out if this is just a case rename
1268          */
1269         tdb_key = ldb_kv_key_dn(module, msg, req->op.rename.newdn);
1270         if (!tdb_key.dptr) {
1271                 talloc_free(msg);
1272                 return LDB_ERR_OPERATIONS_ERROR;
1273         }
1274
1275         tdb_key_old = ldb_kv_key_dn(module, msg, req->op.rename.olddn);
1276         if (!tdb_key_old.dptr) {
1277                 talloc_free(msg);
1278                 talloc_free(tdb_key.dptr);
1279                 return LDB_ERR_OPERATIONS_ERROR;
1280         }
1281
1282         /*
1283          * Only declare a conflict if the new DN already exists,
1284          * and it isn't a case change on the old DN
1285          */
1286         if (tdb_key_old.dsize != tdb_key.dsize
1287             || memcmp(tdb_key.dptr, tdb_key_old.dptr, tdb_key.dsize) != 0) {
1288                 ret = ldb_kv_search_base(
1289                     module, msg, req->op.rename.newdn, &db_dn);
1290                 if (ret == LDB_SUCCESS) {
1291                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1292                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1293                         ret = LDB_SUCCESS;
1294                 }
1295         }
1296
1297         /* finding the new record already in the DB is an error */
1298
1299         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1300                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1301                                        "Entry %s already exists",
1302                                        ldb_dn_get_linearized(req->op.rename.newdn));
1303         }
1304         if (ret != LDB_SUCCESS) {
1305                 talloc_free(tdb_key_old.dptr);
1306                 talloc_free(tdb_key.dptr);
1307                 talloc_free(msg);
1308                 return ret;
1309         }
1310
1311         talloc_free(tdb_key_old.dptr);
1312         talloc_free(tdb_key.dptr);
1313
1314         /* Always delete first then add, to avoid conflicts with
1315          * unique indexes. We rely on the transaction to make this
1316          * atomic
1317          */
1318         ret = ldb_kv_delete_internal(module, msg->dn);
1319         if (ret != LDB_SUCCESS) {
1320                 talloc_free(msg);
1321                 return ret;
1322         }
1323
1324         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1325         if (msg->dn == NULL) {
1326                 talloc_free(msg);
1327                 return LDB_ERR_OPERATIONS_ERROR;
1328         }
1329
1330         /* We don't check single value as we can have more than 1 with
1331          * deleted attributes. We could go through all elements but that's
1332          * maybe not the most efficient way
1333          */
1334         ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
1335
1336         talloc_free(msg);
1337
1338         return ret;
1339 }
1340
1341 static int ldb_kv_start_trans(struct ldb_module *module)
1342 {
1343         void *data = ldb_module_get_private(module);
1344         struct ldb_kv_private *ldb_kv =
1345             talloc_get_type(data, struct ldb_kv_private);
1346
1347         pid_t pid = getpid();
1348
1349         if (ldb_kv->pid != pid) {
1350                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
1351                                        __location__
1352                                        ": Reusing ldb opend by pid %d in "
1353                                        "process %d\n",
1354                                        ldb_kv->pid,
1355                                        pid);
1356                 return LDB_ERR_PROTOCOL_ERROR;
1357         }
1358
1359         /* Do not take out the transaction lock on a read-only DB */
1360         if (ldb_kv->read_only) {
1361                 return LDB_ERR_UNWILLING_TO_PERFORM;
1362         }
1363
1364         if (ldb_kv->kv_ops->begin_write(ldb_kv) != 0) {
1365                 return ldb_kv->kv_ops->error(ldb_kv);
1366         }
1367
1368         ldb_kv_index_transaction_start(module);
1369
1370         ldb_kv->reindex_failed = false;
1371
1372         return LDB_SUCCESS;
1373 }
1374
1375 /*
1376  * Forward declaration to allow prepare_commit to in fact abort the
1377  * transaction
1378  */
1379 static int ldb_kv_del_trans(struct ldb_module *module);
1380
1381 static int ldb_kv_prepare_commit(struct ldb_module *module)
1382 {
1383         int ret;
1384         void *data = ldb_module_get_private(module);
1385         struct ldb_kv_private *ldb_kv =
1386             talloc_get_type(data, struct ldb_kv_private);
1387         pid_t pid = getpid();
1388
1389         if (ldb_kv->pid != pid) {
1390                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1391                                        __location__
1392                                        ": Reusing ldb opend by pid %d in "
1393                                        "process %d\n",
1394                                        ldb_kv->pid,
1395                                        pid);
1396                 return LDB_ERR_PROTOCOL_ERROR;
1397         }
1398
1399         if (!ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1400                 ldb_set_errstring(ldb_module_get_ctx(module),
1401                                   "ltdb_prepare_commit() called "
1402                                   "without transaction active");
1403                 return LDB_ERR_OPERATIONS_ERROR;
1404         }
1405
1406         /*
1407          * Check if the last re-index failed.
1408          *
1409          * This can happen if for example a duplicate value was marked
1410          * unique.  We must not write a partial re-index into the DB.
1411          */
1412         if (ldb_kv->reindex_failed) {
1413                 /*
1414                  * We must instead abort the transaction so we get the
1415                  * old values and old index back
1416                  */
1417                 ldb_kv_del_trans(module);
1418                 ldb_set_errstring(ldb_module_get_ctx(module),
1419                                   "Failure during re-index, so "
1420                                   "transaction must be aborted.");
1421                 return LDB_ERR_OPERATIONS_ERROR;
1422         }
1423
1424         ret = ldb_kv_index_transaction_commit(module);
1425         if (ret != LDB_SUCCESS) {
1426                 ldb_kv->kv_ops->abort_write(ldb_kv);
1427                 return ret;
1428         }
1429
1430         if (ldb_kv->kv_ops->prepare_write(ldb_kv) != 0) {
1431                 ret = ldb_kv->kv_ops->error(ldb_kv);
1432                 ldb_debug_set(ldb_module_get_ctx(module),
1433                               LDB_DEBUG_FATAL,
1434                               "Failure during "
1435                               "prepare_write): %s -> %s",
1436                               ldb_kv->kv_ops->errorstr(ldb_kv),
1437                               ldb_strerror(ret));
1438                 return ret;
1439         }
1440
1441         ldb_kv->prepared_commit = true;
1442
1443         return LDB_SUCCESS;
1444 }
1445
1446 static int ldb_kv_end_trans(struct ldb_module *module)
1447 {
1448         int ret;
1449         void *data = ldb_module_get_private(module);
1450         struct ldb_kv_private *ldb_kv =
1451             talloc_get_type(data, struct ldb_kv_private);
1452
1453         if (!ldb_kv->prepared_commit) {
1454                 ret = ldb_kv_prepare_commit(module);
1455                 if (ret != LDB_SUCCESS) {
1456                         return ret;
1457                 }
1458         }
1459
1460         ldb_kv->prepared_commit = false;
1461
1462         if (ldb_kv->kv_ops->finish_write(ldb_kv) != 0) {
1463                 ret = ldb_kv->kv_ops->error(ldb_kv);
1464                 ldb_asprintf_errstring(
1465                     ldb_module_get_ctx(module),
1466                     "Failure during tdb_transaction_commit(): %s -> %s",
1467                     ldb_kv->kv_ops->errorstr(ldb_kv),
1468                     ldb_strerror(ret));
1469                 return ret;
1470         }
1471
1472         return LDB_SUCCESS;
1473 }
1474
1475 static int ldb_kv_del_trans(struct ldb_module *module)
1476 {
1477         void *data = ldb_module_get_private(module);
1478         struct ldb_kv_private *ldb_kv =
1479             talloc_get_type(data, struct ldb_kv_private);
1480
1481         if (ldb_kv_index_transaction_cancel(module) != 0) {
1482                 ldb_kv->kv_ops->abort_write(ldb_kv);
1483                 return ldb_kv->kv_ops->error(ldb_kv);
1484         }
1485
1486         ldb_kv->kv_ops->abort_write(ldb_kv);
1487         return LDB_SUCCESS;
1488 }
1489
1490 /*
1491   return sequenceNumber from @BASEINFO
1492 */
1493 static int ldb_kv_sequence_number(struct ldb_kv_context *ctx,
1494                                   struct ldb_extended **ext)
1495 {
1496         struct ldb_context *ldb;
1497         struct ldb_module *module = ctx->module;
1498         struct ldb_request *req = ctx->req;
1499         void *data = ldb_module_get_private(module);
1500         struct ldb_kv_private *ldb_kv =
1501             talloc_get_type(data, struct ldb_kv_private);
1502         TALLOC_CTX *tmp_ctx = NULL;
1503         struct ldb_seqnum_request *seq;
1504         struct ldb_seqnum_result *res;
1505         struct ldb_message *msg = NULL;
1506         struct ldb_dn *dn;
1507         const char *date;
1508         int ret = LDB_SUCCESS;
1509
1510         ldb = ldb_module_get_ctx(module);
1511
1512         seq = talloc_get_type(req->op.extended.data,
1513                                 struct ldb_seqnum_request);
1514         if (seq == NULL) {
1515                 return LDB_ERR_OPERATIONS_ERROR;
1516         }
1517
1518         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1519
1520         if (ldb_kv->kv_ops->lock_read(module) != 0) {
1521                 return LDB_ERR_OPERATIONS_ERROR;
1522         }
1523
1524         res = talloc_zero(req, struct ldb_seqnum_result);
1525         if (res == NULL) {
1526                 ret = LDB_ERR_OPERATIONS_ERROR;
1527                 goto done;
1528         }
1529
1530         tmp_ctx = talloc_new(req);
1531         if (tmp_ctx == NULL) {
1532                 ret = LDB_ERR_OPERATIONS_ERROR;
1533                 goto done;
1534         }
1535
1536         dn = ldb_dn_new(tmp_ctx, ldb, LDB_KV_BASEINFO);
1537         if (dn == NULL) {
1538                 ret = LDB_ERR_OPERATIONS_ERROR;
1539                 goto done;
1540         }
1541
1542         msg = ldb_msg_new(tmp_ctx);
1543         if (msg == NULL) {
1544                 ret = LDB_ERR_OPERATIONS_ERROR;
1545                 goto done;
1546         }
1547
1548         ret = ldb_kv_search_dn1(module, dn, msg, 0);
1549         if (ret != LDB_SUCCESS) {
1550                 goto done;
1551         }
1552
1553         switch (seq->type) {
1554         case LDB_SEQ_HIGHEST_SEQ:
1555                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1556                 break;
1557         case LDB_SEQ_NEXT:
1558                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1559                 res->seq_num++;
1560                 break;
1561         case LDB_SEQ_HIGHEST_TIMESTAMP:
1562                 date = ldb_msg_find_attr_as_string(msg, LDB_KV_MOD_TIMESTAMP, NULL);
1563                 if (date) {
1564                         res->seq_num = ldb_string_to_time(date);
1565                 } else {
1566                         res->seq_num = 0;
1567                         /* zero is as good as anything when we don't know */
1568                 }
1569                 break;
1570         }
1571
1572         *ext = talloc_zero(req, struct ldb_extended);
1573         if (*ext == NULL) {
1574                 ret = LDB_ERR_OPERATIONS_ERROR;
1575                 goto done;
1576         }
1577         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1578         (*ext)->data = talloc_steal(*ext, res);
1579
1580 done:
1581         talloc_free(tmp_ctx);
1582
1583         ldb_kv->kv_ops->unlock_read(module);
1584         return ret;
1585 }
1586
1587 static void ldb_kv_request_done(struct ldb_kv_context *ctx, int error)
1588 {
1589         struct ldb_context *ldb;
1590         struct ldb_request *req;
1591         struct ldb_reply *ares;
1592
1593         ldb = ldb_module_get_ctx(ctx->module);
1594         req = ctx->req;
1595
1596         /* if we already returned an error just return */
1597         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1598                 return;
1599         }
1600
1601         ares = talloc_zero(req, struct ldb_reply);
1602         if (!ares) {
1603                 ldb_oom(ldb);
1604                 req->callback(req, NULL);
1605                 return;
1606         }
1607         ares->type = LDB_REPLY_DONE;
1608         ares->error = error;
1609
1610         req->callback(req, ares);
1611 }
1612
1613 static void ldb_kv_timeout(struct tevent_context *ev,
1614                            struct tevent_timer *te,
1615                            struct timeval t,
1616                            void *private_data)
1617 {
1618         struct ldb_kv_context *ctx;
1619         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1620
1621         if (!ctx->request_terminated) {
1622                 /* request is done now */
1623                 ldb_kv_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1624         }
1625
1626         if (ctx->spy) {
1627                 /* neutralize the spy */
1628                 ctx->spy->ctx = NULL;
1629                 ctx->spy = NULL;
1630         }
1631         talloc_free(ctx);
1632 }
1633
1634 static void ldb_kv_request_extended_done(struct ldb_kv_context *ctx,
1635                                          struct ldb_extended *ext,
1636                                          int error)
1637 {
1638         struct ldb_context *ldb;
1639         struct ldb_request *req;
1640         struct ldb_reply *ares;
1641
1642         ldb = ldb_module_get_ctx(ctx->module);
1643         req = ctx->req;
1644
1645         /* if we already returned an error just return */
1646         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1647                 return;
1648         }
1649
1650         ares = talloc_zero(req, struct ldb_reply);
1651         if (!ares) {
1652                 ldb_oom(ldb);
1653                 req->callback(req, NULL);
1654                 return;
1655         }
1656         ares->type = LDB_REPLY_DONE;
1657         ares->response = ext;
1658         ares->error = error;
1659
1660         req->callback(req, ares);
1661 }
1662
1663 static void ldb_kv_handle_extended(struct ldb_kv_context *ctx)
1664 {
1665         struct ldb_extended *ext = NULL;
1666         int ret;
1667
1668         if (strcmp(ctx->req->op.extended.oid,
1669                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1670                 /* get sequence number */
1671                 ret = ldb_kv_sequence_number(ctx, &ext);
1672         } else {
1673                 /* not recognized */
1674                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1675         }
1676
1677         ldb_kv_request_extended_done(ctx, ext, ret);
1678 }
1679
1680 static void ldb_kv_callback(struct tevent_context *ev,
1681                             struct tevent_timer *te,
1682                             struct timeval t,
1683                             void *private_data)
1684 {
1685         struct ldb_kv_context *ctx;
1686         int ret;
1687
1688         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1689
1690         if (ctx->request_terminated) {
1691                 goto done;
1692         }
1693
1694         switch (ctx->req->operation) {
1695         case LDB_SEARCH:
1696                 ret = ldb_kv_search(ctx);
1697                 break;
1698         case LDB_ADD:
1699                 ret = ldb_kv_add(ctx);
1700                 break;
1701         case LDB_MODIFY:
1702                 ret = ldb_kv_modify(ctx);
1703                 break;
1704         case LDB_DELETE:
1705                 ret = ldb_kv_delete(ctx);
1706                 break;
1707         case LDB_RENAME:
1708                 ret = ldb_kv_rename(ctx);
1709                 break;
1710         case LDB_EXTENDED:
1711                 ldb_kv_handle_extended(ctx);
1712                 goto done;
1713         default:
1714                 /* no other op supported */
1715                 ret = LDB_ERR_PROTOCOL_ERROR;
1716         }
1717
1718         if (!ctx->request_terminated) {
1719                 /* request is done now */
1720                 ldb_kv_request_done(ctx, ret);
1721         }
1722
1723 done:
1724         if (ctx->spy) {
1725                 /* neutralize the spy */
1726                 ctx->spy->ctx = NULL;
1727                 ctx->spy = NULL;
1728         }
1729         talloc_free(ctx);
1730 }
1731
1732 static int ldb_kv_request_destructor(void *ptr)
1733 {
1734         struct ldb_kv_req_spy *spy =
1735             talloc_get_type(ptr, struct ldb_kv_req_spy);
1736
1737         if (spy->ctx != NULL) {
1738                 spy->ctx->spy = NULL;
1739                 spy->ctx->request_terminated = true;
1740                 spy->ctx = NULL;
1741         }
1742
1743         return 0;
1744 }
1745
1746 static int ldb_kv_handle_request(struct ldb_module *module,
1747                                  struct ldb_request *req)
1748 {
1749         struct ldb_control *control_permissive;
1750         struct ldb_context *ldb;
1751         struct tevent_context *ev;
1752         struct ldb_kv_context *ac;
1753         struct tevent_timer *te;
1754         struct timeval tv;
1755         unsigned int i;
1756
1757         ldb = ldb_module_get_ctx(module);
1758
1759         control_permissive = ldb_request_get_control(req,
1760                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1761
1762         for (i = 0; req->controls && req->controls[i]; i++) {
1763                 if (req->controls[i]->critical &&
1764                     req->controls[i] != control_permissive) {
1765                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1766                                                req->controls[i]->oid);
1767                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1768                 }
1769         }
1770
1771         if (req->starttime == 0 || req->timeout == 0) {
1772                 ldb_set_errstring(ldb, "Invalid timeout settings");
1773                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1774         }
1775
1776         ev = ldb_handle_get_event_context(req->handle);
1777
1778         ac = talloc_zero(ldb, struct ldb_kv_context);
1779         if (ac == NULL) {
1780                 ldb_oom(ldb);
1781                 return LDB_ERR_OPERATIONS_ERROR;
1782         }
1783
1784         ac->module = module;
1785         ac->req = req;
1786
1787         tv.tv_sec = 0;
1788         tv.tv_usec = 0;
1789         te = tevent_add_timer(ev, ac, tv, ldb_kv_callback, ac);
1790         if (NULL == te) {
1791                 talloc_free(ac);
1792                 return LDB_ERR_OPERATIONS_ERROR;
1793         }
1794
1795         if (req->timeout > 0) {
1796                 tv.tv_sec = req->starttime + req->timeout;
1797                 tv.tv_usec = 0;
1798                 ac->timeout_event =
1799                     tevent_add_timer(ev, ac, tv, ldb_kv_timeout, ac);
1800                 if (NULL == ac->timeout_event) {
1801                         talloc_free(ac);
1802                         return LDB_ERR_OPERATIONS_ERROR;
1803                 }
1804         }
1805
1806         /* set a spy so that we do not try to use the request context
1807          * if it is freed before ltdb_callback fires */
1808         ac->spy = talloc(req, struct ldb_kv_req_spy);
1809         if (NULL == ac->spy) {
1810                 talloc_free(ac);
1811                 return LDB_ERR_OPERATIONS_ERROR;
1812         }
1813         ac->spy->ctx = ac;
1814
1815         talloc_set_destructor((TALLOC_CTX *)ac->spy, ldb_kv_request_destructor);
1816
1817         return LDB_SUCCESS;
1818 }
1819
1820 static int ldb_kv_init_rootdse(struct ldb_module *module)
1821 {
1822         /* ignore errors on this - we expect it for non-sam databases */
1823         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1824
1825         /* there can be no module beyond the backend, just return */
1826         return LDB_SUCCESS;
1827 }
1828
1829 static int ldb_kv_lock_read(struct ldb_module *module)
1830 {
1831         void *data = ldb_module_get_private(module);
1832         struct ldb_kv_private *ldb_kv =
1833             talloc_get_type(data, struct ldb_kv_private);
1834         return ldb_kv->kv_ops->lock_read(module);
1835 }
1836
1837 static int ldb_kv_unlock_read(struct ldb_module *module)
1838 {
1839         void *data = ldb_module_get_private(module);
1840         struct ldb_kv_private *ldb_kv =
1841             talloc_get_type(data, struct ldb_kv_private);
1842         return ldb_kv->kv_ops->unlock_read(module);
1843 }
1844
1845 static const struct ldb_module_ops ldb_kv_ops = {
1846     .name = "tdb",
1847     .init_context = ldb_kv_init_rootdse,
1848     .search = ldb_kv_handle_request,
1849     .add = ldb_kv_handle_request,
1850     .modify = ldb_kv_handle_request,
1851     .del = ldb_kv_handle_request,
1852     .rename = ldb_kv_handle_request,
1853     .extended = ldb_kv_handle_request,
1854     .start_transaction = ldb_kv_start_trans,
1855     .end_transaction = ldb_kv_end_trans,
1856     .prepare_commit = ldb_kv_prepare_commit,
1857     .del_transaction = ldb_kv_del_trans,
1858     .read_lock = ldb_kv_lock_read,
1859     .read_unlock = ldb_kv_unlock_read,
1860 };
1861
1862 int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
1863                       const char *name,
1864                       struct ldb_context *ldb,
1865                       const char *options[],
1866                       struct ldb_module **_module)
1867 {
1868         if (getenv("LDB_WARN_UNINDEXED")) {
1869                 ldb_kv->warn_unindexed = true;
1870         }
1871
1872         if (getenv("LDB_WARN_REINDEX")) {
1873                 ldb_kv->warn_reindex = true;
1874         }
1875
1876         ldb_kv->sequence_number = 0;
1877
1878         ldb_kv->pid = getpid();
1879
1880         ldb_kv->module = ldb_module_new(ldb, ldb, name, &ldb_kv_ops);
1881         if (!ldb_kv->module) {
1882                 ldb_oom(ldb);
1883                 talloc_free(ldb_kv);
1884                 return LDB_ERR_OPERATIONS_ERROR;
1885         }
1886         ldb_module_set_private(ldb_kv->module, ldb_kv);
1887         talloc_steal(ldb_kv->module, ldb_kv);
1888
1889         if (ldb_kv_cache_load(ldb_kv->module) != 0) {
1890                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
1891                                        "records for backend '%s'", name);
1892                 talloc_free(ldb_kv->module);
1893                 return LDB_ERR_OPERATIONS_ERROR;
1894         }
1895
1896         *_module = ldb_kv->module;
1897         /*
1898          * Set or override the maximum key length
1899          *
1900          * The ldb_mdb code will have set this to 511, but our tests
1901          * set this even smaller (to make the tests more practical).
1902          *
1903          * This must only be used for the selftest as the length
1904          * becomes encoded in the index keys.
1905          */
1906         {
1907                 const char *len_str =
1908                         ldb_options_find(ldb, options,
1909                                          "max_key_len_for_self_test");
1910                 if (len_str != NULL) {
1911                         unsigned len = strtoul(len_str, NULL, 0);
1912                         ldb_kv->max_key_length = len;
1913                 }
1914         }
1915
1916         /*
1917          * Override full DB scans
1918          *
1919          * A full DB scan is expensive on a large database.  This
1920          * option is for testing to show that the full DB scan is not
1921          * triggered.
1922          */
1923         {
1924                 const char *len_str =
1925                         ldb_options_find(ldb, options,
1926                                          "disable_full_db_scan_for_self_test");
1927                 if (len_str != NULL) {
1928                         ldb_kv->disable_full_db_scan = true;
1929                 }
1930         }
1931
1932         return LDB_SUCCESS;
1933 }