ldb tests: Confirm lmdb free list handling
[samba.git] / lib / ldb / tests / ldb_lmdb_free_list_test.c
1 /*
2  * Copyright (C) Catalyst.Net Ltd 2020
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 3 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
16  *
17  */
18
19 /*
20  * Tests confirming lmdb's handling of the free space list in the presence
21  * of active and stale readers.  A stale reader is a process that opens a
22  * read lock and then exits without releasing the lock.
23  *
24  * lmdb uses MVCC to maintain databased consistency, new copies of updated
25  * records are written to the database. The old entries are only
26  * reused when they are no longer referenced in a read transaction.
27  *
28  * The tests all update a single record multiple times
29  *
30  * If there is a read transaction or a stale reader lmdb will report
31  * out of space.
32  *
33  * If no read transaction and no stale reader, lmdb reclaims space from the
34  * free list.
35  */
36
37 /*
38  * from cmocka.c:
39  * These headers or their equivalents should be included prior to
40  * including
41  * this header file.
42  *
43  * #include <stdarg.h>
44  * #include <stddef.h>
45  * #include <setjmp.h>
46  *
47  * This allows test applications to use custom definitions of C standard
48  * library functions and types.
49  *
50  */
51
52 #include <stdarg.h>
53 #include <stddef.h>
54 #include <stdint.h>
55 #include <setjmp.h>
56 #include <cmocka.h>
57
58 #include <errno.h>
59 #include <unistd.h>
60 #include <talloc.h>
61 #include <tevent.h>
62 #include <ldb.h>
63 #include <ldb_module.h>
64 #include <ldb_private.h>
65 #include <string.h>
66 #include <ctype.h>
67
68 #include <sys/wait.h>
69
70 #include "ldb_tdb/ldb_tdb.h"
71 #include "ldb_key_value/ldb_kv.h"
72
73 #define DEFAULT_BE "mdb"
74
75 #ifndef TEST_BE
76 #define TEST_BE DEFAULT_BE
77 #endif /* TEST_BE */
78
79 const int RECORD_SIZE = 6144;
80 const int ITERATIONS = 3;
81
82 struct test_ctx {
83         struct tevent_context *ev;
84         struct ldb_context *ldb;
85
86         const char *dbfile;
87         const char *lockfile; /* lockfile is separate */
88
89         const char *dbpath;
90 };
91
92 static void unlink_old_db(struct test_ctx *test_ctx)
93 {
94         int ret;
95
96         errno = 0;
97         ret = unlink(test_ctx->lockfile);
98         if (ret == -1 && errno != ENOENT) {
99                 fail();
100         }
101
102         errno = 0;
103         ret = unlink(test_ctx->dbfile);
104         if (ret == -1 && errno != ENOENT) {
105                 fail();
106         }
107 }
108
109 static int noconn_setup(void **state)
110 {
111         struct test_ctx *test_ctx;
112
113         test_ctx = talloc_zero(NULL, struct test_ctx);
114         assert_non_null(test_ctx);
115
116         test_ctx->ev = tevent_context_init(test_ctx);
117         assert_non_null(test_ctx->ev);
118
119         test_ctx->ldb = ldb_init(test_ctx, test_ctx->ev);
120         assert_non_null(test_ctx->ldb);
121
122         test_ctx->dbfile = talloc_strdup(test_ctx, "lmdb_free_list_test.ldb");
123         assert_non_null(test_ctx->dbfile);
124
125         test_ctx->lockfile =
126             talloc_asprintf(test_ctx, "%s-lock", test_ctx->dbfile);
127         assert_non_null(test_ctx->lockfile);
128
129         test_ctx->dbpath =
130             talloc_asprintf(test_ctx, TEST_BE "://%s", test_ctx->dbfile);
131         assert_non_null(test_ctx->dbpath);
132
133         unlink_old_db(test_ctx);
134         *state = test_ctx;
135         return 0;
136 }
137
138 static int noconn_teardown(void **state)
139 {
140         struct test_ctx *test_ctx =
141             talloc_get_type_abort(*state, struct test_ctx);
142
143         unlink_old_db(test_ctx);
144         talloc_free(test_ctx);
145         return 0;
146 }
147
148 static int setup(void **state)
149 {
150         struct test_ctx *test_ctx;
151         int ret;
152         struct ldb_ldif *ldif;
153         const char *index_ldif = "dn: @INDEXLIST\n"
154                                  "@IDXGUID: objectUUID\n"
155                                  "@IDX_DN_GUID: GUID\n"
156                                  "\n";
157         /*
158          * Use a 64KiB DB for this test
159          */
160         const char *options[] = {"lmdb_env_size:65536", NULL};
161
162         noconn_setup((void **)&test_ctx);
163
164         ret = ldb_connect(test_ctx->ldb, test_ctx->dbpath, 0, options);
165         assert_int_equal(ret, 0);
166
167         while ((ldif = ldb_ldif_read_string(test_ctx->ldb, &index_ldif))) {
168                 ret = ldb_add(test_ctx->ldb, ldif->msg);
169                 assert_int_equal(ret, LDB_SUCCESS);
170         }
171         *state = test_ctx;
172         return 0;
173 }
174
175 static int teardown(void **state)
176 {
177         struct test_ctx *test_ctx =
178             talloc_get_type_abort(*state, struct test_ctx);
179         noconn_teardown((void **)&test_ctx);
180         return 0;
181 }
182
183 static struct ldb_kv_private *get_ldb_kv(struct ldb_context *ldb)
184 {
185         void *data = NULL;
186         struct ldb_kv_private *ldb_kv = NULL;
187
188         data = ldb_module_get_private(ldb->modules);
189         assert_non_null(data);
190
191         ldb_kv = talloc_get_type(data, struct ldb_kv_private);
192         assert_non_null(ldb_kv);
193
194         return ldb_kv;
195 }
196
197 static int parse(struct ldb_val key, struct ldb_val data, void *private_data)
198 {
199         struct ldb_val *read = private_data;
200
201         /* Yes, we leak this.  That is OK */
202         read->data = talloc_size(NULL, data.length);
203         assert_non_null(read->data);
204
205         memcpy(read->data, data.data, data.length);
206         read->length = data.length;
207         return LDB_SUCCESS;
208 }
209
210 /*
211  * This test has the same structure as the test_free_list_read_lock
212  * except the parent process does not keep the read lock open while the
213  * child process is performing an update.
214  */
215 static void test_free_list_no_read_lock(void **state)
216 {
217         int ret;
218         struct test_ctx *test_ctx =
219             talloc_get_type_abort(*state, struct test_ctx);
220         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
221         struct ldb_val key;
222         struct ldb_val val;
223
224         const char *KEY1 = "KEY01";
225
226         /*
227          * Pipes etc to co-ordinate the processes
228          */
229         int to_child[2];
230         int to_parent[2];
231         char buf[2];
232         pid_t pid;
233         size_t i;
234
235         TALLOC_CTX *tmp_ctx;
236         tmp_ctx = talloc_new(test_ctx);
237         assert_non_null(tmp_ctx);
238
239         ret = pipe(to_child);
240         assert_int_equal(ret, 0);
241         ret = pipe(to_parent);
242         assert_int_equal(ret, 0);
243         /*
244          * Now fork a new process
245          */
246
247         pid = fork();
248         if (pid == 0) {
249                 /*
250                  * Child process
251                  */
252
253                 struct ldb_context *ldb = NULL;
254                 close(to_child[1]);
255                 close(to_parent[0]);
256
257                 /*
258                  * Wait for the parent to get ready.
259                  */
260                 ret = read(to_child[0], buf, 2);
261                 assert_int_equal(ret, 2);
262
263                 ldb = ldb_init(test_ctx, test_ctx->ev);
264                 assert_non_null(ldb);
265
266                 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
267                 assert_int_equal(ret, LDB_SUCCESS);
268
269                 ldb_kv = get_ldb_kv(ldb);
270                 assert_non_null(ldb_kv);
271                 /*
272                  * Add a record to the database
273                  */
274                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
275                 key.length = strlen(KEY1) + 1;
276                 val.data = talloc_zero_size(tmp_ctx, RECORD_SIZE);
277                 assert_non_null(val.data);
278                 memset(val.data, 'x', RECORD_SIZE);
279                 val.length = RECORD_SIZE;
280                 /*
281                  * Do more iterations than when a read lock, stale reader
282                  * active to confirm that the space is being re-used.
283                  */
284                 for (i = 0; i < ITERATIONS * 10; i++) {
285                         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
286                         assert_int_equal(ret, LDB_SUCCESS);
287
288                         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
289                         assert_int_equal(ret, LDB_SUCCESS);
290
291                         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
292                         assert_int_equal(ret, LDB_SUCCESS);
293                 }
294
295                 /*
296                  * Signal the parent that we've done the updates
297                  */
298                 ret = write(to_parent[1], "GO", 2);
299                 assert_int_equal(ret, 2);
300                 exit(0);
301         }
302
303         close(to_child[0]);
304         close(to_parent[1]);
305
306         /*
307          * Begin a read transaction
308          */
309         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
310         assert_int_equal(ret, LDB_SUCCESS);
311
312         /*
313          * Now close it
314          */
315         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
316         assert_int_equal(ret, LDB_SUCCESS);
317
318         /*
319          * Signal the child process
320          */
321         ret = write(to_child[1], "GO", 2);
322         assert_int_equal(2, ret);
323
324         /*
325          * Wait for the child process to update the record
326          */
327         ret = read(to_parent[0], buf, 2);
328         assert_int_equal(2, ret);
329
330         /*
331          * Begin a read transaction
332          */
333         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
334         assert_int_equal(ret, LDB_SUCCESS);
335         /*
336          * read the record
337          * and close the transaction
338          */
339         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
340         key.length = strlen(KEY1) + 1;
341
342         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
343         assert_int_equal(ret, LDB_SUCCESS);
344
345         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
346         assert_int_equal(ret, LDB_SUCCESS);
347
348         close(to_child[1]);
349         close(to_parent[0]);
350         TALLOC_FREE(tmp_ctx);
351 }
352
353 /*
354  * This test has the same structure as the test_free_list_read_lock
355  * except the parent process keeps the read lock open while the
356  * child process is performing an update.
357  */
358 static void test_free_list_read_lock(void **state)
359 {
360         int ret;
361         struct test_ctx *test_ctx =
362             talloc_get_type_abort(*state, struct test_ctx);
363         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
364         struct ldb_val key;
365         struct ldb_val val;
366
367         const char *KEY1 = "KEY01";
368
369         /*
370          * Pipes etc to co-ordinate the processes
371          */
372         int to_child[2];
373         int to_parent[2];
374         char buf[2];
375         pid_t pid;
376         size_t i;
377
378         TALLOC_CTX *tmp_ctx;
379         tmp_ctx = talloc_new(test_ctx);
380         assert_non_null(tmp_ctx);
381
382         ret = pipe(to_child);
383         assert_int_equal(ret, 0);
384         ret = pipe(to_parent);
385         assert_int_equal(ret, 0);
386         /*
387          * Now fork a new process
388          */
389
390         pid = fork();
391         if (pid == 0) {
392                 /*
393                  * Child process
394                  */
395
396                 struct ldb_context *ldb = NULL;
397                 close(to_child[1]);
398                 close(to_parent[0]);
399
400                 /*
401                  * Wait for the transaction to start
402                  */
403                 ret = read(to_child[0], buf, 2);
404                 assert_int_equal(ret, 2);
405
406                 ldb = ldb_init(test_ctx, test_ctx->ev);
407                 assert_non_null(ldb);
408
409                 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
410                 assert_int_equal(ret, LDB_SUCCESS);
411
412                 ldb_kv = get_ldb_kv(ldb);
413                 assert_non_null(ldb_kv);
414                 /*
415                  * Add a record to the database
416                  */
417                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
418                 key.length = strlen(KEY1) + 1;
419                 val.data = talloc_zero_size(tmp_ctx, RECORD_SIZE);
420                 assert_non_null(val.data);
421                 memset(val.data, 'x', RECORD_SIZE);
422                 val.length = RECORD_SIZE;
423                 for (i = 0; i < ITERATIONS; i++) {
424                         ret = ldb_kv->kv_ops->begin_write(ldb_kv);
425                         assert_int_equal(ret, 0);
426                         ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
427                         if (ret == LDB_ERR_BUSY && i > 0) {
428                                 int rc = ldb_kv->kv_ops->abort_write(ldb_kv);
429                                 assert_int_equal(rc, LDB_SUCCESS);
430                                 break;
431                         }
432                         assert_int_equal(ret, LDB_SUCCESS);
433                         ret = ldb_kv->kv_ops->finish_write(ldb_kv);
434                         assert_int_equal(ret, LDB_SUCCESS);
435                 }
436                 assert_int_equal(ret, LDB_ERR_BUSY);
437                 assert_int_not_equal(i, 0);
438
439                 /*
440                  * Begin a read transaction
441                  */
442                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
443                 assert_int_equal(ret, LDB_SUCCESS);
444                 /*
445                  * read the record
446                  * and close the transaction
447                  */
448                 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
449                 key.length = strlen(KEY1) + 1;
450
451                 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
452                 assert_int_equal(ret, LDB_SUCCESS);
453
454                 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
455                 assert_int_equal(ret, LDB_SUCCESS);
456
457                 /*
458                  * Signal the the parent that we've done the update
459                  */
460                 ret = write(to_parent[1], "GO", 2);
461                 assert_int_equal(ret, 2);
462                 exit(0);
463         }
464
465         close(to_child[0]);
466         close(to_parent[1]);
467
468         /*
469          * Begin a read transaction
470          */
471         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
472         assert_int_equal(ret, LDB_SUCCESS);
473
474         /*
475          * Signal the child process
476          */
477         ret = write(to_child[1], "GO", 2);
478         assert_int_equal(ret, 2);
479
480         /*
481          * Wait for the child process to update the record
482          */
483         ret = read(to_parent[0], buf, 2);
484         assert_int_equal(ret, 2);
485
486         /*
487          * read the record
488          * and close the transaction
489          */
490         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
491         key.length = strlen(KEY1) + 1;
492
493         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
494         assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
495         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
496         assert_int_equal(ret, 0);
497
498         close(to_child[1]);
499         close(to_parent[0]);
500         TALLOC_FREE(tmp_ctx);
501 }
502
503 /*
504  * This tests forks a child process that opens a read lock and then
505  * exits. This results in a stale reader entry in the lmdb lock file.
506  */
507 static void test_free_list_stale_reader(void **state)
508 {
509         int ret;
510         struct test_ctx *test_ctx =
511             talloc_get_type_abort(*state, struct test_ctx);
512         struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
513         struct ldb_val key;
514         struct ldb_val val;
515
516         const char *KEY1 = "KEY01";
517
518         /*
519          * Pipes etc to co-ordinate the processes
520          */
521         int to_child[2];
522         int to_parent[2];
523         char buf[2];
524         pid_t pid;
525         size_t i;
526
527         TALLOC_CTX *tmp_ctx;
528         tmp_ctx = talloc_new(test_ctx);
529         assert_non_null(tmp_ctx);
530
531         ret = pipe(to_child);
532         assert_int_equal(ret, 0);
533         ret = pipe(to_parent);
534         assert_int_equal(ret, 0);
535         /*
536          * Now fork a new process
537          */
538
539         pid = fork();
540         if (pid == 0) {
541                 /*
542                  * Child process
543                  */
544
545                 struct ldb_context *ldb = NULL;
546                 close(to_child[1]);
547                 close(to_parent[0]);
548
549                 /*
550                  * Wait for the parent to get ready
551                  */
552                 ret = read(to_child[0], buf, 2);
553                 assert_int_equal(ret, 2);
554
555                 ldb = ldb_init(test_ctx, test_ctx->ev);
556                 assert_non_null(ldb);
557
558                 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
559                 assert_int_equal(ret, LDB_SUCCESS);
560
561                 ldb_kv = get_ldb_kv(ldb);
562                 assert_non_null(ldb_kv);
563
564                 /*
565                  * Begin a read transaction
566                  */
567                 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
568                 assert_int_equal(ret, LDB_SUCCESS);
569
570                 /*
571                  * Now exit with out releasing the read lock
572                  * this will result in a stale entry in the
573                  * read lock table.
574                  */
575
576                 exit(0);
577         }
578
579         close(to_child[0]);
580         close(to_parent[1]);
581
582         /*
583          * Tell the child to start
584          */
585         ret = write(to_child[1], "GO", 2);
586         assert_int_equal(ret, 2);
587
588         close(to_child[1]);
589         close(to_parent[0]);
590
591         /*
592          * Now wait for the child process to complete
593          */
594         waitpid(pid, NULL, 0);
595
596         /*
597          * Add a record to the database
598          */
599         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
600         key.length = strlen(KEY1) + 1;
601         val.data = talloc_zero_size(tmp_ctx, RECORD_SIZE);
602         assert_non_null(val.data);
603         memset(val.data, 'x', RECORD_SIZE);
604         val.length = RECORD_SIZE;
605         for (i = 0; i < ITERATIONS; i++) {
606                 ret = ldb_kv->kv_ops->begin_write(ldb_kv);
607                 assert_int_equal(ret, LDB_SUCCESS);
608
609                 ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
610                 if (ret == LDB_ERR_BUSY && i > 0) {
611                         int rc = ldb_kv->kv_ops->abort_write(ldb_kv);
612                         assert_int_equal(rc, LDB_SUCCESS);
613                         break;
614                 }
615                 assert_int_equal(ret, LDB_SUCCESS);
616
617                 ret = ldb_kv->kv_ops->finish_write(ldb_kv);
618                 assert_int_equal(ret, LDB_SUCCESS);
619         }
620         assert_int_equal(ret, LDB_ERR_BUSY);
621         assert_int_not_equal(i, 0);
622
623         /*
624          * Begin a read transaction
625          */
626         ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
627         assert_int_equal(ret, LDB_SUCCESS);
628         /*
629          * read the record
630          * and close the transaction
631          */
632         key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
633         key.length = strlen(KEY1) + 1;
634
635         ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
636         assert_int_equal(ret, LDB_SUCCESS);
637
638         ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
639         assert_int_equal(ret, LDB_SUCCESS);
640
641         TALLOC_FREE(tmp_ctx);
642 }
643
644 int main(int argc, const char **argv)
645 {
646         const struct CMUnitTest tests[] = {
647             cmocka_unit_test_setup_teardown(
648                 test_free_list_no_read_lock, setup, teardown),
649             cmocka_unit_test_setup_teardown(
650                 test_free_list_read_lock, setup, teardown),
651             cmocka_unit_test_setup_teardown(
652                 test_free_list_stale_reader, setup, teardown),
653         };
654
655         return cmocka_run_group_tests(tests, NULL, NULL);
656 }