r4088: Get medieval on our ass about malloc.... :-). Take control of all our allocation
[tprouty/samba.git] / source / tdb / tdb.c
index 98caca82a100ba702cdbe6d03ad9ce2532058397..45895d2ec71bc4a84d53a7a40729415db385a553 100644 (file)
@@ -1,25 +1,51 @@
  /* 
    Unix SMB/CIFS implementation.
-   Samba database functions
-   Copyright (C) Andrew Tridgell              1999-2000
-   Copyright (C) Luke Kenneth Casson Leighton      2000
+
+   trivial database library
+
+   Copyright (C) Andrew Tridgell              1999-2004
    Copyright (C) Paul `Rusty' Russell             2000
-   Copyright (C) Jeremy Allison                           2000
+   Copyright (C) Jeremy Allison                           2000-2003
    
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 2 of the License, or
-   (at your option) any later version.
+     ** NOTE! The following LGPL license applies to the tdb
+     ** library. This does NOT imply that all of Samba is released
+     ** under the LGPL
    
-   This program is distributed in the hope that it will be useful,
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
    
-   You should have received a copy of the GNU General Public License
-   along with this program; if not, write to the Free Software
-   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
 */
+
+
+/* NOTE: If you use tdbs under valgrind, and in particular if you run
+ * tdbtorture, you may get spurious "uninitialized value" warnings.  I
+ * think this is because valgrind doesn't understand that the mmap'd
+ * area may be written to by other processes.  Memory can, from the
+ * point of view of the grinded process, spontaneously become
+ * initialized.
+ *
+ * I can think of a few solutions.  [mbp 20030311]
+ *
+ * 1 - Write suppressions for Valgrind so that it doesn't complain
+ * about this.  Probably the most reasonable but people need to
+ * remember to use them.
+ *
+ * 2 - Use IO not mmap when running under valgrind.  Not so nice.
+ *
+ * 3 - Use the special valgrind macros to mark memory as valid at the
+ * right time.  Probably too hard -- the process just doesn't know.
+ */ 
+
 #ifdef STANDALONE
 #if HAVE_CONFIG_H
 #include <config.h>
 #include <errno.h>
 #include <sys/mman.h>
 #include <sys/stat.h>
+#include <signal.h>
 #include "tdb.h"
 #include "spinlock.h"
 #else
 #include "includes.h"
+
+#if defined(PARANOID_MALLOC_CHECKER)
+#ifdef malloc
+#undef malloc
+#endif
+
+#ifdef realloc
+#undef realloc
+#endif
+
+#ifdef calloc
+#undef calloc
+#endif
+
+#ifdef strdup
+#undef strdup
+#endif
+
+#ifdef strndup
+#undef strndup
+#endif
+
+#endif
+
 #endif
 
 #define TDB_MAGIC_FOOD "TDB file\n"
 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
+#define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
+
 
 /* NB assumes there is a local variable called "tdb" that is the
  * current context, also takes doubly-parenthesized print-style
@@ -160,6 +213,18 @@ struct list_struct {
        */
 };
 
+/***************************************************************
+ Allow a caller to set a "alarm" flag that tdb can check to abort
+ a blocking lock on SIGALRM.
+***************************************************************/
+
+static sig_atomic_t *palarm_fired;
+
+void tdb_set_lock_alarm(sig_atomic_t *palarm)
+{
+       palarm_fired = palarm;
+}
+
 /* a byte range locking function - return 0 on success
    this functions locks/unlocks 1 byte at the specified offset.
 
@@ -169,10 +234,11 @@ static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset,
                      int rw_type, int lck_type, int probe)
 {
        struct flock fl;
+       int ret;
 
        if (tdb->flags & TDB_NOLOCK)
                return 0;
-       if (tdb->read_only) {
+       if ((rw_type == F_WRLCK) && (tdb->read_only)) {
                errno = EACCES;
                return -1;
        }
@@ -183,12 +249,36 @@ static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset,
        fl.l_len = 1;
        fl.l_pid = 0;
 
-       if (fcntl(tdb->fd,lck_type,&fl) == -1) {
-               if (!probe) {
+       do {
+               ret = fcntl(tdb->fd,lck_type,&fl);
+               if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
+                       break;
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               if (!probe && lck_type != F_SETLK) {
+                       /* Ensure error code is set for log fun to examine. */
+                       if (errno == EINTR && palarm_fired && *palarm_fired)
+                               tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
+                       else
+                               tdb->ecode = TDB_ERR_LOCK;
                        TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
                                 tdb->fd, offset, rw_type, lck_type));
                }
-               /* errno set by fcntl */
+               /* Was it an alarm timeout ? */
+               if (errno == EINTR && palarm_fired && *palarm_fired) {
+                       TDB_LOG((tdb, 5, "tdb_brlock timed out (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
+                                tdb->fd, offset, rw_type, lck_type));
+                       return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
+               }
+               /* Otherwise - generic lock error. errno set by fcntl.
+                * EAGAIN is an expected return from non-blocking
+                * locks. */
+               if (errno != EAGAIN) {
+                       TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
+                                tdb->fd, offset, rw_type, lck_type, 
+                                strerror(errno)));
+               }
                return TDB_ERRCODE(TDB_ERR_LOCK, -1);
        }
        return 0;
@@ -210,7 +300,7 @@ static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
        if (tdb->locked[list+1].count == 0) {
                if (!tdb->read_only && tdb->header.rwlocks) {
                        if (tdb_spinlock(tdb, list, ltype)) {
-                               TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
+                               TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list %d ltype=%d\n", 
                                           list, ltype));
                                return -1;
                        }
@@ -263,19 +353,6 @@ static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
        return ret;
 }
 
-/* This is based on the hash algorithm from gdbm */
-static u32 tdb_hash(TDB_DATA *key)
-{
-       u32 value;      /* Used to compute the hash value.  */
-       u32   i;        /* Used to cycle through random values. */
-
-       /* Set the initial value from the key size. */
-       for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
-               value = (value + (key->dptr[i] << (i*5 % 24)));
-
-       return (1103515243 * value + 12345);  
-}
-
 /* check for an out of bounds access - if it is out of bounds then
    see if the database has been expanded by someone else and expand
    if necessary 
@@ -288,6 +365,8 @@ static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
                return 0;
        if (tdb->flags & TDB_INTERNAL) {
                if (!probe) {
+                       /* Ensure ecode is set for log fn. */
+                       tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
                                 (int)len, (int)tdb->map_size));
                }
@@ -299,6 +378,8 @@ static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
 
        if (st.st_size < (size_t)len) {
                if (!probe) {
+                       /* Ensure ecode is set for log fn. */
+                       tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
                                 (int)len, (int)st.st_size));
                }
@@ -327,6 +408,8 @@ static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
        else if (lseek(tdb->fd, off, SEEK_SET) != off
                 || write(tdb->fd, buf, len) != (ssize_t)len) {
 #endif
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_IO;
                TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
                           off, len, strerror(errno)));
                return TDB_ERRCODE(TDB_ERR_IO, -1);
@@ -348,6 +431,8 @@ static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
        else if (lseek(tdb->fd, off, SEEK_SET) != off
                 || read(tdb->fd, buf, len) != (ssize_t)len) {
 #endif
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_IO;
                TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
                           off, len, strerror(errno)));
                return TDB_ERRCODE(TDB_ERR_IO, -1);
@@ -363,6 +448,8 @@ static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
        char *buf;
 
        if (!(buf = malloc(len))) {
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_OOM;
                TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
                           len, strerror(errno)));
                return TDB_ERRCODE(TDB_ERR_OOM, buf);
@@ -391,6 +478,8 @@ static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
        if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
                return -1;
        if (TDB_BAD_MAGIC(rec)) {
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_CORRUPT;
                TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
                return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
        }
@@ -407,7 +496,20 @@ static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
 {
        if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
                return -1;
+
+       if (rec->magic == TDB_MAGIC) {
+               /* this happens when a app is showdown while deleting a record - we should
+                  not completely fail when this happens */
+               TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
+                        rec->magic, off));
+               rec->magic = TDB_FREE_MAGIC;
+               if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
+                       return -1;
+       }
+
        if (rec->magic != TDB_FREE_MAGIC) {
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_CORRUPT;
                TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
                           rec->magic, off));
                return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
@@ -501,17 +603,20 @@ int tdb_printfreelist(TDB_CONTEXT *tdb)
 
        /* read in the freelist top */
        if (ofs_read(tdb, offset, &rec_ptr) == -1) {
+               tdb_unlock(tdb, -1, F_WRLCK);
                return 0;
        }
 
        printf("freelist top=[0x%08x]\n", rec_ptr );
        while (rec_ptr) {
                if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
+                       tdb_unlock(tdb, -1, F_WRLCK);
                        return -1;
                }
 
                if (rec.magic != TDB_FREE_MAGIC) {
                        printf("bad magic 0x%08x in free list\n", rec.magic);
+                       tdb_unlock(tdb, -1, F_WRLCK);
                        return -1;
                }
 
@@ -585,10 +690,10 @@ static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
 left:
        /* Look left */
        left = offset - sizeof(tdb_off);
-       if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
+       if (left > TDB_DATA_START(tdb->header.hash_size)) {
                struct list_struct l;
                tdb_off leftsize;
-
+               
                /* Read in tailer and jump back to header */
                if (ofs_read(tdb, left, &leftsize) == -1) {
                        TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
@@ -768,6 +873,8 @@ static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
        tdb_off rec_ptr, last_ptr, newrec_ptr;
        struct list_struct newrec;
 
+       memset(&newrec, '\0', sizeof(newrec));
+
        if (tdb_lock(tdb, -1, F_WRLCK) == -1)
                return 0;
 
@@ -924,27 +1031,12 @@ static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
        return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
 }
 
-/* If they do lockkeys, check that this hash is one they locked */
-static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
-{
-       u32 i;
-       if (!tdb->lockedkeys)
-               return 1;
-       for (i = 0; i < tdb->lockedkeys[0]; i++)
-               if (tdb->lockedkeys[i+1] == hash)
-                       return 1;
-       return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
-}
-
 /* As tdb_find, but if you succeed, keep the lock */
-static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
+static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
                             struct list_struct *rec)
 {
-       u32 hash, rec_ptr;
+       u32 rec_ptr;
 
-       hash = tdb_hash(&key);
-       if (!tdb_keylocked(tdb, hash))
-               return 0;
        if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
                return 0;
        if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
@@ -980,52 +1072,61 @@ const char *tdb_errorstr(TDB_CONTEXT *tdb)
 
 /* update an entry in place - this only works if the new data size
    is <= the old data size and the key exists.
-   on failure return -1
+   on failure return -1.
 */
-static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
+
+static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
 {
        struct list_struct rec;
        tdb_off rec_ptr;
-       int ret = -1;
 
        /* find entry */
-       if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
+       if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
                return -1;
 
        /* must be long enough key, data and tailer */
        if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
                tdb->ecode = TDB_SUCCESS; /* Not really an error */
-               goto out;
+               return -1;
        }
 
        if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
                      dbuf.dptr, dbuf.dsize) == -1)
-               goto out;
+               return -1;
 
        if (dbuf.dsize != rec.data_len) {
                /* update size */
                rec.data_len = dbuf.dsize;
-               ret = rec_write(tdb, rec_ptr, &rec);
-       } else
-               ret = 0;
- out:
-       tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
-       return ret;
+               return rec_write(tdb, rec_ptr, &rec);
+       }
+       return 0;
 }
 
 /* find an entry in the database given a key */
+/* If an entry doesn't exist tdb_err will be set to
+ * TDB_ERR_NOEXIST. If a key has no data attached
+ * tdb_err will not be set. Both will return a
+ * zero pptr and zero dsize.
+ */
+
 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
 {
        tdb_off rec_ptr;
        struct list_struct rec;
        TDB_DATA ret;
+       u32 hash;
 
        /* find which hash bucket it is in */
-       if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
+       hash = tdb->hash_fn(&key);
+       if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
                return tdb_null;
 
-       ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
-                                 rec.data_len);
+       if (rec.data_len)
+               ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
+                                         rec.data_len);
+       else
+               ret.dptr = NULL;
        ret.dsize = rec.data_len;
        tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
        return ret;
@@ -1037,16 +1138,22 @@ TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
    this doesn't match the conventions in the rest of this module, but is
    compatible with gdbm
 */
-int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
+static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
 {
        struct list_struct rec;
        
-       if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
+       if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
                return 0;
        tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
        return 1;
 }
 
+int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
+{
+       u32 hash = tdb->hash_fn(&key);
+       return tdb_exists_hash(tdb, key, hash);
+}
+
 /* record lock stops delete underneath */
 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
 {
@@ -1131,10 +1238,6 @@ static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
 {
        int want_next = (tlock->off != 0);
 
-       /* No traversal allows if you've called tdb_lockkeys() */
-       if (tdb->lockedkeys)
-               return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
-
        /* Lock each chain from the start one. */
        for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
                if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
@@ -1172,7 +1275,8 @@ static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
                        /* Try to clean dead ones from old traverses */
                        current = tlock->off;
                        tlock->off = rec->next;
-                       if (do_delete(tdb, current, rec) != 0)
+                       if (!tdb->read_only && 
+                           do_delete(tdb, current, rec) != 0)
                                goto fail;
                }
                tdb_unlock(tdb, tlock->hash, F_WRLCK);
@@ -1193,7 +1297,7 @@ static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
    if fn is NULL then it is not called
    a non-zero return value from fn() indicates that the traversal should stop
   */
-int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
+int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private)
 {
        TDB_DATA key, dbuf;
        struct list_struct rec;
@@ -1231,7 +1335,7 @@ int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
                        ret = -1;
                        goto out;
                }
-               if (fn && fn(tdb, key, dbuf, state)) {
+               if (fn && fn(tdb, key, dbuf, private)) {
                        /* They want us to terminate traversal */
                        ret = count;
                        if (unlock_record(tdb, tl.off) != 0) {
@@ -1302,7 +1406,7 @@ TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
 
        if (!tdb->travlocks.off) {
                /* No previous element: do normal find, and lock record */
-               tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
+               tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
                if (!tdb->travlocks.off)
                        return tdb_null;
                tdb->travlocks.hash = BUCKET(rec.full_hash);
@@ -1330,13 +1434,13 @@ TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
 }
 
 /* delete an entry in the database given a key */
-int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
+static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
 {
        tdb_off rec_ptr;
        struct list_struct rec;
        int ret;
 
-       if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
+       if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
                return -1;
        ret = do_delete(tdb, rec_ptr, &rec);
        if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
@@ -1344,6 +1448,12 @@ int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
        return ret;
 }
 
+int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
+{
+       u32 hash = tdb->hash_fn(&key);
+       return tdb_delete_hash(tdb, key, hash);
+}
+
 /* store an element in the database, replacing any existing element
    with the same key 
 
@@ -1358,25 +1468,27 @@ int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        int ret = 0;
 
        /* find which hash bucket it is in */
-       hash = tdb_hash(&key);
-       if (!tdb_keylocked(tdb, hash))
-               return -1;
+       hash = tdb->hash_fn(&key);
        if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
                return -1;
 
        /* check for it existing, on insert. */
        if (flag == TDB_INSERT) {
-               if (tdb_exists(tdb, key)) {
+               if (tdb_exists_hash(tdb, key, hash)) {
                        tdb->ecode = TDB_ERR_EXISTS;
                        goto fail;
                }
        } else {
                /* first try in-place update, on modify or replace. */
-               if (tdb_update(tdb, key, dbuf) == 0)
+               if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
                        goto out;
-               if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
+               if (tdb->ecode == TDB_ERR_NOEXIST &&
+                   flag == TDB_MODIFY) {
+                       /* if the record doesn't exist and we are in TDB_MODIFY mode then
+                        we should fail the store */
                        goto fail;
        }
+       }
        /* reset the error code potentially set by the tdb_update() */
        tdb->ecode = TDB_SUCCESS;
 
@@ -1384,7 +1496,7 @@ int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
            care.  Doing this first reduces fragmentation, and avoids
            coalescing with `allocated' block before it's updated. */
        if (flag != TDB_INSERT)
-               tdb_delete(tdb, key);
+               tdb_delete_hash(tdb, key, hash);
 
        /* Copy key+value *before* allocating free space in case malloc
           fails and we are left with a dead spot in the tdb. */
@@ -1395,11 +1507,10 @@ int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        }
 
        memcpy(p, key.dptr, key.dsize);
-       memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
+       if (dbuf.dsize)
+               memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
 
-       /* now we're into insert / modify / replace of a record which
-        * we know could not be optimised by an in-place store (for
-        * various reasons).  */
+       /* we have to allocate some space */
        if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
                goto fail;
 
@@ -1428,6 +1539,131 @@ fail:
        goto out;
 }
 
+/* Attempt to append data to an entry in place - this only works if the new data size
+   is <= the old data size and the key exists.
+   on failure return -1. Record must be locked before calling.
+*/
+static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
+{
+       struct list_struct rec;
+       tdb_off rec_ptr;
+
+       /* find entry */
+       if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
+               return -1;
+
+       /* Append of 0 is always ok. */
+       if (new_dbuf.dsize == 0)
+               return 0;
+
+       /* must be long enough for key, old data + new data and tailer */
+       if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
+               /* No room. */
+               tdb->ecode = TDB_SUCCESS; /* Not really an error */
+               return -1;
+       }
+
+       if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
+                     new_dbuf.dptr, new_dbuf.dsize) == -1)
+               return -1;
+
+       /* update size */
+       rec.data_len += new_dbuf.dsize;
+       return rec_write(tdb, rec_ptr, &rec);
+}
+
+/* Append to an entry. Create if not exist. */
+
+int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
+{
+       struct list_struct rec;
+       u32 hash;
+       tdb_off rec_ptr;
+       char *p = NULL;
+       int ret = 0;
+       size_t new_data_size = 0;
+
+       /* find which hash bucket it is in */
+       hash = tdb->hash_fn(&key);
+       if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
+               return -1;
+
+       /* first try in-place. */
+       if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
+               goto out;
+
+       /* reset the error code potentially set by the tdb_append_inplace() */
+       tdb->ecode = TDB_SUCCESS;
+
+       /* find entry */
+       if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
+               if (tdb->ecode != TDB_ERR_NOEXIST)
+                       goto fail;
+
+               /* Not found - create. */
+
+               ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
+               goto out;
+       }
+
+       new_data_size = rec.data_len + new_dbuf.dsize;
+
+       /* Copy key+old_value+value *before* allocating free space in case malloc
+          fails and we are left with a dead spot in the tdb. */
+
+       if (!(p = (char *)malloc(key.dsize + new_data_size))) {
+               tdb->ecode = TDB_ERR_OOM;
+               goto fail;
+       }
+
+       /* Copy the key in place. */
+       memcpy(p, key.dptr, key.dsize);
+
+       /* Now read the old data into place. */
+       if (rec.data_len &&
+               tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
+                       goto fail;
+
+       /* Finally append the new data. */
+       if (new_dbuf.dsize)
+               memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
+
+       /* delete any existing record - if it doesn't exist we don't
+           care.  Doing this first reduces fragmentation, and avoids
+           coalescing with `allocated' block before it's updated. */
+
+       tdb_delete_hash(tdb, key, hash);
+
+       if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
+               goto fail;
+
+       /* Read hash top into next ptr */
+       if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
+               goto fail;
+
+       rec.key_len = key.dsize;
+       rec.data_len = new_data_size;
+       rec.full_hash = hash;
+       rec.magic = TDB_MAGIC;
+
+       /* write out and point the top of the hash chain at it */
+       if (rec_write(tdb, rec_ptr, &rec) == -1
+           || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
+           || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
+               /* Need to tdb_unallocate() here */
+               goto fail;
+       }
+
+ out:
+       SAFE_FREE(p); 
+       tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
+       return ret;
+
+fail:
+       ret = -1;
+       goto out;
+}
+
 static int tdb_already_open(dev_t device,
                            ino_t ino)
 {
@@ -1442,6 +1678,19 @@ static int tdb_already_open(dev_t device,
        return 0;
 }
 
+/* This is based on the hash algorithm from gdbm */
+static u32 default_tdb_hash(TDB_DATA *key)
+{
+       u32 value;      /* Used to compute the hash value.  */
+       u32   i;        /* Used to cycle through random values. */
+
+       /* Set the initial value from the key size. */
+       for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
+               value = (value + (key->dptr[i] << (i*5 % 24)));
+
+       return (1103515243 * value + 12345);  
+}
+
 /* open the database, creating it if necessary 
 
    The open_flags and mode are passed straight to the open call on the
@@ -1455,17 +1704,20 @@ static int tdb_already_open(dev_t device,
 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
                      int open_flags, mode_t mode)
 {
-       return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
+       return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
 }
 
 
 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
                         int open_flags, mode_t mode,
-                        tdb_log_func log_fn)
+                        tdb_log_func log_fn,
+                        tdb_hash_func hash_fn)
 {
        TDB_CONTEXT *tdb;
        struct stat st;
-       int rev = 0, locked;
+       int rev = 0, locked = 0;
+       unsigned char *vp;
+       u32 vertest;
 
        if (!(tdb = calloc(1, sizeof *tdb))) {
                /* Can't log this */
@@ -1475,11 +1727,11 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        tdb->fd = -1;
        tdb->name = NULL;
        tdb->map_ptr = NULL;
-       tdb->lockedkeys = NULL;
        tdb->flags = tdb_flags;
        tdb->open_flags = open_flags;
        tdb->log_fn = log_fn;
-       
+       tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
+
        if ((open_flags & O_ACCMODE) == O_WRONLY) {
                TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
                         name));
@@ -1521,8 +1773,8 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        }
 
        /* we need to zero database if we are the only one with it open */
-       if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
-           && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
+       if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
+               (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
                open_flags |= O_CREAT;
                if (ftruncate(tdb->fd, 0) == -1) {
                        TDB_LOG((tdb, 0, "tdb_open_ex: "
@@ -1543,6 +1795,10 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
                }
                rev = (tdb->flags & TDB_CONVERT);
        }
+       vp = (unsigned char *)&tdb->header.version;
+       vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
+                 (((u32)vp[2]) << 8) | (u32)vp[3];
+       tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
        if (!rev)
                tdb->flags &= ~TDB_CONVERT;
        else {
@@ -1556,7 +1812,7 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        if (tdb_already_open(st.st_dev, st.st_ino)) {
                TDB_LOG((tdb, 2, "tdb_open_ex: "
                         "%s (%d,%d) is already open in this process\n",
-                        name, st.st_dev, st.st_ino));
+                        name, (int)st.st_dev, (int)st.st_ino));
                errno = EBUSY;
                goto fail;
        }
@@ -1591,10 +1847,19 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
                                 name, strerror(errno)));
                        goto fail;
                }
+
+       }
+
+       /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
+          we didn't get the initial exclusive lock as we need to let all other
+          users know we're using it. */
+
+       if (tdb_flags & TDB_CLEAR_IF_FIRST) {
+               /* leave this lock in place to indicate it's in use */
+               if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
+                       goto fail;
        }
-       /* leave this lock in place to indicate it's in use */
-       if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
-               goto fail;
+
 
  internal:
        /* Internal (memory-only) databases skip all the code above to
@@ -1629,7 +1894,11 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        }
 }
 
-/* close a database */
+/**
+ * Close a database.
+ *
+ * @returns -1 for error; 0 for success.
+ **/
 int tdb_close(TDB_CONTEXT *tdb)
 {
        TDB_CONTEXT **i;
@@ -1645,7 +1914,6 @@ int tdb_close(TDB_CONTEXT *tdb)
        if (tdb->fd != -1)
                ret = close(tdb->fd);
        SAFE_FREE(tdb->locked);
-       SAFE_FREE(tdb->lockedkeys);
 
        /* Remove from contexts list */
        for (i = &tdbs; *i; i = &(*i)->next) {
@@ -1669,8 +1937,6 @@ int tdb_lockall(TDB_CONTEXT *tdb)
        /* There are no locks on read-only dbs */
        if (tdb->read_only)
                return TDB_ERRCODE(TDB_ERR_LOCK, -1);
-       if (tdb->lockedkeys)
-               return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
        for (i = 0; i < tdb->header.hash_size; i++) 
                if (tdb_lock(tdb, i, F_WRLCK))
                        break;
@@ -1693,59 +1959,26 @@ void tdb_unlockall(TDB_CONTEXT *tdb)
                tdb_unlock(tdb, i, F_WRLCK);
 }
 
-int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
+/* lock/unlock one hash chain. This is meant to be used to reduce
+   contention - it cannot guarantee how many records will be locked */
+int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
 {
-       u32 i, j, hash;
-
-       /* Can't lock more keys if already locked */
-       if (tdb->lockedkeys)
-               return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
-       if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
-               return TDB_ERRCODE(TDB_ERR_OOM, -1);
-       /* First number in array is # keys */
-       tdb->lockedkeys[0] = number;
-
-       /* Insertion sort by bucket */
-       for (i = 0; i < number; i++) {
-               hash = tdb_hash(&keys[i]);
-               for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
-                       memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
-               tdb->lockedkeys[j+1] = hash;
-       }
-       /* Finally, lock in order */
-       for (i = 0; i < number; i++)
-               if (tdb_lock(tdb, i, F_WRLCK))
-                       break;
-
-       /* If error, release locks we have... */
-       if (i < number) {
-               for ( j = 0; j < i; j++)
-                       tdb_unlock(tdb, j, F_WRLCK);
-               SAFE_FREE(tdb->lockedkeys);
-               return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
-       }
-       return 0;
+       return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
 }
 
-/* Unlock the keys previously locked by tdb_lockkeys() */
-void tdb_unlockkeys(TDB_CONTEXT *tdb)
+int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
 {
-       u32 i;
-       for (i = 0; i < tdb->lockedkeys[0]; i++)
-               tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
-       SAFE_FREE(tdb->lockedkeys);
+       return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
 }
 
-/* lock/unlock one hash chain. This is meant to be used to reduce
-   contention - it cannot guarantee how many records will be locked */
-int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
+int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
 {
-       return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
+       return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
 }
 
-int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
+int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
 {
-       return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
+       return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
 }
 
 
@@ -1755,13 +1988,14 @@ void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , cons
        tdb->log_fn = fn;
 }
 
-
-/* reopen a tdb - this is used after a fork to ensure that we have an independent
+/* reopen a tdb - this can be used after a fork to ensure that we have an independent
    seek pointer from our parent and to re-establish locks */
 int tdb_reopen(TDB_CONTEXT *tdb)
 {
        struct stat st;
 
+       if (tdb->flags & TDB_INTERNAL)
+               return 0; /* Nothing to do. */
        if (tdb_munmap(tdb) != 0) {
                TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
                goto fail;
@@ -1782,7 +2016,7 @@ int tdb_reopen(TDB_CONTEXT *tdb)
                goto fail;
        }
        tdb_mmap(tdb);
-       if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
+       if ((tdb->flags & TDB_CLEAR_IF_FIRST) && (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
                TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
                goto fail;
        }
@@ -1800,7 +2034,10 @@ int tdb_reopen_all(void)
        TDB_CONTEXT *tdb;
 
        for (tdb=tdbs; tdb; tdb = tdb->next) {
-               if (tdb_reopen(tdb) != 0) return -1;
+               /* Ensure no clear-if-first. */
+               tdb->flags &= ~TDB_CLEAR_IF_FIRST;
+               if (tdb_reopen(tdb) != 0)
+                       return -1;
        }
 
        return 0;