lmdb: iterate_range implementation
authorGarming Sam <garming@catalyst.net.nz>
Sun, 3 Mar 2019 23:50:24 +0000 (12:50 +1300)
committerAndrew Bartlett <abartlet@samba.org>
Mon, 8 Apr 2019 02:07:22 +0000 (02:07 +0000)
Adding iterate_range to LDB API and implementing in LMDB.  This
operation takes a start_key and end_key and returns all records between
the two, inclusive of both.  This will be used to implementing indexing
for <= and >= expressions.

Signed-off-by: Garming Sam <garming@catalyst.net.nz>
Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
lib/ldb/ldb_key_value/ldb_kv.h
lib/ldb/ldb_mdb/ldb_mdb.c
lib/ldb/ldb_tdb/ldb_tdb.c

index a4aa5ed9e62ae4ca73c748f0386b1b93e60811aa..2fa931a0b7ab39c8854bf9d07fa1d082d1ee4332 100644 (file)
@@ -32,6 +32,11 @@ struct kv_db_ops {
                                             struct ldb_val data,
                                             void *private_data),
                               void *ctx);
+       int (*iterate_range)(struct ldb_kv_private *ldb_kv,
+                            struct ldb_val start_key,
+                            struct ldb_val end_key,
+                            ldb_kv_traverse_fn fn,
+                            void *ctx);
        int (*lock_read)(struct ldb_module *);
        int (*unlock_read)(struct ldb_module *);
        int (*begin_write)(struct ldb_kv_private *);
index 1e94d6608624d8d60b996506bd9eb05f03dfa17c..fd13bc3dcc32166a0ed9d4ea213f680681da398f 100644 (file)
@@ -415,6 +415,119 @@ static int lmdb_parse_record(struct ldb_kv_private *ldb_kv,
        return parser(key, data, ctx);
 }
 
+/*
+ * Exactly the same as iterate, except we have a start key and an end key
+ * (which are both included in the results if present).
+ *
+ * If start > end, return MDB_PANIC.
+ */
+static int lmdb_iterate_range(struct ldb_kv_private *ldb_kv,
+                             struct ldb_val start_key,
+                             struct ldb_val end_key,
+                             ldb_kv_traverse_fn fn,
+                             void *ctx)
+{
+       struct lmdb_private *lmdb = ldb_kv->lmdb_private;
+       MDB_val mdb_key;
+       MDB_val mdb_data;
+       MDB_txn *txn = NULL;
+       MDB_dbi dbi = 0;
+       MDB_cursor *cursor = NULL;
+       int ret;
+
+       MDB_val mdb_s_key;
+       MDB_val mdb_e_key;
+
+       txn = get_current_txn(lmdb);
+       if (txn == NULL) {
+               ldb_debug(lmdb->ldb, LDB_DEBUG_FATAL, "No transaction");
+               lmdb->error = MDB_PANIC;
+               return ldb_mdb_error(lmdb->ldb, lmdb->error);
+       }
+
+       lmdb->error = mdb_dbi_open(txn, NULL, 0, &dbi);
+       if (lmdb->error != MDB_SUCCESS) {
+               return ldb_mdb_error(lmdb->ldb, lmdb->error);
+       }
+
+       mdb_s_key.mv_size = start_key.length;
+       mdb_s_key.mv_data = start_key.data;
+
+       mdb_e_key.mv_size = end_key.length;
+       mdb_e_key.mv_data = end_key.data;
+
+       if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) {
+               lmdb->error = MDB_PANIC;
+               return ldb_mdb_error(lmdb->ldb, lmdb->error);
+       }
+
+       lmdb->error = mdb_cursor_open(txn, dbi, &cursor);
+       if (lmdb->error != MDB_SUCCESS) {
+               goto done;
+       }
+
+       lmdb->error = mdb_cursor_get(cursor, &mdb_s_key, &mdb_data, MDB_SET_RANGE);
+
+       if (lmdb->error != MDB_SUCCESS) {
+               if (lmdb->error == MDB_NOTFOUND) {
+                       lmdb->error = MDB_SUCCESS;
+               }
+               goto done;
+       } else {
+               struct ldb_val key = {
+                       .length = mdb_s_key.mv_size,
+                       .data = mdb_s_key.mv_data,
+               };
+               struct ldb_val data = {
+                       .length = mdb_data.mv_size,
+                       .data = mdb_data.mv_data,
+               };
+
+               if (mdb_cmp(txn, dbi, &mdb_s_key, &mdb_e_key) > 0) {
+                       goto done;
+               }
+
+               ret = fn(ldb_kv, key, data, ctx);
+               if (ret != 0) {
+                       goto done;
+               }
+       }
+
+       while ((lmdb->error = mdb_cursor_get(
+                       cursor, &mdb_key,
+                       &mdb_data, MDB_NEXT)) == MDB_SUCCESS) {
+
+               struct ldb_val key = {
+                       .length = mdb_key.mv_size,
+                       .data = mdb_key.mv_data,
+               };
+               struct ldb_val data = {
+                       .length = mdb_data.mv_size,
+                       .data = mdb_data.mv_data,
+               };
+
+               if (mdb_cmp(txn, dbi, &mdb_key, &mdb_e_key) > 0) {
+                       goto done;
+               }
+
+               ret = fn(ldb_kv, key, data, ctx);
+               if (ret != 0) {
+                       goto done;
+               }
+       }
+       if (lmdb->error == MDB_NOTFOUND) {
+               lmdb->error = MDB_SUCCESS;
+       }
+done:
+       if (cursor != NULL) {
+               mdb_cursor_close(cursor);
+       }
+
+       if (lmdb->error != MDB_SUCCESS) {
+               return ldb_mdb_error(lmdb->ldb, lmdb->error);
+       }
+       return ldb_mdb_err_map(lmdb->error);
+}
 
 static int lmdb_lock_read(struct ldb_module *module)
 {
@@ -603,6 +716,7 @@ static struct kv_db_ops lmdb_key_value_ops = {
        .iterate            = lmdb_traverse_fn,
        .update_in_iterate  = lmdb_update_in_iterate,
        .fetch_and_parse    = lmdb_parse_record,
+       .iterate_range      = lmdb_iterate_range,
        .lock_read          = lmdb_lock_read,
        .unlock_read        = lmdb_unlock_read,
        .begin_write        = lmdb_transaction_start,
index 9f16040659ba627a9bc8cb1ee74e8c8755c19c74..3dcc158729a1f27300ca7dbc357c7444c51333e9 100644 (file)
@@ -380,6 +380,22 @@ static int ltdb_parse_record(struct ldb_kv_private *ldb_kv,
        return ltdb_err_map(tdb_error(ldb_kv->tdb));
 }
 
+static int ltdb_iterate_range(struct ldb_kv_private *ldb_kv,
+                             struct ldb_val start_key,
+                             struct ldb_val end_key,
+                             ldb_kv_traverse_fn fn,
+                             void *ctx)
+{
+       /*
+        * We do not implement this operation because we do not know how to
+        * iterate from one key to the next (in a sorted fashion).
+        *
+        * We could mimic it potentially, but it would violate boundaries of
+        * knowledge (data type representation).
+        */
+       return LDB_ERR_OPERATIONS_ERROR;
+}
+
 static const char *ltdb_name(struct ldb_kv_private *ldb_kv)
 {
        return tdb_name(ldb_kv->tdb);
@@ -423,6 +439,7 @@ static const struct kv_db_ops key_value_ops = {
     .iterate = ltdb_traverse_fn,
     .update_in_iterate = ltdb_update_in_iterate,
     .fetch_and_parse = ltdb_parse_record,
+    .iterate_range = ltdb_iterate_range,
     .lock_read = ltdb_lock_read,
     .unlock_read = ltdb_unlock_read,
     .begin_write = ltdb_transaction_start,