r22041: merge trivial changes from samba3
[ira/wip.git] / source / lib / tdb / common / tdb.c
index f099c2d1aa26916a0a994cfab85d195d57be1e87..25103d826e5f0e9398e76f371ac9ef437b87a993 100644 (file)
 
 TDB_DATA tdb_null;
 
+/*
+  increment the tdb sequence number if the tdb has been opened using
+  the TDB_SEQNUM flag
+*/
+static void tdb_increment_seqnum(struct tdb_context *tdb)
+{
+       tdb_off_t seqnum=0;
+       
+       if (!(tdb->flags & TDB_SEQNUM)) {
+               return;
+       }
+
+       if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
+               return;
+       }
+
+       /* we ignore errors from this, as we have no sane way of
+          dealing with them.
+       */
+       tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
+       seqnum++;
+       tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
+
+       tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
+}
+
+static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
+{
+       return memcmp(data.dptr, key.dptr, data.dsize);
+}
+
 /* Returns 0 on fail.  On success, return offset of record, and fills
    in rec */
 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
@@ -46,19 +77,12 @@ static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
                if (tdb_rec_read(tdb, rec_ptr, r) == -1)
                        return 0;
 
-               if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
-                       unsigned char *k;
-                       /* a very likely hit - read the key */
-                       k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
-                                          r->key_len);
-                       if (!k)
-                               return 0;
-
-                       if (memcmp(key.dptr, k, key.dsize) == 0) {
-                               SAFE_FREE(k);
-                               return rec_ptr;
-                       }
-                       SAFE_FREE(k);
+               if (!TDB_DEAD(r) && hash==r->full_hash
+                   && key.dsize==r->key_len
+                   && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
+                                     r->key_len, tdb_key_compare,
+                                     NULL) == 0) {
+                       return rec_ptr;
                }
                rec_ptr = r->next;
        }
@@ -98,7 +122,7 @@ static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_
                return -1;
        }
 
-       if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
+       if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
                      dbuf.dptr, dbuf.dsize) == -1)
                return -1;
 
@@ -136,6 +160,47 @@ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
        return ret;
 }
 
+/*
+ * Find an entry in the database and hand the record's data to a parsing
+ * function. The parsing function is executed under the chain read lock, so it
+ * should be fast and should not block on other syscalls.
+ *
+ * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
+ *
+ * For mmapped tdb's that do not have a transaction open it points the parsing
+ * function directly at the mmap area, it avoids the malloc/memcpy in this
+ * case. If a transaction is open or no mmap is available, it has to do
+ * malloc/read/parse/free.
+ *
+ * This is interesting for all readers of potentially large data structures in
+ * the tdb records, ldb indexes being one example.
+ */
+
+int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
+                    int (*parser)(TDB_DATA key, TDB_DATA data,
+                                  void *private_data),
+                    void *private_data)
+{
+       tdb_off_t rec_ptr;
+       struct list_struct rec;
+       int ret;
+       u32 hash;
+
+       /* find which hash bucket it is in */
+       hash = tdb->hash_fn(&key);
+
+       if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
+               return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
+       }
+
+       ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
+                            rec.data_len, parser, private_data);
+
+       tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
+
+       return ret;
+}
+
 /* check if an entry in the database exists 
 
    note that 1 is returned if the key is found and 0 is returned if not found
@@ -164,7 +229,7 @@ int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct
        tdb_off_t last_ptr, i;
        struct list_struct lastrec;
 
-       if (tdb->read_only) return -1;
+       if (tdb->read_only || tdb->traverse_read) return -1;
 
        if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
                /* Someone traversing here: mark it as dead */
@@ -193,6 +258,66 @@ int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct
        return 0;
 }
 
+static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
+{
+       int res = 0;
+       tdb_off_t rec_ptr;
+       struct list_struct rec;
+       
+       /* read in the hash top */
+       if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
+               return 0;
+
+       while (rec_ptr) {
+               if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
+                       return 0;
+
+               if (rec.magic == TDB_DEAD_MAGIC) {
+                       res += 1;
+               }
+               rec_ptr = rec.next;
+       }
+       return res;
+}
+
+/*
+ * Purge all DEAD records from a hash chain
+ */
+static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
+{
+       int res = -1;
+       struct list_struct rec;
+       tdb_off_t rec_ptr;
+
+       if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
+               return -1;
+       }
+       
+       /* read in the hash top */
+       if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
+               goto fail;
+
+       while (rec_ptr) {
+               tdb_off_t next;
+
+               if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
+                       goto fail;
+               }
+
+               next = rec.next;
+
+               if (rec.magic == TDB_DEAD_MAGIC
+                   && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
+                       goto fail;
+               }
+               rec_ptr = next;
+       }
+       res = 0;
+ fail:
+       tdb_unlock(tdb, -1, F_WRLCK);
+       return res;
+}
+
 /* delete an entry in the database given a key */
 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
 {
@@ -200,11 +325,49 @@ static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
        struct list_struct rec;
        int ret;
 
-       if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
-               return -1;
-       ret = tdb_do_delete(tdb, rec_ptr, &rec);
+       if (tdb->max_dead_records != 0) {
+
+               /*
+                * Allow for some dead records per hash chain, mainly for
+                * tdb's with a very high create/delete rate like locking.tdb.
+                */
+
+               if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
+                       return -1;
+
+               if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
+                       /*
+                        * Don't let the per-chain freelist grow too large,
+                        * delete all existing dead records
+                        */
+                       tdb_purge_dead(tdb, hash);
+               }
+
+               if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
+                       tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
+                       return -1;
+               }
+
+               /*
+                * Just mark the record as dead.
+                */
+               rec.magic = TDB_DEAD_MAGIC;
+               ret = tdb_rec_write(tdb, rec_ptr, &rec);
+       }
+       else {
+               if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
+                                                  &rec)))
+                       return -1;
+
+               ret = tdb_do_delete(tdb, rec_ptr, &rec);
+       }
+
+       if (ret == 0) {
+               tdb_increment_seqnum(tdb);
+       }
+
        if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
-               TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
+               TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
        return ret;
 }
 
@@ -214,6 +377,35 @@ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
        return tdb_delete_hash(tdb, key, hash);
 }
 
+/*
+ * See if we have a dead record around with enough space
+ */
+static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
+                              struct list_struct *r, tdb_len_t length)
+{
+       tdb_off_t rec_ptr;
+       
+       /* read in the hash top */
+       if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
+               return 0;
+
+       /* keep looking until we find the right record */
+       while (rec_ptr) {
+               if (tdb_rec_read(tdb, rec_ptr, r) == -1)
+                       return 0;
+
+               if (TDB_DEAD(r) && r->rec_len >= length) {
+                       /*
+                        * First fit for simple coding, TODO: change to best
+                        * fit
+                        */
+                       return rec_ptr;
+               }
+               rec_ptr = r->next;
+       }
+       return 0;
+}
+
 /* store an element in the database, replacing any existing element
    with the same key 
 
@@ -225,7 +417,12 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        u32 hash;
        tdb_off_t rec_ptr;
        char *p = NULL;
-       int ret = 0;
+       int ret = -1;
+
+       if (tdb->read_only || tdb->traverse_read) {
+               tdb->ecode = TDB_ERR_RDONLY;
+               return -1;
+       }
 
        /* find which hash bucket it is in */
        hash = tdb->hash_fn(&key);
@@ -240,8 +437,9 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
                }
        } else {
                /* first try in-place update, on modify or replace. */
-               if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
-                       goto out;
+               if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
+                       goto done;
+               }
                if (tdb->ecode == TDB_ERR_NOEXIST &&
                    flag == TDB_MODIFY) {
                        /* if the record doesn't exist and we are in TDB_MODIFY mode then
@@ -270,9 +468,56 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        if (dbuf.dsize)
                memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
 
+       if (tdb->max_dead_records != 0) {
+               /*
+                * Allow for some dead records per hash chain, look if we can
+                * find one that can hold the new record. We need enough space
+                * for key, data and tailer. If we find one, we don't have to
+                * consult the central freelist.
+                */
+               rec_ptr = tdb_find_dead(
+                       tdb, hash, &rec,
+                       key.dsize + dbuf.dsize + sizeof(tdb_off_t));
+
+               if (rec_ptr != 0) {
+                       rec.key_len = key.dsize;
+                       rec.data_len = dbuf.dsize;
+                       rec.full_hash = hash;
+                       rec.magic = TDB_MAGIC;
+                       if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
+                           || tdb->methods->tdb_write(
+                                   tdb, rec_ptr + sizeof(rec),
+                                   p, key.dsize + dbuf.dsize) == -1) {
+                               goto fail;
+                       }
+                       goto done;
+               }
+       }
+
+       /*
+        * We have to allocate some space from the freelist, so this means we
+        * have to lock it. Use the chance to purge all the DEAD records from
+        * the hash chain under the freelist lock.
+        */
+
+       if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
+               goto fail;
+       }
+
+       if ((tdb->max_dead_records != 0)
+           && (tdb_purge_dead(tdb, hash) == -1)) {
+               tdb_unlock(tdb, -1, F_WRLCK);
+               goto fail;
+       }
+
        /* we have to allocate some space */
-       if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
+       rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
+
+       tdb_unlock(tdb, -1, F_WRLCK);
+
+       if (rec_ptr == 0) {
                goto fail;
+       }
 
        /* Read hash top into next ptr */
        if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
@@ -285,18 +530,22 @@ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
 
        /* write out and point the top of the hash chain at it */
        if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
-           || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
+           || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
            || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
                /* Need to tdb_unallocate() here */
                goto fail;
        }
- out:
+
+ done:
+       ret = 0;
+ fail:
+       if (ret == 0) {
+               tdb_increment_seqnum(tdb);
+       }
+
        SAFE_FREE(p); 
        tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
        return ret;
-fail:
-       ret = -1;
-       goto out;
 }
 
 
@@ -315,9 +564,10 @@ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
        dbuf = tdb_fetch(tdb, key);
 
        if (dbuf.dptr == NULL) {
-               dbuf.dptr = malloc(new_dbuf.dsize);
+               dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
        } else {
-               dbuf.dptr = realloc(dbuf.dptr, dbuf.dsize + new_dbuf.dsize);
+               dbuf.dptr = (unsigned char *)realloc(dbuf.dptr,
+                                                    dbuf.dsize + new_dbuf.dsize);
        }
 
        if (dbuf.dptr == NULL) {
@@ -362,5 +612,40 @@ int tdb_fd(struct tdb_context *tdb)
 */
 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
 {
-       return tdb->log_fn;
+       return tdb->log.log_fn;
+}
+
+
+/*
+  get the tdb sequence number. Only makes sense if the writers opened
+  with TDB_SEQNUM set. Note that this sequence number will wrap quite
+  quickly, so it should only be used for a 'has something changed'
+  test, not for code that relies on the count of the number of changes
+  made. If you want a counter then use a tdb record.
+
+  The aim of this sequence number is to allow for a very lightweight
+  test of a possible tdb change.
+*/
+int tdb_get_seqnum(struct tdb_context *tdb)
+{
+       tdb_off_t seqnum=0;
+
+       tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
+       return seqnum;
+}
+
+int tdb_hash_size(struct tdb_context *tdb)
+{
+       return tdb->header.hash_size;
+}
+
+size_t tdb_map_size(struct tdb_context *tdb)
+{
+       return tdb->map_size;
 }
+
+int tdb_get_flags(struct tdb_context *tdb)
+{
+       return tdb->flags;
+}
+