3ea8d5e2ed4044343c56c81a8aa92afd398309f8
[sfrench/samba-autobuild/.git] / lib / ldb / ldb_key_value / ldb_kv.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_kv
29  *
30  *  Component: ldb key value backend
31  *
32  *  Description: core functions for ldb key value backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_kv.h"
53 #include "ldb_private.h"
54
55 /*
56   prevent memory errors on callbacks
57 */
58 struct ldb_kv_req_spy {
59         struct ldb_kv_context *ctx;
60 };
61
62 /*
63  * Determine if this key could hold a record.  We allow the new GUID
64  * index, the old DN index and a possible future ID=
65  */
66 bool ldb_kv_key_is_record(struct ldb_val key)
67 {
68         if (key.length < 4) {
69                 return false;
70         }
71
72         if (memcmp(key.data, "DN=", 3) == 0) {
73                 return true;
74         }
75
76         if (memcmp(key.data, "ID=", 3) == 0) {
77                 return true;
78         }
79
80         if (key.length < sizeof(LDB_KV_GUID_KEY_PREFIX)) {
81                 return false;
82         }
83
84         if (memcmp(key.data, LDB_KV_GUID_KEY_PREFIX,
85                    sizeof(LDB_KV_GUID_KEY_PREFIX) - 1) == 0) {
86                 return true;
87         }
88
89         return false;
90 }
91
92 /*
93   form a ldb_val for a record key
94   caller frees
95
96   note that the key for a record can depend on whether the
97   dn refers to a case sensitive index record or not
98 */
99 struct ldb_val ldb_kv_key_dn(struct ldb_module *module,
100                              TALLOC_CTX *mem_ctx,
101                              struct ldb_dn *dn)
102 {
103         struct ldb_val key;
104         char *key_str = NULL;
105         const char *dn_folded = NULL;
106
107         /*
108           most DNs are case insensitive. The exception is index DNs for
109           case sensitive attributes
110
111           there are 3 cases dealt with in this code:
112
113           1) if the dn doesn't start with @ then uppercase the attribute
114              names and the attributes values of case insensitive attributes
115           2) if the dn starts with @ then leave it alone -
116              the indexing code handles the rest
117         */
118
119         dn_folded = ldb_dn_get_casefold(dn);
120         if (!dn_folded) {
121                 goto failed;
122         }
123
124         key_str = talloc_strdup(mem_ctx, "DN=");
125         if (!key_str) {
126                 goto failed;
127         }
128
129         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
130         if (!key_str) {
131                 goto failed;
132         }
133
134         key.data = (uint8_t *)key_str;
135         key.length = strlen(key_str) + 1;
136
137         return key;
138
139 failed:
140         errno = ENOMEM;
141         key.data = NULL;
142         key.length = 0;
143         return key;
144 }
145
146 /* The caller is to provide a correctly sized key */
147 int ldb_kv_guid_to_key(struct ldb_module *module,
148                        struct ldb_kv_private *ldb_kv,
149                        const struct ldb_val *GUID_val,
150                        struct ldb_val *key)
151 {
152         const char *GUID_prefix = LDB_KV_GUID_KEY_PREFIX;
153         const int GUID_prefix_len = sizeof(LDB_KV_GUID_KEY_PREFIX) - 1;
154
155         if (key->length != (GUID_val->length+GUID_prefix_len)) {
156                 return LDB_ERR_OPERATIONS_ERROR;
157         }
158
159         memcpy(key->data, GUID_prefix, GUID_prefix_len);
160         memcpy(&key->data[GUID_prefix_len],
161                GUID_val->data, GUID_val->length);
162         return LDB_SUCCESS;
163 }
164
165 /*
166  * The caller is to provide a correctly sized key, used only in
167  * the GUID index mode
168  */
169 int ldb_kv_idx_to_key(struct ldb_module *module,
170                       struct ldb_kv_private *ldb_kv,
171                       TALLOC_CTX *mem_ctx,
172                       const struct ldb_val *idx_val,
173                       struct ldb_val *key)
174 {
175         struct ldb_context *ldb = ldb_module_get_ctx(module);
176         struct ldb_dn *dn;
177
178         if (ldb_kv->cache->GUID_index_attribute != NULL) {
179                 return ldb_kv_guid_to_key(module, ldb_kv, idx_val, key);
180         }
181
182         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
183         if (dn == NULL) {
184                 /*
185                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
186                  * to the caller, as this in an invalid index value
187                  */
188                 return LDB_ERR_OPERATIONS_ERROR;
189         }
190         /* form the key */
191         *key = ldb_kv_key_dn(module, mem_ctx, dn);
192         TALLOC_FREE(dn);
193         if (!key->data) {
194                 return ldb_module_oom(module);
195         }
196         return LDB_SUCCESS;
197 }
198
199 /*
200   form a TDB_DATA for a record key
201   caller frees mem_ctx, which may or may not have the key
202   as a child.
203
204   note that the key for a record can depend on whether a
205   GUID index is in use, or the DN is used as the key
206 */
207 struct ldb_val ldb_kv_key_msg(struct ldb_module *module,
208                         TALLOC_CTX *mem_ctx,
209                         const struct ldb_message *msg)
210 {
211         void *data = ldb_module_get_private(module);
212         struct ldb_kv_private *ldb_kv =
213             talloc_get_type(data, struct ldb_kv_private);
214         struct ldb_val key;
215         const struct ldb_val *guid_val;
216         int ret;
217
218         if (ldb_kv->cache->GUID_index_attribute == NULL) {
219                 return ldb_kv_key_dn(module, mem_ctx, msg->dn);
220         }
221
222         if (ldb_dn_is_special(msg->dn)) {
223                 return ldb_kv_key_dn(module, mem_ctx, msg->dn);
224         }
225
226         guid_val =
227             ldb_msg_find_ldb_val(msg, ldb_kv->cache->GUID_index_attribute);
228         if (guid_val == NULL) {
229                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
230                                        "Did not find GUID attribute %s "
231                                        "in %s, required for TDB record "
232                                        "key in " LDB_KV_IDXGUID " mode.",
233                                        ldb_kv->cache->GUID_index_attribute,
234                                        ldb_dn_get_linearized(msg->dn));
235                 errno = EINVAL;
236                 key.data = NULL;
237                 key.length = 0;
238                 return key;
239         }
240
241         /* In this case, allocate with talloc */
242         key.data = talloc_size(mem_ctx, LDB_KV_GUID_KEY_SIZE);
243         if (key.data == NULL) {
244                 errno = ENOMEM;
245                 key.data = NULL;
246                 key.length = 0;
247                 return key;
248         }
249         key.length = talloc_get_size(key.data);
250
251         ret = ldb_kv_guid_to_key(module, ldb_kv, guid_val, &key);
252
253         if (ret != LDB_SUCCESS) {
254                 errno = EINVAL;
255                 key.data = NULL;
256                 key.length = 0;
257                 return key;
258         }
259         return key;
260 }
261
262 /*
263   check special dn's have valid attributes
264   currently only @ATTRIBUTES is checked
265 */
266 static int ldb_kv_check_special_dn(struct ldb_module *module,
267                                    const struct ldb_message *msg)
268 {
269         struct ldb_context *ldb = ldb_module_get_ctx(module);
270         unsigned int i, j;
271
272         if (! ldb_dn_is_special(msg->dn) ||
273             ! ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
274                 return LDB_SUCCESS;
275         }
276
277         /* we have @ATTRIBUTES, let's check attributes are fine */
278         /* should we check that we deny multivalued attributes ? */
279         for (i = 0; i < msg->num_elements; i++) {
280                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
281
282                 for (j = 0; j < msg->elements[i].num_values; j++) {
283                         if (ldb_kv_check_at_attributes_values(
284                                 &msg->elements[i].values[j]) != 0) {
285                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
286                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
287                         }
288                 }
289         }
290
291         return LDB_SUCCESS;
292 }
293
294
295 /*
296   we've made a modification to a dn - possibly reindex and
297   update sequence number
298 */
299 static int ldb_kv_modified(struct ldb_module *module, struct ldb_dn *dn)
300 {
301         int ret = LDB_SUCCESS;
302         struct ldb_kv_private *ldb_kv = talloc_get_type(
303             ldb_module_get_private(module), struct ldb_kv_private);
304
305         /* only allow modifies inside a transaction, otherwise the
306          * ldb is unsafe */
307         if (ldb_kv->kv_ops->transaction_active(ldb_kv) == false) {
308                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
309                 return LDB_ERR_OPERATIONS_ERROR;
310         }
311
312         if (ldb_dn_is_special(dn) &&
313             (ldb_dn_check_special(dn, LDB_KV_INDEXLIST) ||
314              ldb_dn_check_special(dn, LDB_KV_ATTRIBUTES)) )
315         {
316                 if (ldb_kv->warn_reindex) {
317                         ldb_debug(ldb_module_get_ctx(module),
318                                   LDB_DEBUG_ERROR,
319                                   "Reindexing %s due to modification on %s",
320                                   ldb_kv->kv_ops->name(ldb_kv),
321                                   ldb_dn_get_linearized(dn));
322                 }
323                 ret = ldb_kv_reindex(module);
324         }
325
326         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
327         if (ret == LDB_SUCCESS &&
328             !(ldb_dn_is_special(dn) &&
329               ldb_dn_check_special(dn, LDB_KV_BASEINFO)) ) {
330                 ret = ldb_kv_increase_sequence_number(module);
331         }
332
333         /* If the modify was to @OPTIONS, reload the cache */
334         if (ret == LDB_SUCCESS &&
335             ldb_dn_is_special(dn) &&
336             (ldb_dn_check_special(dn, LDB_KV_OPTIONS)) ) {
337                 ret = ldb_kv_cache_reload(module);
338         }
339
340         if (ret != LDB_SUCCESS) {
341                 ldb_kv->reindex_failed = true;
342         }
343
344         return ret;
345 }
346 /*
347   store a record into the db
348 */
349 int ldb_kv_store(struct ldb_module *module,
350                  const struct ldb_message *msg,
351                  int flgs)
352 {
353         void *data = ldb_module_get_private(module);
354         struct ldb_kv_private *ldb_kv =
355             talloc_get_type(data, struct ldb_kv_private);
356         struct ldb_val key;
357         struct ldb_val ldb_data;
358         int ret = LDB_SUCCESS;
359         TALLOC_CTX *key_ctx = talloc_new(module);
360
361         if (key_ctx == NULL) {
362                 return ldb_module_oom(module);
363         }
364
365         if (ldb_kv->read_only) {
366                 talloc_free(key_ctx);
367                 return LDB_ERR_UNWILLING_TO_PERFORM;
368         }
369
370         key = ldb_kv_key_msg(module, key_ctx, msg);
371         if (key.data == NULL) {
372                 TALLOC_FREE(key_ctx);
373                 return LDB_ERR_OTHER;
374         }
375
376         ret = ldb_pack_data(ldb_module_get_ctx(module),
377                             msg, &ldb_data);
378         if (ret == -1) {
379                 TALLOC_FREE(key_ctx);
380                 return LDB_ERR_OTHER;
381         }
382
383         ret = ldb_kv->kv_ops->store(ldb_kv, key, ldb_data, flgs);
384         if (ret != 0) {
385                 bool is_special = ldb_dn_is_special(msg->dn);
386                 ret = ldb_kv->kv_ops->error(ldb_kv);
387
388                 /*
389                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
390                  * the GUID, so re-map
391                  */
392                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS && !is_special &&
393                     ldb_kv->cache->GUID_index_attribute != NULL) {
394                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
395                 }
396                 goto done;
397         }
398
399 done:
400         TALLOC_FREE(key_ctx);
401         talloc_free(ldb_data.data);
402
403         return ret;
404 }
405
406
407 /*
408   check if a attribute is a single valued, for a given element
409  */
410 static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
411                                  struct ldb_message_element *el)
412 {
413         if (!a) return false;
414         if (el != NULL) {
415                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
416                         /* override from a ldb module, for example
417                            used for the description field, which is
418                            marked multi-valued in the schema but which
419                            should not actually accept multiple
420                            values */
421                         return true;
422                 }
423                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
424                         /* override from a ldb module, for example used for
425                            deleted linked attribute entries */
426                         return false;
427                 }
428         }
429         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
430                 return true;
431         }
432         return false;
433 }
434
435 static int ldb_kv_add_internal(struct ldb_module *module,
436                                struct ldb_kv_private *ldb_kv,
437                                const struct ldb_message *msg,
438                                bool check_single_value)
439 {
440         struct ldb_context *ldb = ldb_module_get_ctx(module);
441         int ret = LDB_SUCCESS;
442         unsigned int i;
443
444         for (i=0;i<msg->num_elements;i++) {
445                 struct ldb_message_element *el = &msg->elements[i];
446                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
447
448                 if (el->num_values == 0) {
449                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
450                                                el->name, ldb_dn_get_linearized(msg->dn));
451                         return LDB_ERR_CONSTRAINT_VIOLATION;
452                 }
453                 if (check_single_value && el->num_values > 1 &&
454                     ldb_kv_single_valued(a, el)) {
455                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
456                                                el->name, ldb_dn_get_linearized(msg->dn));
457                         return LDB_ERR_CONSTRAINT_VIOLATION;
458                 }
459
460                 /* Do not check "@ATTRIBUTES" for duplicated values */
461                 if (ldb_dn_is_special(msg->dn) &&
462                     ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
463                         continue;
464                 }
465
466                 if (check_single_value &&
467                     !(el->flags &
468                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
469                         struct ldb_val *duplicate = NULL;
470
471                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
472                                                          el, &duplicate, 0);
473                         if (ret != LDB_SUCCESS) {
474                                 return ret;
475                         }
476                         if (duplicate != NULL) {
477                                 ldb_asprintf_errstring(
478                                         ldb,
479                                         "attribute '%s': value '%.*s' on '%s' "
480                                         "provided more than once in ADD object",
481                                         el->name,
482                                         (int)duplicate->length,
483                                         duplicate->data,
484                                         ldb_dn_get_linearized(msg->dn));
485                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
486                         }
487                 }
488         }
489
490         ret = ldb_kv_store(module, msg, TDB_INSERT);
491         if (ret != LDB_SUCCESS) {
492                 /*
493                  * Try really hard to get the right error code for
494                  * a re-add situation, as this can matter!
495                  */
496                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
497                         int ret2;
498                         struct ldb_dn *dn2 = NULL;
499                         TALLOC_CTX *mem_ctx = talloc_new(module);
500                         if (mem_ctx == NULL) {
501                                 return ldb_module_operr(module);
502                         }
503                         ret2 =
504                             ldb_kv_search_base(module, mem_ctx, msg->dn, &dn2);
505                         TALLOC_FREE(mem_ctx);
506                         if (ret2 == LDB_SUCCESS) {
507                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
508                         }
509                 }
510                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
511                         ldb_asprintf_errstring(ldb,
512                                                "Entry %s already exists",
513                                                ldb_dn_get_linearized(msg->dn));
514                 }
515                 return ret;
516         }
517
518         ret = ldb_kv_index_add_new(module, ldb_kv, msg);
519         if (ret != LDB_SUCCESS) {
520                 /*
521                  * If we failed to index, delete the message again.
522                  *
523                  * This is particularly important for the GUID index
524                  * case, which will only fail for a duplicate DN
525                  * in the index add.
526                  *
527                  * Note that the caller may not cancel the transation
528                  * and this means the above add might really show up!
529                  */
530                 ldb_kv_delete_noindex(module, msg);
531                 return ret;
532         }
533
534         ret = ldb_kv_modified(module, msg->dn);
535
536         return ret;
537 }
538
539 /*
540   add a record to the database
541 */
542 static int ldb_kv_add(struct ldb_kv_context *ctx)
543 {
544         struct ldb_module *module = ctx->module;
545         struct ldb_request *req = ctx->req;
546         void *data = ldb_module_get_private(module);
547         struct ldb_kv_private *ldb_kv =
548             talloc_get_type(data, struct ldb_kv_private);
549         int ret = LDB_SUCCESS;
550
551         if (ldb_kv->max_key_length != 0 &&
552             ldb_kv->cache->GUID_index_attribute == NULL &&
553             !ldb_dn_is_special(req->op.add.message->dn)) {
554                 ldb_set_errstring(ldb_module_get_ctx(module),
555                                   "Must operate ldb_mdb in GUID "
556                                   "index mode, but " LDB_KV_IDXGUID " not set.");
557                 return LDB_ERR_UNWILLING_TO_PERFORM;
558         }
559
560         ret = ldb_kv_check_special_dn(module, req->op.add.message);
561         if (ret != LDB_SUCCESS) {
562                 return ret;
563         }
564
565         ldb_request_set_state(req, LDB_ASYNC_PENDING);
566
567         if (ldb_kv_cache_load(module) != 0) {
568                 return LDB_ERR_OPERATIONS_ERROR;
569         }
570
571         ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
572
573         return ret;
574 }
575
576 /*
577   delete a record from the database, not updating indexes (used for deleting
578   index records)
579 */
580 int ldb_kv_delete_noindex(struct ldb_module *module,
581                           const struct ldb_message *msg)
582 {
583         void *data = ldb_module_get_private(module);
584         struct ldb_kv_private *ldb_kv =
585             talloc_get_type(data, struct ldb_kv_private);
586         struct ldb_val key;
587         int ret;
588         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
589
590         if (tdb_key_ctx == NULL) {
591                 return ldb_module_oom(module);
592         }
593
594         if (ldb_kv->read_only) {
595                 talloc_free(tdb_key_ctx);
596                 return LDB_ERR_UNWILLING_TO_PERFORM;
597         }
598
599         key = ldb_kv_key_msg(module, tdb_key_ctx, msg);
600         if (!key.data) {
601                 TALLOC_FREE(tdb_key_ctx);
602                 return LDB_ERR_OTHER;
603         }
604
605         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
606         TALLOC_FREE(tdb_key_ctx);
607
608         if (ret != 0) {
609                 ret = ldb_kv->kv_ops->error(ldb_kv);
610         }
611
612         return ret;
613 }
614
615 static int ldb_kv_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
616 {
617         struct ldb_message *msg;
618         int ret = LDB_SUCCESS;
619
620         msg = ldb_msg_new(module);
621         if (msg == NULL) {
622                 return LDB_ERR_OPERATIONS_ERROR;
623         }
624
625         /* in case any attribute of the message was indexed, we need
626            to fetch the old record */
627         ret = ldb_kv_search_dn1(
628             module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
629         if (ret != LDB_SUCCESS) {
630                 /* not finding the old record is an error */
631                 goto done;
632         }
633
634         ret = ldb_kv_delete_noindex(module, msg);
635         if (ret != LDB_SUCCESS) {
636                 goto done;
637         }
638
639         /* remove any indexed attributes */
640         ret = ldb_kv_index_delete(module, msg);
641         if (ret != LDB_SUCCESS) {
642                 goto done;
643         }
644
645         ret = ldb_kv_modified(module, dn);
646         if (ret != LDB_SUCCESS) {
647                 goto done;
648         }
649
650 done:
651         talloc_free(msg);
652         return ret;
653 }
654
655 /*
656   delete a record from the database
657 */
658 static int ldb_kv_delete(struct ldb_kv_context *ctx)
659 {
660         struct ldb_module *module = ctx->module;
661         struct ldb_request *req = ctx->req;
662         int ret = LDB_SUCCESS;
663
664         ldb_request_set_state(req, LDB_ASYNC_PENDING);
665
666         if (ldb_kv_cache_load(module) != 0) {
667                 return LDB_ERR_OPERATIONS_ERROR;
668         }
669
670         ret = ldb_kv_delete_internal(module, req->op.del.dn);
671
672         return ret;
673 }
674
675 /*
676   find an element by attribute name. At the moment this does a linear search,
677   it should be re-coded to use a binary search once all places that modify
678   records guarantee sorted order
679
680   return the index of the first matching element if found, otherwise -1
681 */
682 static int ldb_kv_find_element(const struct ldb_message *msg, const char *name)
683 {
684         unsigned int i;
685         for (i=0;i<msg->num_elements;i++) {
686                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
687                         return i;
688                 }
689         }
690         return -1;
691 }
692
693
694 /*
695   add an element to an existing record. Assumes a elements array that we
696   can call re-alloc on, and assumed that we can re-use the data pointers from
697   the passed in additional values. Use with care!
698
699   returns 0 on success, -1 on failure (and sets errno)
700 */
701 static int ldb_kv_msg_add_element(struct ldb_message *msg,
702                                   struct ldb_message_element *el)
703 {
704         struct ldb_message_element *e2;
705         unsigned int i;
706
707         if (el->num_values == 0) {
708                 /* nothing to do here - we don't add empty elements */
709                 return 0;
710         }
711
712         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
713                               msg->num_elements+1);
714         if (!e2) {
715                 errno = ENOMEM;
716                 return -1;
717         }
718
719         msg->elements = e2;
720
721         e2 = &msg->elements[msg->num_elements];
722
723         e2->name = el->name;
724         e2->flags = el->flags;
725         e2->values = talloc_array(msg->elements,
726                                   struct ldb_val, el->num_values);
727         if (!e2->values) {
728                 errno = ENOMEM;
729                 return -1;
730         }
731         for (i=0;i<el->num_values;i++) {
732                 e2->values[i] = el->values[i];
733         }
734         e2->num_values = el->num_values;
735
736         ++msg->num_elements;
737
738         return 0;
739 }
740
741 /*
742   delete all elements having a specified attribute name
743 */
744 static int ldb_kv_msg_delete_attribute(struct ldb_module *module,
745                                        struct ldb_kv_private *ldb_kv,
746                                        struct ldb_message *msg,
747                                        const char *name)
748 {
749         unsigned int i;
750         int ret;
751         struct ldb_message_element *el;
752         bool is_special = ldb_dn_is_special(msg->dn);
753
754         if (!is_special && ldb_kv->cache->GUID_index_attribute != NULL &&
755             ldb_attr_cmp(name, ldb_kv->cache->GUID_index_attribute) == 0) {
756                 struct ldb_context *ldb = ldb_module_get_ctx(module);
757                 ldb_asprintf_errstring(ldb,
758                                        "Must not modify GUID "
759                                        "attribute %s (used as DB index)",
760                                        ldb_kv->cache->GUID_index_attribute);
761                 return LDB_ERR_CONSTRAINT_VIOLATION;
762         }
763
764         el = ldb_msg_find_element(msg, name);
765         if (el == NULL) {
766                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
767         }
768         i = el - msg->elements;
769
770         ret = ldb_kv_index_del_element(module, ldb_kv, msg, el);
771         if (ret != LDB_SUCCESS) {
772                 return ret;
773         }
774
775         talloc_free(el->values);
776         if (msg->num_elements > (i+1)) {
777                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
778         }
779         msg->num_elements--;
780         msg->elements = talloc_realloc(msg, msg->elements,
781                                        struct ldb_message_element,
782                                        msg->num_elements);
783         return LDB_SUCCESS;
784 }
785
786 /*
787   delete all elements matching an attribute name/value
788
789   return LDB Error on failure
790 */
791 static int ldb_kv_msg_delete_element(struct ldb_module *module,
792                                      struct ldb_kv_private *ldb_kv,
793                                      struct ldb_message *msg,
794                                      const char *name,
795                                      const struct ldb_val *val)
796 {
797         struct ldb_context *ldb = ldb_module_get_ctx(module);
798         unsigned int i;
799         int found, ret;
800         struct ldb_message_element *el;
801         const struct ldb_schema_attribute *a;
802
803         found = ldb_kv_find_element(msg, name);
804         if (found == -1) {
805                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
806         }
807
808         i = (unsigned int) found;
809         el = &(msg->elements[i]);
810
811         a = ldb_schema_attribute_by_name(ldb, el->name);
812
813         for (i=0;i<el->num_values;i++) {
814                 bool matched;
815                 if (a->syntax->operator_fn) {
816                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
817                                                      &el->values[i], val, &matched);
818                         if (ret != LDB_SUCCESS) return ret;
819                 } else {
820                         matched = (a->syntax->comparison_fn(ldb, ldb,
821                                                             &el->values[i], val) == 0);
822                 }
823                 if (matched) {
824                         if (el->num_values == 1) {
825                                 return ldb_kv_msg_delete_attribute(
826                                     module, ldb_kv, msg, name);
827                         }
828
829                         ret =
830                             ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
831                         if (ret != LDB_SUCCESS) {
832                                 return ret;
833                         }
834
835                         if (i<el->num_values-1) {
836                                 memmove(&el->values[i], &el->values[i+1],
837                                         sizeof(el->values[i])*
838                                                 (el->num_values-(i+1)));
839                         }
840                         el->num_values--;
841
842                         /* per definition we find in a canonicalised message an
843                            attribute value only once. So we are finished here */
844                         return LDB_SUCCESS;
845                 }
846         }
847
848         /* Not found */
849         return LDB_ERR_NO_SUCH_ATTRIBUTE;
850 }
851
852 /*
853   modify a record - internal interface
854
855   yuck - this is O(n^2). Luckily n is usually small so we probably
856   get away with it, but if we ever have really large attribute lists
857   then we'll need to look at this again
858
859   'req' is optional, and is used to specify controls if supplied
860 */
861 int ldb_kv_modify_internal(struct ldb_module *module,
862                            const struct ldb_message *msg,
863                            struct ldb_request *req)
864 {
865         struct ldb_context *ldb = ldb_module_get_ctx(module);
866         void *data = ldb_module_get_private(module);
867         struct ldb_kv_private *ldb_kv =
868             talloc_get_type(data, struct ldb_kv_private);
869         struct ldb_message *msg2;
870         unsigned int i, j;
871         int ret = LDB_SUCCESS, idx;
872         struct ldb_control *control_permissive = NULL;
873         TALLOC_CTX *mem_ctx = talloc_new(req);
874
875         if (mem_ctx == NULL) {
876                 return ldb_module_oom(module);
877         }
878
879         if (req) {
880                 control_permissive = ldb_request_get_control(req,
881                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
882         }
883
884         msg2 = ldb_msg_new(mem_ctx);
885         if (msg2 == NULL) {
886                 ret = LDB_ERR_OTHER;
887                 goto done;
888         }
889
890         ret = ldb_kv_search_dn1(
891             module, msg->dn, msg2, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
892         if (ret != LDB_SUCCESS) {
893                 goto done;
894         }
895
896         for (i=0; i<msg->num_elements; i++) {
897                 struct ldb_message_element *el = &msg->elements[i], *el2;
898                 struct ldb_val *vals;
899                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
900                 const char *dn;
901                 uint32_t options = 0;
902                 if (control_permissive != NULL) {
903                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
904                 }
905
906                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
907                 case LDB_FLAG_MOD_ADD:
908
909                         if (el->num_values == 0) {
910                                 ldb_asprintf_errstring(ldb,
911                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
912                                                        el->name, ldb_dn_get_linearized(msg2->dn));
913                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
914                                 goto done;
915                         }
916
917                         /* make a copy of the array so that a permissive
918                          * control can remove duplicates without changing the
919                          * original values, but do not copy data as we do not
920                          * need to keep it around once the operation is
921                          * finished */
922                         if (control_permissive) {
923                                 el = talloc(msg2, struct ldb_message_element);
924                                 if (!el) {
925                                         ret = LDB_ERR_OTHER;
926                                         goto done;
927                                 }
928                                 *el = msg->elements[i];
929                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
930                                 if (el->values == NULL) {
931                                         ret = LDB_ERR_OTHER;
932                                         goto done;
933                                 }
934                                 for (j = 0; j < el->num_values; j++) {
935                                         el->values[j] = msg->elements[i].values[j];
936                                 }
937                         }
938
939                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
940                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
941                                                        el->name, ldb_dn_get_linearized(msg2->dn));
942                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
943                                 goto done;
944                         }
945
946                         /* Checks if element already exists */
947                         idx = ldb_kv_find_element(msg2, el->name);
948                         if (idx == -1) {
949                                 if (ldb_kv_msg_add_element(msg2, el) != 0) {
950                                         ret = LDB_ERR_OTHER;
951                                         goto done;
952                                 }
953                                 ret = ldb_kv_index_add_element(
954                                     module, ldb_kv, msg2, el);
955                                 if (ret != LDB_SUCCESS) {
956                                         goto done;
957                                 }
958                         } else {
959                                 j = (unsigned int) idx;
960                                 el2 = &(msg2->elements[j]);
961
962                                 /* We cannot add another value on a existing one
963                                    if the attribute is single-valued */
964                                 if (ldb_kv_single_valued(a, el)) {
965                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
966                                                                el->name, ldb_dn_get_linearized(msg2->dn));
967                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
968                                         goto done;
969                                 }
970
971                                 /* Check that values don't exist yet on multi-
972                                    valued attributes or aren't provided twice */
973                                 if (!(el->flags &
974                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
975                                         struct ldb_val *duplicate = NULL;
976                                         ret = ldb_msg_find_common_values(ldb,
977                                                                          msg2,
978                                                                          el,
979                                                                          el2,
980                                                                          options);
981
982                                         if (ret ==
983                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
984                                                 ldb_asprintf_errstring(ldb,
985                                                         "attribute '%s': value "
986                                                         "#%u on '%s' already "
987                                                         "exists", el->name, j,
988                                                         ldb_dn_get_linearized(msg2->dn));
989                                                 goto done;
990                                         } else if (ret != LDB_SUCCESS) {
991                                                 goto done;
992                                         }
993
994                                         ret = ldb_msg_find_duplicate_val(
995                                                 ldb, msg2, el, &duplicate, 0);
996                                         if (ret != LDB_SUCCESS) {
997                                                 goto done;
998                                         }
999                                         if (duplicate != NULL) {
1000                                                 ldb_asprintf_errstring(
1001                                                         ldb,
1002                                                         "attribute '%s': value "
1003                                                         "'%.*s' on '%s' "
1004                                                         "provided more than "
1005                                                         "once in ADD",
1006                                                         el->name,
1007                                                         (int)duplicate->length,
1008                                                         duplicate->data,
1009                                                         ldb_dn_get_linearized(msg->dn));
1010                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1011                                                 goto done;
1012                                         }
1013                                 }
1014
1015                                 /* Now combine existing and new values to a new
1016                                    attribute record */
1017                                 vals = talloc_realloc(msg2->elements,
1018                                                       el2->values, struct ldb_val,
1019                                                       el2->num_values + el->num_values);
1020                                 if (vals == NULL) {
1021                                         ldb_oom(ldb);
1022                                         ret = LDB_ERR_OTHER;
1023                                         goto done;
1024                                 }
1025
1026                                 for (j=0; j<el->num_values; j++) {
1027                                         vals[el2->num_values + j] =
1028                                                 ldb_val_dup(vals, &el->values[j]);
1029                                 }
1030
1031                                 el2->values = vals;
1032                                 el2->num_values += el->num_values;
1033
1034                                 ret = ldb_kv_index_add_element(
1035                                     module, ldb_kv, msg2, el);
1036                                 if (ret != LDB_SUCCESS) {
1037                                         goto done;
1038                                 }
1039                         }
1040
1041                         break;
1042
1043                 case LDB_FLAG_MOD_REPLACE:
1044
1045                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
1046                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1047                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1048                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1049                                 goto done;
1050                         }
1051
1052                         /*
1053                          * We don't need to check this if we have been
1054                          * pre-screened by the repl_meta_data module
1055                          * in Samba, or someone else who can claim to
1056                          * know what they are doing.
1057                          */
1058                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1059                                 struct ldb_val *duplicate = NULL;
1060
1061                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1062                                                                  &duplicate, 0);
1063                                 if (ret != LDB_SUCCESS) {
1064                                         goto done;
1065                                 }
1066                                 if (duplicate != NULL) {
1067                                         ldb_asprintf_errstring(
1068                                                 ldb,
1069                                                 "attribute '%s': value '%.*s' "
1070                                                 "on '%s' provided more than "
1071                                                 "once in REPLACE",
1072                                                 el->name,
1073                                                 (int)duplicate->length,
1074                                                 duplicate->data,
1075                                                 ldb_dn_get_linearized(msg2->dn));
1076                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1077                                         goto done;
1078                                 }
1079                         }
1080
1081                         /* Checks if element already exists */
1082                         idx = ldb_kv_find_element(msg2, el->name);
1083                         if (idx != -1) {
1084                                 j = (unsigned int) idx;
1085                                 el2 = &(msg2->elements[j]);
1086
1087                                 /* we consider two elements to be
1088                                  * equal only if the order
1089                                  * matches. This allows dbcheck to
1090                                  * fix the ordering on attributes
1091                                  * where order matters, such as
1092                                  * objectClass
1093                                  */
1094                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1095                                         continue;
1096                                 }
1097
1098                                 /* Delete the attribute if it exists in the DB */
1099                                 if (ldb_kv_msg_delete_attribute(
1100                                         module, ldb_kv, msg2, el->name) != 0) {
1101                                         ret = LDB_ERR_OTHER;
1102                                         goto done;
1103                                 }
1104                         }
1105
1106                         /* Recreate it with the new values */
1107                         if (ldb_kv_msg_add_element(msg2, el) != 0) {
1108                                 ret = LDB_ERR_OTHER;
1109                                 goto done;
1110                         }
1111
1112                         ret =
1113                             ldb_kv_index_add_element(module, ldb_kv, msg2, el);
1114                         if (ret != LDB_SUCCESS) {
1115                                 goto done;
1116                         }
1117
1118                         break;
1119
1120                 case LDB_FLAG_MOD_DELETE:
1121                         dn = ldb_dn_get_linearized(msg2->dn);
1122                         if (dn == NULL) {
1123                                 ret = LDB_ERR_OTHER;
1124                                 goto done;
1125                         }
1126
1127                         if (msg->elements[i].num_values == 0) {
1128                                 /* Delete the whole attribute */
1129                                 ret = ldb_kv_msg_delete_attribute(
1130                                     module,
1131                                     ldb_kv,
1132                                     msg2,
1133                                     msg->elements[i].name);
1134                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1135                                     control_permissive) {
1136                                         ret = LDB_SUCCESS;
1137                                 } else {
1138                                         ldb_asprintf_errstring(ldb,
1139                                                                "attribute '%s': no such attribute for delete on '%s'",
1140                                                                msg->elements[i].name, dn);
1141                                 }
1142                                 if (ret != LDB_SUCCESS) {
1143                                         goto done;
1144                                 }
1145                         } else {
1146                                 /* Delete specified values from an attribute */
1147                                 for (j=0; j < msg->elements[i].num_values; j++) {
1148                                         ret = ldb_kv_msg_delete_element(
1149                                             module,
1150                                             ldb_kv,
1151                                             msg2,
1152                                             msg->elements[i].name,
1153                                             &msg->elements[i].values[j]);
1154                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1155                                             control_permissive) {
1156                                                 ret = LDB_SUCCESS;
1157                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1158                                                 ldb_asprintf_errstring(ldb,
1159                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1160                                                                        msg->elements[i].name, dn);
1161                                         }
1162                                         if (ret != LDB_SUCCESS) {
1163                                                 goto done;
1164                                         }
1165                                 }
1166                         }
1167                         break;
1168                 default:
1169                         ldb_asprintf_errstring(ldb,
1170                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1171                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1172                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1173                         ret = LDB_ERR_PROTOCOL_ERROR;
1174                         goto done;
1175                 }
1176         }
1177
1178         ret = ldb_kv_store(module, msg2, TDB_MODIFY);
1179         if (ret != LDB_SUCCESS) {
1180                 goto done;
1181         }
1182
1183         ret = ldb_kv_modified(module, msg2->dn);
1184         if (ret != LDB_SUCCESS) {
1185                 goto done;
1186         }
1187
1188 done:
1189         TALLOC_FREE(mem_ctx);
1190         return ret;
1191 }
1192
1193 /*
1194   modify a record
1195 */
1196 static int ldb_kv_modify(struct ldb_kv_context *ctx)
1197 {
1198         struct ldb_module *module = ctx->module;
1199         struct ldb_request *req = ctx->req;
1200         int ret = LDB_SUCCESS;
1201
1202         ret = ldb_kv_check_special_dn(module, req->op.mod.message);
1203         if (ret != LDB_SUCCESS) {
1204                 return ret;
1205         }
1206
1207         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1208
1209         if (ldb_kv_cache_load(module) != 0) {
1210                 return LDB_ERR_OPERATIONS_ERROR;
1211         }
1212
1213         ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
1214
1215         return ret;
1216 }
1217
1218 /*
1219   rename a record
1220 */
1221 static int ldb_kv_rename(struct ldb_kv_context *ctx)
1222 {
1223         struct ldb_module *module = ctx->module;
1224         void *data = ldb_module_get_private(module);
1225         struct ldb_kv_private *ldb_kv =
1226             talloc_get_type(data, struct ldb_kv_private);
1227         struct ldb_request *req = ctx->req;
1228         struct ldb_message *msg;
1229         int ret = LDB_SUCCESS;
1230         struct ldb_val  key, key_old;
1231         struct ldb_dn *db_dn;
1232
1233         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1234
1235         if (ldb_kv_cache_load(ctx->module) != 0) {
1236                 return LDB_ERR_OPERATIONS_ERROR;
1237         }
1238
1239         msg = ldb_msg_new(ctx);
1240         if (msg == NULL) {
1241                 return LDB_ERR_OPERATIONS_ERROR;
1242         }
1243
1244         /* we need to fetch the old record to re-add under the new name */
1245         ret = ldb_kv_search_dn1(module,
1246                                 req->op.rename.olddn,
1247                                 msg,
1248                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1249         if (ret != LDB_SUCCESS) {
1250                 /* not finding the old record is an error */
1251                 return ret;
1252         }
1253
1254         /* We need to, before changing the DB, check if the new DN
1255          * exists, so we can return this error to the caller with an
1256          * unmodified DB
1257          *
1258          * Even in GUID index mode we use ltdb_key_dn() as we are
1259          * trying to figure out if this is just a case rename
1260          */
1261         key = ldb_kv_key_dn(module, msg, req->op.rename.newdn);
1262         if (!key.data) {
1263                 talloc_free(msg);
1264                 return LDB_ERR_OPERATIONS_ERROR;
1265         }
1266
1267         key_old = ldb_kv_key_dn(module, msg, req->op.rename.olddn);
1268         if (!key_old.data) {
1269                 talloc_free(msg);
1270                 talloc_free(key.data);
1271                 return LDB_ERR_OPERATIONS_ERROR;
1272         }
1273
1274         /*
1275          * Only declare a conflict if the new DN already exists,
1276          * and it isn't a case change on the old DN
1277          */
1278         if (key_old.length != key.length
1279             || memcmp(key.data, key_old.data, key.length) != 0) {
1280                 ret = ldb_kv_search_base(
1281                     module, msg, req->op.rename.newdn, &db_dn);
1282                 if (ret == LDB_SUCCESS) {
1283                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1284                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1285                         ret = LDB_SUCCESS;
1286                 }
1287         }
1288
1289         /* finding the new record already in the DB is an error */
1290
1291         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1292                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1293                                        "Entry %s already exists",
1294                                        ldb_dn_get_linearized(req->op.rename.newdn));
1295         }
1296         if (ret != LDB_SUCCESS) {
1297                 talloc_free(key_old.data);
1298                 talloc_free(key.data);
1299                 talloc_free(msg);
1300                 return ret;
1301         }
1302
1303         talloc_free(key_old.data);
1304         talloc_free(key.data);
1305
1306         /* Always delete first then add, to avoid conflicts with
1307          * unique indexes. We rely on the transaction to make this
1308          * atomic
1309          */
1310         ret = ldb_kv_delete_internal(module, msg->dn);
1311         if (ret != LDB_SUCCESS) {
1312                 talloc_free(msg);
1313                 return ret;
1314         }
1315
1316         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1317         if (msg->dn == NULL) {
1318                 talloc_free(msg);
1319                 return LDB_ERR_OPERATIONS_ERROR;
1320         }
1321
1322         /* We don't check single value as we can have more than 1 with
1323          * deleted attributes. We could go through all elements but that's
1324          * maybe not the most efficient way
1325          */
1326         ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
1327
1328         talloc_free(msg);
1329
1330         return ret;
1331 }
1332
1333 static int ldb_kv_start_trans(struct ldb_module *module)
1334 {
1335         void *data = ldb_module_get_private(module);
1336         struct ldb_kv_private *ldb_kv =
1337             talloc_get_type(data, struct ldb_kv_private);
1338
1339         pid_t pid = getpid();
1340
1341         if (ldb_kv->pid != pid) {
1342                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
1343                                        __location__
1344                                        ": Reusing ldb opend by pid %d in "
1345                                        "process %d\n",
1346                                        ldb_kv->pid,
1347                                        pid);
1348                 return LDB_ERR_PROTOCOL_ERROR;
1349         }
1350
1351         /* Do not take out the transaction lock on a read-only DB */
1352         if (ldb_kv->read_only) {
1353                 return LDB_ERR_UNWILLING_TO_PERFORM;
1354         }
1355
1356         if (ldb_kv->kv_ops->begin_write(ldb_kv) != 0) {
1357                 return ldb_kv->kv_ops->error(ldb_kv);
1358         }
1359
1360         ldb_kv_index_transaction_start(module);
1361
1362         ldb_kv->reindex_failed = false;
1363
1364         return LDB_SUCCESS;
1365 }
1366
1367 /*
1368  * Forward declaration to allow prepare_commit to in fact abort the
1369  * transaction
1370  */
1371 static int ldb_kv_del_trans(struct ldb_module *module);
1372
1373 static int ldb_kv_prepare_commit(struct ldb_module *module)
1374 {
1375         int ret;
1376         void *data = ldb_module_get_private(module);
1377         struct ldb_kv_private *ldb_kv =
1378             talloc_get_type(data, struct ldb_kv_private);
1379         pid_t pid = getpid();
1380
1381         if (ldb_kv->pid != pid) {
1382                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1383                                        __location__
1384                                        ": Reusing ldb opend by pid %d in "
1385                                        "process %d\n",
1386                                        ldb_kv->pid,
1387                                        pid);
1388                 return LDB_ERR_PROTOCOL_ERROR;
1389         }
1390
1391         if (!ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1392                 ldb_set_errstring(ldb_module_get_ctx(module),
1393                                   "ltdb_prepare_commit() called "
1394                                   "without transaction active");
1395                 return LDB_ERR_OPERATIONS_ERROR;
1396         }
1397
1398         /*
1399          * Check if the last re-index failed.
1400          *
1401          * This can happen if for example a duplicate value was marked
1402          * unique.  We must not write a partial re-index into the DB.
1403          */
1404         if (ldb_kv->reindex_failed) {
1405                 /*
1406                  * We must instead abort the transaction so we get the
1407                  * old values and old index back
1408                  */
1409                 ldb_kv_del_trans(module);
1410                 ldb_set_errstring(ldb_module_get_ctx(module),
1411                                   "Failure during re-index, so "
1412                                   "transaction must be aborted.");
1413                 return LDB_ERR_OPERATIONS_ERROR;
1414         }
1415
1416         ret = ldb_kv_index_transaction_commit(module);
1417         if (ret != LDB_SUCCESS) {
1418                 ldb_kv->kv_ops->abort_write(ldb_kv);
1419                 return ret;
1420         }
1421
1422         if (ldb_kv->kv_ops->prepare_write(ldb_kv) != 0) {
1423                 ret = ldb_kv->kv_ops->error(ldb_kv);
1424                 ldb_debug_set(ldb_module_get_ctx(module),
1425                               LDB_DEBUG_FATAL,
1426                               "Failure during "
1427                               "prepare_write): %s -> %s",
1428                               ldb_kv->kv_ops->errorstr(ldb_kv),
1429                               ldb_strerror(ret));
1430                 return ret;
1431         }
1432
1433         ldb_kv->prepared_commit = true;
1434
1435         return LDB_SUCCESS;
1436 }
1437
1438 static int ldb_kv_end_trans(struct ldb_module *module)
1439 {
1440         int ret;
1441         void *data = ldb_module_get_private(module);
1442         struct ldb_kv_private *ldb_kv =
1443             talloc_get_type(data, struct ldb_kv_private);
1444
1445         if (!ldb_kv->prepared_commit) {
1446                 ret = ldb_kv_prepare_commit(module);
1447                 if (ret != LDB_SUCCESS) {
1448                         return ret;
1449                 }
1450         }
1451
1452         ldb_kv->prepared_commit = false;
1453
1454         if (ldb_kv->kv_ops->finish_write(ldb_kv) != 0) {
1455                 ret = ldb_kv->kv_ops->error(ldb_kv);
1456                 ldb_asprintf_errstring(
1457                     ldb_module_get_ctx(module),
1458                     "Failure during tdb_transaction_commit(): %s -> %s",
1459                     ldb_kv->kv_ops->errorstr(ldb_kv),
1460                     ldb_strerror(ret));
1461                 return ret;
1462         }
1463
1464         return LDB_SUCCESS;
1465 }
1466
1467 static int ldb_kv_del_trans(struct ldb_module *module)
1468 {
1469         void *data = ldb_module_get_private(module);
1470         struct ldb_kv_private *ldb_kv =
1471             talloc_get_type(data, struct ldb_kv_private);
1472
1473         if (ldb_kv_index_transaction_cancel(module) != 0) {
1474                 ldb_kv->kv_ops->abort_write(ldb_kv);
1475                 return ldb_kv->kv_ops->error(ldb_kv);
1476         }
1477
1478         ldb_kv->kv_ops->abort_write(ldb_kv);
1479         return LDB_SUCCESS;
1480 }
1481
1482 /*
1483   return sequenceNumber from @BASEINFO
1484 */
1485 static int ldb_kv_sequence_number(struct ldb_kv_context *ctx,
1486                                   struct ldb_extended **ext)
1487 {
1488         struct ldb_context *ldb;
1489         struct ldb_module *module = ctx->module;
1490         struct ldb_request *req = ctx->req;
1491         void *data = ldb_module_get_private(module);
1492         struct ldb_kv_private *ldb_kv =
1493             talloc_get_type(data, struct ldb_kv_private);
1494         TALLOC_CTX *tmp_ctx = NULL;
1495         struct ldb_seqnum_request *seq;
1496         struct ldb_seqnum_result *res;
1497         struct ldb_message *msg = NULL;
1498         struct ldb_dn *dn;
1499         const char *date;
1500         int ret = LDB_SUCCESS;
1501
1502         ldb = ldb_module_get_ctx(module);
1503
1504         seq = talloc_get_type(req->op.extended.data,
1505                                 struct ldb_seqnum_request);
1506         if (seq == NULL) {
1507                 return LDB_ERR_OPERATIONS_ERROR;
1508         }
1509
1510         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1511
1512         if (ldb_kv->kv_ops->lock_read(module) != 0) {
1513                 return LDB_ERR_OPERATIONS_ERROR;
1514         }
1515
1516         res = talloc_zero(req, struct ldb_seqnum_result);
1517         if (res == NULL) {
1518                 ret = LDB_ERR_OPERATIONS_ERROR;
1519                 goto done;
1520         }
1521
1522         tmp_ctx = talloc_new(req);
1523         if (tmp_ctx == NULL) {
1524                 ret = LDB_ERR_OPERATIONS_ERROR;
1525                 goto done;
1526         }
1527
1528         dn = ldb_dn_new(tmp_ctx, ldb, LDB_KV_BASEINFO);
1529         if (dn == NULL) {
1530                 ret = LDB_ERR_OPERATIONS_ERROR;
1531                 goto done;
1532         }
1533
1534         msg = ldb_msg_new(tmp_ctx);
1535         if (msg == NULL) {
1536                 ret = LDB_ERR_OPERATIONS_ERROR;
1537                 goto done;
1538         }
1539
1540         ret = ldb_kv_search_dn1(module, dn, msg, 0);
1541         if (ret != LDB_SUCCESS) {
1542                 goto done;
1543         }
1544
1545         switch (seq->type) {
1546         case LDB_SEQ_HIGHEST_SEQ:
1547                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1548                 break;
1549         case LDB_SEQ_NEXT:
1550                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1551                 res->seq_num++;
1552                 break;
1553         case LDB_SEQ_HIGHEST_TIMESTAMP:
1554                 date = ldb_msg_find_attr_as_string(msg, LDB_KV_MOD_TIMESTAMP, NULL);
1555                 if (date) {
1556                         res->seq_num = ldb_string_to_time(date);
1557                 } else {
1558                         res->seq_num = 0;
1559                         /* zero is as good as anything when we don't know */
1560                 }
1561                 break;
1562         }
1563
1564         *ext = talloc_zero(req, struct ldb_extended);
1565         if (*ext == NULL) {
1566                 ret = LDB_ERR_OPERATIONS_ERROR;
1567                 goto done;
1568         }
1569         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1570         (*ext)->data = talloc_steal(*ext, res);
1571
1572 done:
1573         talloc_free(tmp_ctx);
1574
1575         ldb_kv->kv_ops->unlock_read(module);
1576         return ret;
1577 }
1578
1579 static void ldb_kv_request_done(struct ldb_kv_context *ctx, int error)
1580 {
1581         struct ldb_context *ldb;
1582         struct ldb_request *req;
1583         struct ldb_reply *ares;
1584
1585         ldb = ldb_module_get_ctx(ctx->module);
1586         req = ctx->req;
1587
1588         /* if we already returned an error just return */
1589         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1590                 return;
1591         }
1592
1593         ares = talloc_zero(req, struct ldb_reply);
1594         if (!ares) {
1595                 ldb_oom(ldb);
1596                 req->callback(req, NULL);
1597                 return;
1598         }
1599         ares->type = LDB_REPLY_DONE;
1600         ares->error = error;
1601
1602         req->callback(req, ares);
1603 }
1604
1605 static void ldb_kv_timeout(struct tevent_context *ev,
1606                            struct tevent_timer *te,
1607                            struct timeval t,
1608                            void *private_data)
1609 {
1610         struct ldb_kv_context *ctx;
1611         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1612
1613         if (!ctx->request_terminated) {
1614                 /* request is done now */
1615                 ldb_kv_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1616         }
1617
1618         if (ctx->spy) {
1619                 /* neutralize the spy */
1620                 ctx->spy->ctx = NULL;
1621                 ctx->spy = NULL;
1622         }
1623         talloc_free(ctx);
1624 }
1625
1626 static void ldb_kv_request_extended_done(struct ldb_kv_context *ctx,
1627                                          struct ldb_extended *ext,
1628                                          int error)
1629 {
1630         struct ldb_context *ldb;
1631         struct ldb_request *req;
1632         struct ldb_reply *ares;
1633
1634         ldb = ldb_module_get_ctx(ctx->module);
1635         req = ctx->req;
1636
1637         /* if we already returned an error just return */
1638         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1639                 return;
1640         }
1641
1642         ares = talloc_zero(req, struct ldb_reply);
1643         if (!ares) {
1644                 ldb_oom(ldb);
1645                 req->callback(req, NULL);
1646                 return;
1647         }
1648         ares->type = LDB_REPLY_DONE;
1649         ares->response = ext;
1650         ares->error = error;
1651
1652         req->callback(req, ares);
1653 }
1654
1655 static void ldb_kv_handle_extended(struct ldb_kv_context *ctx)
1656 {
1657         struct ldb_extended *ext = NULL;
1658         int ret;
1659
1660         if (strcmp(ctx->req->op.extended.oid,
1661                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1662                 /* get sequence number */
1663                 ret = ldb_kv_sequence_number(ctx, &ext);
1664         } else {
1665                 /* not recognized */
1666                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1667         }
1668
1669         ldb_kv_request_extended_done(ctx, ext, ret);
1670 }
1671
1672 static void ldb_kv_callback(struct tevent_context *ev,
1673                             struct tevent_timer *te,
1674                             struct timeval t,
1675                             void *private_data)
1676 {
1677         struct ldb_kv_context *ctx;
1678         int ret;
1679
1680         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1681
1682         if (ctx->request_terminated) {
1683                 goto done;
1684         }
1685
1686         switch (ctx->req->operation) {
1687         case LDB_SEARCH:
1688                 ret = ldb_kv_search(ctx);
1689                 break;
1690         case LDB_ADD:
1691                 ret = ldb_kv_add(ctx);
1692                 break;
1693         case LDB_MODIFY:
1694                 ret = ldb_kv_modify(ctx);
1695                 break;
1696         case LDB_DELETE:
1697                 ret = ldb_kv_delete(ctx);
1698                 break;
1699         case LDB_RENAME:
1700                 ret = ldb_kv_rename(ctx);
1701                 break;
1702         case LDB_EXTENDED:
1703                 ldb_kv_handle_extended(ctx);
1704                 goto done;
1705         default:
1706                 /* no other op supported */
1707                 ret = LDB_ERR_PROTOCOL_ERROR;
1708         }
1709
1710         if (!ctx->request_terminated) {
1711                 /* request is done now */
1712                 ldb_kv_request_done(ctx, ret);
1713         }
1714
1715 done:
1716         if (ctx->spy) {
1717                 /* neutralize the spy */
1718                 ctx->spy->ctx = NULL;
1719                 ctx->spy = NULL;
1720         }
1721         talloc_free(ctx);
1722 }
1723
1724 static int ldb_kv_request_destructor(void *ptr)
1725 {
1726         struct ldb_kv_req_spy *spy =
1727             talloc_get_type(ptr, struct ldb_kv_req_spy);
1728
1729         if (spy->ctx != NULL) {
1730                 spy->ctx->spy = NULL;
1731                 spy->ctx->request_terminated = true;
1732                 spy->ctx = NULL;
1733         }
1734
1735         return 0;
1736 }
1737
1738 static int ldb_kv_handle_request(struct ldb_module *module,
1739                                  struct ldb_request *req)
1740 {
1741         struct ldb_control *control_permissive;
1742         struct ldb_context *ldb;
1743         struct tevent_context *ev;
1744         struct ldb_kv_context *ac;
1745         struct tevent_timer *te;
1746         struct timeval tv;
1747         unsigned int i;
1748
1749         ldb = ldb_module_get_ctx(module);
1750
1751         control_permissive = ldb_request_get_control(req,
1752                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1753
1754         for (i = 0; req->controls && req->controls[i]; i++) {
1755                 if (req->controls[i]->critical &&
1756                     req->controls[i] != control_permissive) {
1757                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1758                                                req->controls[i]->oid);
1759                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1760                 }
1761         }
1762
1763         if (req->starttime == 0 || req->timeout == 0) {
1764                 ldb_set_errstring(ldb, "Invalid timeout settings");
1765                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1766         }
1767
1768         ev = ldb_handle_get_event_context(req->handle);
1769
1770         ac = talloc_zero(ldb, struct ldb_kv_context);
1771         if (ac == NULL) {
1772                 ldb_oom(ldb);
1773                 return LDB_ERR_OPERATIONS_ERROR;
1774         }
1775
1776         ac->module = module;
1777         ac->req = req;
1778
1779         tv.tv_sec = 0;
1780         tv.tv_usec = 0;
1781         te = tevent_add_timer(ev, ac, tv, ldb_kv_callback, ac);
1782         if (NULL == te) {
1783                 talloc_free(ac);
1784                 return LDB_ERR_OPERATIONS_ERROR;
1785         }
1786
1787         if (req->timeout > 0) {
1788                 tv.tv_sec = req->starttime + req->timeout;
1789                 tv.tv_usec = 0;
1790                 ac->timeout_event =
1791                     tevent_add_timer(ev, ac, tv, ldb_kv_timeout, ac);
1792                 if (NULL == ac->timeout_event) {
1793                         talloc_free(ac);
1794                         return LDB_ERR_OPERATIONS_ERROR;
1795                 }
1796         }
1797
1798         /* set a spy so that we do not try to use the request context
1799          * if it is freed before ltdb_callback fires */
1800         ac->spy = talloc(req, struct ldb_kv_req_spy);
1801         if (NULL == ac->spy) {
1802                 talloc_free(ac);
1803                 return LDB_ERR_OPERATIONS_ERROR;
1804         }
1805         ac->spy->ctx = ac;
1806
1807         talloc_set_destructor((TALLOC_CTX *)ac->spy, ldb_kv_request_destructor);
1808
1809         return LDB_SUCCESS;
1810 }
1811
1812 static int ldb_kv_init_rootdse(struct ldb_module *module)
1813 {
1814         /* ignore errors on this - we expect it for non-sam databases */
1815         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1816
1817         /* there can be no module beyond the backend, just return */
1818         return LDB_SUCCESS;
1819 }
1820
1821 static int ldb_kv_lock_read(struct ldb_module *module)
1822 {
1823         void *data = ldb_module_get_private(module);
1824         struct ldb_kv_private *ldb_kv =
1825             talloc_get_type(data, struct ldb_kv_private);
1826         return ldb_kv->kv_ops->lock_read(module);
1827 }
1828
1829 static int ldb_kv_unlock_read(struct ldb_module *module)
1830 {
1831         void *data = ldb_module_get_private(module);
1832         struct ldb_kv_private *ldb_kv =
1833             talloc_get_type(data, struct ldb_kv_private);
1834         return ldb_kv->kv_ops->unlock_read(module);
1835 }
1836
1837 static const struct ldb_module_ops ldb_kv_ops = {
1838     .name = "tdb",
1839     .init_context = ldb_kv_init_rootdse,
1840     .search = ldb_kv_handle_request,
1841     .add = ldb_kv_handle_request,
1842     .modify = ldb_kv_handle_request,
1843     .del = ldb_kv_handle_request,
1844     .rename = ldb_kv_handle_request,
1845     .extended = ldb_kv_handle_request,
1846     .start_transaction = ldb_kv_start_trans,
1847     .end_transaction = ldb_kv_end_trans,
1848     .prepare_commit = ldb_kv_prepare_commit,
1849     .del_transaction = ldb_kv_del_trans,
1850     .read_lock = ldb_kv_lock_read,
1851     .read_unlock = ldb_kv_unlock_read,
1852 };
1853
1854 int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
1855                       const char *name,
1856                       struct ldb_context *ldb,
1857                       const char *options[],
1858                       struct ldb_module **_module)
1859 {
1860         if (getenv("LDB_WARN_UNINDEXED")) {
1861                 ldb_kv->warn_unindexed = true;
1862         }
1863
1864         if (getenv("LDB_WARN_REINDEX")) {
1865                 ldb_kv->warn_reindex = true;
1866         }
1867
1868         ldb_kv->sequence_number = 0;
1869
1870         ldb_kv->pid = getpid();
1871
1872         ldb_kv->module = ldb_module_new(ldb, ldb, name, &ldb_kv_ops);
1873         if (!ldb_kv->module) {
1874                 ldb_oom(ldb);
1875                 talloc_free(ldb_kv);
1876                 return LDB_ERR_OPERATIONS_ERROR;
1877         }
1878         ldb_module_set_private(ldb_kv->module, ldb_kv);
1879         talloc_steal(ldb_kv->module, ldb_kv);
1880
1881         if (ldb_kv_cache_load(ldb_kv->module) != 0) {
1882                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
1883                                        "records for backend '%s'", name);
1884                 talloc_free(ldb_kv->module);
1885                 return LDB_ERR_OPERATIONS_ERROR;
1886         }
1887
1888         *_module = ldb_kv->module;
1889         /*
1890          * Set or override the maximum key length
1891          *
1892          * The ldb_mdb code will have set this to 511, but our tests
1893          * set this even smaller (to make the tests more practical).
1894          *
1895          * This must only be used for the selftest as the length
1896          * becomes encoded in the index keys.
1897          */
1898         {
1899                 const char *len_str =
1900                         ldb_options_find(ldb, options,
1901                                          "max_key_len_for_self_test");
1902                 if (len_str != NULL) {
1903                         unsigned len = strtoul(len_str, NULL, 0);
1904                         ldb_kv->max_key_length = len;
1905                 }
1906         }
1907
1908         /*
1909          * Override full DB scans
1910          *
1911          * A full DB scan is expensive on a large database.  This
1912          * option is for testing to show that the full DB scan is not
1913          * triggered.
1914          */
1915         {
1916                 const char *len_str =
1917                         ldb_options_find(ldb, options,
1918                                          "disable_full_db_scan_for_self_test");
1919                 if (len_str != NULL) {
1920                         ldb_kv->disable_full_db_scan = true;
1921                 }
1922         }
1923
1924         return LDB_SUCCESS;
1925 }