r21722: Add the dead record functionality presented on samba-technical@samba.org. If
authorVolker Lendecke <vlendec@samba.org>
Tue, 6 Mar 2007 10:11:15 +0000 (10:11 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 19:49:18 +0000 (14:49 -0500)
you do a tdb_set_max_dead(tdb, n), then for this tdb a delete operation will
only mark a record as dead and re-use it if a new record is created. The
parameter n allows for at most n dead records per hash chain. If this number
is exceeded, all dead records are put on the central freelist.

Volker
(This used to be commit 98a27ab28a3cd554e370a9a0e3652f4dea8749e9)

source4/lib/tdb/common/open.c
source4/lib/tdb/common/tdb.c
source4/lib/tdb/common/tdb_private.h
source4/lib/tdb/include/tdb.h

index 3d6e222b96915055c73d96ee78c3d4cef5527950..47e3258c09fc07a001feb475f3722a75b579d1f5 100644 (file)
@@ -263,6 +263,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        tdb->map_size = st.st_size;
        tdb->device = st.st_dev;
        tdb->inode = st.st_ino;
+       tdb->max_dead_records = 0;
        tdb_mmap(tdb);
        if (locked) {
                if (tdb->methods->tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0, 1) == -1) {
@@ -321,6 +322,15 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        }
 }
 
+/*
+ * Set the maximum number of dead records per hash chain
+ */
+
+void tdb_set_max_dead(struct tdb_context *tdb, int max_dead)
+{
+       tdb->max_dead_records = max_dead;
+}
+
 /**
  * Close a database.
  *
index 03a66804b3fdd40bb7c03ed22c4e8e5f6110ab03..a6b472ae94f135f9841ae6b76cf42f4ddce2589b 100644 (file)
@@ -258,6 +258,66 @@ int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct
        return 0;
 }
 
+static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
+{
+       int res = 0;
+       tdb_off_t rec_ptr;
+       struct list_struct rec;
+       
+       /* read in the hash top */
+       if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
+               return 0;
+
+       while (rec_ptr) {
+               if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
+                       return 0;
+
+               if (rec.magic == TDB_DEAD_MAGIC) {
+                       res += 1;
+               }
+               rec_ptr = rec.next;
+       }
+       return res;
+}
+
+/*
+ * Purge all DEAD records from a hash chain
+ */
+static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
+{
+       int res = -1;
+       struct list_struct rec;
+       tdb_off_t rec_ptr;
+
+       if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
+               return -1;
+       }
+       
+       /* read in the hash top */
+       if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
+               goto fail;
+
+       while (rec_ptr) {
+               tdb_off_t next;
+
+               if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
+                       goto fail;
+               }
+
+               next = rec.next;
+
+               if (rec.magic == TDB_DEAD_MAGIC
+                   && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
+                       goto fail;
+               }
+               rec_ptr = next;
+       }
+       res = 0;
+ fail:
+       tdb_unlock(tdb, -1, F_WRLCK);
+       return res;
+}
+
 /* delete an entry in the database given a key */
 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
 {
@@ -265,9 +325,42 @@ static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
        struct list_struct rec;
        int ret;
 
-       if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
-               return -1;
-       ret = tdb_do_delete(tdb, rec_ptr, &rec);
+       if (tdb->max_dead_records != 0) {
+
+               /*
+                * Allow for some dead records per hash chain, mainly for
+                * tdb's with a very high create/delete rate like locking.tdb.
+                */
+
+               if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
+                       return -1;
+
+               if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
+                       /*
+                        * Don't let the per-chain freelist grow too large,
+                        * delete all existing dead records
+                        */
+                       tdb_purge_dead(tdb, hash);
+               }
+
+               if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
+                       tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
+                       return -1;
+               }
+
+               /*
+                * Just mark the record as dead.
+                */
+               rec.magic = TDB_DEAD_MAGIC;
+               ret = tdb_rec_write(tdb, rec_ptr, &rec);
+       }
+       else {
+               if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
+                                                  &rec)))
+                       return -1;
+
+               ret = tdb_do_delete(tdb, rec_ptr, &rec);
+       }
 
        if (ret == 0) {
                tdb_increment_seqnum(tdb);
@@ -284,6 +377,35 @@ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
        return tdb_delete_hash(tdb, key, hash);
 }
 
+/*
+ * See if we have a dead record around with enough space
+ */
+static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
+                              struct list_struct *r, tdb_len_t length)
+{
+       tdb_off_t rec_ptr;
+       
+       /* read in the hash top */
+       if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
+               return 0;
+
+       /* keep looking until we find the right record */
+       while (rec_ptr) {
+               if (tdb_rec_read(tdb, rec_ptr, r) == -1)
+                       return 0;
+
+               if (TDB_DEAD(r) && r->rec_len >= length) {
+                       /*
+                        * First fit for simple coding, TODO: change to best
+                        * fit
+                        */
+                       return rec_ptr;
+               }
+               rec_ptr = r->next;
+       }
+       return 0;
+}
+
 /* store an element in the database, replacing any existing element
    with the same key 
 
@@ -316,8 +438,7 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        } else {
                /* first try in-place update, on modify or replace. */
                if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
-                       ret = 0;
-                       goto fail; /* Well, not really failed */
+                       goto done;
                }
                if (tdb->ecode == TDB_ERR_NOEXIST &&
                    flag == TDB_MODIFY) {
@@ -347,9 +468,56 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        if (dbuf.dsize)
                memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
 
+       if (tdb->max_dead_records != 0) {
+               /*
+                * Allow for some dead records per hash chain, look if we can
+                * find one that can hold the new record. We need enough space
+                * for key, data and tailer. If we find one, we don't have to
+                * consult the central freelist.
+                */
+               rec_ptr = tdb_find_dead(
+                       tdb, hash, &rec,
+                       key.dsize + dbuf.dsize + sizeof(tdb_off_t));
+
+               if (rec_ptr != 0) {
+                       rec.key_len = key.dsize;
+                       rec.data_len = dbuf.dsize;
+                       rec.full_hash = hash;
+                       rec.magic = TDB_MAGIC;
+                       if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
+                           || tdb->methods->tdb_write(
+                                   tdb, rec_ptr + sizeof(rec),
+                                   p, key.dsize + dbuf.dsize) == -1) {
+                               goto fail;
+                       }
+                       goto done;
+               }
+       }
+
+       /*
+        * We have to allocate some space from the freelist, so this means we
+        * have to lock it. Use the chance to purge all the DEAD records from
+        * the hash chain under the freelist lock.
+        */
+
+       if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
+               goto fail;
+       }
+
+       if ((tdb->max_dead_records != 0)
+           && (tdb_purge_dead(tdb, hash) == -1)) {
+               tdb_unlock(tdb, -1, F_WRLCK);
+               goto fail;
+       }
+
        /* we have to allocate some space */
-       if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
+       rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
+
+       tdb_unlock(tdb, -1, F_WRLCK);
+
+       if (rec_ptr == 0) {
                goto fail;
+       }
 
        /* Read hash top into next ptr */
        if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
@@ -368,6 +536,7 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
                goto fail;
        }
 
+ done:
        ret = 0;
  fail:
        if (ret == 0) {
index a05f4da0cfd20a4c88303b8ac6339e7d7683ab29..7219a1a2e2c2173da37f51e6b5f38253d91fcc8f 100644 (file)
@@ -169,6 +169,7 @@ struct tdb_context {
        const struct tdb_methods *methods;
        struct tdb_transaction *transaction;
        int page_size;
+       int max_dead_records;
 };
 
 
index 07a8160f1039fa568116146322a4a2a0b8ec362c..e54c9b8da271f893b001ada51d56eae58c4a56b7 100644 (file)
@@ -94,6 +94,7 @@ struct tdb_context *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
                         int open_flags, mode_t mode,
                         const struct tdb_logging_context *log_ctx,
                         tdb_hash_func hash_fn);
+void tdb_set_max_dead(struct tdb_context *tdb, int max_dead);
 
 int tdb_reopen(struct tdb_context *tdb);
 int tdb_reopen_all(int parent_longlived);