return parser(key, data, ctx);
}
+/*
+ * Exactly the same as iterate, except we have a start key and an end key
+ * (which are both included in the results if present).
+ *
+ * If start > end, return MDB_PANIC.
+ */
+static int lmdb_iterate_range(struct ldb_kv_private *ldb_kv,
+ struct ldb_val start_key,
+ struct ldb_val end_key,
+ ldb_kv_traverse_fn fn,
+ void *ctx)
+{
+ struct lmdb_private *lmdb = ldb_kv->lmdb_private;
+ MDB_val mdb_key;
+ MDB_val mdb_data;
+ MDB_txn *txn = NULL;
+ MDB_dbi dbi = 0;
+ MDB_cursor *cursor = NULL;
+ int ret;
+
+ MDB_val mdb_s_key;
+ MDB_val mdb_e_key;
+
+ txn = get_current_txn(lmdb);
+ if (txn == NULL) {
+ ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
+ lmdb->error = MDB_PANIC;
+ return ldb_mdb_error(lmdb->ldb, lmdb->error);
+ }
+
+ lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
+ if (lmdb->error != MDB_SUCCESS) {
+ return ldb_mdb_error(lmdb->ldb, lmdb->error);
+ }
+
+ mdb_s_key.mv_size = start_key.length;
+ mdb_s_key.mv_data = start_key.data;
+
+ mdb_e_key.mv_size = end_key.length;
+ mdb_e_key.mv_data = end_key.data;
+
+ if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) {
+ lmdb->error = MDB_PANIC;
+ return ldb_mdb_error(lmdb->ldb, lmdb->error);
+ }
+
+ lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
+ if (lmdb->error != MDB_SUCCESS) {
+ goto done;
+ }
+
+ lmdb->error = mdb_cursor_get(cursor, &mdb_s_key, &mdb_data, MDB_SET_RANGE);
+
+ if (lmdb->error != MDB_SUCCESS) {
+ if (lmdb->error == MDB_NOTFOUND) {
+ lmdb->error = MDB_SUCCESS;
+ }
+ goto done;
+ } else {
+ struct ldb_val key = {
+ .length = mdb_s_key.mv_size,
+ .data = mdb_s_key.mv_data,
+ };
+ struct ldb_val data = {
+ .length = mdb_data.mv_size,
+ .data = mdb_data.mv_data,
+ };
+
+ if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) {
+ goto done;
+ }
+
+ ret = fn(ldb_kv, key, data, ctx);
+ if (ret != 0) {
+ goto done;
+ }
+ }
+
+ while ((lmdb->error = mdb_cursor_get(
+ cursor, &mdb_key,
+ &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
+
+ struct ldb_val key = {
+ .length = mdb_key.mv_size,
+ .data = mdb_key.mv_data,
+ };
+ struct ldb_val data = {
+ .length = mdb_data.mv_size,
+ .data = mdb_data.mv_data,
+ };
+
+ if (mdb_cmp(txn, dbi, &mdb_key, &mdb_e_key) > 0) {
+ goto done;
+ }
+
+ ret = fn(ldb_kv, key, data, ctx);
+ if (ret != 0) {
+ goto done;
+ }
+ }
+ if (lmdb->error == MDB_NOTFOUND) {
+ lmdb->error = MDB_SUCCESS;
+ }
+done:
+ if (cursor != NULL) {
+ mdb_cursor_close(cursor);
+ }
+
+ if (lmdb->error != MDB_SUCCESS) {
+ return ldb_mdb_error(lmdb->ldb, lmdb->error);
+ }
+ return ldb_mdb_err_map(lmdb->error);
+}
static int lmdb_lock_read(struct ldb_module *module)
{
.iterate = lmdb_traverse_fn,
.update_in_iterate = lmdb_update_in_iterate,
.fetch_and_parse = lmdb_parse_record,
+ .iterate_range = lmdb_iterate_range,
.lock_read = lmdb_lock_read,
.unlock_read = lmdb_unlock_read,
.begin_write = lmdb_transaction_start,
return ltdb_err_map(tdb_error(ldb_kv->tdb));
}
+static int ltdb_iterate_range(struct ldb_kv_private *ldb_kv,
+ struct ldb_val start_key,
+ struct ldb_val end_key,
+ ldb_kv_traverse_fn fn,
+ void *ctx)
+{
+ /*
+ * We do not implement this operation because we do not know how to
+ * iterate from one key to the next (in a sorted fashion).
+ *
+ * We could mimic it potentially, but it would violate boundaries of
+ * knowledge (data type representation).
+ */
+ return LDB_ERR_OPERATIONS_ERROR;
+}
+
static const char *ltdb_name(struct ldb_kv_private *ldb_kv)
{
return tdb_name(ldb_kv->tdb);
.iterate = ltdb_traverse_fn,
.update_in_iterate = ltdb_update_in_iterate,
.fetch_and_parse = ltdb_parse_record,
+ .iterate_range = ltdb_iterate_range,
.lock_read = ltdb_lock_read,
.unlock_read = ltdb_unlock_read,
.begin_write = ltdb_transaction_start,