lib ldb: move key value code to lib/ldb/ldb_key_value
[amitay/samba.git] / lib / ldb / tests / ldb_kv_ops_test.c
1 /*
2  * Tests exercising the ldb key value operations.
3  *
4  *  Copyright (C) Andrew Bartlett <abartlet@samba.org> 2018
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  *
19  */
20
21 /*
22  * from cmocka.c:
23  * These headers or their equivalents should be included prior to
24  * including
25  * this header file.
26  *
27  * #include <stdarg.h>
28  * #include <stddef.h>
29  * #include <setjmp.h>
30  *
31  * This allows test applications to use custom definitions of C standard
32  * library functions and types.
33  *
34  */
35
36 /*
37  * A KV module is expected to have the following behaviour
38  *
39  * - A transaction must be open to perform any read, write or delete operation
40  * - Writes and Deletes should not be visible until a transaction is commited
41  * - Nested transactions are not permitted
42  * - transactions can be rolled back and commited.
43  * - supports iteration over all records in the database
44  * - supports the update_in_iterate operation allowing entries to be
45  *   re-keyed.
46  */
47 #include <stdarg.h>
48 #include <stddef.h>
49 #include <setjmp.h>
50 #include <cmocka.h>
51
52 #include <errno.h>
53 #include <unistd.h>
54 #include <talloc.h>
55 #include <tevent.h>
56 #include <ldb.h>
57 #include <ldb_module.h>
58 #include <ldb_private.h>
59 #include <string.h>
60 #include <ctype.h>
61
62 #include <sys/wait.h>
63
64 #include "ldb_tdb/ldb_tdb.h"
65 #include "ldb_key_value/ldb_kv.h"
66
67
68 #define DEFAULT_BE  "tdb"
69
70 #ifndef TEST_BE
71 #define TEST_BE DEFAULT_BE
72 #endif /* TEST_BE */
73
74 #define NUM_RECS 1024
75
76
77 struct test_ctx {
78         struct tevent_context *ev;
79         struct ldb_context *ldb;
80
81         const char *dbfile;
82         const char *lockfile;   /* lockfile is separate */
83
84         const char *dbpath;
85 };
86
87 static void unlink_old_db(struct test_ctx *test_ctx)
88 {
89         int ret;
90
91         errno = 0;
92         ret = unlink(test_ctx->lockfile);
93         if (ret == -1 && errno != ENOENT) {
94                 fail();
95         }
96
97         errno = 0;
98         ret = unlink(test_ctx->dbfile);
99         if (ret == -1 && errno != ENOENT) {
100                 fail();
101         }
102 }
103
104 static int noconn_setup(void **state)
105 {
106         struct test_ctx *test_ctx;
107
108         test_ctx = talloc_zero(NULL, struct test_ctx);
109         assert_non_null(test_ctx);
110
111         test_ctx->ev = tevent_context_init(test_ctx);
112         assert_non_null(test_ctx->ev);
113
114         test_ctx->ldb = ldb_init(test_ctx, test_ctx->ev);
115         assert_non_null(test_ctx->ldb);
116
117         test_ctx->dbfile = talloc_strdup(test_ctx, "kvopstest.ldb");
118         assert_non_null(test_ctx->dbfile);
119
120         test_ctx->lockfile = talloc_asprintf(test_ctx, "%s-lock",
121                                              test_ctx->dbfile);
122         assert_non_null(test_ctx->lockfile);
123
124         test_ctx->dbpath = talloc_asprintf(test_ctx,
125                         TEST_BE"://%s", test_ctx->dbfile);
126         assert_non_null(test_ctx->dbpath);
127
128         unlink_old_db(test_ctx);
129         *state = test_ctx;
130         return 0;
131 }
132
133 static int noconn_teardown(void **state)
134 {
135         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
136                                                           struct test_ctx);
137
138         unlink_old_db(test_ctx);
139         talloc_free(test_ctx);
140         return 0;
141 }
142
143 static int setup(void **state)
144 {
145         struct test_ctx *test_ctx;
146         int ret;
147         struct ldb_ldif *ldif;
148         const char *index_ldif =                \
149                 "dn: @INDEXLIST\n"
150                 "@IDXGUID: objectUUID\n"
151                 "@IDX_DN_GUID: GUID\n"
152                 "\n";
153
154         noconn_setup((void **) &test_ctx);
155
156         ret = ldb_connect(test_ctx->ldb, test_ctx->dbpath, 0, NULL);
157         assert_int_equal(ret, 0);
158
159         while ((ldif = ldb_ldif_read_string(test_ctx->ldb, &index_ldif))) {
160                 ret = ldb_add(test_ctx->ldb, ldif->msg);
161                 assert_int_equal(ret, LDB_SUCCESS);
162         }
163         *state = test_ctx;
164         return 0;
165 }
166
167 static int teardown(void **state)
168 {
169         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
170                                                           struct test_ctx);
171         noconn_teardown((void **) &test_ctx);
172         return 0;
173 }
174
175 static struct ldb_kv_private *get_ldb_kv(struct ldb_context *ldb)
176 {
177         void *data = NULL;
178         struct ldb_kv_private *ldb_kv = NULL;
179
180         data = ldb_module_get_private(ldb->modules);
181         assert_non_null(data);
182
183         ldb_kv = talloc_get_type(data, struct ldb_kv_private);
184         assert_non_null(ldb_kv);
185
186         return ldb_kv;
187 }
188
189 static int parse(struct ldb_val key,
190                  struct ldb_val data,
191                  void *private_data)
192 {
193         struct ldb_val* read = private_data;
194
195         /* Yes, we essentially leak this.  That is OK */
196         read->data = talloc_size(talloc_autofree_context(),
197                                  data.length);
198         assert_non_null(read->data);
199
200         memcpy(read->data, data.data, data.length);
201         read->length = data.length;
202         return LDB_SUCCESS;
203 }
204
205 /*
206  * Test that data can be written to the kv store and be read back.
207  */
208 static void test_add_get(void **state)
209 {
210         int ret;
211         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
212                                                           struct test_ctx);
213         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
214         uint8_t key_val[] = "TheKey";
215         struct ldb_val key = {
216                 .data   = key_val,
217                 .length = sizeof(key_val)
218         };
219
220         uint8_t value[] = "The record contents";
221         struct ldb_val data = {
222                 .data    = value,
223                 .length = sizeof(value)
224         };
225
226         struct ldb_val read;
227
228         int flags = 0;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(test_ctx);
232         assert_non_null(tmp_ctx);
233
234         /*
235          * Begin a transaction
236          */
237         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
238         assert_int_equal(ret, 0);
239
240         /*
241          * Write the record
242          */
243         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
244         assert_int_equal(ret, 0);
245
246         /*
247          * Commit the transaction
248          */
249         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
250         assert_int_equal(ret, 0);
251
252         /*
253          * And now read it back
254          */
255         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
256         assert_int_equal(ret, 0);
257
258         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
259         assert_int_equal(ret, 0);
260
261         assert_int_equal(sizeof(value), read.length);
262         assert_memory_equal(value, read.data, sizeof(value));
263
264         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
265         assert_int_equal(ret, 0);
266         talloc_free(tmp_ctx);
267 }
268
269 /*
270  * Test that attempts to read data without a read transaction fail.
271  */
272 static void test_read_outside_transaction(void **state)
273 {
274         int ret;
275         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
276                                                           struct test_ctx);
277         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
278         uint8_t key_val[] = "TheKey";
279         struct ldb_val key = {
280                 .data   = key_val,
281                 .length = sizeof(key_val)
282         };
283
284         uint8_t value[] = "The record contents";
285         struct ldb_val data = {
286                 .data    = value,
287                 .length = sizeof(value)
288         };
289
290         struct ldb_val read;
291
292         int flags = 0;
293         TALLOC_CTX *tmp_ctx;
294
295         tmp_ctx = talloc_new(test_ctx);
296         assert_non_null(tmp_ctx);
297
298         /*
299          * Begin a transaction
300          */
301         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
302         assert_int_equal(ret, 0);
303
304         /*
305          * Write the record
306          */
307         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
308         assert_int_equal(ret, 0);
309
310         /*
311          * Commit the transaction
312          */
313         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
314         assert_int_equal(ret, 0);
315
316         /*
317          * And now read it back
318          * Note there is no read transaction active
319          */
320         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
321         assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
322
323         talloc_free(tmp_ctx);
324 }
325
326 /*
327  * Test that data can be deleted from the kv store
328  */
329 static void test_delete(void **state)
330 {
331         int ret;
332         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
333                                                           struct test_ctx);
334         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
335         uint8_t key_val[] = "TheKey";
336         struct ldb_val key = {
337                 .data   = key_val,
338                 .length = sizeof(key_val)
339         };
340
341         uint8_t value[] = "The record contents";
342         struct ldb_val data = {
343                 .data    = value,
344                 .length = sizeof(value)
345         };
346
347         struct ldb_val read;
348
349         int flags = 0;
350         TALLOC_CTX *tmp_ctx;
351
352         tmp_ctx = talloc_new(test_ctx);
353         assert_non_null(tmp_ctx);
354
355         /*
356          * Begin a transaction
357          */
358         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
359         assert_int_equal(ret, 0);
360
361         /*
362          * Write the record
363          */
364         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
365         assert_int_equal(ret, 0);
366
367         /*
368          * Commit the transaction
369          */
370         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
371         assert_int_equal(ret, 0);
372
373         /*
374          * And now read it back
375          */
376         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
377         assert_int_equal(ret, 0);
378         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
379         assert_int_equal(ret, 0);
380         assert_int_equal(sizeof(value), read.length);
381         assert_memory_equal(value, read.data, sizeof(value));
382         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
383         assert_int_equal(ret, 0);
384
385         /*
386          * Begin a transaction
387          */
388         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
389         assert_int_equal(ret, 0);
390
391         /*
392          * Now delete it.
393          */
394         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
395         assert_int_equal(ret, 0);
396
397         /*
398          * Commit the transaction
399          */
400         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
401         assert_int_equal(ret, 0);
402
403         /*
404          * And now try to read it back
405          */
406         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
407         assert_int_equal(ret, 0);
408         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
409         assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
410         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
411         assert_int_equal(ret, 0);
412
413         talloc_free(tmp_ctx);
414 }
415
416 /*
417  * Check that writes are correctly rolled back when a transaction
418  * is rolled back.
419  */
420 static void test_transaction_abort_write(void **state)
421 {
422         int ret;
423         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
424                                                           struct test_ctx);
425         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
426         uint8_t key_val[] = "TheKey";
427         struct ldb_val key = {
428                 .data   = key_val,
429                 .length = sizeof(key_val)
430         };
431
432         uint8_t value[] = "The record contents";
433         struct ldb_val data = {
434                 .data    = value,
435                 .length = sizeof(value)
436         };
437
438         struct ldb_val read;
439
440         int flags = 0;
441         TALLOC_CTX *tmp_ctx;
442
443         tmp_ctx = talloc_new(test_ctx);
444         assert_non_null(tmp_ctx);
445
446         /*
447          * Begin a transaction
448          */
449         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
450         assert_int_equal(ret, 0);
451
452         /*
453          * Write the record
454          */
455         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
456         assert_int_equal(ret, 0);
457
458         /*
459          * And now read it back
460          */
461         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
462         assert_int_equal(ret, 0);
463         assert_int_equal(sizeof(value), read.length);
464         assert_memory_equal(value, read.data, sizeof(value));
465
466
467         /*
468          * Now abort the transaction
469          */
470         ret = ldb_kv->kv_ops->abort_write(ldb_kv);
471         assert_int_equal(ret, 0);
472
473         /*
474          * And now read it back, should not be there
475          */
476         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
477         assert_int_equal(ret, 0);
478         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
479         assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
480         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
481         assert_int_equal(ret, 0);
482
483         talloc_free(tmp_ctx);
484 }
485
486 /*
487  * Check that deletes are correctly rolled back when a transaction is
488  * aborted.
489  */
490 static void test_transaction_abort_delete(void **state)
491 {
492         int ret;
493         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
494                                                           struct test_ctx);
495         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
496         uint8_t key_val[] = "TheKey";
497         struct ldb_val key = {
498                 .data   = key_val,
499                 .length = sizeof(key_val)
500         };
501
502         uint8_t value[] = "The record contents";
503         struct ldb_val data = {
504                 .data    = value,
505                 .length = sizeof(value)
506         };
507
508         struct ldb_val read;
509
510         int flags = 0;
511         TALLOC_CTX *tmp_ctx;
512
513         tmp_ctx = talloc_new(test_ctx);
514         assert_non_null(tmp_ctx);
515
516         /*
517          * Begin a transaction
518          */
519         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
520         assert_int_equal(ret, 0);
521
522         /*
523          * Write the record
524          */
525         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
526         assert_int_equal(ret, 0);
527
528         /*
529          * Commit the transaction
530          */
531         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
532         assert_int_equal(ret, 0);
533
534         /*
535          * And now read it back
536          */
537         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
538         assert_int_equal(ret, 0);
539         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
540         assert_int_equal(ret, 0);
541         assert_int_equal(sizeof(value), read.length);
542         assert_memory_equal(value, read.data, sizeof(value));
543         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
544         assert_int_equal(ret, 0);
545
546         /*
547          * Begin a transaction
548          */
549         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
550         assert_int_equal(ret, 0);
551
552         /*
553          * Now delete it.
554          */
555         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
556         assert_int_equal(ret, 0);
557
558         /*
559          * And now read it back
560          */
561         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
562         assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
563
564         /*
565          * Abort the transaction
566          */
567         ret = ldb_kv->kv_ops->abort_write(ldb_kv);
568         assert_int_equal(ret, 0);
569
570         /*
571          * And now try to read it back
572          */
573         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
574         assert_int_equal(ret, 0);
575         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
576         assert_int_equal(ret, 0);
577         assert_int_equal(sizeof(value), read.length);
578         assert_memory_equal(value, read.data, sizeof(value));
579         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
580         assert_int_equal(ret, 0);
581
582         talloc_free(tmp_ctx);
583 }
584
585 /*
586  * Test that writes outside a transaction fail
587  */
588 static void test_write_outside_transaction(void **state)
589 {
590         int ret;
591         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
592                                                           struct test_ctx);
593         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
594         uint8_t key_val[] = "TheKey";
595         struct ldb_val key = {
596                 .data   = key_val,
597                 .length = sizeof(key_val)
598         };
599
600         uint8_t value[] = "The record contents";
601         struct ldb_val data = {
602                 .data    = value,
603                 .length = sizeof(value)
604         };
605
606
607         int flags = 0;
608         TALLOC_CTX *tmp_ctx;
609
610         tmp_ctx = talloc_new(test_ctx);
611         assert_non_null(tmp_ctx);
612
613         /*
614          * Attempt to write the record
615          */
616         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
617         assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
618
619         talloc_free(tmp_ctx);
620 }
621
622 /*
623  * Test data can not be deleted outside a transaction
624  */
625 static void test_delete_outside_transaction(void **state)
626 {
627         int ret;
628         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
629                                                           struct test_ctx);
630         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
631         uint8_t key_val[] = "TheKey";
632         struct ldb_val key = {
633                 .data   = key_val,
634                 .length = sizeof(key_val)
635         };
636
637         uint8_t value[] = "The record contents";
638         struct ldb_val data = {
639                 .data    = value,
640                 .length = sizeof(value)
641         };
642
643         struct ldb_val read;
644
645         int flags = 0;
646         TALLOC_CTX *tmp_ctx;
647
648         tmp_ctx = talloc_new(test_ctx);
649         assert_non_null(tmp_ctx);
650
651         /*
652          * Begin a transaction
653          */
654         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
655         assert_int_equal(ret, 0);
656
657         /*
658          * Write the record
659          */
660         ret = ldb_kv->kv_ops->store(ldb_kv, key, data, flags);
661         assert_int_equal(ret, 0);
662
663         /*
664          * Commit the transaction
665          */
666         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
667         assert_int_equal(ret, 0);
668
669         /*
670          * And now read it back
671          */
672         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
673         assert_int_equal(ret, 0);
674         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
675         assert_int_equal(ret, 0);
676         assert_int_equal(sizeof(value), read.length);
677         assert_memory_equal(value, read.data, sizeof(value));
678         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
679         assert_int_equal(ret, 0);
680
681         /*
682          * Now attempt to delete a record
683          */
684         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
685         assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
686
687         /*
688          * And now read it back
689          */
690         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
691         assert_int_equal(ret, 0);
692         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &read);
693         assert_int_equal(ret, 0);
694         assert_int_equal(sizeof(value), read.length);
695         assert_memory_equal(value, read.data, sizeof(value));
696         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
697         assert_int_equal(ret, 0);
698
699         talloc_free(tmp_ctx);
700 }
701
702 static int traverse_fn(struct ldb_kv_private *ldb_kv,
703                        struct ldb_val key,
704                        struct ldb_val data,
705                        void *ctx)
706 {
707
708         int *visits = ctx;
709         int i;
710
711         if (strncmp("key ", (char *) key.data, 4) == 0) {
712                 i = strtol((char *) &key.data[4], NULL, 10);
713                 visits[i]++;
714         }
715         return LDB_SUCCESS;
716 }
717
718
719 /*
720  * Test that iterate visits all the records.
721  */
722 static void test_iterate(void **state)
723 {
724         int ret;
725         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
726                                                           struct test_ctx);
727         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
728         int i;
729         int num_recs = 1024;
730         int visits[num_recs];
731
732         TALLOC_CTX *tmp_ctx;
733
734         tmp_ctx = talloc_new(test_ctx);
735         assert_non_null(tmp_ctx);
736
737         /*
738          * Begin a transaction
739          */
740         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
741         assert_int_equal(ret, 0);
742
743         /*
744          * Write the records
745          */
746         for (i = 0; i < num_recs; i++) {
747                 struct ldb_val key;
748                 struct ldb_val rec;
749                 int flags = 0;
750
751                 visits[i] = 0;
752                 key.data   = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", i);
753                 key.length = strlen((char *)key.data) + 1;
754
755                 rec.data = (uint8_t *) talloc_asprintf(tmp_ctx,
756                                                        "data for record (%04d)",
757                                                        i);
758                 rec.length = strlen((char *)rec.data) + 1;
759
760                 ret = ldb_kv->kv_ops->store(ldb_kv, key, rec, flags);
761                 assert_int_equal(ret, 0);
762
763                 TALLOC_FREE(key.data);
764                 TALLOC_FREE(rec.data);
765         }
766
767         /*
768          * Commit the transaction
769          */
770         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
771         assert_int_equal(ret, 0);
772
773         /*
774          * Now iterate over the kv store and ensure that all the
775          * records are visited.
776          */
777         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
778         assert_int_equal(ret, 0);
779         ret = ldb_kv->kv_ops->iterate(ldb_kv, traverse_fn, visits);
780         for (i = 0; i <num_recs; i++) {
781                 assert_int_equal(1, visits[i]);
782         }
783         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
784         assert_int_equal(ret, 0);
785
786         TALLOC_FREE(tmp_ctx);
787 }
788
789 struct update_context {
790         struct ldb_context* ldb;
791         int visits[NUM_RECS];
792 };
793
794 static int update_fn(struct ldb_kv_private *ldb_kv,
795                      struct ldb_val key,
796                      struct ldb_val data,
797                      void *ctx)
798 {
799
800         struct ldb_val new_key;
801         struct ldb_module *module = NULL;
802         struct update_context *context =NULL;
803         int ret = LDB_SUCCESS;
804         TALLOC_CTX *tmp_ctx;
805
806         tmp_ctx = talloc_new(ldb_kv);
807         assert_non_null(tmp_ctx);
808
809         context = talloc_get_type_abort(ctx, struct update_context);
810
811         module = talloc_zero(tmp_ctx, struct ldb_module);
812         module->ldb = context->ldb;
813
814         if (strncmp("key ", (char *) key.data, 4) == 0) {
815                 int i = strtol((char *) &key.data[4], NULL, 10);
816                 context->visits[i]++;
817                 new_key.data = talloc_memdup(tmp_ctx, key.data, key.length);
818                 new_key.length  = key.length;
819                 new_key.data[0] = 'K';
820
821                 ret = ldb_kv->kv_ops->update_in_iterate(
822                     ldb_kv, key, new_key, data, &module);
823         }
824         TALLOC_FREE(tmp_ctx);
825         return ret;
826 }
827
828 /*
829  * Test that update_in_iterate behaves as expected.
830  */
831 static void test_update_in_iterate(void **state)
832 {
833         int ret;
834         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
835                                                           struct test_ctx);
836         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
837         int i;
838         struct update_context *context = NULL;
839
840
841         TALLOC_CTX *tmp_ctx;
842
843         tmp_ctx = talloc_new(test_ctx);
844         assert_non_null(tmp_ctx);
845
846         context = talloc_zero(tmp_ctx, struct update_context);
847         assert_non_null(context);
848         context->ldb = test_ctx->ldb;
849         /*
850          * Begin a transaction
851          */
852         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
853         assert_int_equal(ret, 0);
854
855         /*
856          * Write the records
857          */
858         for (i = 0; i < NUM_RECS; i++) {
859                 struct ldb_val key;
860                 struct ldb_val rec;
861                 int flags = 0;
862
863                 key.data   = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", i);
864                 key.length = strlen((char *)key.data) + 1;
865
866                 rec.data   = (uint8_t *) talloc_asprintf(tmp_ctx,
867                                                          "data for record (%04d)",
868                                                          i);
869                 rec.length = strlen((char *)rec.data) + 1;
870
871                 ret = ldb_kv->kv_ops->store(ldb_kv, key, rec, flags);
872                 assert_int_equal(ret, 0);
873
874                 TALLOC_FREE(key.data);
875                 TALLOC_FREE(rec.data);
876         }
877
878         /*
879          * Commit the transaction
880          */
881         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
882         assert_int_equal(ret, 0);
883
884         /*
885          * Now iterate over the kv store and ensure that all the
886          * records are visited.
887          */
888
889         /*
890          * Needs to be done inside a transaction
891          */
892         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
893         assert_int_equal(ret, 0);
894
895         ret = ldb_kv->kv_ops->iterate(ldb_kv, update_fn, context);
896         for (i = 0; i < NUM_RECS; i++) {
897                 assert_int_equal(1, context->visits[i]);
898         }
899
900         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
901         assert_int_equal(ret, 0);
902
903         TALLOC_FREE(tmp_ctx);
904 }
905
906 /*
907  * Ensure that writes are not visible until the transaction has been
908  * committed.
909  */
910 static void test_write_transaction_isolation(void **state)
911 {
912         int ret;
913         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
914                                                           struct test_ctx);
915         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
916         struct ldb_val key;
917         struct ldb_val val;
918
919         const char *KEY1 = "KEY01";
920         const char *VAL1 = "VALUE01";
921
922         const char *KEY2 = "KEY02";
923         const char *VAL2 = "VALUE02";
924
925         /*
926          * Pipes etc to co-ordinate the processes
927          */
928         int to_child[2];
929         int to_parent[2];
930         char buf[2];
931         pid_t pid, w_pid;
932         int wstatus;
933
934         TALLOC_CTX *tmp_ctx;
935         tmp_ctx = talloc_new(test_ctx);
936         assert_non_null(tmp_ctx);
937
938
939         /*
940          * Add a record to the database
941          */
942         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
943         assert_int_equal(ret, 0);
944
945         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
946         key.length = strlen(KEY1) + 1;
947
948         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL1);
949         val.length = strlen(VAL1) + 1;
950
951         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
952         assert_int_equal(ret, 0);
953
954         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
955         assert_int_equal(ret, 0);
956
957
958         ret = pipe(to_child);
959         assert_int_equal(ret, 0);
960         ret = pipe(to_parent);
961         assert_int_equal(ret, 0);
962         /*
963          * Now fork a new process
964          */
965
966         pid = fork();
967         if (pid == 0) {
968
969                 struct ldb_context *ldb = NULL;
970                 close(to_child[1]);
971                 close(to_parent[0]);
972
973                 /*
974                  * Wait for the transaction to start
975                  */
976                 ret = read(to_child[0], buf, 2);
977                 if (ret != 2) {
978                         print_error(__location__": read returned (%d)\n",
979                                     ret);
980                         exit(LDB_ERR_OPERATIONS_ERROR);
981                 }
982                 ldb = ldb_init(test_ctx, test_ctx->ev);
983                 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
984                 if (ret != LDB_SUCCESS) {
985                         print_error(__location__": ldb_connect returned (%d)\n",
986                                     ret);
987                         exit(ret);
988                 }
989
990                 ldb_kv = get_ldb_kv(ldb);
991
992                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
993                 if (ret != LDB_SUCCESS) {
994                         print_error(__location__": lock_read returned (%d)\n",
995                                     ret);
996                         exit(ret);
997                 }
998
999                 /*
1000                  * Check that KEY1 is there
1001                  */
1002                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1003                 key.length = strlen(KEY1) + 1;
1004
1005                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1006                 if (ret != LDB_SUCCESS) {
1007                         print_error(__location__": fetch_and_parse returned "
1008                                     "(%d)\n",
1009                                     ret);
1010                         exit(ret);
1011                 }
1012
1013                 if ((strlen(VAL1) + 1) != val.length) {
1014                         print_error(__location__": KEY1 value lengths different"
1015                                     ", expected (%d) actual(%d)\n",
1016                                     (int)(strlen(VAL1) + 1), (int)val.length);
1017                         exit(LDB_ERR_OPERATIONS_ERROR);
1018                 }
1019                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1020                         print_error(__location__": KEY1 values different, "
1021                                     "expected (%s) actual(%s)\n",
1022                                     VAL1,
1023                                     val.data);
1024                         exit(LDB_ERR_OPERATIONS_ERROR);
1025                 }
1026
1027                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1028                 if (ret != LDB_SUCCESS) {
1029                         print_error(__location__": unlock_read returned (%d)\n",
1030                                     ret);
1031                         exit(ret);
1032                 }
1033
1034                 /*
1035                  * Check that KEY2 is not there
1036                  */
1037                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1038                 key.length = strlen(KEY2 + 1);
1039
1040                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1041                 if (ret != LDB_SUCCESS) {
1042                         print_error(__location__": lock_read returned (%d)\n",
1043                                     ret);
1044                         exit(ret);
1045                 }
1046
1047                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1048                 if (ret != LDB_ERR_NO_SUCH_OBJECT) {
1049                         print_error(__location__": fetch_and_parse returned "
1050                                     "(%d)\n",
1051                                     ret);
1052                         exit(ret);
1053                 }
1054
1055                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1056                 if (ret != LDB_SUCCESS) {
1057                         print_error(__location__": unlock_read returned (%d)\n",
1058                                     ret);
1059                         exit(ret);
1060                 }
1061
1062                 /*
1063                  * Signal the other process to commit the transaction
1064                  */
1065                 ret = write(to_parent[1], "GO", 2);
1066                 if (ret != 2) {
1067                         print_error(__location__": write returned (%d)\n",
1068                                     ret);
1069                         exit(LDB_ERR_OPERATIONS_ERROR);
1070                 }
1071
1072                 /*
1073                  * Wait for the transaction to be commited
1074                  */
1075                 ret = read(to_child[0], buf, 2);
1076                 if (ret != 2) {
1077                         print_error(__location__": read returned (%d)\n",
1078                                     ret);
1079                         exit(LDB_ERR_OPERATIONS_ERROR);
1080                 }
1081
1082                 /*
1083                  * Check that KEY1 is there
1084                  */
1085                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1086                 if (ret != LDB_SUCCESS) {
1087                         print_error(__location__": unlock_read returned (%d)\n",
1088                                     ret);
1089                         exit(ret);
1090                 }
1091                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1092                 key.length = strlen(KEY1) + 1;
1093
1094                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1095                 if (ret != LDB_SUCCESS) {
1096                         print_error(__location__": fetch_and_parse returned "
1097                                     "(%d)\n",
1098                                     ret);
1099                         exit(ret);
1100                 }
1101
1102                 if ((strlen(VAL1) + 1) != val.length) {
1103                         print_error(__location__": KEY1 value lengths different"
1104                                     ", expected (%d) actual(%d)\n",
1105                                     (int)(strlen(VAL1) + 1), (int)val.length);
1106                         exit(LDB_ERR_OPERATIONS_ERROR);
1107                 }
1108                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1109                         print_error(__location__": KEY1 values different, "
1110                                     "expected (%s) actual(%s)\n",
1111                                     VAL1,
1112                                     val.data);
1113                         exit(LDB_ERR_OPERATIONS_ERROR);
1114                 }
1115
1116                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1117                 if (ret != LDB_SUCCESS) {
1118                         print_error(__location__": unlock_read returned (%d)\n",
1119                                     ret);
1120                         exit(ret);
1121                 }
1122
1123
1124                 /*
1125                  * Check that KEY2 is there
1126                  */
1127                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1128                 if (ret != LDB_SUCCESS) {
1129                         print_error(__location__": unlock_read returned (%d)\n",
1130                                     ret);
1131                         exit(ret);
1132                 }
1133
1134                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1135                 key.length = strlen(KEY2) + 1;
1136
1137                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1138                 if (ret != LDB_SUCCESS) {
1139                         print_error(__location__": fetch_and_parse returned "
1140                                     "(%d)\n",
1141                                     ret);
1142                         exit(ret);
1143                 }
1144
1145                 if ((strlen(VAL2) + 1) != val.length) {
1146                         print_error(__location__": KEY2 value lengths different"
1147                                     ", expected (%d) actual(%d)\n",
1148                                     (int)(strlen(VAL2) + 1), (int)val.length);
1149                         exit(LDB_ERR_OPERATIONS_ERROR);
1150                 }
1151                 if (memcmp(VAL2, val.data, strlen(VAL2)) != 0) {
1152                         print_error(__location__": KEY2 values different, "
1153                                     "expected (%s) actual(%s)\n",
1154                                     VAL2,
1155                                     val.data);
1156                         exit(LDB_ERR_OPERATIONS_ERROR);
1157                 }
1158
1159                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1160                 if (ret != LDB_SUCCESS) {
1161                         print_error(__location__": unlock_read returned (%d)\n",
1162                                     ret);
1163                         exit(ret);
1164                 }
1165
1166                 exit(0);
1167         }
1168         close(to_child[0]);
1169         close(to_parent[1]);
1170
1171         /*
1172          * Begin a transaction and add a record to the database
1173          * but leave the transaction open
1174          */
1175         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1176         assert_int_equal(ret, 0);
1177
1178         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1179         key.length = strlen(KEY2) + 1;
1180
1181         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL2);
1182         val.length = strlen(VAL2) + 1;
1183
1184         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
1185         assert_int_equal(ret, 0);
1186
1187         /*
1188          * Signal the child process
1189          */
1190         ret = write(to_child[1], "GO", 2);
1191         assert_int_equal(2, ret);
1192
1193         /*
1194          * Wait for the child process to check the DB state while the
1195          * transaction is active
1196          */
1197         ret = read(to_parent[0], buf, 2);
1198         assert_int_equal(2, ret);
1199
1200         /*
1201          * commit the transaction
1202          */
1203         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1204         assert_int_equal(0, ret);
1205
1206         /*
1207          * Signal the child process
1208          */
1209         ret = write(to_child[1], "GO", 2);
1210         assert_int_equal(2, ret);
1211
1212         w_pid = waitpid(pid, &wstatus, 0);
1213         assert_int_equal(pid, w_pid);
1214
1215         assert_true(WIFEXITED(wstatus));
1216
1217         assert_int_equal(WEXITSTATUS(wstatus), 0);
1218
1219
1220         TALLOC_FREE(tmp_ctx);
1221 }
1222
1223 /*
1224  * Ensure that deletes are not visible until the transaction has been
1225  * committed.
1226  */
1227 static void test_delete_transaction_isolation(void **state)
1228 {
1229         int ret;
1230         struct test_ctx *test_ctx = talloc_get_type_abort(*state,
1231                                                           struct test_ctx);
1232         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
1233         struct ldb_val key;
1234         struct ldb_val val;
1235
1236         const char *KEY1 = "KEY01";
1237         const char *VAL1 = "VALUE01";
1238
1239         const char *KEY2 = "KEY02";
1240         const char *VAL2 = "VALUE02";
1241
1242         /*
1243          * Pipes etc to co-ordinate the processes
1244          */
1245         int to_child[2];
1246         int to_parent[2];
1247         char buf[2];
1248         pid_t pid, w_pid;
1249         int wstatus;
1250
1251         TALLOC_CTX *tmp_ctx;
1252         tmp_ctx = talloc_new(test_ctx);
1253         assert_non_null(tmp_ctx);
1254
1255
1256         /*
1257          * Add records to the database
1258          */
1259         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1260         assert_int_equal(ret, 0);
1261
1262         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1263         key.length = strlen(KEY1) + 1;
1264
1265         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL1);
1266         val.length = strlen(VAL1) + 1;
1267
1268         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
1269         assert_int_equal(ret, 0);
1270
1271         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1272         key.length = strlen(KEY2) + 1;
1273
1274         val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL2);
1275         val.length = strlen(VAL2) + 1;
1276
1277         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
1278         assert_int_equal(ret, 0);
1279
1280         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1281         assert_int_equal(ret, 0);
1282
1283
1284         ret = pipe(to_child);
1285         assert_int_equal(ret, 0);
1286         ret = pipe(to_parent);
1287         assert_int_equal(ret, 0);
1288         /*
1289          * Now fork a new process
1290          */
1291
1292         pid = fork();
1293         if (pid == 0) {
1294
1295                 struct ldb_context *ldb = NULL;
1296                 close(to_child[1]);
1297                 close(to_parent[0]);
1298
1299                 /*
1300                  * Wait for the transaction to be started
1301                  */
1302                 ret = read(to_child[0], buf, 2);
1303                 if (ret != 2) {
1304                         print_error(__location__": read returned (%d)\n",
1305                                     ret);
1306                         exit(LDB_ERR_OPERATIONS_ERROR);
1307                 }
1308
1309                 ldb = ldb_init(test_ctx, test_ctx->ev);
1310                 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
1311                 if (ret != LDB_SUCCESS) {
1312                         print_error(__location__": ldb_connect returned (%d)\n",
1313                                     ret);
1314                         exit(ret);
1315                 }
1316
1317                 ldb_kv = get_ldb_kv(ldb);
1318
1319                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1320                 if (ret != LDB_SUCCESS) {
1321                         print_error(__location__": lock_read returned (%d)\n",
1322                                     ret);
1323                         exit(ret);
1324                 }
1325
1326                 /*
1327                  * Check that KEY1 is there
1328                  */
1329                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1330                 key.length = strlen(KEY1) + 1;
1331
1332                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1333                 if (ret != LDB_SUCCESS) {
1334                         print_error(__location__": fetch_and_parse returned "
1335                                     "(%d)\n",
1336                                     ret);
1337                         exit(ret);
1338                 }
1339
1340                 if ((strlen(VAL1) + 1) != val.length) {
1341                         print_error(__location__": KEY1 value lengths different"
1342                                     ", expected (%d) actual(%d)\n",
1343                                     (int)(strlen(VAL1) + 1), (int)val.length);
1344                         exit(LDB_ERR_OPERATIONS_ERROR);
1345                 }
1346                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1347                         print_error(__location__": KEY1 values different, "
1348                                     "expected (%s) actual(%s)\n",
1349                                     VAL1,
1350                                     val.data);
1351                         exit(LDB_ERR_OPERATIONS_ERROR);
1352                 }
1353
1354                 /*
1355                  * Check that KEY2 is there
1356                  */
1357
1358                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1359                 key.length = strlen(KEY2) + 1;
1360
1361                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1362                 if (ret != LDB_SUCCESS) {
1363                         print_error(__location__": fetch_and_parse returned "
1364                                     "(%d)\n",
1365                                     ret);
1366                         exit(ret);
1367                 }
1368
1369                 if ((strlen(VAL2) + 1) != val.length) {
1370                         print_error(__location__": KEY2 value lengths different"
1371                                     ", expected (%d) actual(%d)\n",
1372                                     (int)(strlen(VAL2) + 1), (int)val.length);
1373                         exit(LDB_ERR_OPERATIONS_ERROR);
1374                 }
1375                 if (memcmp(VAL2, val.data, strlen(VAL2)) != 0) {
1376                         print_error(__location__": KEY2 values different, "
1377                                     "expected (%s) actual(%s)\n",
1378                                     VAL2,
1379                                     val.data);
1380                         exit(LDB_ERR_OPERATIONS_ERROR);
1381                 }
1382
1383                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1384                 if (ret != LDB_SUCCESS) {
1385                         print_error(__location__": unlock_read returned (%d)\n",
1386                                     ret);
1387                         exit(ret);
1388                 }
1389
1390                 /*
1391                  * Signal the other process to commit the transaction
1392                  */
1393                 ret = write(to_parent[1], "GO", 2);
1394                 if (ret != 2) {
1395                         print_error(__location__": write returned (%d)\n",
1396                                     ret);
1397                         exit(LDB_ERR_OPERATIONS_ERROR);
1398                 }
1399
1400                 /*
1401                  * Wait for the transaction to be commited
1402                  */
1403                 ret = read(to_child[0], buf, 2);
1404                 if (ret != 2) {
1405                         print_error(__location__": read returned (%d)\n",
1406                                     ret);
1407                         exit(LDB_ERR_OPERATIONS_ERROR);
1408                 }
1409
1410                 /*
1411                  * Check that KEY1 is there
1412                  */
1413                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1414                 if (ret != LDB_SUCCESS) {
1415                         print_error(__location__": unlock_read returned (%d)\n",
1416                                     ret);
1417                         exit(ret);
1418                 }
1419                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
1420                 key.length = strlen(KEY1) + 1;
1421
1422                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1423                 if (ret != LDB_SUCCESS) {
1424                         print_error(__location__": fetch_and_parse returned "
1425                                     "(%d)\n",
1426                                     ret);
1427                         exit(ret);
1428                 }
1429
1430                 if ((strlen(VAL1) + 1) != val.length) {
1431                         print_error(__location__": KEY1 value lengths different"
1432                                     ", expected (%d) actual(%d)\n",
1433                                     (int)(strlen(VAL1) + 1), (int)val.length);
1434                         exit(LDB_ERR_OPERATIONS_ERROR);
1435                 }
1436                 if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
1437                         print_error(__location__": KEY1 values different, "
1438                                     "expected (%s) actual(%s)\n",
1439                                     VAL1,
1440                                     val.data);
1441                         exit(LDB_ERR_OPERATIONS_ERROR);
1442                 }
1443
1444                 /*
1445                  * Check that KEY2 is not there
1446                  */
1447                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1448                 key.length = strlen(KEY2 + 1);
1449
1450                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
1451                 if (ret != LDB_SUCCESS) {
1452                         print_error(__location__": lock_read returned (%d)\n",
1453                                     ret);
1454                         exit(ret);
1455                 }
1456
1457                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
1458                 if (ret != LDB_ERR_NO_SUCH_OBJECT) {
1459                         print_error(__location__": fetch_and_parse returned "
1460                                     "(%d)\n",
1461                                     ret);
1462                         exit(ret);
1463                 }
1464
1465                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
1466                 if (ret != LDB_SUCCESS) {
1467                         print_error(__location__": unlock_read returned (%d)\n",
1468                                     ret);
1469                         exit(ret);
1470                 }
1471
1472                 exit(0);
1473         }
1474         close(to_child[0]);
1475         close(to_parent[1]);
1476
1477         /*
1478          * Begin a transaction and delete a record from the database
1479          * but leave the transaction open
1480          */
1481         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
1482         assert_int_equal(ret, 0);
1483
1484         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
1485         key.length = strlen(KEY2) + 1;
1486
1487         ret = ldb_kv->kv_ops->delete (ldb_kv, key);
1488         assert_int_equal(ret, 0);
1489         /*
1490          * Signal the child process
1491          */
1492         ret = write(to_child[1], "GO", 2);
1493         assert_int_equal(2, ret);
1494
1495         /*
1496          * Wait for the child process to check the DB state while the
1497          * transaction is active
1498          */
1499         ret = read(to_parent[0], buf, 2);
1500         assert_int_equal(2, ret);
1501
1502         /*
1503          * commit the transaction
1504          */
1505         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
1506         assert_int_equal(0, ret);
1507
1508         /*
1509          * Signal the child process
1510          */
1511         ret = write(to_child[1], "GO", 2);
1512         assert_int_equal(2, ret);
1513
1514         w_pid = waitpid(pid, &wstatus, 0);
1515         assert_int_equal(pid, w_pid);
1516
1517         assert_true(WIFEXITED(wstatus));
1518
1519         assert_int_equal(WEXITSTATUS(wstatus), 0);
1520
1521
1522         TALLOC_FREE(tmp_ctx);
1523 }
1524
1525
1526 int main(int argc, const char **argv)
1527 {
1528         const struct CMUnitTest tests[] = {
1529                 cmocka_unit_test_setup_teardown(
1530                         test_add_get,
1531                         setup,
1532                         teardown),
1533                 cmocka_unit_test_setup_teardown(
1534                         test_delete,
1535                         setup,
1536                         teardown),
1537                 cmocka_unit_test_setup_teardown(
1538                         test_transaction_abort_write,
1539                         setup,
1540                         teardown),
1541                 cmocka_unit_test_setup_teardown(
1542                         test_transaction_abort_delete,
1543                         setup,
1544                         teardown),
1545                 cmocka_unit_test_setup_teardown(
1546                         test_read_outside_transaction,
1547                         setup,
1548                         teardown),
1549                 cmocka_unit_test_setup_teardown(
1550                         test_write_outside_transaction,
1551                         setup,
1552                         teardown),
1553                 cmocka_unit_test_setup_teardown(
1554                         test_delete_outside_transaction,
1555                         setup,
1556                         teardown),
1557                 cmocka_unit_test_setup_teardown(
1558                         test_iterate,
1559                         setup,
1560                         teardown),
1561                 cmocka_unit_test_setup_teardown(
1562                         test_update_in_iterate,
1563                         setup,
1564                         teardown),
1565                 cmocka_unit_test_setup_teardown(
1566                         test_write_transaction_isolation,
1567                         setup,
1568                         teardown),
1569                 cmocka_unit_test_setup_teardown(
1570                         test_delete_transaction_isolation,
1571                         setup,
1572                         teardown),
1573         };
1574
1575         return cmocka_run_group_tests(tests, NULL, NULL);
1576 }