tests/ldb_kv: Add another case for completeness
[metze/samba-autobuild/.git] / lib / ldb / tests / ldb_kv_ops_test.c
1 /*
2  * Tests exercising the ldb key value operations.
3  *
4  *  Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20
21 /*
22  * from cmocka.c:
23  * These headers or their equivalents should be included prior to
24  * including
25  * this header file.
26  *
27  * #include <stdarg.h>
28  * #include <stddef.h>
29  * #include <setjmp.h>
30  *
31  * This allows test applications to use custom definitions of C standard
32  * library functions and types.
33  *
34  */
35
36 /*
37  * A KV module is expected to have the following behaviour
38  *
39  * - A transaction must be open to perform any read, write or delete operation
40  * - Writes and Deletes should not be visible until a transaction is commited
41  * - Nested transactions are not permitted
42  * - transactions can be rolled back and commited.
43  * - supports iteration over all records in the database
44  * - supports the update_in_iterate operation allowing entries to be
45  *   re-keyed.
46  * - has a get_size implementation that returns an estimate of the number of
47  *   records in the database.  Note that this can be an estimate rather than
48  *   an accurate size.
49  */
50 #include <stdarg.h>
51 #include <stddef.h>
52 #include <setjmp.h>
53 #include <cmocka.h>
54
55 #include <errno.h>
56 #include <unistd.h>
57 #include <talloc.h>
58 #include <tevent.h>
59 #include <ldb.h>
60 #include <ldb_module.h>
61 #include <ldb_private.h>
62 #include <string.h>
63 #include <ctype.h>
64
65 #include <sys/wait.h>
66
67 #include "ldb_tdb/ldb_tdb.h"
68 #include "ldb_key_value/ldb_kv.h"
69
70
71 #define DEFAULT_BE  "tdb"
72
73 #ifndef TEST_BE
74 #define TEST_BE DEFAULT_BE
75 #endif /* TEST_BE */
76
77 #define NUM_RECS 1024
78
79
80 struct test_ctx {
81         struct tevent_context *ev;
82         struct ldb_context *ldb;
83
84         const char *dbfile;
85         const char *lockfile;   /* lockfile is separate */
86
87         const char *dbpath;
88 };
89
90 static void unlink_old_db(struct test_ctx *test_ctx)
91 {
92         int ret;
93
94         errno = 0;
95         ret = unlink(test_ctx->lockfile);
96         if (ret == -1 && errno != ENOENT) {
97                 fail();
98         }
99
100         errno = 0;
101         ret = unlink(test_ctx->dbfile);
102         if (ret == -1 && errno != ENOENT) {
103                 fail();
104         }
105 }
106
107 static int noconn_setup(void **state)
108 {
109         struct test_ctx *test_ctx;
110
111         test_ctx = talloc_zero(NULL, struct test_ctx);
112         assert_non_null(test_ctx);
113
114         test_ctx->ev = tevent_context_init(test_ctx);
115         assert_non_null(test_ctx->ev);
116
117         test_ctx->ldb = ldb_init(test_ctx, test_ctx->ev);
118         assert_non_null(test_ctx->ldb);
119
120         test_ctx->dbfile = talloc_strdup(test_ctx, "kvopstest.ldb");
121         assert_non_null(test_ctx->dbfile);
122
123         test_ctx->lockfile = talloc_asprintf(test_ctx, "%s-lock",
124                                              test_ctx->dbfile);
125         assert_non_null(test_ctx->lockfile);
126
127         test_ctx->dbpath = talloc_asprintf(test_ctx,
128                         TEST_BE"://%s", test_ctx->dbfile);
129         assert_non_null(test_ctx->dbpath);
130
131         unlink_old_db(test_ctx);
132         *state = test_ctx;
133         return 0;
134 }
135
136 static int noconn_teardown(void **state)
137 {
138         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
139                                                           struct test_ctx);
140
141         unlink_old_db(test_ctx);
142         talloc_free(test_ctx);
143         return 0;
144 }
145
146 static int setup(void **state)
147 {
148         struct test_ctx *test_ctx;
149         int ret;
150         struct ldb_ldif *ldif;
151         const char *index_ldif =                \
152                 "dn: @INDEXLIST\n"
153                 "@IDXGUID: objectUUID\n"
154                 "@IDX_DN_GUID: GUID\n"
155                 "\n";
156
157         noconn_setup((void **) &test_ctx);
158
159         ret = ldb_connect(test_ctx->ldb, test_ctx->dbpath, 0, NULL);
160         assert_int_equal(ret, 0);
161
162         while ((ldif = ldb_ldif_read_string(test_ctx->ldb, &index_ldif))) {
163                 ret = ldb_add(test_ctx->ldb, ldif->msg);
164                 assert_int_equal(ret, LDB_SUCCESS);
165         }
166         *state = test_ctx;
167         return 0;
168 }
169
170 static int teardown(void **state)
171 {
172         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
173                                                           struct test_ctx);
174         noconn_teardown((void **) &test_ctx);
175         return 0;
176 }
177
178 static struct ldb_kv_private *get_ldb_kv(struct ldb_context *ldb)
179 {
180         void *data = NULL;
181         struct ldb_kv_private *ldb_kv = NULL;
182
183         data = ldb_module_get_private(ldb->modules);
184         assert_non_null(data);
185
186         ldb_kv = talloc_get_type(data, struct ldb_kv_private);
187         assert_non_null(ldb_kv);
188
189         return ldb_kv;
190 }
191
192 static int parse(struct ldb_val key,
193                  struct ldb_val data,
194                  void *private_data)
195 {
196         struct ldb_val* read = private_data;
197
198         /* Yes, we leak this.  That is OK */
199         read->data = talloc_size(NULL,
200                                  data.length);
201         assert_non_null(read->data);
202
203         memcpy(read->data, data.data, data.length);
204         read->length = data.length;
205         return LDB_SUCCESS;
206 }
207
208 /*
209  * Test that data can be written to the kv store and be read back.
210  */
211 static void test_add_get(void **state)
212 {
213         int ret;
214         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
215                                                           struct test_ctx);
216         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
217         uint8_t key_val[] = "TheKey";
218         struct ldb_val key = {
219                 .data   = key_val,
220                 .length = sizeof(key_val)
221         };
222
223         uint8_t value[] = "The record contents";
224         struct ldb_val data = {
225                 .data    = value,
226                 .length = sizeof(value)
227         };
228
229         struct ldb_val read;
230
231         int flags = 0;
232         TALLOC_CTX *tmp_ctx;
233
234         tmp_ctx = talloc_new(test_ctx);
235         assert_non_null(tmp_ctx);
236
237         /*
238          * Begin a transaction
239          */
240         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
241         assert_int_equal(ret, 0);
242
243         /*
244          * Write the record
245          */
246         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
247         assert_int_equal(ret, 0);
248
249         /*
250          * Commit the transaction
251          */
252         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
253         assert_int_equal(ret, 0);
254
255         /*
256          * And now read it back
257          */
258         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
259         assert_int_equal(ret, 0);
260
261         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
262         assert_int_equal(ret, 0);
263
264         assert_int_equal(sizeof(value), read.length);
265         assert_memory_equal(value, read.data, sizeof(value));
266
267         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
268         assert_int_equal(ret, 0);
269         talloc_free(tmp_ctx);
270 }
271
272 /*
273  * Test that attempts to read data without a read transaction fail.
274  */
275 static void test_read_outside_transaction(void **state)
276 {
277         int ret;
278         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
279                                                           struct test_ctx);
280         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
281         uint8_t key_val[] = "TheKey";
282         struct ldb_val key = {
283                 .data   = key_val,
284                 .length = sizeof(key_val)
285         };
286
287         uint8_t value[] = "The record contents";
288         struct ldb_val data = {
289                 .data    = value,
290                 .length = sizeof(value)
291         };
292
293         struct ldb_val read;
294
295         int flags = 0;
296         TALLOC_CTX *tmp_ctx;
297
298         tmp_ctx = talloc_new(test_ctx);
299         assert_non_null(tmp_ctx);
300
301         /*
302          * Begin a transaction
303          */
304         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
305         assert_int_equal(ret, 0);
306
307         /*
308          * Write the record
309          */
310         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
311         assert_int_equal(ret, 0);
312
313         /*
314          * Commit the transaction
315          */
316         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
317         assert_int_equal(ret, 0);
318
319         /*
320          * And now read it back
321          * Note there is no read transaction active
322          */
323         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
324         assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
325
326         talloc_free(tmp_ctx);
327 }
328
329 /*
330  * Test that data can be deleted from the kv store
331  */
332 static void test_delete(void **state)
333 {
334         int ret;
335         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
336                                                           struct test_ctx);
337         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
338         uint8_t key_val[] = "TheKey";
339         struct ldb_val key = {
340                 .data   = key_val,
341                 .length = sizeof(key_val)
342         };
343
344         uint8_t value[] = "The record contents";
345         struct ldb_val data = {
346                 .data    = value,
347                 .length = sizeof(value)
348         };
349
350         struct ldb_val read;
351
352         int flags = 0;
353         TALLOC_CTX *tmp_ctx;
354
355         tmp_ctx = talloc_new(test_ctx);
356         assert_non_null(tmp_ctx);
357
358         /*
359          * Begin a transaction
360          */
361         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
362         assert_int_equal(ret, 0);
363
364         /*
365          * Write the record
366          */
367         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
368         assert_int_equal(ret, 0);
369
370         /*
371          * Commit the transaction
372          */
373         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
374         assert_int_equal(ret, 0);
375
376         /*
377          * And now read it back
378          */
379         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
380         assert_int_equal(ret, 0);
381         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
382         assert_int_equal(ret, 0);
383         assert_int_equal(sizeof(value), read.length);
384         assert_memory_equal(value, read.data, sizeof(value));
385         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
386         assert_int_equal(ret, 0);
387
388         /*
389          * Begin a transaction
390          */
391         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
392         assert_int_equal(ret, 0);
393
394         /*
395          * Now delete it.
396          */
397         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
398         assert_int_equal(ret, 0);
399
400         /*
401          * Commit the transaction
402          */
403         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
404         assert_int_equal(ret, 0);
405
406         /*
407          * And now try to read it back
408          */
409         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
410         assert_int_equal(ret, 0);
411         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
412         assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
413         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
414         assert_int_equal(ret, 0);
415
416         talloc_free(tmp_ctx);
417 }
418
419 /*
420  * Check that writes are correctly rolled back when a transaction
421  * is rolled back.
422  */
423 static void test_transaction_abort_write(void **state)
424 {
425         int ret;
426         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
427                                                           struct test_ctx);
428         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
429         uint8_t key_val[] = "TheKey";
430         struct ldb_val key = {
431                 .data   = key_val,
432                 .length = sizeof(key_val)
433         };
434
435         uint8_t value[] = "The record contents";
436         struct ldb_val data = {
437                 .data    = value,
438                 .length = sizeof(value)
439         };
440
441         struct ldb_val read;
442
443         int flags = 0;
444         TALLOC_CTX *tmp_ctx;
445
446         tmp_ctx = talloc_new(test_ctx);
447         assert_non_null(tmp_ctx);
448
449         /*
450          * Begin a transaction
451          */
452         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
453         assert_int_equal(ret, 0);
454
455         /*
456          * Write the record
457          */
458         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
459         assert_int_equal(ret, 0);
460
461         /*
462          * And now read it back
463          */
464         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
465         assert_int_equal(ret, 0);
466         assert_int_equal(sizeof(value), read.length);
467         assert_memory_equal(value, read.data, sizeof(value));
468
469
470         /*
471          * Now abort the transaction
472          */
473         ret = ldb_kv->kv_ops->abort_write(ldb_kv);
474         assert_int_equal(ret, 0);
475
476         /*
477          * And now read it back, should not be there
478          */
479         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
480         assert_int_equal(ret, 0);
481         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
482         assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
483         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
484         assert_int_equal(ret, 0);
485
486         talloc_free(tmp_ctx);
487 }
488
489 /*
490  * Check that deletes are correctly rolled back when a transaction is
491  * aborted.
492  */
493 static void test_transaction_abort_delete(void **state)
494 {
495         int ret;
496         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
497                                                           struct test_ctx);
498         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
499         uint8_t key_val[] = "TheKey";
500         struct ldb_val key = {
501                 .data   = key_val,
502                 .length = sizeof(key_val)
503         };
504
505         uint8_t value[] = "The record contents";
506         struct ldb_val data = {
507                 .data    = value,
508                 .length = sizeof(value)
509         };
510
511         struct ldb_val read;
512
513         int flags = 0;
514         TALLOC_CTX *tmp_ctx;
515
516         tmp_ctx = talloc_new(test_ctx);
517         assert_non_null(tmp_ctx);
518
519         /*
520          * Begin a transaction
521          */
522         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
523         assert_int_equal(ret, 0);
524
525         /*
526          * Write the record
527          */
528         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
529         assert_int_equal(ret, 0);
530
531         /*
532          * Commit the transaction
533          */
534         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
535         assert_int_equal(ret, 0);
536
537         /*
538          * And now read it back
539          */
540         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
541         assert_int_equal(ret, 0);
542         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
543         assert_int_equal(ret, 0);
544         assert_int_equal(sizeof(value), read.length);
545         assert_memory_equal(value, read.data, sizeof(value));
546         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
547         assert_int_equal(ret, 0);
548
549         /*
550          * Begin a transaction
551          */
552         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
553         assert_int_equal(ret, 0);
554
555         /*
556          * Now delete it.
557          */
558         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
559         assert_int_equal(ret, 0);
560
561         /*
562          * And now read it back
563          */
564         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
565         assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
566
567         /*
568          * Abort the transaction
569          */
570         ret = ldb_kv->kv_ops->abort_write(ldb_kv);
571         assert_int_equal(ret, 0);
572
573         /*
574          * And now try to read it back
575          */
576         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
577         assert_int_equal(ret, 0);
578         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
579         assert_int_equal(ret, 0);
580         assert_int_equal(sizeof(value), read.length);
581         assert_memory_equal(value, read.data, sizeof(value));
582         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
583         assert_int_equal(ret, 0);
584
585         talloc_free(tmp_ctx);
586 }
587
588 /*
589  * Test that writes outside a transaction fail
590  */
591 static void test_write_outside_transaction(void **state)
592 {
593         int ret;
594         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
595                                                           struct test_ctx);
596         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
597         uint8_t key_val[] = "TheKey";
598         struct ldb_val key = {
599                 .data   = key_val,
600                 .length = sizeof(key_val)
601         };
602
603         uint8_t value[] = "The record contents";
604         struct ldb_val data = {
605                 .data    = value,
606                 .length = sizeof(value)
607         };
608
609
610         int flags = 0;
611         TALLOC_CTX *tmp_ctx;
612
613         tmp_ctx = talloc_new(test_ctx);
614         assert_non_null(tmp_ctx);
615
616         /*
617          * Attempt to write the record
618          */
619         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
620         assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
621
622         talloc_free(tmp_ctx);
623 }
624
625 /*
626  * Test data can not be deleted outside a transaction
627  */
628 static void test_delete_outside_transaction(void **state)
629 {
630         int ret;
631         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
632                                                           struct test_ctx);
633         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
634         uint8_t key_val[] = "TheKey";
635         struct ldb_val key = {
636                 .data   = key_val,
637                 .length = sizeof(key_val)
638         };
639
640         uint8_t value[] = "The record contents";
641         struct ldb_val data = {
642                 .data    = value,
643                 .length = sizeof(value)
644         };
645
646         struct ldb_val read;
647
648         int flags = 0;
649         TALLOC_CTX *tmp_ctx;
650
651         tmp_ctx = talloc_new(test_ctx);
652         assert_non_null(tmp_ctx);
653
654         /*
655          * Begin a transaction
656          */
657         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
658         assert_int_equal(ret, 0);
659
660         /*
661          * Write the record
662          */
663         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
664         assert_int_equal(ret, 0);
665
666         /*
667          * Commit the transaction
668          */
669         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
670         assert_int_equal(ret, 0);
671
672         /*
673          * And now read it back
674          */
675         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
676         assert_int_equal(ret, 0);
677         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
678         assert_int_equal(ret, 0);
679         assert_int_equal(sizeof(value), read.length);
680         assert_memory_equal(value, read.data, sizeof(value));
681         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
682         assert_int_equal(ret, 0);
683
684         /*
685          * Now attempt to delete a record
686          */
687         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
688         assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
689
690         /*
691          * And now read it back
692          */
693         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
694         assert_int_equal(ret, 0);
695         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
696         assert_int_equal(ret, 0);
697         assert_int_equal(sizeof(value), read.length);
698         assert_memory_equal(value, read.data, sizeof(value));
699         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
700         assert_int_equal(ret, 0);
701
702         talloc_free(tmp_ctx);
703 }
704
705 static int traverse_fn(struct ldb_kv_private *ldb_kv,
706                        struct ldb_val key,
707                        struct ldb_val data,
708                        void *ctx)
709 {
710
711         int *visits = ctx;
712         int i;
713
714         if (strncmp("key ", (char *) key.data, 4) == 0) {
715                 i = strtol((char *) &key.data[4], NULL, 10);
716                 visits[i]++;
717         }
718         return LDB_SUCCESS;
719 }
720
721 /*
722  * Test that iterate visits all the records.
723  */
724 static void test_iterate(void **state)
725 {
726         int ret;
727         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
728                                                           struct test_ctx);
729         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
730         int i;
731         int num_recs = 1024;
732         int visits[num_recs];
733
734         TALLOC_CTX *tmp_ctx;
735
736         tmp_ctx = talloc_new(test_ctx);
737         assert_non_null(tmp_ctx);
738
739         /*
740          * Begin a transaction
741          */
742         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
743         assert_int_equal(ret, 0);
744
745         /*
746          * Write the records
747          */
748         for (i = 0; i < num_recs; i++) {
749                 struct ldb_val key;
750                 struct ldb_val rec;
751                 int flags = 0;
752
753                 visits[i] = 0;
754                 key.data   = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", i);
755                 key.length = strlen((char *)key.data) + 1;
756
757                 rec.data = (uint8_t *) talloc_asprintf(tmp_ctx,
758                                                        "data for record (%04d)",
759                                                        i);
760                 rec.length = strlen((char *)rec.data) + 1;
761
762                 ret = ldb_kv->kv_ops->store(ldb_kv, key, rec, flags);
763                 assert_int_equal(ret, 0);
764
765                 TALLOC_FREE(key.data);
766                 TALLOC_FREE(rec.data);
767         }
768
769         /*
770          * Commit the transaction
771          */
772         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
773         assert_int_equal(ret, 0);
774
775         /*
776          * Now iterate over the kv store and ensure that all the
777          * records are visited.
778          */
779         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
780         assert_int_equal(ret, 0);
781         ret = ldb_kv->kv_ops->iterate(ldb_kv, traverse_fn, visits);
782         for (i = 0; i <num_recs; i++) {
783                 assert_int_equal(1, visits[i]);
784         }
785         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
786         assert_int_equal(ret, 0);
787
788         TALLOC_FREE(tmp_ctx);
789 }
790
791 static void do_iterate_range_test(void **state, int range_start,
792                                   int range_end, bool fail)
793 {
794         int ret;
795         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
796                                                           struct test_ctx);
797         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
798         int i;
799         int num_recs = 1024;
800         int skip_recs = 10;
801         int visits[num_recs];
802         struct ldb_val sk, ek;
803
804         TALLOC_CTX *tmp_ctx;
805
806         for (i = 0; i < num_recs; i++){
807                 visits[i] = 0;
808         }
809
810         /*
811          * No iterate_range on tdb
812          */
813         if (strcmp(TEST_BE, "tdb") == 0) {
814                 return;
815         }
816
817         tmp_ctx = talloc_new(test_ctx);
818         assert_non_null(tmp_ctx);
819
820         /*
821          * Begin a transaction
822          */
823         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
824         assert_int_equal(ret, 0);
825
826         /*
827          * Write the records
828          */
829         for (i = skip_recs; i <= num_recs - skip_recs; i++) {
830                 struct ldb_val key;
831                 struct ldb_val rec;
832                 int flags = 0;
833
834                 key.data   = (uint8_t *)talloc_asprintf(tmp_ctx,
835                                                         "key %04d",
836                                                         i);
837                 key.length = strlen((char *)key.data);
838
839                 rec.data = (uint8_t *)talloc_asprintf(tmp_ctx,
840                                                       "data for record (%04d)",
841                                                       i);
842                 rec.length = strlen((char *)rec.data) + 1;
843
844                 ret = ldb_kv->kv_ops->store(ldb_kv, key, rec, flags);
845                 assert_int_equal(ret, 0);
846
847                 TALLOC_FREE(key.data);
848                 TALLOC_FREE(rec.data);
849         }
850
851         /*
852          * Commit the transaction
853          */
854         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
855         assert_int_equal(ret, 0);
856
857         sk.data = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", range_start);
858         sk.length = strlen((char *)sk.data);
859
860         ek.data = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", range_end);
861         ek.length = strlen((char *)ek.data) + 1;
862
863         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
864         assert_int_equal(ret, 0);
865         ret = ldb_kv->kv_ops->iterate_range(ldb_kv, sk, ek,
866                                             traverse_fn, visits);
867         if (fail){
868                 assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
869                 TALLOC_FREE(tmp_ctx);
870                 return;
871         } else{
872                 assert_int_equal(ret, 0);
873         }
874         for (i = 0; i < num_recs; i++) {
875                 if (i >= skip_recs && i <= num_recs - skip_recs &&
876                     i >= range_start && i <= range_end){
877                         assert_int_equal(1, visits[i]);
878                 } else {
879                         assert_int_equal(0, visits[i]);
880                 }
881         }
882
883         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
884         assert_int_equal(ret, 0);
885
886         TALLOC_FREE(tmp_ctx);
887 }
888
889 /*
890  * Test that iterate_range visits all the records between two keys.
891  */
892 static void test_iterate_range(void **state)
893 {
894         do_iterate_range_test(state, 300, 900, false);
895
896         /*
897          * test start_key = end_key
898          */
899         do_iterate_range_test(state, 20, 20, false);
900
901         /*
902          * test reverse range fails
903          */
904         do_iterate_range_test(state, 50, 40, true);
905
906         /*
907          * keys are between 10-1014 so test with keys outside that range
908          */
909         do_iterate_range_test(state, 0, 20, false);
910         do_iterate_range_test(state, 1010, 1030, false);
911         do_iterate_range_test(state, 0, 1030, false);
912 }
913
914 struct update_context {
915         struct ldb_context* ldb;
916         int visits[NUM_RECS];
917 };
918
919 static int update_fn(struct ldb_kv_private *ldb_kv,
920                      struct ldb_val key,
921                      struct ldb_val data,
922                      void *ctx)
923 {
924
925         struct ldb_val new_key;
926         struct ldb_module *module = NULL;
927         struct update_context *context =NULL;
928         int ret = LDB_SUCCESS;
929         TALLOC_CTX *tmp_ctx;
930
931         tmp_ctx = talloc_new(ldb_kv);
932         assert_non_null(tmp_ctx);
933
934         context = talloc_get_type_abort(ctx, struct update_context);
935
936         module = talloc_zero(tmp_ctx, struct ldb_module);
937         module->ldb = context->ldb;
938
939         if (strncmp("key ", (char *) key.data, 4) == 0) {
940                 int i = strtol((char *) &key.data[4], NULL, 10);
941                 context->visits[i]++;
942                 new_key.data = talloc_memdup(tmp_ctx, key.data, key.length);
943                 new_key.length  = key.length;
944                 new_key.data[0] = 'K';
945
946                 ret = ldb_kv->kv_ops->update_in_iterate(
947                     ldb_kv, key, new_key, data, &module);
948         }
949         TALLOC_FREE(tmp_ctx);
950         return ret;
951 }
952
953 /*
954  * Test that update_in_iterate behaves as expected.
955  */
956 static void test_update_in_iterate(void **state)
957 {
958         int ret;
959         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
960                                                           struct test_ctx);
961         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
962         int i;
963         struct update_context *context = NULL;
964
965
966         TALLOC_CTX *tmp_ctx;
967
968         tmp_ctx = talloc_new(test_ctx);
969         assert_non_null(tmp_ctx);
970
971         context = talloc_zero(tmp_ctx, struct update_context);
972         assert_non_null(context);
973         context->ldb = test_ctx->ldb;
974         /*
975          * Begin a transaction
976          */
977         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
978         assert_int_equal(ret, 0);
979
980         /*
981          * Write the records
982          */
983         for (i = 0; i < NUM_RECS; i++) {
984                 struct ldb_val key;
985                 struct ldb_val rec;
986                 int flags = 0;
987
988                 key.data   = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", i);
989                 key.length = strlen((char *)key.data) + 1;
990
991                 rec.data   = (uint8_t *) talloc_asprintf(tmp_ctx,
992                                                          "data for record (%04d)",
993                                                          i);
994                 rec.length = strlen((char *)rec.data) + 1;
995
996                 ret = ldb_kv->kv_ops->store(ldb_kv, key, rec, flags);
997                 assert_int_equal(ret, 0);
998
999                 TALLOC_FREE(key.data);
1000                 TALLOC_FREE(rec.data);
1001         }
1002
1003         /*
1004          * Commit the transaction
1005          */
1006         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1007         assert_int_equal(ret, 0);
1008
1009         /*
1010          * Now iterate over the kv store and ensure that all the
1011          * records are visited.
1012          */
1013
1014         /*
1015          * Needs to be done inside a transaction
1016          */
1017         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1018         assert_int_equal(ret, 0);
1019
1020         ret = ldb_kv->kv_ops->iterate(ldb_kv, update_fn, context);
1021         for (i = 0; i < NUM_RECS; i++) {
1022                 assert_int_equal(1, context->visits[i]);
1023         }
1024
1025         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1026         assert_int_equal(ret, 0);
1027
1028         TALLOC_FREE(tmp_ctx);
1029 }
1030
1031 /*
1032  * Ensure that writes are not visible until the transaction has been
1033  * committed.
1034  */
1035 static void test_write_transaction_isolation(void **state)
1036 {
1037         int ret;
1038         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
1039                                                           struct test_ctx);
1040         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
1041         struct ldb_val key;
1042         struct ldb_val val;
1043
1044         const char *KEY1 = "KEY01";
1045         const char *VAL1 = "VALUE01";
1046
1047         const char *KEY2 = "KEY02";
1048         const char *VAL2 = "VALUE02";
1049
1050         /*
1051          * Pipes etc to co-ordinate the processes
1052          */
1053         int to_child[2];
1054         int to_parent[2];
1055         char buf[2];
1056         pid_t pid, w_pid;
1057         int wstatus;
1058
1059         TALLOC_CTX *tmp_ctx;
1060         tmp_ctx = talloc_new(test_ctx);
1061         assert_non_null(tmp_ctx);
1062
1063
1064         /*
1065          * Add a record to the database
1066          */
1067         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1068         assert_int_equal(ret, 0);
1069
1070         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1071         key.length = strlen(KEY1) + 1;
1072
1073         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL1);
1074         val.length = strlen(VAL1) + 1;
1075
1076         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
1077         assert_int_equal(ret, 0);
1078
1079         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1080         assert_int_equal(ret, 0);
1081
1082
1083         ret = pipe(to_child);
1084         assert_int_equal(ret, 0);
1085         ret = pipe(to_parent);
1086         assert_int_equal(ret, 0);
1087         /*
1088          * Now fork a new process
1089          */
1090
1091         pid = fork();
1092         if (pid == 0) {
1093
1094                 struct ldb_context *ldb = NULL;
1095                 close(to_child[1]);
1096                 close(to_parent[0]);
1097
1098                 /*
1099                  * Wait for the transaction to start
1100                  */
1101                 ret = read(to_child[0], buf, 2);
1102                 if (ret != 2) {
1103                         print_error(__location__": read returned (%d)\n",
1104                                     ret);
1105                         exit(LDB_ERR_OPERATIONS_ERROR);
1106                 }
1107                 ldb = ldb_init(test_ctx, test_ctx->ev);
1108                 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
1109                 if (ret != LDB_SUCCESS) {
1110                         print_error(__location__": ldb_connect returned (%d)\n",
1111                                     ret);
1112                         exit(ret);
1113                 }
1114
1115                 ldb_kv = get_ldb_kv(ldb);
1116
1117                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1118                 if (ret != LDB_SUCCESS) {
1119                         print_error(__location__": lock_read returned (%d)\n",
1120                                     ret);
1121                         exit(ret);
1122                 }
1123
1124                 /*
1125                  * Check that KEY1 is there
1126                  */
1127                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1128                 key.length = strlen(KEY1) + 1;
1129
1130                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1131                 if (ret != LDB_SUCCESS) {
1132                         print_error(__location__": fetch_and_parse returned "
1133                                     "(%d)\n",
1134                                     ret);
1135                         exit(ret);
1136                 }
1137
1138                 if ((strlen(VAL1) + 1) != val.length) {
1139                         print_error(__location__": KEY1 value lengths different"
1140                                     ", expected (%d) actual(%d)\n",
1141                                     (int)(strlen(VAL1) + 1), (int)val.length);
1142                         exit(LDB_ERR_OPERATIONS_ERROR);
1143                 }
1144                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1145                         print_error(__location__": KEY1 values different, "
1146                                     "expected (%s) actual(%s)\n",
1147                                     VAL1,
1148                                     val.data);
1149                         exit(LDB_ERR_OPERATIONS_ERROR);
1150                 }
1151
1152                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1153                 if (ret != LDB_SUCCESS) {
1154                         print_error(__location__": unlock_read returned (%d)\n",
1155                                     ret);
1156                         exit(ret);
1157                 }
1158
1159                 /*
1160                  * Check that KEY2 is not there
1161                  */
1162                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1163                 key.length = strlen(KEY2 + 1);
1164
1165                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1166                 if (ret != LDB_SUCCESS) {
1167                         print_error(__location__": lock_read returned (%d)\n",
1168                                     ret);
1169                         exit(ret);
1170                 }
1171
1172                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1173                 if (ret != LDB_ERR_NO_SUCH_OBJECT) {
1174                         print_error(__location__": fetch_and_parse returned "
1175                                     "(%d)\n",
1176                                     ret);
1177                         exit(ret);
1178                 }
1179
1180                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1181                 if (ret != LDB_SUCCESS) {
1182                         print_error(__location__": unlock_read returned (%d)\n",
1183                                     ret);
1184                         exit(ret);
1185                 }
1186
1187                 /*
1188                  * Signal the other process to commit the transaction
1189                  */
1190                 ret = write(to_parent[1], "GO", 2);
1191                 if (ret != 2) {
1192                         print_error(__location__": write returned (%d)\n",
1193                                     ret);
1194                         exit(LDB_ERR_OPERATIONS_ERROR);
1195                 }
1196
1197                 /*
1198                  * Wait for the transaction to be commited
1199                  */
1200                 ret = read(to_child[0], buf, 2);
1201                 if (ret != 2) {
1202                         print_error(__location__": read returned (%d)\n",
1203                                     ret);
1204                         exit(LDB_ERR_OPERATIONS_ERROR);
1205                 }
1206
1207                 /*
1208                  * Check that KEY1 is there
1209                  */
1210                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1211                 if (ret != LDB_SUCCESS) {
1212                         print_error(__location__": unlock_read returned (%d)\n",
1213                                     ret);
1214                         exit(ret);
1215                 }
1216                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1217                 key.length = strlen(KEY1) + 1;
1218
1219                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1220                 if (ret != LDB_SUCCESS) {
1221                         print_error(__location__": fetch_and_parse returned "
1222                                     "(%d)\n",
1223                                     ret);
1224                         exit(ret);
1225                 }
1226
1227                 if ((strlen(VAL1) + 1) != val.length) {
1228                         print_error(__location__": KEY1 value lengths different"
1229                                     ", expected (%d) actual(%d)\n",
1230                                     (int)(strlen(VAL1) + 1), (int)val.length);
1231                         exit(LDB_ERR_OPERATIONS_ERROR);
1232                 }
1233                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1234                         print_error(__location__": KEY1 values different, "
1235                                     "expected (%s) actual(%s)\n",
1236                                     VAL1,
1237                                     val.data);
1238                         exit(LDB_ERR_OPERATIONS_ERROR);
1239                 }
1240
1241                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1242                 if (ret != LDB_SUCCESS) {
1243                         print_error(__location__": unlock_read returned (%d)\n",
1244                                     ret);
1245                         exit(ret);
1246                 }
1247
1248
1249                 /*
1250                  * Check that KEY2 is there
1251                  */
1252                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1253                 if (ret != LDB_SUCCESS) {
1254                         print_error(__location__": unlock_read returned (%d)\n",
1255                                     ret);
1256                         exit(ret);
1257                 }
1258
1259                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1260                 key.length = strlen(KEY2) + 1;
1261
1262                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1263                 if (ret != LDB_SUCCESS) {
1264                         print_error(__location__": fetch_and_parse returned "
1265                                     "(%d)\n",
1266                                     ret);
1267                         exit(ret);
1268                 }
1269
1270                 if ((strlen(VAL2) + 1) != val.length) {
1271                         print_error(__location__": KEY2 value lengths different"
1272                                     ", expected (%d) actual(%d)\n",
1273                                     (int)(strlen(VAL2) + 1), (int)val.length);
1274                         exit(LDB_ERR_OPERATIONS_ERROR);
1275                 }
1276                 if (memcmp(VAL2, val.data, strlen(VAL2)) != 0) {
1277                         print_error(__location__": KEY2 values different, "
1278                                     "expected (%s) actual(%s)\n",
1279                                     VAL2,
1280                                     val.data);
1281                         exit(LDB_ERR_OPERATIONS_ERROR);
1282                 }
1283
1284                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1285                 if (ret != LDB_SUCCESS) {
1286                         print_error(__location__": unlock_read returned (%d)\n",
1287                                     ret);
1288                         exit(ret);
1289                 }
1290
1291                 exit(0);
1292         }
1293         close(to_child[0]);
1294         close(to_parent[1]);
1295
1296         /*
1297          * Begin a transaction and add a record to the database
1298          * but leave the transaction open
1299          */
1300         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1301         assert_int_equal(ret, 0);
1302
1303         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1304         key.length = strlen(KEY2) + 1;
1305
1306         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL2);
1307         val.length = strlen(VAL2) + 1;
1308
1309         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
1310         assert_int_equal(ret, 0);
1311
1312         /*
1313          * Signal the child process
1314          */
1315         ret = write(to_child[1], "GO", 2);
1316         assert_int_equal(2, ret);
1317
1318         /*
1319          * Wait for the child process to check the DB state while the
1320          * transaction is active
1321          */
1322         ret = read(to_parent[0], buf, 2);
1323         assert_int_equal(2, ret);
1324
1325         /*
1326          * commit the transaction
1327          */
1328         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1329         assert_int_equal(0, ret);
1330
1331         /*
1332          * Signal the child process
1333          */
1334         ret = write(to_child[1], "GO", 2);
1335         assert_int_equal(2, ret);
1336
1337         w_pid = waitpid(pid, &wstatus, 0);
1338         assert_int_equal(pid, w_pid);
1339
1340         assert_true(WIFEXITED(wstatus));
1341
1342         assert_int_equal(WEXITSTATUS(wstatus), 0);
1343
1344
1345         TALLOC_FREE(tmp_ctx);
1346 }
1347
1348 /*
1349  * Ensure that deletes are not visible until the transaction has been
1350  * committed.
1351  */
1352 static void test_delete_transaction_isolation(void **state)
1353 {
1354         int ret;
1355         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
1356                                                           struct test_ctx);
1357         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
1358         struct ldb_val key;
1359         struct ldb_val val;
1360
1361         const char *KEY1 = "KEY01";
1362         const char *VAL1 = "VALUE01";
1363
1364         const char *KEY2 = "KEY02";
1365         const char *VAL2 = "VALUE02";
1366
1367         /*
1368          * Pipes etc to co-ordinate the processes
1369          */
1370         int to_child[2];
1371         int to_parent[2];
1372         char buf[2];
1373         pid_t pid, w_pid;
1374         int wstatus;
1375
1376         TALLOC_CTX *tmp_ctx;
1377         tmp_ctx = talloc_new(test_ctx);
1378         assert_non_null(tmp_ctx);
1379
1380
1381         /*
1382          * Add records to the database
1383          */
1384         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1385         assert_int_equal(ret, 0);
1386
1387         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1388         key.length = strlen(KEY1) + 1;
1389
1390         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL1);
1391         val.length = strlen(VAL1) + 1;
1392
1393         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
1394         assert_int_equal(ret, 0);
1395
1396         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1397         key.length = strlen(KEY2) + 1;
1398
1399         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL2);
1400         val.length = strlen(VAL2) + 1;
1401
1402         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
1403         assert_int_equal(ret, 0);
1404
1405         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1406         assert_int_equal(ret, 0);
1407
1408
1409         ret = pipe(to_child);
1410         assert_int_equal(ret, 0);
1411         ret = pipe(to_parent);
1412         assert_int_equal(ret, 0);
1413         /*
1414          * Now fork a new process
1415          */
1416
1417         pid = fork();
1418         if (pid == 0) {
1419
1420                 struct ldb_context *ldb = NULL;
1421                 close(to_child[1]);
1422                 close(to_parent[0]);
1423
1424                 /*
1425                  * Wait for the transaction to be started
1426                  */
1427                 ret = read(to_child[0], buf, 2);
1428                 if (ret != 2) {
1429                         print_error(__location__": read returned (%d)\n",
1430                                     ret);
1431                         exit(LDB_ERR_OPERATIONS_ERROR);
1432                 }
1433
1434                 ldb = ldb_init(test_ctx, test_ctx->ev);
1435                 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
1436                 if (ret != LDB_SUCCESS) {
1437                         print_error(__location__": ldb_connect returned (%d)\n",
1438                                     ret);
1439                         exit(ret);
1440                 }
1441
1442                 ldb_kv = get_ldb_kv(ldb);
1443
1444                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1445                 if (ret != LDB_SUCCESS) {
1446                         print_error(__location__": lock_read returned (%d)\n",
1447                                     ret);
1448                         exit(ret);
1449                 }
1450
1451                 /*
1452                  * Check that KEY1 is there
1453                  */
1454                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1455                 key.length = strlen(KEY1) + 1;
1456
1457                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1458                 if (ret != LDB_SUCCESS) {
1459                         print_error(__location__": fetch_and_parse returned "
1460                                     "(%d)\n",
1461                                     ret);
1462                         exit(ret);
1463                 }
1464
1465                 if ((strlen(VAL1) + 1) != val.length) {
1466                         print_error(__location__": KEY1 value lengths different"
1467                                     ", expected (%d) actual(%d)\n",
1468                                     (int)(strlen(VAL1) + 1), (int)val.length);
1469                         exit(LDB_ERR_OPERATIONS_ERROR);
1470                 }
1471                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1472                         print_error(__location__": KEY1 values different, "
1473                                     "expected (%s) actual(%s)\n",
1474                                     VAL1,
1475                                     val.data);
1476                         exit(LDB_ERR_OPERATIONS_ERROR);
1477                 }
1478
1479                 /*
1480                  * Check that KEY2 is there
1481                  */
1482
1483                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1484                 key.length = strlen(KEY2) + 1;
1485
1486                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1487                 if (ret != LDB_SUCCESS) {
1488                         print_error(__location__": fetch_and_parse returned "
1489                                     "(%d)\n",
1490                                     ret);
1491                         exit(ret);
1492                 }
1493
1494                 if ((strlen(VAL2) + 1) != val.length) {
1495                         print_error(__location__": KEY2 value lengths different"
1496                                     ", expected (%d) actual(%d)\n",
1497                                     (int)(strlen(VAL2) + 1), (int)val.length);
1498                         exit(LDB_ERR_OPERATIONS_ERROR);
1499                 }
1500                 if (memcmp(VAL2, val.data, strlen(VAL2)) != 0) {
1501                         print_error(__location__": KEY2 values different, "
1502                                     "expected (%s) actual(%s)\n",
1503                                     VAL2,
1504                                     val.data);
1505                         exit(LDB_ERR_OPERATIONS_ERROR);
1506                 }
1507
1508                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1509                 if (ret != LDB_SUCCESS) {
1510                         print_error(__location__": unlock_read returned (%d)\n",
1511                                     ret);
1512                         exit(ret);
1513                 }
1514
1515                 /*
1516                  * Signal the other process to commit the transaction
1517                  */
1518                 ret = write(to_parent[1], "GO", 2);
1519                 if (ret != 2) {
1520                         print_error(__location__": write returned (%d)\n",
1521                                     ret);
1522                         exit(LDB_ERR_OPERATIONS_ERROR);
1523                 }
1524
1525                 /*
1526                  * Wait for the transaction to be commited
1527                  */
1528                 ret = read(to_child[0], buf, 2);
1529                 if (ret != 2) {
1530                         print_error(__location__": read returned (%d)\n",
1531                                     ret);
1532                         exit(LDB_ERR_OPERATIONS_ERROR);
1533                 }
1534
1535                 /*
1536                  * Check that KEY1 is there
1537                  */
1538                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1539                 if (ret != LDB_SUCCESS) {
1540                         print_error(__location__": unlock_read returned (%d)\n",
1541                                     ret);
1542                         exit(ret);
1543                 }
1544                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1545                 key.length = strlen(KEY1) + 1;
1546
1547                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1548                 if (ret != LDB_SUCCESS) {
1549                         print_error(__location__": fetch_and_parse returned "
1550                                     "(%d)\n",
1551                                     ret);
1552                         exit(ret);
1553                 }
1554
1555                 if ((strlen(VAL1) + 1) != val.length) {
1556                         print_error(__location__": KEY1 value lengths different"
1557                                     ", expected (%d) actual(%d)\n",
1558                                     (int)(strlen(VAL1) + 1), (int)val.length);
1559                         exit(LDB_ERR_OPERATIONS_ERROR);
1560                 }
1561                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1562                         print_error(__location__": KEY1 values different, "
1563                                     "expected (%s) actual(%s)\n",
1564                                     VAL1,
1565                                     val.data);
1566                         exit(LDB_ERR_OPERATIONS_ERROR);
1567                 }
1568
1569                 /*
1570                  * Check that KEY2 is not there
1571                  */
1572                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1573                 key.length = strlen(KEY2 + 1);
1574
1575                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1576                 if (ret != LDB_SUCCESS) {
1577                         print_error(__location__": lock_read returned (%d)\n",
1578                                     ret);
1579                         exit(ret);
1580                 }
1581
1582                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1583                 if (ret != LDB_ERR_NO_SUCH_OBJECT) {
1584                         print_error(__location__": fetch_and_parse returned "
1585                                     "(%d)\n",
1586                                     ret);
1587                         exit(ret);
1588                 }
1589
1590                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1591                 if (ret != LDB_SUCCESS) {
1592                         print_error(__location__": unlock_read returned (%d)\n",
1593                                     ret);
1594                         exit(ret);
1595                 }
1596
1597                 exit(0);
1598         }
1599         close(to_child[0]);
1600         close(to_parent[1]);
1601
1602         /*
1603          * Begin a transaction and delete a record from the database
1604          * but leave the transaction open
1605          */
1606         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1607         assert_int_equal(ret, 0);
1608
1609         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1610         key.length = strlen(KEY2) + 1;
1611
1612         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
1613         assert_int_equal(ret, 0);
1614         /*
1615          * Signal the child process
1616          */
1617         ret = write(to_child[1], "GO", 2);
1618         assert_int_equal(2, ret);
1619
1620         /*
1621          * Wait for the child process to check the DB state while the
1622          * transaction is active
1623          */
1624         ret = read(to_parent[0], buf, 2);
1625         assert_int_equal(2, ret);
1626
1627         /*
1628          * commit the transaction
1629          */
1630         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1631         assert_int_equal(0, ret);
1632
1633         /*
1634          * Signal the child process
1635          */
1636         ret = write(to_child[1], "GO", 2);
1637         assert_int_equal(2, ret);
1638
1639         w_pid = waitpid(pid, &wstatus, 0);
1640         assert_int_equal(pid, w_pid);
1641
1642         assert_true(WIFEXITED(wstatus));
1643
1644         assert_int_equal(WEXITSTATUS(wstatus), 0);
1645
1646
1647         TALLOC_FREE(tmp_ctx);
1648 }
1649
1650
1651 /*
1652  * Test that get_size returns a sensible estimate of the number of records
1653  * in the database.
1654  */
1655 static void test_get_size(void **state)
1656 {
1657         int ret;
1658         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
1659                                                           struct test_ctx);
1660         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
1661         uint8_t key_val[] = "TheKey";
1662         struct ldb_val key = {
1663                 .data   = key_val,
1664                 .length = sizeof(key_val)
1665         };
1666
1667         uint8_t value[] = "The record contents";
1668         struct ldb_val data = {
1669                 .data    = value,
1670                 .length = sizeof(value)
1671         };
1672         size_t size = 0;
1673
1674         int flags = 0;
1675         TALLOC_CTX *tmp_ctx;
1676
1677         tmp_ctx = talloc_new(test_ctx);
1678         assert_non_null(tmp_ctx);
1679
1680         size = ldb_kv->kv_ops->get_size(ldb_kv);
1681 #if defined(TEST_LMDB)
1682         assert_int_equal(2, size);
1683 #else
1684         /*
1685          * The tdb implementation of get_size over estimates for sparse files
1686          * which is perfectly acceptable for it's intended use.
1687          */
1688         assert_true( size > 2500);
1689 #endif
1690
1691         /*
1692          * Begin a transaction
1693          */
1694         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1695         assert_int_equal(ret, 0);
1696
1697         /*
1698          * Write the record
1699          */
1700         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
1701         assert_int_equal(ret, 0);
1702
1703         /*
1704          * Commit the transaction
1705          */
1706         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1707         assert_int_equal(ret, 0);
1708
1709         size = ldb_kv->kv_ops->get_size(ldb_kv);
1710 #ifdef TEST_LMDB
1711         assert_int_equal(3, size);
1712 #endif
1713         talloc_free(tmp_ctx);
1714 }
1715
1716 int main(int argc, const char **argv)
1717 {
1718         const struct CMUnitTest tests[] = {
1719                 cmocka_unit_test_setup_teardown(
1720                         test_add_get,
1721                         setup,
1722                         teardown),
1723                 cmocka_unit_test_setup_teardown(
1724                         test_delete,
1725                         setup,
1726                         teardown),
1727                 cmocka_unit_test_setup_teardown(
1728                         test_transaction_abort_write,
1729                         setup,
1730                         teardown),
1731                 cmocka_unit_test_setup_teardown(
1732                         test_transaction_abort_delete,
1733                         setup,
1734                         teardown),
1735                 cmocka_unit_test_setup_teardown(
1736                         test_read_outside_transaction,
1737                         setup,
1738                         teardown),
1739                 cmocka_unit_test_setup_teardown(
1740                         test_write_outside_transaction,
1741                         setup,
1742                         teardown),
1743                 cmocka_unit_test_setup_teardown(
1744                         test_delete_outside_transaction,
1745                         setup,
1746                         teardown),
1747                 cmocka_unit_test_setup_teardown(
1748                         test_iterate,
1749                         setup,
1750                         teardown),
1751                 cmocka_unit_test_setup_teardown(
1752                         test_iterate_range,
1753                         setup,
1754                         teardown),
1755                 cmocka_unit_test_setup_teardown(
1756                         test_update_in_iterate,
1757                         setup,
1758                         teardown),
1759                 cmocka_unit_test_setup_teardown(
1760                         test_write_transaction_isolation,
1761                         setup,
1762                         teardown),
1763                 cmocka_unit_test_setup_teardown(
1764                         test_delete_transaction_isolation,
1765                         setup,
1766                         teardown),
1767                 cmocka_unit_test_setup_teardown(
1768                         test_get_size,
1769                         setup,
1770                         teardown),
1771         };
1772
1773         return cmocka_run_group_tests(tests, NULL, NULL);
1774 }