bd35feed14b1a08454ed7c323dc593ec3d911dc3
[nivanova/samba-autobuild/.git] / lib / ldb / ldb_key_value / ldb_kv.c
1 /*
2    ldb database library
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2004
6    Copyright (C) Simo Sorce 2006-2008
7    Copyright (C) Matthias Dieter Wallnöfer 2009-2010
8
9      ** NOTE! The following LGPL license applies to the ldb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 /*
28  *  Name: ldb_kv
29  *
30  *  Component: ldb key value backend
31  *
32  *  Description: core functions for ldb key value backend
33  *
34  *  Author: Andrew Tridgell
35  *  Author: Stefan Metzmacher
36  *
37  *  Modifications:
38  *
39  *  - description: make the module use asynchronous calls
40  *    date: Feb 2006
41  *    Author: Simo Sorce
42  *
43  *  - description: make it possible to use event contexts
44  *    date: Jan 2008
45  *    Author: Simo Sorce
46  *
47  *  - description: fix up memory leaks and small bugs
48  *    date: Oct 2009
49  *    Author: Matthias Dieter Wallnöfer
50  */
51
52 #include "ldb_kv.h"
53 #include "ldb_private.h"
54
55 /*
56   prevent memory errors on callbacks
57 */
58 struct ldb_kv_req_spy {
59         struct ldb_kv_context *ctx;
60 };
61
62 /*
63  * Determine if this key could hold a record.  We allow the new GUID
64  * index, the old DN index and a possible future ID=
65  */
66 bool ldb_kv_key_is_record(struct ldb_val key)
67 {
68         if (key.length < 4) {
69                 return false;
70         }
71
72         if (memcmp(key.data, "DN=", 3) == 0) {
73                 return true;
74         }
75
76         if (memcmp(key.data, "ID=", 3) == 0) {
77                 return true;
78         }
79
80         if (key.length < sizeof(LDB_KV_GUID_KEY_PREFIX)) {
81                 return false;
82         }
83
84         if (memcmp(key.data, LDB_KV_GUID_KEY_PREFIX,
85                    sizeof(LDB_KV_GUID_KEY_PREFIX) - 1) == 0) {
86                 return true;
87         }
88
89         return false;
90 }
91
92 /*
93   form a ldb_val for a record key
94   caller frees
95
96   note that the key for a record can depend on whether the
97   dn refers to a case sensitive index record or not
98 */
99 struct ldb_val ldb_kv_key_dn(struct ldb_module *module,
100                              TALLOC_CTX *mem_ctx,
101                              struct ldb_dn *dn)
102 {
103         struct ldb_val key;
104         char *key_str = NULL;
105         const char *dn_folded = NULL;
106
107         /*
108           most DNs are case insensitive. The exception is index DNs for
109           case sensitive attributes
110
111           there are 3 cases dealt with in this code:
112
113           1) if the dn doesn't start with @ then uppercase the attribute
114              names and the attributes values of case insensitive attributes
115           2) if the dn starts with @ then leave it alone -
116              the indexing code handles the rest
117         */
118
119         dn_folded = ldb_dn_get_casefold(dn);
120         if (!dn_folded) {
121                 goto failed;
122         }
123
124         key_str = talloc_strdup(mem_ctx, "DN=");
125         if (!key_str) {
126                 goto failed;
127         }
128
129         key_str = talloc_strdup_append_buffer(key_str, dn_folded);
130         if (!key_str) {
131                 goto failed;
132         }
133
134         key.data = (uint8_t *)key_str;
135         key.length = strlen(key_str) + 1;
136
137         return key;
138
139 failed:
140         errno = ENOMEM;
141         key.data = NULL;
142         key.length = 0;
143         return key;
144 }
145
146 /* The caller is to provide a correctly sized key */
147 int ldb_kv_guid_to_key(struct ldb_module *module,
148                        struct ldb_kv_private *ldb_kv,
149                        const struct ldb_val *GUID_val,
150                        struct ldb_val *key)
151 {
152         const char *GUID_prefix = LDB_KV_GUID_KEY_PREFIX;
153         const int GUID_prefix_len = sizeof(LDB_KV_GUID_KEY_PREFIX) - 1;
154
155         if (key->length != (GUID_val->length+GUID_prefix_len)) {
156                 return LDB_ERR_OPERATIONS_ERROR;
157         }
158
159         memcpy(key->data, GUID_prefix, GUID_prefix_len);
160         memcpy(&key->data[GUID_prefix_len],
161                GUID_val->data, GUID_val->length);
162         return LDB_SUCCESS;
163 }
164
165 /*
166  * The caller is to provide a correctly sized key, used only in
167  * the GUID index mode
168  */
169 int ldb_kv_idx_to_key(struct ldb_module *module,
170                       struct ldb_kv_private *ldb_kv,
171                       TALLOC_CTX *mem_ctx,
172                       const struct ldb_val *idx_val,
173                       struct ldb_val *key)
174 {
175         struct ldb_context *ldb = ldb_module_get_ctx(module);
176         struct ldb_dn *dn;
177
178         if (ldb_kv->cache->GUID_index_attribute != NULL) {
179                 return ldb_kv_guid_to_key(module, ldb_kv, idx_val, key);
180         }
181
182         dn = ldb_dn_from_ldb_val(mem_ctx, ldb, idx_val);
183         if (dn == NULL) {
184                 /*
185                  * LDB_ERR_INVALID_DN_SYNTAX would just be confusing
186                  * to the caller, as this in an invalid index value
187                  */
188                 return LDB_ERR_OPERATIONS_ERROR;
189         }
190         /* form the key */
191         *key = ldb_kv_key_dn(module, mem_ctx, dn);
192         TALLOC_FREE(dn);
193         if (!key->data) {
194                 return ldb_module_oom(module);
195         }
196         return LDB_SUCCESS;
197 }
198
199 /*
200   form a TDB_DATA for a record key
201   caller frees mem_ctx, which may or may not have the key
202   as a child.
203
204   note that the key for a record can depend on whether a
205   GUID index is in use, or the DN is used as the key
206 */
207 struct ldb_val ldb_kv_key_msg(struct ldb_module *module,
208                         TALLOC_CTX *mem_ctx,
209                         const struct ldb_message *msg)
210 {
211         void *data = ldb_module_get_private(module);
212         struct ldb_kv_private *ldb_kv =
213             talloc_get_type(data, struct ldb_kv_private);
214         struct ldb_val key;
215         const struct ldb_val *guid_val;
216         int ret;
217
218         if (ldb_kv->cache->GUID_index_attribute == NULL) {
219                 return ldb_kv_key_dn(module, mem_ctx, msg->dn);
220         }
221
222         if (ldb_dn_is_special(msg->dn)) {
223                 return ldb_kv_key_dn(module, mem_ctx, msg->dn);
224         }
225
226         guid_val =
227             ldb_msg_find_ldb_val(msg, ldb_kv->cache->GUID_index_attribute);
228         if (guid_val == NULL) {
229                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
230                                        "Did not find GUID attribute %s "
231                                        "in %s, required for TDB record "
232                                        "key in " LDB_KV_IDXGUID " mode.",
233                                        ldb_kv->cache->GUID_index_attribute,
234                                        ldb_dn_get_linearized(msg->dn));
235                 errno = EINVAL;
236                 key.data = NULL;
237                 key.length = 0;
238                 return key;
239         }
240
241         /* In this case, allocate with talloc */
242         key.data = talloc_size(mem_ctx, LDB_KV_GUID_KEY_SIZE);
243         if (key.data == NULL) {
244                 errno = ENOMEM;
245                 key.data = NULL;
246                 key.length = 0;
247                 return key;
248         }
249         key.length = talloc_get_size(key.data);
250
251         ret = ldb_kv_guid_to_key(module, ldb_kv, guid_val, &key);
252
253         if (ret != LDB_SUCCESS) {
254                 errno = EINVAL;
255                 key.data = NULL;
256                 key.length = 0;
257                 return key;
258         }
259         return key;
260 }
261
262 /*
263   check special dn's have valid attributes
264   currently only @ATTRIBUTES is checked
265 */
266 static int ldb_kv_check_special_dn(struct ldb_module *module,
267                                    const struct ldb_message *msg)
268 {
269         struct ldb_context *ldb = ldb_module_get_ctx(module);
270         unsigned int i, j;
271
272         if (! ldb_dn_is_special(msg->dn) ||
273             ! ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
274                 return LDB_SUCCESS;
275         }
276
277         /* we have @ATTRIBUTES, let's check attributes are fine */
278         /* should we check that we deny multivalued attributes ? */
279         for (i = 0; i < msg->num_elements; i++) {
280                 if (ldb_attr_cmp(msg->elements[i].name, "distinguishedName") == 0) continue;
281
282                 for (j = 0; j < msg->elements[i].num_values; j++) {
283                         if (ldb_kv_check_at_attributes_values(
284                                 &msg->elements[i].values[j]) != 0) {
285                                 ldb_set_errstring(ldb, "Invalid attribute value in an @ATTRIBUTES entry");
286                                 return LDB_ERR_INVALID_ATTRIBUTE_SYNTAX;
287                         }
288                 }
289         }
290
291         return LDB_SUCCESS;
292 }
293
294
295 /*
296   we've made a modification to a dn - possibly reindex and
297   update sequence number
298 */
299 static int ldb_kv_modified(struct ldb_module *module, struct ldb_dn *dn)
300 {
301         int ret = LDB_SUCCESS;
302         struct ldb_kv_private *ldb_kv = talloc_get_type(
303             ldb_module_get_private(module), struct ldb_kv_private);
304
305         /* only allow modifies inside a transaction, otherwise the
306          * ldb is unsafe */
307         if (ldb_kv->kv_ops->transaction_active(ldb_kv) == false) {
308                 ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
309                 return LDB_ERR_OPERATIONS_ERROR;
310         }
311
312         if (ldb_dn_is_special(dn) &&
313             (ldb_dn_check_special(dn, LDB_KV_INDEXLIST) ||
314              ldb_dn_check_special(dn, LDB_KV_ATTRIBUTES)) )
315         {
316                 if (ldb_kv->warn_reindex) {
317                         ldb_debug(ldb_module_get_ctx(module),
318                                   LDB_DEBUG_ERROR,
319                                   "Reindexing %s due to modification on %s",
320                                   ldb_kv->kv_ops->name(ldb_kv),
321                                   ldb_dn_get_linearized(dn));
322                 }
323                 ret = ldb_kv_reindex(module);
324         }
325
326         /* If the modify was to a normal record, or any special except @BASEINFO, update the seq number */
327         if (ret == LDB_SUCCESS &&
328             !(ldb_dn_is_special(dn) &&
329               ldb_dn_check_special(dn, LDB_KV_BASEINFO)) ) {
330                 ret = ldb_kv_increase_sequence_number(module);
331         }
332
333         /* If the modify was to @OPTIONS, reload the cache */
334         if (ret == LDB_SUCCESS &&
335             ldb_dn_is_special(dn) &&
336             (ldb_dn_check_special(dn, LDB_KV_OPTIONS)) ) {
337                 ret = ldb_kv_cache_reload(module);
338         }
339
340         if (ret != LDB_SUCCESS) {
341                 ldb_kv->reindex_failed = true;
342         }
343
344         return ret;
345 }
346 /*
347   store a record into the db
348 */
349 int ldb_kv_store(struct ldb_module *module,
350                  const struct ldb_message *msg,
351                  int flgs)
352 {
353         void *data = ldb_module_get_private(module);
354         struct ldb_kv_private *ldb_kv =
355             talloc_get_type(data, struct ldb_kv_private);
356         struct ldb_val key;
357         struct ldb_val ldb_data;
358         int ret = LDB_SUCCESS;
359         TALLOC_CTX *key_ctx = talloc_new(module);
360
361         if (key_ctx == NULL) {
362                 return ldb_module_oom(module);
363         }
364
365         if (ldb_kv->read_only) {
366                 talloc_free(key_ctx);
367                 return LDB_ERR_UNWILLING_TO_PERFORM;
368         }
369
370         key = ldb_kv_key_msg(module, key_ctx, msg);
371         if (key.data == NULL) {
372                 TALLOC_FREE(key_ctx);
373                 return LDB_ERR_OTHER;
374         }
375
376         ret = ldb_pack_data(ldb_module_get_ctx(module),
377                             msg, &ldb_data);
378         if (ret == -1) {
379                 TALLOC_FREE(key_ctx);
380                 return LDB_ERR_OTHER;
381         }
382
383         ret = ldb_kv->kv_ops->store(ldb_kv, key, ldb_data, flgs);
384         if (ret != 0) {
385                 bool is_special = ldb_dn_is_special(msg->dn);
386                 ret = ldb_kv->kv_ops->error(ldb_kv);
387
388                 /*
389                  * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
390                  * the GUID, so re-map
391                  */
392                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS && !is_special &&
393                     ldb_kv->cache->GUID_index_attribute != NULL) {
394                         ret = LDB_ERR_CONSTRAINT_VIOLATION;
395                 }
396                 goto done;
397         }
398
399 done:
400         TALLOC_FREE(key_ctx);
401         talloc_free(ldb_data.data);
402
403         return ret;
404 }
405
406
407 /*
408   check if a attribute is a single valued, for a given element
409  */
410 static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
411                                  struct ldb_message_element *el)
412 {
413         if (!a) return false;
414         if (el != NULL) {
415                 if (el->flags & LDB_FLAG_INTERNAL_FORCE_SINGLE_VALUE_CHECK) {
416                         /* override from a ldb module, for example
417                            used for the description field, which is
418                            marked multi-valued in the schema but which
419                            should not actually accept multiple
420                            values */
421                         return true;
422                 }
423                 if (el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK) {
424                         /* override from a ldb module, for example used for
425                            deleted linked attribute entries */
426                         return false;
427                 }
428         }
429         if (a->flags & LDB_ATTR_FLAG_SINGLE_VALUE) {
430                 return true;
431         }
432         return false;
433 }
434
435 static int ldb_kv_add_internal(struct ldb_module *module,
436                                struct ldb_kv_private *ldb_kv,
437                                const struct ldb_message *msg,
438                                bool check_single_value)
439 {
440         struct ldb_context *ldb = ldb_module_get_ctx(module);
441         int ret = LDB_SUCCESS;
442         unsigned int i;
443         bool valid_dn = false;
444
445         /* Check the new DN is reasonable */
446         valid_dn = ldb_dn_validate(msg->dn);
447         if (valid_dn == false) {
448                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
449                                        "Invalid DN in ADD: %s",
450                                        ldb_dn_get_linearized(msg->dn));
451                 return LDB_ERR_INVALID_DN_SYNTAX;
452         }
453
454         for (i=0;i<msg->num_elements;i++) {
455                 struct ldb_message_element *el = &msg->elements[i];
456                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
457
458                 if (el->num_values == 0) {
459                         ldb_asprintf_errstring(ldb, "attribute '%s' on '%s' specified, but with 0 values (illegal)",
460                                                el->name, ldb_dn_get_linearized(msg->dn));
461                         return LDB_ERR_CONSTRAINT_VIOLATION;
462                 }
463                 if (check_single_value && el->num_values > 1 &&
464                     ldb_kv_single_valued(a, el)) {
465                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
466                                                el->name, ldb_dn_get_linearized(msg->dn));
467                         return LDB_ERR_CONSTRAINT_VIOLATION;
468                 }
469
470                 /* Do not check "@ATTRIBUTES" for duplicated values */
471                 if (ldb_dn_is_special(msg->dn) &&
472                     ldb_dn_check_special(msg->dn, LDB_KV_ATTRIBUTES)) {
473                         continue;
474                 }
475
476                 if (check_single_value &&
477                     !(el->flags &
478                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
479                         struct ldb_val *duplicate = NULL;
480
481                         ret = ldb_msg_find_duplicate_val(ldb, discard_const(msg),
482                                                          el, &duplicate, 0);
483                         if (ret != LDB_SUCCESS) {
484                                 return ret;
485                         }
486                         if (duplicate != NULL) {
487                                 ldb_asprintf_errstring(
488                                         ldb,
489                                         "attribute '%s': value '%.*s' on '%s' "
490                                         "provided more than once in ADD object",
491                                         el->name,
492                                         (int)duplicate->length,
493                                         duplicate->data,
494                                         ldb_dn_get_linearized(msg->dn));
495                                 return LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
496                         }
497                 }
498         }
499
500         ret = ldb_kv_store(module, msg, TDB_INSERT);
501         if (ret != LDB_SUCCESS) {
502                 /*
503                  * Try really hard to get the right error code for
504                  * a re-add situation, as this can matter!
505                  */
506                 if (ret == LDB_ERR_CONSTRAINT_VIOLATION) {
507                         int ret2;
508                         struct ldb_dn *dn2 = NULL;
509                         TALLOC_CTX *mem_ctx = talloc_new(module);
510                         if (mem_ctx == NULL) {
511                                 return ldb_module_operr(module);
512                         }
513                         ret2 =
514                             ldb_kv_search_base(module, mem_ctx, msg->dn, &dn2);
515                         TALLOC_FREE(mem_ctx);
516                         if (ret2 == LDB_SUCCESS) {
517                                 ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
518                         }
519                 }
520                 if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
521                         ldb_asprintf_errstring(ldb,
522                                                "Entry %s already exists",
523                                                ldb_dn_get_linearized(msg->dn));
524                 }
525                 return ret;
526         }
527
528         ret = ldb_kv_index_add_new(module, ldb_kv, msg);
529         if (ret != LDB_SUCCESS) {
530                 /*
531                  * If we failed to index, delete the message again.
532                  *
533                  * This is particularly important for the GUID index
534                  * case, which will only fail for a duplicate DN
535                  * in the index add.
536                  *
537                  * Note that the caller may not cancel the transation
538                  * and this means the above add might really show up!
539                  */
540                 ldb_kv_delete_noindex(module, msg);
541                 return ret;
542         }
543
544         ret = ldb_kv_modified(module, msg->dn);
545
546         return ret;
547 }
548
549 /*
550   add a record to the database
551 */
552 static int ldb_kv_add(struct ldb_kv_context *ctx)
553 {
554         struct ldb_module *module = ctx->module;
555         struct ldb_request *req = ctx->req;
556         void *data = ldb_module_get_private(module);
557         struct ldb_kv_private *ldb_kv =
558             talloc_get_type(data, struct ldb_kv_private);
559         int ret = LDB_SUCCESS;
560
561         if (ldb_kv->max_key_length != 0 &&
562             ldb_kv->cache->GUID_index_attribute == NULL &&
563             !ldb_dn_is_special(req->op.add.message->dn)) {
564                 ldb_set_errstring(ldb_module_get_ctx(module),
565                                   "Must operate ldb_mdb in GUID "
566                                   "index mode, but " LDB_KV_IDXGUID " not set.");
567                 return LDB_ERR_UNWILLING_TO_PERFORM;
568         }
569
570         ret = ldb_kv_check_special_dn(module, req->op.add.message);
571         if (ret != LDB_SUCCESS) {
572                 return ret;
573         }
574
575         ldb_request_set_state(req, LDB_ASYNC_PENDING);
576
577         if (ldb_kv_cache_load(module) != 0) {
578                 return LDB_ERR_OPERATIONS_ERROR;
579         }
580
581         ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
582
583         return ret;
584 }
585
586 /*
587   delete a record from the database, not updating indexes (used for deleting
588   index records)
589 */
590 int ldb_kv_delete_noindex(struct ldb_module *module,
591                           const struct ldb_message *msg)
592 {
593         void *data = ldb_module_get_private(module);
594         struct ldb_kv_private *ldb_kv =
595             talloc_get_type(data, struct ldb_kv_private);
596         struct ldb_val key;
597         int ret;
598         TALLOC_CTX *tdb_key_ctx = talloc_new(module);
599
600         if (tdb_key_ctx == NULL) {
601                 return ldb_module_oom(module);
602         }
603
604         if (ldb_kv->read_only) {
605                 talloc_free(tdb_key_ctx);
606                 return LDB_ERR_UNWILLING_TO_PERFORM;
607         }
608
609         key = ldb_kv_key_msg(module, tdb_key_ctx, msg);
610         if (!key.data) {
611                 TALLOC_FREE(tdb_key_ctx);
612                 return LDB_ERR_OTHER;
613         }
614
615         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
616         TALLOC_FREE(tdb_key_ctx);
617
618         if (ret != 0) {
619                 ret = ldb_kv->kv_ops->error(ldb_kv);
620         }
621
622         return ret;
623 }
624
625 static int ldb_kv_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
626 {
627         struct ldb_message *msg;
628         int ret = LDB_SUCCESS;
629
630         msg = ldb_msg_new(module);
631         if (msg == NULL) {
632                 return LDB_ERR_OPERATIONS_ERROR;
633         }
634
635         /* in case any attribute of the message was indexed, we need
636            to fetch the old record */
637         ret = ldb_kv_search_dn1(
638             module, dn, msg, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
639         if (ret != LDB_SUCCESS) {
640                 /* not finding the old record is an error */
641                 goto done;
642         }
643
644         ret = ldb_kv_delete_noindex(module, msg);
645         if (ret != LDB_SUCCESS) {
646                 goto done;
647         }
648
649         /* remove any indexed attributes */
650         ret = ldb_kv_index_delete(module, msg);
651         if (ret != LDB_SUCCESS) {
652                 goto done;
653         }
654
655         ret = ldb_kv_modified(module, dn);
656         if (ret != LDB_SUCCESS) {
657                 goto done;
658         }
659
660 done:
661         talloc_free(msg);
662         return ret;
663 }
664
665 /*
666   delete a record from the database
667 */
668 static int ldb_kv_delete(struct ldb_kv_context *ctx)
669 {
670         struct ldb_module *module = ctx->module;
671         struct ldb_request *req = ctx->req;
672         int ret = LDB_SUCCESS;
673
674         ldb_request_set_state(req, LDB_ASYNC_PENDING);
675
676         if (ldb_kv_cache_load(module) != 0) {
677                 return LDB_ERR_OPERATIONS_ERROR;
678         }
679
680         ret = ldb_kv_delete_internal(module, req->op.del.dn);
681
682         return ret;
683 }
684
685 /*
686   find an element by attribute name. At the moment this does a linear search,
687   it should be re-coded to use a binary search once all places that modify
688   records guarantee sorted order
689
690   return the index of the first matching element if found, otherwise -1
691 */
692 static int ldb_kv_find_element(const struct ldb_message *msg, const char *name)
693 {
694         unsigned int i;
695         for (i=0;i<msg->num_elements;i++) {
696                 if (ldb_attr_cmp(msg->elements[i].name, name) == 0) {
697                         return i;
698                 }
699         }
700         return -1;
701 }
702
703
704 /*
705   add an element to an existing record. Assumes a elements array that we
706   can call re-alloc on, and assumed that we can re-use the data pointers from
707   the passed in additional values. Use with care!
708
709   returns 0 on success, -1 on failure (and sets errno)
710 */
711 static int ldb_kv_msg_add_element(struct ldb_message *msg,
712                                   struct ldb_message_element *el)
713 {
714         struct ldb_message_element *e2;
715         unsigned int i;
716
717         if (el->num_values == 0) {
718                 /* nothing to do here - we don't add empty elements */
719                 return 0;
720         }
721
722         e2 = talloc_realloc(msg, msg->elements, struct ldb_message_element,
723                               msg->num_elements+1);
724         if (!e2) {
725                 errno = ENOMEM;
726                 return -1;
727         }
728
729         msg->elements = e2;
730
731         e2 = &msg->elements[msg->num_elements];
732
733         e2->name = el->name;
734         e2->flags = el->flags;
735         e2->values = talloc_array(msg->elements,
736                                   struct ldb_val, el->num_values);
737         if (!e2->values) {
738                 errno = ENOMEM;
739                 return -1;
740         }
741         for (i=0;i<el->num_values;i++) {
742                 e2->values[i] = el->values[i];
743         }
744         e2->num_values = el->num_values;
745
746         ++msg->num_elements;
747
748         return 0;
749 }
750
751 /*
752   delete all elements having a specified attribute name
753 */
754 static int ldb_kv_msg_delete_attribute(struct ldb_module *module,
755                                        struct ldb_kv_private *ldb_kv,
756                                        struct ldb_message *msg,
757                                        const char *name)
758 {
759         unsigned int i;
760         int ret;
761         struct ldb_message_element *el;
762         bool is_special = ldb_dn_is_special(msg->dn);
763
764         if (!is_special && ldb_kv->cache->GUID_index_attribute != NULL &&
765             ldb_attr_cmp(name, ldb_kv->cache->GUID_index_attribute) == 0) {
766                 struct ldb_context *ldb = ldb_module_get_ctx(module);
767                 ldb_asprintf_errstring(ldb,
768                                        "Must not modify GUID "
769                                        "attribute %s (used as DB index)",
770                                        ldb_kv->cache->GUID_index_attribute);
771                 return LDB_ERR_CONSTRAINT_VIOLATION;
772         }
773
774         el = ldb_msg_find_element(msg, name);
775         if (el == NULL) {
776                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
777         }
778         i = el - msg->elements;
779
780         ret = ldb_kv_index_del_element(module, ldb_kv, msg, el);
781         if (ret != LDB_SUCCESS) {
782                 return ret;
783         }
784
785         talloc_free(el->values);
786         if (msg->num_elements > (i+1)) {
787                 memmove(el, el+1, sizeof(*el) * (msg->num_elements - (i+1)));
788         }
789         msg->num_elements--;
790         msg->elements = talloc_realloc(msg, msg->elements,
791                                        struct ldb_message_element,
792                                        msg->num_elements);
793         return LDB_SUCCESS;
794 }
795
796 /*
797   delete all elements matching an attribute name/value
798
799   return LDB Error on failure
800 */
801 static int ldb_kv_msg_delete_element(struct ldb_module *module,
802                                      struct ldb_kv_private *ldb_kv,
803                                      struct ldb_message *msg,
804                                      const char *name,
805                                      const struct ldb_val *val)
806 {
807         struct ldb_context *ldb = ldb_module_get_ctx(module);
808         unsigned int i;
809         int found, ret;
810         struct ldb_message_element *el;
811         const struct ldb_schema_attribute *a;
812
813         found = ldb_kv_find_element(msg, name);
814         if (found == -1) {
815                 return LDB_ERR_NO_SUCH_ATTRIBUTE;
816         }
817
818         i = (unsigned int) found;
819         el = &(msg->elements[i]);
820
821         a = ldb_schema_attribute_by_name(ldb, el->name);
822
823         for (i=0;i<el->num_values;i++) {
824                 bool matched;
825                 if (a->syntax->operator_fn) {
826                         ret = a->syntax->operator_fn(ldb, LDB_OP_EQUALITY, a,
827                                                      &el->values[i], val, &matched);
828                         if (ret != LDB_SUCCESS) return ret;
829                 } else {
830                         matched = (a->syntax->comparison_fn(ldb, ldb,
831                                                             &el->values[i], val) == 0);
832                 }
833                 if (matched) {
834                         if (el->num_values == 1) {
835                                 return ldb_kv_msg_delete_attribute(
836                                     module, ldb_kv, msg, name);
837                         }
838
839                         ret =
840                             ldb_kv_index_del_value(module, ldb_kv, msg, el, i);
841                         if (ret != LDB_SUCCESS) {
842                                 return ret;
843                         }
844
845                         if (i<el->num_values-1) {
846                                 memmove(&el->values[i], &el->values[i+1],
847                                         sizeof(el->values[i])*
848                                                 (el->num_values-(i+1)));
849                         }
850                         el->num_values--;
851
852                         /* per definition we find in a canonicalised message an
853                            attribute value only once. So we are finished here */
854                         return LDB_SUCCESS;
855                 }
856         }
857
858         /* Not found */
859         return LDB_ERR_NO_SUCH_ATTRIBUTE;
860 }
861
862 /*
863   modify a record - internal interface
864
865   yuck - this is O(n^2). Luckily n is usually small so we probably
866   get away with it, but if we ever have really large attribute lists
867   then we'll need to look at this again
868
869   'req' is optional, and is used to specify controls if supplied
870 */
871 int ldb_kv_modify_internal(struct ldb_module *module,
872                            const struct ldb_message *msg,
873                            struct ldb_request *req)
874 {
875         struct ldb_context *ldb = ldb_module_get_ctx(module);
876         void *data = ldb_module_get_private(module);
877         struct ldb_kv_private *ldb_kv =
878             talloc_get_type(data, struct ldb_kv_private);
879         struct ldb_message *msg2;
880         unsigned int i, j;
881         int ret = LDB_SUCCESS, idx;
882         struct ldb_control *control_permissive = NULL;
883         TALLOC_CTX *mem_ctx = talloc_new(req);
884
885         if (mem_ctx == NULL) {
886                 return ldb_module_oom(module);
887         }
888
889         if (req) {
890                 control_permissive = ldb_request_get_control(req,
891                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
892         }
893
894         msg2 = ldb_msg_new(mem_ctx);
895         if (msg2 == NULL) {
896                 ret = LDB_ERR_OTHER;
897                 goto done;
898         }
899
900         ret = ldb_kv_search_dn1(
901             module, msg->dn, msg2, LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
902         if (ret != LDB_SUCCESS) {
903                 goto done;
904         }
905
906         for (i=0; i<msg->num_elements; i++) {
907                 struct ldb_message_element *el = &msg->elements[i], *el2;
908                 struct ldb_val *vals;
909                 const struct ldb_schema_attribute *a = ldb_schema_attribute_by_name(ldb, el->name);
910                 const char *dn;
911                 uint32_t options = 0;
912                 if (control_permissive != NULL) {
913                         options |= LDB_MSG_FIND_COMMON_REMOVE_DUPLICATES;
914                 }
915
916                 switch (msg->elements[i].flags & LDB_FLAG_MOD_MASK) {
917                 case LDB_FLAG_MOD_ADD:
918
919                         if (el->num_values == 0) {
920                                 ldb_asprintf_errstring(ldb,
921                                                        "attribute '%s': attribute on '%s' specified, but with 0 values (illegal)",
922                                                        el->name, ldb_dn_get_linearized(msg2->dn));
923                                 ret = LDB_ERR_CONSTRAINT_VIOLATION;
924                                 goto done;
925                         }
926
927                         /* make a copy of the array so that a permissive
928                          * control can remove duplicates without changing the
929                          * original values, but do not copy data as we do not
930                          * need to keep it around once the operation is
931                          * finished */
932                         if (control_permissive) {
933                                 el = talloc(msg2, struct ldb_message_element);
934                                 if (!el) {
935                                         ret = LDB_ERR_OTHER;
936                                         goto done;
937                                 }
938                                 *el = msg->elements[i];
939                                 el->values = talloc_array(el, struct ldb_val, el->num_values);
940                                 if (el->values == NULL) {
941                                         ret = LDB_ERR_OTHER;
942                                         goto done;
943                                 }
944                                 for (j = 0; j < el->num_values; j++) {
945                                         el->values[j] = msg->elements[i].values[j];
946                                 }
947                         }
948
949                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
950                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
951                                                        el->name, ldb_dn_get_linearized(msg2->dn));
952                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
953                                 goto done;
954                         }
955
956                         /* Checks if element already exists */
957                         idx = ldb_kv_find_element(msg2, el->name);
958                         if (idx == -1) {
959                                 if (ldb_kv_msg_add_element(msg2, el) != 0) {
960                                         ret = LDB_ERR_OTHER;
961                                         goto done;
962                                 }
963                                 ret = ldb_kv_index_add_element(
964                                     module, ldb_kv, msg2, el);
965                                 if (ret != LDB_SUCCESS) {
966                                         goto done;
967                                 }
968                         } else {
969                                 j = (unsigned int) idx;
970                                 el2 = &(msg2->elements[j]);
971
972                                 /* We cannot add another value on a existing one
973                                    if the attribute is single-valued */
974                                 if (ldb_kv_single_valued(a, el)) {
975                                         ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
976                                                                el->name, ldb_dn_get_linearized(msg2->dn));
977                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
978                                         goto done;
979                                 }
980
981                                 /* Check that values don't exist yet on multi-
982                                    valued attributes or aren't provided twice */
983                                 if (!(el->flags &
984                                       LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
985                                         struct ldb_val *duplicate = NULL;
986                                         ret = ldb_msg_find_common_values(ldb,
987                                                                          msg2,
988                                                                          el,
989                                                                          el2,
990                                                                          options);
991
992                                         if (ret ==
993                                             LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS) {
994                                                 ldb_asprintf_errstring(ldb,
995                                                         "attribute '%s': value "
996                                                         "#%u on '%s' already "
997                                                         "exists", el->name, j,
998                                                         ldb_dn_get_linearized(msg2->dn));
999                                                 goto done;
1000                                         } else if (ret != LDB_SUCCESS) {
1001                                                 goto done;
1002                                         }
1003
1004                                         ret = ldb_msg_find_duplicate_val(
1005                                                 ldb, msg2, el, &duplicate, 0);
1006                                         if (ret != LDB_SUCCESS) {
1007                                                 goto done;
1008                                         }
1009                                         if (duplicate != NULL) {
1010                                                 ldb_asprintf_errstring(
1011                                                         ldb,
1012                                                         "attribute '%s': value "
1013                                                         "'%.*s' on '%s' "
1014                                                         "provided more than "
1015                                                         "once in ADD",
1016                                                         el->name,
1017                                                         (int)duplicate->length,
1018                                                         duplicate->data,
1019                                                         ldb_dn_get_linearized(msg->dn));
1020                                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1021                                                 goto done;
1022                                         }
1023                                 }
1024
1025                                 /* Now combine existing and new values to a new
1026                                    attribute record */
1027                                 vals = talloc_realloc(msg2->elements,
1028                                                       el2->values, struct ldb_val,
1029                                                       el2->num_values + el->num_values);
1030                                 if (vals == NULL) {
1031                                         ldb_oom(ldb);
1032                                         ret = LDB_ERR_OTHER;
1033                                         goto done;
1034                                 }
1035
1036                                 for (j=0; j<el->num_values; j++) {
1037                                         vals[el2->num_values + j] =
1038                                                 ldb_val_dup(vals, &el->values[j]);
1039                                 }
1040
1041                                 el2->values = vals;
1042                                 el2->num_values += el->num_values;
1043
1044                                 ret = ldb_kv_index_add_element(
1045                                     module, ldb_kv, msg2, el);
1046                                 if (ret != LDB_SUCCESS) {
1047                                         goto done;
1048                                 }
1049                         }
1050
1051                         break;
1052
1053                 case LDB_FLAG_MOD_REPLACE:
1054
1055                         if (el->num_values > 1 && ldb_kv_single_valued(a, el)) {
1056                                 ldb_asprintf_errstring(ldb, "SINGLE-VALUE attribute %s on %s specified more than once",
1057                                                        el->name, ldb_dn_get_linearized(msg2->dn));
1058                                 ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1059                                 goto done;
1060                         }
1061
1062                         /*
1063                          * We don't need to check this if we have been
1064                          * pre-screened by the repl_meta_data module
1065                          * in Samba, or someone else who can claim to
1066                          * know what they are doing.
1067                          */
1068                         if (!(el->flags & LDB_FLAG_INTERNAL_DISABLE_SINGLE_VALUE_CHECK)) {
1069                                 struct ldb_val *duplicate = NULL;
1070
1071                                 ret = ldb_msg_find_duplicate_val(ldb, msg2, el,
1072                                                                  &duplicate, 0);
1073                                 if (ret != LDB_SUCCESS) {
1074                                         goto done;
1075                                 }
1076                                 if (duplicate != NULL) {
1077                                         ldb_asprintf_errstring(
1078                                                 ldb,
1079                                                 "attribute '%s': value '%.*s' "
1080                                                 "on '%s' provided more than "
1081                                                 "once in REPLACE",
1082                                                 el->name,
1083                                                 (int)duplicate->length,
1084                                                 duplicate->data,
1085                                                 ldb_dn_get_linearized(msg2->dn));
1086                                         ret = LDB_ERR_ATTRIBUTE_OR_VALUE_EXISTS;
1087                                         goto done;
1088                                 }
1089                         }
1090
1091                         /* Checks if element already exists */
1092                         idx = ldb_kv_find_element(msg2, el->name);
1093                         if (idx != -1) {
1094                                 j = (unsigned int) idx;
1095                                 el2 = &(msg2->elements[j]);
1096
1097                                 /* we consider two elements to be
1098                                  * equal only if the order
1099                                  * matches. This allows dbcheck to
1100                                  * fix the ordering on attributes
1101                                  * where order matters, such as
1102                                  * objectClass
1103                                  */
1104                                 if (ldb_msg_element_equal_ordered(el, el2)) {
1105                                         continue;
1106                                 }
1107
1108                                 /* Delete the attribute if it exists in the DB */
1109                                 if (ldb_kv_msg_delete_attribute(
1110                                         module, ldb_kv, msg2, el->name) != 0) {
1111                                         ret = LDB_ERR_OTHER;
1112                                         goto done;
1113                                 }
1114                         }
1115
1116                         /* Recreate it with the new values */
1117                         if (ldb_kv_msg_add_element(msg2, el) != 0) {
1118                                 ret = LDB_ERR_OTHER;
1119                                 goto done;
1120                         }
1121
1122                         ret =
1123                             ldb_kv_index_add_element(module, ldb_kv, msg2, el);
1124                         if (ret != LDB_SUCCESS) {
1125                                 goto done;
1126                         }
1127
1128                         break;
1129
1130                 case LDB_FLAG_MOD_DELETE:
1131                         dn = ldb_dn_get_linearized(msg2->dn);
1132                         if (dn == NULL) {
1133                                 ret = LDB_ERR_OTHER;
1134                                 goto done;
1135                         }
1136
1137                         if (msg->elements[i].num_values == 0) {
1138                                 /* Delete the whole attribute */
1139                                 ret = ldb_kv_msg_delete_attribute(
1140                                     module,
1141                                     ldb_kv,
1142                                     msg2,
1143                                     msg->elements[i].name);
1144                                 if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1145                                     control_permissive) {
1146                                         ret = LDB_SUCCESS;
1147                                 } else {
1148                                         ldb_asprintf_errstring(ldb,
1149                                                                "attribute '%s': no such attribute for delete on '%s'",
1150                                                                msg->elements[i].name, dn);
1151                                 }
1152                                 if (ret != LDB_SUCCESS) {
1153                                         goto done;
1154                                 }
1155                         } else {
1156                                 /* Delete specified values from an attribute */
1157                                 for (j=0; j < msg->elements[i].num_values; j++) {
1158                                         ret = ldb_kv_msg_delete_element(
1159                                             module,
1160                                             ldb_kv,
1161                                             msg2,
1162                                             msg->elements[i].name,
1163                                             &msg->elements[i].values[j]);
1164                                         if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE &&
1165                                             control_permissive) {
1166                                                 ret = LDB_SUCCESS;
1167                                         } else if (ret == LDB_ERR_NO_SUCH_ATTRIBUTE) {
1168                                                 ldb_asprintf_errstring(ldb,
1169                                                                        "attribute '%s': no matching attribute value while deleting attribute on '%s'",
1170                                                                        msg->elements[i].name, dn);
1171                                         }
1172                                         if (ret != LDB_SUCCESS) {
1173                                                 goto done;
1174                                         }
1175                                 }
1176                         }
1177                         break;
1178                 default:
1179                         ldb_asprintf_errstring(ldb,
1180                                                "attribute '%s': invalid modify flags on '%s': 0x%x",
1181                                                msg->elements[i].name, ldb_dn_get_linearized(msg->dn),
1182                                                msg->elements[i].flags & LDB_FLAG_MOD_MASK);
1183                         ret = LDB_ERR_PROTOCOL_ERROR;
1184                         goto done;
1185                 }
1186         }
1187
1188         ret = ldb_kv_store(module, msg2, TDB_MODIFY);
1189         if (ret != LDB_SUCCESS) {
1190                 goto done;
1191         }
1192
1193         ret = ldb_kv_modified(module, msg2->dn);
1194         if (ret != LDB_SUCCESS) {
1195                 goto done;
1196         }
1197
1198 done:
1199         TALLOC_FREE(mem_ctx);
1200         return ret;
1201 }
1202
1203 /*
1204   modify a record
1205 */
1206 static int ldb_kv_modify(struct ldb_kv_context *ctx)
1207 {
1208         struct ldb_module *module = ctx->module;
1209         struct ldb_request *req = ctx->req;
1210         int ret = LDB_SUCCESS;
1211
1212         ret = ldb_kv_check_special_dn(module, req->op.mod.message);
1213         if (ret != LDB_SUCCESS) {
1214                 return ret;
1215         }
1216
1217         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1218
1219         if (ldb_kv_cache_load(module) != 0) {
1220                 return LDB_ERR_OPERATIONS_ERROR;
1221         }
1222
1223         ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
1224
1225         return ret;
1226 }
1227
1228 /*
1229   rename a record
1230 */
1231 static int ldb_kv_rename(struct ldb_kv_context *ctx)
1232 {
1233         struct ldb_module *module = ctx->module;
1234         void *data = ldb_module_get_private(module);
1235         struct ldb_kv_private *ldb_kv =
1236             talloc_get_type(data, struct ldb_kv_private);
1237         struct ldb_request *req = ctx->req;
1238         struct ldb_message *msg;
1239         int ret = LDB_SUCCESS;
1240         struct ldb_val  key, key_old;
1241         struct ldb_dn *db_dn;
1242         bool valid_dn = false;
1243
1244         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1245
1246         if (ldb_kv_cache_load(ctx->module) != 0) {
1247                 return LDB_ERR_OPERATIONS_ERROR;
1248         }
1249
1250         msg = ldb_msg_new(ctx);
1251         if (msg == NULL) {
1252                 return LDB_ERR_OPERATIONS_ERROR;
1253         }
1254
1255         /* Check the new DN is reasonable */
1256         valid_dn = ldb_dn_validate(req->op.rename.newdn);
1257         if (valid_dn == false) {
1258                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1259                                        "Invalid New DN: %s",
1260                                        ldb_dn_get_linearized(req->op.rename.newdn));
1261                 return LDB_ERR_INVALID_DN_SYNTAX;
1262         }
1263
1264         /* we need to fetch the old record to re-add under the new name */
1265         ret = ldb_kv_search_dn1(module,
1266                                 req->op.rename.olddn,
1267                                 msg,
1268                                 LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC);
1269         if (ret == LDB_ERR_INVALID_DN_SYNTAX) {
1270                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1271                                        "Invalid Old DN: %s",
1272                                        ldb_dn_get_linearized(req->op.rename.newdn));
1273                 return ret;
1274         } else if (ret != LDB_SUCCESS) {
1275                 /* not finding the old record is an error */
1276                 return ret;
1277         }
1278
1279         /* We need to, before changing the DB, check if the new DN
1280          * exists, so we can return this error to the caller with an
1281          * unmodified DB
1282          *
1283          * Even in GUID index mode we use ltdb_key_dn() as we are
1284          * trying to figure out if this is just a case rename
1285          */
1286         key = ldb_kv_key_dn(module, msg, req->op.rename.newdn);
1287         if (!key.data) {
1288                 talloc_free(msg);
1289                 return LDB_ERR_OPERATIONS_ERROR;
1290         }
1291
1292         key_old = ldb_kv_key_dn(module, msg, req->op.rename.olddn);
1293         if (!key_old.data) {
1294                 talloc_free(msg);
1295                 talloc_free(key.data);
1296                 return LDB_ERR_OPERATIONS_ERROR;
1297         }
1298
1299         /*
1300          * Only declare a conflict if the new DN already exists,
1301          * and it isn't a case change on the old DN
1302          */
1303         if (key_old.length != key.length
1304             || memcmp(key.data, key_old.data, key.length) != 0) {
1305                 ret = ldb_kv_search_base(
1306                     module, msg, req->op.rename.newdn, &db_dn);
1307                 if (ret == LDB_SUCCESS) {
1308                         ret = LDB_ERR_ENTRY_ALREADY_EXISTS;
1309                 } else if (ret == LDB_ERR_NO_SUCH_OBJECT) {
1310                         ret = LDB_SUCCESS;
1311                 }
1312         }
1313
1314         /* finding the new record already in the DB is an error */
1315
1316         if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
1317                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1318                                        "Entry %s already exists",
1319                                        ldb_dn_get_linearized(req->op.rename.newdn));
1320         }
1321         if (ret != LDB_SUCCESS) {
1322                 talloc_free(key_old.data);
1323                 talloc_free(key.data);
1324                 talloc_free(msg);
1325                 return ret;
1326         }
1327
1328         talloc_free(key_old.data);
1329         talloc_free(key.data);
1330
1331         /* Always delete first then add, to avoid conflicts with
1332          * unique indexes. We rely on the transaction to make this
1333          * atomic
1334          */
1335         ret = ldb_kv_delete_internal(module, msg->dn);
1336         if (ret != LDB_SUCCESS) {
1337                 talloc_free(msg);
1338                 return ret;
1339         }
1340
1341         msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
1342         if (msg->dn == NULL) {
1343                 talloc_free(msg);
1344                 return LDB_ERR_OPERATIONS_ERROR;
1345         }
1346
1347         /* We don't check single value as we can have more than 1 with
1348          * deleted attributes. We could go through all elements but that's
1349          * maybe not the most efficient way
1350          */
1351         ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
1352
1353         talloc_free(msg);
1354
1355         return ret;
1356 }
1357
1358 static int ldb_kv_start_trans(struct ldb_module *module)
1359 {
1360         void *data = ldb_module_get_private(module);
1361         struct ldb_kv_private *ldb_kv =
1362             talloc_get_type(data, struct ldb_kv_private);
1363
1364         pid_t pid = getpid();
1365
1366         if (ldb_kv->pid != pid) {
1367                 ldb_asprintf_errstring(ldb_module_get_ctx(ldb_kv->module),
1368                                        __location__
1369                                        ": Reusing ldb opend by pid %d in "
1370                                        "process %d\n",
1371                                        ldb_kv->pid,
1372                                        pid);
1373                 return LDB_ERR_PROTOCOL_ERROR;
1374         }
1375
1376         /* Do not take out the transaction lock on a read-only DB */
1377         if (ldb_kv->read_only) {
1378                 return LDB_ERR_UNWILLING_TO_PERFORM;
1379         }
1380
1381         if (ldb_kv->kv_ops->begin_write(ldb_kv) != 0) {
1382                 return ldb_kv->kv_ops->error(ldb_kv);
1383         }
1384
1385         ldb_kv_index_transaction_start(
1386                 module,
1387                 ldb_kv->index_transaction_cache_size);
1388
1389         ldb_kv->reindex_failed = false;
1390
1391         return LDB_SUCCESS;
1392 }
1393
1394 /*
1395  * Forward declaration to allow prepare_commit to in fact abort the
1396  * transaction
1397  */
1398 static int ldb_kv_del_trans(struct ldb_module *module);
1399
1400 static int ldb_kv_prepare_commit(struct ldb_module *module)
1401 {
1402         int ret;
1403         void *data = ldb_module_get_private(module);
1404         struct ldb_kv_private *ldb_kv =
1405             talloc_get_type(data, struct ldb_kv_private);
1406         pid_t pid = getpid();
1407
1408         if (ldb_kv->pid != pid) {
1409                 ldb_asprintf_errstring(ldb_module_get_ctx(module),
1410                                        __location__
1411                                        ": Reusing ldb opend by pid %d in "
1412                                        "process %d\n",
1413                                        ldb_kv->pid,
1414                                        pid);
1415                 return LDB_ERR_PROTOCOL_ERROR;
1416         }
1417
1418         if (!ldb_kv->kv_ops->transaction_active(ldb_kv)) {
1419                 ldb_set_errstring(ldb_module_get_ctx(module),
1420                                   "ltdb_prepare_commit() called "
1421                                   "without transaction active");
1422                 return LDB_ERR_OPERATIONS_ERROR;
1423         }
1424
1425         /*
1426          * Check if the last re-index failed.
1427          *
1428          * This can happen if for example a duplicate value was marked
1429          * unique.  We must not write a partial re-index into the DB.
1430          */
1431         if (ldb_kv->reindex_failed) {
1432                 /*
1433                  * We must instead abort the transaction so we get the
1434                  * old values and old index back
1435                  */
1436                 ldb_kv_del_trans(module);
1437                 ldb_set_errstring(ldb_module_get_ctx(module),
1438                                   "Failure during re-index, so "
1439                                   "transaction must be aborted.");
1440                 return LDB_ERR_OPERATIONS_ERROR;
1441         }
1442
1443         ret = ldb_kv_index_transaction_commit(module);
1444         if (ret != LDB_SUCCESS) {
1445                 ldb_kv->kv_ops->abort_write(ldb_kv);
1446                 return ret;
1447         }
1448
1449         if (ldb_kv->kv_ops->prepare_write(ldb_kv) != 0) {
1450                 ret = ldb_kv->kv_ops->error(ldb_kv);
1451                 ldb_debug_set(ldb_module_get_ctx(module),
1452                               LDB_DEBUG_FATAL,
1453                               "Failure during "
1454                               "prepare_write): %s -> %s",
1455                               ldb_kv->kv_ops->errorstr(ldb_kv),
1456                               ldb_strerror(ret));
1457                 return ret;
1458         }
1459
1460         ldb_kv->prepared_commit = true;
1461
1462         return LDB_SUCCESS;
1463 }
1464
1465 static int ldb_kv_end_trans(struct ldb_module *module)
1466 {
1467         int ret;
1468         void *data = ldb_module_get_private(module);
1469         struct ldb_kv_private *ldb_kv =
1470             talloc_get_type(data, struct ldb_kv_private);
1471
1472         if (!ldb_kv->prepared_commit) {
1473                 ret = ldb_kv_prepare_commit(module);
1474                 if (ret != LDB_SUCCESS) {
1475                         return ret;
1476                 }
1477         }
1478
1479         ldb_kv->prepared_commit = false;
1480
1481         if (ldb_kv->kv_ops->finish_write(ldb_kv) != 0) {
1482                 ret = ldb_kv->kv_ops->error(ldb_kv);
1483                 ldb_asprintf_errstring(
1484                     ldb_module_get_ctx(module),
1485                     "Failure during tdb_transaction_commit(): %s -> %s",
1486                     ldb_kv->kv_ops->errorstr(ldb_kv),
1487                     ldb_strerror(ret));
1488                 return ret;
1489         }
1490
1491         return LDB_SUCCESS;
1492 }
1493
1494 static int ldb_kv_del_trans(struct ldb_module *module)
1495 {
1496         void *data = ldb_module_get_private(module);
1497         struct ldb_kv_private *ldb_kv =
1498             talloc_get_type(data, struct ldb_kv_private);
1499
1500         if (ldb_kv_index_transaction_cancel(module) != 0) {
1501                 ldb_kv->kv_ops->abort_write(ldb_kv);
1502                 return ldb_kv->kv_ops->error(ldb_kv);
1503         }
1504
1505         ldb_kv->kv_ops->abort_write(ldb_kv);
1506         return LDB_SUCCESS;
1507 }
1508
1509 /*
1510   return sequenceNumber from @BASEINFO
1511 */
1512 static int ldb_kv_sequence_number(struct ldb_kv_context *ctx,
1513                                   struct ldb_extended **ext)
1514 {
1515         struct ldb_context *ldb;
1516         struct ldb_module *module = ctx->module;
1517         struct ldb_request *req = ctx->req;
1518         void *data = ldb_module_get_private(module);
1519         struct ldb_kv_private *ldb_kv =
1520             talloc_get_type(data, struct ldb_kv_private);
1521         TALLOC_CTX *tmp_ctx = NULL;
1522         struct ldb_seqnum_request *seq;
1523         struct ldb_seqnum_result *res;
1524         struct ldb_message *msg = NULL;
1525         struct ldb_dn *dn;
1526         const char *date;
1527         int ret = LDB_SUCCESS;
1528
1529         ldb = ldb_module_get_ctx(module);
1530
1531         seq = talloc_get_type(req->op.extended.data,
1532                                 struct ldb_seqnum_request);
1533         if (seq == NULL) {
1534                 return LDB_ERR_OPERATIONS_ERROR;
1535         }
1536
1537         ldb_request_set_state(req, LDB_ASYNC_PENDING);
1538
1539         if (ldb_kv->kv_ops->lock_read(module) != 0) {
1540                 return LDB_ERR_OPERATIONS_ERROR;
1541         }
1542
1543         res = talloc_zero(req, struct ldb_seqnum_result);
1544         if (res == NULL) {
1545                 ret = LDB_ERR_OPERATIONS_ERROR;
1546                 goto done;
1547         }
1548
1549         tmp_ctx = talloc_new(req);
1550         if (tmp_ctx == NULL) {
1551                 ret = LDB_ERR_OPERATIONS_ERROR;
1552                 goto done;
1553         }
1554
1555         dn = ldb_dn_new(tmp_ctx, ldb, LDB_KV_BASEINFO);
1556         if (dn == NULL) {
1557                 ret = LDB_ERR_OPERATIONS_ERROR;
1558                 goto done;
1559         }
1560
1561         msg = ldb_msg_new(tmp_ctx);
1562         if (msg == NULL) {
1563                 ret = LDB_ERR_OPERATIONS_ERROR;
1564                 goto done;
1565         }
1566
1567         ret = ldb_kv_search_dn1(module, dn, msg, 0);
1568         if (ret != LDB_SUCCESS) {
1569                 goto done;
1570         }
1571
1572         switch (seq->type) {
1573         case LDB_SEQ_HIGHEST_SEQ:
1574                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1575                 break;
1576         case LDB_SEQ_NEXT:
1577                 res->seq_num = ldb_msg_find_attr_as_uint64(msg, LDB_KV_SEQUENCE_NUMBER, 0);
1578                 res->seq_num++;
1579                 break;
1580         case LDB_SEQ_HIGHEST_TIMESTAMP:
1581                 date = ldb_msg_find_attr_as_string(msg, LDB_KV_MOD_TIMESTAMP, NULL);
1582                 if (date) {
1583                         res->seq_num = ldb_string_to_time(date);
1584                 } else {
1585                         res->seq_num = 0;
1586                         /* zero is as good as anything when we don't know */
1587                 }
1588                 break;
1589         }
1590
1591         *ext = talloc_zero(req, struct ldb_extended);
1592         if (*ext == NULL) {
1593                 ret = LDB_ERR_OPERATIONS_ERROR;
1594                 goto done;
1595         }
1596         (*ext)->oid = LDB_EXTENDED_SEQUENCE_NUMBER;
1597         (*ext)->data = talloc_steal(*ext, res);
1598
1599 done:
1600         talloc_free(tmp_ctx);
1601
1602         ldb_kv->kv_ops->unlock_read(module);
1603         return ret;
1604 }
1605
1606 static void ldb_kv_request_done(struct ldb_kv_context *ctx, int error)
1607 {
1608         struct ldb_context *ldb;
1609         struct ldb_request *req;
1610         struct ldb_reply *ares;
1611
1612         ldb = ldb_module_get_ctx(ctx->module);
1613         req = ctx->req;
1614
1615         /* if we already returned an error just return */
1616         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1617                 return;
1618         }
1619
1620         ares = talloc_zero(req, struct ldb_reply);
1621         if (!ares) {
1622                 ldb_oom(ldb);
1623                 req->callback(req, NULL);
1624                 return;
1625         }
1626         ares->type = LDB_REPLY_DONE;
1627         ares->error = error;
1628
1629         req->callback(req, ares);
1630 }
1631
1632 static void ldb_kv_timeout(struct tevent_context *ev,
1633                            struct tevent_timer *te,
1634                            struct timeval t,
1635                            void *private_data)
1636 {
1637         struct ldb_kv_context *ctx;
1638         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1639
1640         if (!ctx->request_terminated) {
1641                 /* request is done now */
1642                 ldb_kv_request_done(ctx, LDB_ERR_TIME_LIMIT_EXCEEDED);
1643         }
1644
1645         if (ctx->spy) {
1646                 /* neutralize the spy */
1647                 ctx->spy->ctx = NULL;
1648                 ctx->spy = NULL;
1649         }
1650         talloc_free(ctx);
1651 }
1652
1653 static void ldb_kv_request_extended_done(struct ldb_kv_context *ctx,
1654                                          struct ldb_extended *ext,
1655                                          int error)
1656 {
1657         struct ldb_context *ldb;
1658         struct ldb_request *req;
1659         struct ldb_reply *ares;
1660
1661         ldb = ldb_module_get_ctx(ctx->module);
1662         req = ctx->req;
1663
1664         /* if we already returned an error just return */
1665         if (ldb_request_get_status(req) != LDB_SUCCESS) {
1666                 return;
1667         }
1668
1669         ares = talloc_zero(req, struct ldb_reply);
1670         if (!ares) {
1671                 ldb_oom(ldb);
1672                 req->callback(req, NULL);
1673                 return;
1674         }
1675         ares->type = LDB_REPLY_DONE;
1676         ares->response = ext;
1677         ares->error = error;
1678
1679         req->callback(req, ares);
1680 }
1681
1682 static void ldb_kv_handle_extended(struct ldb_kv_context *ctx)
1683 {
1684         struct ldb_extended *ext = NULL;
1685         int ret;
1686
1687         if (strcmp(ctx->req->op.extended.oid,
1688                    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
1689                 /* get sequence number */
1690                 ret = ldb_kv_sequence_number(ctx, &ext);
1691         } else {
1692                 /* not recognized */
1693                 ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1694         }
1695
1696         ldb_kv_request_extended_done(ctx, ext, ret);
1697 }
1698
1699 static void ldb_kv_callback(struct tevent_context *ev,
1700                             struct tevent_timer *te,
1701                             struct timeval t,
1702                             void *private_data)
1703 {
1704         struct ldb_kv_context *ctx;
1705         int ret;
1706
1707         ctx = talloc_get_type(private_data, struct ldb_kv_context);
1708
1709         if (ctx->request_terminated) {
1710                 goto done;
1711         }
1712
1713         switch (ctx->req->operation) {
1714         case LDB_SEARCH:
1715                 ret = ldb_kv_search(ctx);
1716                 break;
1717         case LDB_ADD:
1718                 ret = ldb_kv_add(ctx);
1719                 break;
1720         case LDB_MODIFY:
1721                 ret = ldb_kv_modify(ctx);
1722                 break;
1723         case LDB_DELETE:
1724                 ret = ldb_kv_delete(ctx);
1725                 break;
1726         case LDB_RENAME:
1727                 ret = ldb_kv_rename(ctx);
1728                 break;
1729         case LDB_EXTENDED:
1730                 ldb_kv_handle_extended(ctx);
1731                 goto done;
1732         default:
1733                 /* no other op supported */
1734                 ret = LDB_ERR_PROTOCOL_ERROR;
1735         }
1736
1737         if (!ctx->request_terminated) {
1738                 /* request is done now */
1739                 ldb_kv_request_done(ctx, ret);
1740         }
1741
1742 done:
1743         if (ctx->spy) {
1744                 /* neutralize the spy */
1745                 ctx->spy->ctx = NULL;
1746                 ctx->spy = NULL;
1747         }
1748         talloc_free(ctx);
1749 }
1750
1751 static int ldb_kv_request_destructor(void *ptr)
1752 {
1753         struct ldb_kv_req_spy *spy =
1754             talloc_get_type(ptr, struct ldb_kv_req_spy);
1755
1756         if (spy->ctx != NULL) {
1757                 spy->ctx->spy = NULL;
1758                 spy->ctx->request_terminated = true;
1759                 spy->ctx = NULL;
1760         }
1761
1762         return 0;
1763 }
1764
1765 static int ldb_kv_handle_request(struct ldb_module *module,
1766                                  struct ldb_request *req)
1767 {
1768         struct ldb_control *control_permissive;
1769         struct ldb_context *ldb;
1770         struct tevent_context *ev;
1771         struct ldb_kv_context *ac;
1772         struct tevent_timer *te;
1773         struct timeval tv;
1774         unsigned int i;
1775
1776         ldb = ldb_module_get_ctx(module);
1777
1778         control_permissive = ldb_request_get_control(req,
1779                                         LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1780
1781         for (i = 0; req->controls && req->controls[i]; i++) {
1782                 if (req->controls[i]->critical &&
1783                     req->controls[i] != control_permissive) {
1784                         ldb_asprintf_errstring(ldb, "Unsupported critical extension %s",
1785                                                req->controls[i]->oid);
1786                         return LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
1787                 }
1788         }
1789
1790         if (req->starttime == 0 || req->timeout == 0) {
1791                 ldb_set_errstring(ldb, "Invalid timeout settings");
1792                 return LDB_ERR_TIME_LIMIT_EXCEEDED;
1793         }
1794
1795         ev = ldb_handle_get_event_context(req->handle);
1796
1797         ac = talloc_zero(ldb, struct ldb_kv_context);
1798         if (ac == NULL) {
1799                 ldb_oom(ldb);
1800                 return LDB_ERR_OPERATIONS_ERROR;
1801         }
1802
1803         ac->module = module;
1804         ac->req = req;
1805
1806         tv.tv_sec = 0;
1807         tv.tv_usec = 0;
1808         te = tevent_add_timer(ev, ac, tv, ldb_kv_callback, ac);
1809         if (NULL == te) {
1810                 talloc_free(ac);
1811                 return LDB_ERR_OPERATIONS_ERROR;
1812         }
1813
1814         if (req->timeout > 0) {
1815                 tv.tv_sec = req->starttime + req->timeout;
1816                 tv.tv_usec = 0;
1817                 ac->timeout_event =
1818                     tevent_add_timer(ev, ac, tv, ldb_kv_timeout, ac);
1819                 if (NULL == ac->timeout_event) {
1820                         talloc_free(ac);
1821                         return LDB_ERR_OPERATIONS_ERROR;
1822                 }
1823         }
1824
1825         /* set a spy so that we do not try to use the request context
1826          * if it is freed before ltdb_callback fires */
1827         ac->spy = talloc(req, struct ldb_kv_req_spy);
1828         if (NULL == ac->spy) {
1829                 talloc_free(ac);
1830                 return LDB_ERR_OPERATIONS_ERROR;
1831         }
1832         ac->spy->ctx = ac;
1833
1834         talloc_set_destructor((TALLOC_CTX *)ac->spy, ldb_kv_request_destructor);
1835
1836         return LDB_SUCCESS;
1837 }
1838
1839 static int ldb_kv_init_rootdse(struct ldb_module *module)
1840 {
1841         /* ignore errors on this - we expect it for non-sam databases */
1842         ldb_mod_register_control(module, LDB_CONTROL_PERMISSIVE_MODIFY_OID);
1843
1844         /* there can be no module beyond the backend, just return */
1845         return LDB_SUCCESS;
1846 }
1847
1848 static int ldb_kv_lock_read(struct ldb_module *module)
1849 {
1850         void *data = ldb_module_get_private(module);
1851         struct ldb_kv_private *ldb_kv =
1852             talloc_get_type(data, struct ldb_kv_private);
1853         return ldb_kv->kv_ops->lock_read(module);
1854 }
1855
1856 static int ldb_kv_unlock_read(struct ldb_module *module)
1857 {
1858         void *data = ldb_module_get_private(module);
1859         struct ldb_kv_private *ldb_kv =
1860             talloc_get_type(data, struct ldb_kv_private);
1861         return ldb_kv->kv_ops->unlock_read(module);
1862 }
1863
1864 static const struct ldb_module_ops ldb_kv_ops = {
1865     .name = "tdb",
1866     .init_context = ldb_kv_init_rootdse,
1867     .search = ldb_kv_handle_request,
1868     .add = ldb_kv_handle_request,
1869     .modify = ldb_kv_handle_request,
1870     .del = ldb_kv_handle_request,
1871     .rename = ldb_kv_handle_request,
1872     .extended = ldb_kv_handle_request,
1873     .start_transaction = ldb_kv_start_trans,
1874     .end_transaction = ldb_kv_end_trans,
1875     .prepare_commit = ldb_kv_prepare_commit,
1876     .del_transaction = ldb_kv_del_trans,
1877     .read_lock = ldb_kv_lock_read,
1878     .read_unlock = ldb_kv_unlock_read,
1879 };
1880
1881 int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
1882                       const char *name,
1883                       struct ldb_context *ldb,
1884                       const char *options[],
1885                       struct ldb_module **_module)
1886 {
1887         if (getenv("LDB_WARN_UNINDEXED")) {
1888                 ldb_kv->warn_unindexed = true;
1889         }
1890
1891         if (getenv("LDB_WARN_REINDEX")) {
1892                 ldb_kv->warn_reindex = true;
1893         }
1894
1895         ldb_kv->sequence_number = 0;
1896
1897         ldb_kv->pid = getpid();
1898
1899         ldb_kv->module = ldb_module_new(ldb, ldb, name, &ldb_kv_ops);
1900         if (!ldb_kv->module) {
1901                 ldb_oom(ldb);
1902                 talloc_free(ldb_kv);
1903                 return LDB_ERR_OPERATIONS_ERROR;
1904         }
1905         ldb_module_set_private(ldb_kv->module, ldb_kv);
1906         talloc_steal(ldb_kv->module, ldb_kv);
1907
1908         if (ldb_kv_cache_load(ldb_kv->module) != 0) {
1909                 ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
1910                                        "records for backend '%s'", name);
1911                 talloc_free(ldb_kv->module);
1912                 return LDB_ERR_OPERATIONS_ERROR;
1913         }
1914
1915         *_module = ldb_kv->module;
1916         /*
1917          * Set or override the maximum key length
1918          *
1919          * The ldb_mdb code will have set this to 511, but our tests
1920          * set this even smaller (to make the tests more practical).
1921          *
1922          * This must only be used for the selftest as the length
1923          * becomes encoded in the index keys.
1924          */
1925         {
1926                 const char *len_str =
1927                         ldb_options_find(ldb, options,
1928                                          "max_key_len_for_self_test");
1929                 if (len_str != NULL) {
1930                         unsigned len = strtoul(len_str, NULL, 0);
1931                         ldb_kv->max_key_length = len;
1932                 }
1933         }
1934
1935         /*
1936          * Override full DB scans
1937          *
1938          * A full DB scan is expensive on a large database.  This
1939          * option is for testing to show that the full DB scan is not
1940          * triggered.
1941          */
1942         {
1943                 const char *len_str =
1944                         ldb_options_find(ldb, options,
1945                                          "disable_full_db_scan_for_self_test");
1946                 if (len_str != NULL) {
1947                         ldb_kv->disable_full_db_scan = true;
1948                 }
1949         }
1950
1951         /*
1952          * Set the size of the transaction index cache.
1953          * If the ldb option "transaction_index_cache_size" is set use that
1954          * otherwise use DEFAULT_INDEX_CACHE_SIZE
1955          */
1956         ldb_kv->index_transaction_cache_size = DEFAULT_INDEX_CACHE_SIZE;
1957         {
1958                 const char *size = ldb_options_find(
1959                         ldb,
1960                         options,
1961                         "transaction_index_cache_size");
1962                 if (size != NULL) {
1963                         size_t cache_size = 0;
1964                         errno = 0;
1965
1966                         cache_size = strtoul( size, NULL, 0);
1967                         if (cache_size == 0 || errno == ERANGE) {
1968                                 ldb_debug(
1969                                         ldb,
1970                                         LDB_DEBUG_WARNING,
1971                                         "Invalid transaction_index_cache_size "
1972                                         "value [%s], using default(%d)\n",
1973                                         size,
1974                                         DEFAULT_INDEX_CACHE_SIZE);
1975                         } else {
1976                                 ldb_kv->index_transaction_cache_size =
1977                                         cache_size;
1978                         }
1979                 }
1980         }
1981
1982         return LDB_SUCCESS;
1983 }