2 * Copyright (C) Catalyst.Net Ltd 2020
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 3 of the License, or
7 * (at your option) any later version.
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
14 * You should have received a copy of the GNU General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
20 * Tests confirming lmdb's handling of the free space list in the presence
21 * of active and stale readers. A stale reader is a process that opens a
22 * read lock and then exits without releasing the lock.
24 * lmdb uses MVCC to maintain databased consistency, new copies of updated
25 * records are written to the database. The old entries are only
26 * reused when they are no longer referenced in a read transaction.
28 * The tests all update a single record multiple times
30 * If there is a read transaction or a stale reader lmdb will report
33 * If no read transaction and no stale reader, lmdb reclaims space from the
39 * These headers or their equivalents should be included prior to
47 * This allows test applications to use custom definitions of C standard
48 * library functions and types.
63 #include <ldb_module.h>
64 #include <ldb_private.h>
70 #include "ldb_tdb/ldb_tdb.h"
71 #include "ldb_key_value/ldb_kv.h"
73 #define DEFAULT_BE "mdb"
76 #define TEST_BE DEFAULT_BE
79 const int RECORD_SIZE = 6144;
80 const int ITERATIONS = 3;
83 struct tevent_context *ev;
84 struct ldb_context *ldb;
87 const char *lockfile; /* lockfile is separate */
92 static void unlink_old_db(struct test_ctx *test_ctx)
97 ret = unlink(test_ctx->lockfile);
98 if (ret == -1 && errno != ENOENT) {
103 ret = unlink(test_ctx->dbfile);
104 if (ret == -1 && errno != ENOENT) {
109 static int noconn_setup(void **state)
111 struct test_ctx *test_ctx;
113 test_ctx = talloc_zero(NULL, struct test_ctx);
114 assert_non_null(test_ctx);
116 test_ctx->ev = tevent_context_init(test_ctx);
117 assert_non_null(test_ctx->ev);
119 test_ctx->ldb = ldb_init(test_ctx, test_ctx->ev);
120 assert_non_null(test_ctx->ldb);
122 test_ctx->dbfile = talloc_strdup(test_ctx, "lmdb_free_list_test.ldb");
123 assert_non_null(test_ctx->dbfile);
126 talloc_asprintf(test_ctx, "%s-lock", test_ctx->dbfile);
127 assert_non_null(test_ctx->lockfile);
130 talloc_asprintf(test_ctx, TEST_BE "://%s", test_ctx->dbfile);
131 assert_non_null(test_ctx->dbpath);
133 unlink_old_db(test_ctx);
138 static int noconn_teardown(void **state)
140 struct test_ctx *test_ctx =
141 talloc_get_type_abort(*state, struct test_ctx);
143 unlink_old_db(test_ctx);
144 talloc_free(test_ctx);
148 static int setup(void **state)
150 struct test_ctx *test_ctx;
152 struct ldb_ldif *ldif;
153 const char *index_ldif = "dn: @INDEXLIST\n"
154 "@IDXGUID: objectUUID\n"
155 "@IDX_DN_GUID: GUID\n"
158 * Use a 64KiB DB for this test
160 const char *options[] = {"lmdb_env_size:65536", NULL};
162 noconn_setup((void **)&test_ctx);
164 ret = ldb_connect(test_ctx->ldb, test_ctx->dbpath, 0, options);
165 assert_int_equal(ret, 0);
167 while ((ldif = ldb_ldif_read_string(test_ctx->ldb, &index_ldif))) {
168 ret = ldb_add(test_ctx->ldb, ldif->msg);
169 assert_int_equal(ret, LDB_SUCCESS);
175 static int teardown(void **state)
177 struct test_ctx *test_ctx =
178 talloc_get_type_abort(*state, struct test_ctx);
179 noconn_teardown((void **)&test_ctx);
183 static struct ldb_kv_private *get_ldb_kv(struct ldb_context *ldb)
186 struct ldb_kv_private *ldb_kv = NULL;
188 data = ldb_module_get_private(ldb->modules);
189 assert_non_null(data);
191 ldb_kv = talloc_get_type(data, struct ldb_kv_private);
192 assert_non_null(ldb_kv);
197 static int parse(struct ldb_val key, struct ldb_val data, void *private_data)
199 struct ldb_val *read = private_data;
201 /* Yes, we leak this. That is OK */
202 read->data = talloc_size(NULL, data.length);
203 assert_non_null(read->data);
205 memcpy(read->data, data.data, data.length);
206 read->length = data.length;
211 * This test has the same structure as the test_free_list_read_lock
212 * except the parent process does not keep the read lock open while the
213 * child process is performing an update.
215 static void test_free_list_no_read_lock(void **state)
218 struct test_ctx *test_ctx =
219 talloc_get_type_abort(*state, struct test_ctx);
220 struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
224 const char *KEY1 = "KEY01";
227 * Pipes etc to co-ordinate the processes
236 tmp_ctx = talloc_new(test_ctx);
237 assert_non_null(tmp_ctx);
239 ret = pipe(to_child);
240 assert_int_equal(ret, 0);
241 ret = pipe(to_parent);
242 assert_int_equal(ret, 0);
244 * Now fork a new process
253 struct ldb_context *ldb = NULL;
258 * Wait for the parent to get ready.
260 ret = read(to_child[0], buf, 2);
261 assert_int_equal(ret, 2);
263 ldb = ldb_init(test_ctx, test_ctx->ev);
264 assert_non_null(ldb);
266 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
267 assert_int_equal(ret, LDB_SUCCESS);
269 ldb_kv = get_ldb_kv(ldb);
270 assert_non_null(ldb_kv);
272 * Add a record to the database
274 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
275 key.length = strlen(KEY1) + 1;
276 val.data = talloc_zero_size(tmp_ctx, RECORD_SIZE);
277 assert_non_null(val.data);
278 memset(val.data, 'x', RECORD_SIZE);
279 val.length = RECORD_SIZE;
281 * Do more iterations than when a read lock, stale reader
282 * active to confirm that the space is being re-used.
284 for (i = 0; i < ITERATIONS * 10; i++) {
285 ret = ldb_kv->kv_ops->begin_write(ldb_kv);
286 assert_int_equal(ret, LDB_SUCCESS);
288 ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
289 assert_int_equal(ret, LDB_SUCCESS);
291 ret = ldb_kv->kv_ops->finish_write(ldb_kv);
292 assert_int_equal(ret, LDB_SUCCESS);
296 * Signal the parent that we've done the updates
298 ret = write(to_parent[1], "GO", 2);
299 assert_int_equal(ret, 2);
307 * Begin a read transaction
309 ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
310 assert_int_equal(ret, LDB_SUCCESS);
315 ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
316 assert_int_equal(ret, LDB_SUCCESS);
319 * Signal the child process
321 ret = write(to_child[1], "GO", 2);
322 assert_int_equal(2, ret);
325 * Wait for the child process to update the record
327 ret = read(to_parent[0], buf, 2);
328 assert_int_equal(2, ret);
331 * Begin a read transaction
333 ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
334 assert_int_equal(ret, LDB_SUCCESS);
337 * and close the transaction
339 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
340 key.length = strlen(KEY1) + 1;
342 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
343 assert_int_equal(ret, LDB_SUCCESS);
345 ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
346 assert_int_equal(ret, LDB_SUCCESS);
350 TALLOC_FREE(tmp_ctx);
354 * This test has the same structure as the test_free_list_read_lock
355 * except the parent process keeps the read lock open while the
356 * child process is performing an update.
358 static void test_free_list_read_lock(void **state)
361 struct test_ctx *test_ctx =
362 talloc_get_type_abort(*state, struct test_ctx);
363 struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
367 const char *KEY1 = "KEY01";
370 * Pipes etc to co-ordinate the processes
379 tmp_ctx = talloc_new(test_ctx);
380 assert_non_null(tmp_ctx);
382 ret = pipe(to_child);
383 assert_int_equal(ret, 0);
384 ret = pipe(to_parent);
385 assert_int_equal(ret, 0);
387 * Now fork a new process
396 struct ldb_context *ldb = NULL;
401 * Wait for the transaction to start
403 ret = read(to_child[0], buf, 2);
404 assert_int_equal(ret, 2);
406 ldb = ldb_init(test_ctx, test_ctx->ev);
407 assert_non_null(ldb);
409 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
410 assert_int_equal(ret, LDB_SUCCESS);
412 ldb_kv = get_ldb_kv(ldb);
413 assert_non_null(ldb_kv);
415 * Add a record to the database
417 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
418 key.length = strlen(KEY1) + 1;
419 val.data = talloc_zero_size(tmp_ctx, RECORD_SIZE);
420 assert_non_null(val.data);
421 memset(val.data, 'x', RECORD_SIZE);
422 val.length = RECORD_SIZE;
423 for (i = 0; i < ITERATIONS; i++) {
424 ret = ldb_kv->kv_ops->begin_write(ldb_kv);
425 assert_int_equal(ret, 0);
426 ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
427 if (ret == LDB_ERR_BUSY && i > 0) {
428 int rc = ldb_kv->kv_ops->abort_write(ldb_kv);
429 assert_int_equal(rc, LDB_SUCCESS);
432 assert_int_equal(ret, LDB_SUCCESS);
433 ret = ldb_kv->kv_ops->finish_write(ldb_kv);
434 assert_int_equal(ret, LDB_SUCCESS);
436 assert_int_equal(ret, LDB_ERR_BUSY);
437 assert_int_not_equal(i, 0);
440 * Begin a read transaction
442 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
443 assert_int_equal(ret, LDB_SUCCESS);
446 * and close the transaction
448 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
449 key.length = strlen(KEY1) + 1;
451 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
452 assert_int_equal(ret, LDB_SUCCESS);
454 ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
455 assert_int_equal(ret, LDB_SUCCESS);
458 * Signal the the parent that we've done the update
460 ret = write(to_parent[1], "GO", 2);
461 assert_int_equal(ret, 2);
469 * Begin a read transaction
471 ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
472 assert_int_equal(ret, LDB_SUCCESS);
475 * Signal the child process
477 ret = write(to_child[1], "GO", 2);
478 assert_int_equal(ret, 2);
481 * Wait for the child process to update the record
483 ret = read(to_parent[0], buf, 2);
484 assert_int_equal(ret, 2);
488 * and close the transaction
490 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
491 key.length = strlen(KEY1) + 1;
493 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
494 assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
495 ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
496 assert_int_equal(ret, 0);
500 TALLOC_FREE(tmp_ctx);
504 * This tests forks a child process that opens a read lock and then
505 * exits. This results in a stale reader entry in the lmdb lock file.
507 static void test_free_list_stale_reader(void **state)
510 struct test_ctx *test_ctx =
511 talloc_get_type_abort(*state, struct test_ctx);
512 struct ldb_kv_private *ldb_kv = get_ldb_kv(test_ctx->ldb);
516 const char *KEY1 = "KEY01";
519 * Pipes etc to co-ordinate the processes
528 tmp_ctx = talloc_new(test_ctx);
529 assert_non_null(tmp_ctx);
531 ret = pipe(to_child);
532 assert_int_equal(ret, 0);
533 ret = pipe(to_parent);
534 assert_int_equal(ret, 0);
536 * Now fork a new process
545 struct ldb_context *ldb = NULL;
550 * Wait for the parent to get ready
552 ret = read(to_child[0], buf, 2);
553 assert_int_equal(ret, 2);
555 ldb = ldb_init(test_ctx, test_ctx->ev);
556 assert_non_null(ldb);
558 ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
559 assert_int_equal(ret, LDB_SUCCESS);
561 ldb_kv = get_ldb_kv(ldb);
562 assert_non_null(ldb_kv);
565 * Begin a read transaction
567 ret = ldb_kv->kv_ops->lock_read(ldb->modules);
568 assert_int_equal(ret, LDB_SUCCESS);
571 * Now exit with out releasing the read lock
572 * this will result in a stale entry in the
583 * Tell the child to start
585 ret = write(to_child[1], "GO", 2);
586 assert_int_equal(ret, 2);
592 * Now wait for the child process to complete
594 waitpid(pid, NULL, 0);
597 * Add a record to the database
599 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
600 key.length = strlen(KEY1) + 1;
601 val.data = talloc_zero_size(tmp_ctx, RECORD_SIZE);
602 assert_non_null(val.data);
603 memset(val.data, 'x', RECORD_SIZE);
604 val.length = RECORD_SIZE;
605 for (i = 0; i < ITERATIONS; i++) {
606 ret = ldb_kv->kv_ops->begin_write(ldb_kv);
607 assert_int_equal(ret, LDB_SUCCESS);
609 ret = ldb_kv->kv_ops->store(ldb_kv, key, val, 0);
610 if (ret == LDB_ERR_BUSY && i > 0) {
611 int rc = ldb_kv->kv_ops->abort_write(ldb_kv);
612 assert_int_equal(rc, LDB_SUCCESS);
615 assert_int_equal(ret, LDB_SUCCESS);
617 ret = ldb_kv->kv_ops->finish_write(ldb_kv);
618 assert_int_equal(ret, LDB_SUCCESS);
620 assert_int_equal(ret, LDB_ERR_BUSY);
621 assert_int_not_equal(i, 0);
624 * Begin a read transaction
626 ret = ldb_kv->kv_ops->lock_read(test_ctx->ldb->modules);
627 assert_int_equal(ret, LDB_SUCCESS);
630 * and close the transaction
632 key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
633 key.length = strlen(KEY1) + 1;
635 ret = ldb_kv->kv_ops->fetch_and_parse(ldb_kv, key, parse, &val);
636 assert_int_equal(ret, LDB_SUCCESS);
638 ret = ldb_kv->kv_ops->unlock_read(test_ctx->ldb->modules);
639 assert_int_equal(ret, LDB_SUCCESS);
641 TALLOC_FREE(tmp_ctx);
644 int main(int argc, const char **argv)
646 const struct CMUnitTest tests[] = {
647 cmocka_unit_test_setup_teardown(
648 test_free_list_no_read_lock, setup, teardown),
649 cmocka_unit_test_setup_teardown(
650 test_free_list_read_lock, setup, teardown),
651 cmocka_unit_test_setup_teardown(
652 test_free_list_stale_reader, setup, teardown),
655 return cmocka_run_group_tests(tests, NULL, NULL);