dbwrap_rbt: fix modifying the db during traverse
authorStefan Metzmacher <metze@samba.org>
Wed, 25 Nov 2015 08:22:08 +0000 (09:22 +0100)
committerStefan Metzmacher <metze@samba.org>
Fri, 27 Nov 2015 09:10:18 +0000 (10:10 +0100)
We delete and add of records rebalace the tree, but our
traverse code doesn't handle that and skips records
randomly.

We maintain records in a linked list for now
in addition to the rbtree and use that list during
traverse.

This add a bit overhead, but at least it works reliable.
If someone finds a way to do reliable traverse with the
rebalanced tree, we can replace this commit.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=11375
BUG: https://bugzilla.samba.org/show_bug.cgi?id=11394

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Volker Lendecke <vl@samba.org>
lib/dbwrap/dbwrap_rbt.c

index 622ab0dd24b8d1ec599771855f9491534eef99df..3b5e5895cd5a48b671bf5c8af6858823280d7244 100644 (file)
 #include "dbwrap/dbwrap_private.h"
 #include "dbwrap/dbwrap_rbt.h"
 #include "../lib/util/rbtree.h"
+#include "../lib/util/dlinklist.h"
 
 #define DBWRAP_RBT_ALIGN(_size_) (((_size_)+15)&~15)
 
 struct db_rbt_ctx {
        struct rb_root tree;
+       struct db_rbt_node *nodes;
        size_t traverse_read;
-       bool traverse_write;
+       struct db_rbt_node **traverse_nextp;
 };
 
 struct db_rbt_rec {
@@ -40,6 +42,7 @@ struct db_rbt_rec {
 struct db_rbt_node {
        struct rb_node rb_node;
        size_t keysize, valuesize;
+       struct db_rbt_node *prev, *next;
 };
 
 /*
@@ -123,7 +126,8 @@ static NTSTATUS db_rbt_store(struct db_record *rec, TDB_DATA data, int flag)
        struct db_rbt_node *node;
 
        struct rb_node ** p;
-       struct rb_node * parent;
+       struct rb_node *parent = NULL;
+       struct db_rbt_node *parent_node = NULL;
 
        ssize_t reclen;
        TDB_DATA this_key, this_val;
@@ -165,12 +169,19 @@ static NTSTATUS db_rbt_store(struct db_record *rec, TDB_DATA data, int flag)
        }
 
        if (rec_priv->node != NULL) {
+               if (db_ctx->traverse_nextp != NULL) {
+                       if (*db_ctx->traverse_nextp == rec_priv->node) {
+                               *db_ctx->traverse_nextp = node;
+                       }
+               }
+
                /*
                 * We need to delete the key from the tree and start fresh,
                 * there's not enough space in the existing record
                 */
 
                rb_erase(&rec_priv->node->rb_node, &db_ctx->tree);
+               DLIST_REMOVE(db_ctx->nodes, rec_priv->node);
 
                /*
                 * Keep the existing node around for a while: If the record
@@ -197,10 +208,11 @@ static NTSTATUS db_rbt_store(struct db_record *rec, TDB_DATA data, int flag)
                TDB_DATA search_key, search_val;
                int res;
 
-               parent = (*p);
-
                r = db_rbt2node(*p);
 
+               parent = (*p);
+               parent_node = r;
+
                db_rbt_parse_node(r, &search_key, &search_val);
 
                res = db_rbt_compare(this_key, search_key);
@@ -217,6 +229,7 @@ static NTSTATUS db_rbt_store(struct db_record *rec, TDB_DATA data, int flag)
        }
 
        rb_link_node(&node->rb_node, parent, p);
+       DLIST_ADD_AFTER(db_ctx->nodes, node, parent_node);
        rb_insert_color(&node->rb_node, &db_ctx->tree);
 
        return NT_STATUS_OK;
@@ -236,7 +249,14 @@ static NTSTATUS db_rbt_delete(struct db_record *rec)
                return NT_STATUS_OK;
        }
 
+       if (db_ctx->traverse_nextp != NULL) {
+               if (*db_ctx->traverse_nextp == rec_priv->node) {
+                       *db_ctx->traverse_nextp = rec_priv->node->next;
+               }
+       }
+
        rb_erase(&rec_priv->node->rb_node, &db_ctx->tree);
+       DLIST_REMOVE(db_ctx->nodes, rec_priv->node);
        TALLOC_FREE(rec_priv->node);
 
        return NT_STATUS_OK;
@@ -383,56 +403,48 @@ static NTSTATUS db_rbt_parse_record(struct db_context *db, TDB_DATA key,
 }
 
 static int db_rbt_traverse_internal(struct db_context *db,
-                                   struct rb_node *n,
                                    int (*f)(struct db_record *db,
                                             void *private_data),
                                    void *private_data, uint32_t* count,
                                    bool rw)
 {
-       struct rb_node *rb_right;
-       struct rb_node *rb_left;
-       struct db_record rec;
-       struct db_rbt_rec rec_priv;
+       struct db_rbt_ctx *ctx = talloc_get_type_abort(
+               db->private_data, struct db_rbt_ctx);
+       struct db_rbt_node *cur = NULL;
+       struct db_rbt_node *next = NULL;
        int ret;
 
-       if (n == NULL) {
-               return 0;
-       }
-
-       rb_left = n->rb_left;
-       rb_right = n->rb_right;
+       for (cur = ctx->nodes; cur != NULL; cur = next) {
+               struct db_record rec;
+               struct db_rbt_rec rec_priv;
 
-       ret = db_rbt_traverse_internal(db, rb_left, f, private_data, count, rw);
-       if (ret != 0) {
-               return ret;
-       }
+               rec_priv.node = cur;
+               next = rec_priv.node->next;
 
-       rec_priv.node = db_rbt2node(n);
-       /* n might be altered by the callback function */
-       n = NULL;
+               ZERO_STRUCT(rec);
+               rec.db = db;
+               rec.private_data = &rec_priv;
+               rec.store = db_rbt_store;
+               rec.delete_rec = db_rbt_delete;
+               db_rbt_parse_node(rec_priv.node, &rec.key, &rec.value);
 
-       ZERO_STRUCT(rec);
-       rec.db = db;
-       rec.private_data = &rec_priv;
-       rec.store = db_rbt_store;
-       rec.delete_rec = db_rbt_delete;
-       db_rbt_parse_node(rec_priv.node, &rec.key, &rec.value);
-
-       ret = f(&rec, private_data);
-       (*count) ++;
-       if (ret != 0) {
-               return ret;
-       }
-
-       if (rec_priv.node != NULL) {
-               /*
-                * If the current record is still there
-                * we should take the current rb_right.
-                */
-               rb_right = rec_priv.node->rb_node.rb_right;
+               if (rw) {
+                       ctx->traverse_nextp = &next;
+               }
+               ret = f(&rec, private_data);
+               (*count) ++;
+               if (rw) {
+                       ctx->traverse_nextp = NULL;
+               }
+               if (ret != 0) {
+                       return ret;
+               }
+               if (rec_priv.node != NULL) {
+                       next = rec_priv.node->next;
+               }
        }
 
-       return db_rbt_traverse_internal(db, rb_right, f, private_data, count, rw);
+       return 0;
 }
 
 static int db_rbt_traverse_read(struct db_context *db,
@@ -446,7 +458,7 @@ static int db_rbt_traverse_read(struct db_context *db,
        int ret;
 
        ctx->traverse_read++;
-       ret = db_rbt_traverse_internal(db, ctx->tree.rb_node,
+       ret = db_rbt_traverse_internal(db,
                                       f, private_data, &count,
                                       false /* rw */);
        ctx->traverse_read--;
@@ -469,7 +481,7 @@ static int db_rbt_traverse(struct db_context *db,
        uint32_t count = 0;
        int ret;
 
-       if (ctx->traverse_write) {
+       if (ctx->traverse_nextp != NULL) {
                return -1;
        };
 
@@ -477,11 +489,9 @@ static int db_rbt_traverse(struct db_context *db,
                return db_rbt_traverse_read(db, f, private_data);
        }
 
-       ctx->traverse_write = true;
-       ret = db_rbt_traverse_internal(db, ctx->tree.rb_node,
+       ret = db_rbt_traverse_internal(db,
                                       f, private_data, &count,
                                       true /* rw */);
-       ctx->traverse_write = false;
        if (ret != 0) {
                return -1;
        }