trying to get HEAD building again. If you want the code
[gd/samba-autobuild/.git] / source3 / tdb / tdb.c
index e5f1b0a19b8f401af8d7ed42449b98911bbf6ba1..34681ea78f28fea90617ad1699fae9e37abd7eef 100644 (file)
@@ -4,7 +4,7 @@
    Copyright (C) Andrew Tridgell              1999-2000
    Copyright (C) Luke Kenneth Casson Leighton      2000
    Copyright (C) Paul `Rusty' Russell             2000
-   Copyright (C) Jeremy Allison                           2000
+   Copyright (C) Jeremy Allison                           2000-2003
    
    This program is free software; you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    along with this program; if not, write to the Free Software
    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
 */
+
+
+/* NOTE: If you use tdbs under valgrind, and in particular if you run
+ * tdbtorture, you may get spurious "uninitialized value" warnings.  I
+ * think this is because valgrind doesn't understand that the mmap'd
+ * area may be written to by other processes.  Memory can, from the
+ * point of view of the grinded process, spontaneously become
+ * initialized.
+ *
+ * I can think of a few solutions.  [mbp 20030311]
+ *
+ * 1 - Write suppressions for Valgrind so that it doesn't complain
+ * about this.  Probably the most reasonable but people need to
+ * remember to use them.
+ *
+ * 2 - Use IO not mmap when running under valgrind.  Not so nice.
+ *
+ * 3 - Use the special valgrind macros to mark memory as valid at the
+ * right time.  Probably too hard -- the process just doesn't know.
+ */ 
+
 #ifdef STANDALONE
 #if HAVE_CONFIG_H
 #include <config.h>
@@ -34,6 +55,7 @@
 #include <errno.h>
 #include <sys/mman.h>
 #include <sys/stat.h>
+#include <signal.h>
 #include "tdb.h"
 #include "spinlock.h"
 #else
@@ -84,16 +106,20 @@ TDB_DATA tdb_null;
 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
 static TDB_CONTEXT *tdbs = NULL;
 
-static void tdb_munmap(TDB_CONTEXT *tdb)
+static int tdb_munmap(TDB_CONTEXT *tdb)
 {
        if (tdb->flags & TDB_INTERNAL)
-               return;
+               return 0;
 
 #ifdef HAVE_MMAP
-       if (tdb->map_ptr)
-               munmap(tdb->map_ptr, tdb->map_size);
+       if (tdb->map_ptr) {
+               int ret = munmap(tdb->map_ptr, tdb->map_size);
+               if (ret != 0)
+                       return ret;
+       }
 #endif
        tdb->map_ptr = NULL;
+       return 0;
 }
 
 static void tdb_mmap(TDB_CONTEXT *tdb)
@@ -156,6 +182,18 @@ struct list_struct {
        */
 };
 
+/***************************************************************
+ Allow a caller to set a "alarm" flag that tdb can check to abort
+ a blocking lock on SIGALRM.
+***************************************************************/
+
+static sig_atomic_t *palarm_fired;
+
+void tdb_set_lock_alarm(sig_atomic_t *palarm)
+{
+       palarm_fired = palarm;
+}
+
 /* a byte range locking function - return 0 on success
    this functions locks/unlocks 1 byte at the specified offset.
 
@@ -165,10 +203,11 @@ static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset,
                      int rw_type, int lck_type, int probe)
 {
        struct flock fl;
+       int ret;
 
        if (tdb->flags & TDB_NOLOCK)
                return 0;
-       if (tdb->read_only) {
+       if ((rw_type == F_WRLCK) && (tdb->read_only)) {
                errno = EACCES;
                return -1;
        }
@@ -179,11 +218,26 @@ static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset,
        fl.l_len = 1;
        fl.l_pid = 0;
 
-       if (fcntl(tdb->fd,lck_type,&fl) == -1) {
-               if (!probe) {
+       do {
+               ret = fcntl(tdb->fd,lck_type,&fl);
+               if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
+                       break;
+       } while (ret == -1 && errno == EINTR);
+
+       if (ret == -1) {
+               if (!probe && lck_type != F_SETLK) {
+                       /* Ensure error code is set for log fun to examine. */
+                       if (errno == EINTR && palarm_fired && *palarm_fired)
+                               tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
+                       else
+                               tdb->ecode = TDB_ERR_LOCK;
                        TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
                                 tdb->fd, offset, rw_type, lck_type));
                }
+               /* Was it an alarm timeout ? */
+               if (errno == EINTR && palarm_fired && *palarm_fired)
+                       return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
+               /* Otherwise - generic lock error. */
                /* errno set by fcntl */
                return TDB_ERRCODE(TDB_ERR_LOCK, -1);
        }
@@ -222,25 +276,41 @@ static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
 }
 
 /* unlock the database: returns void because it's too late for errors. */
-static void tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
+       /* changed to return int it may be interesting to know there
+          has been an error  --simo */
+static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
 {
+       int ret = -1;
+
        if (tdb->flags & TDB_NOLOCK)
-               return;
+               return 0;
 
        /* Sanity checks */
-       if (list < -1 || list >= (int)tdb->header.hash_size)
-               return;
-       if (tdb->locked[list+1].count==0)
-               return;
+       if (list < -1 || list >= (int)tdb->header.hash_size) {
+               TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
+               return ret;
+       }
+
+       if (tdb->locked[list+1].count==0) {
+               TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
+               return ret;
+       }
 
        if (tdb->locked[list+1].count == 1) {
                /* Down to last nested lock: unlock underneath */
-               if (!tdb->read_only && tdb->header.rwlocks)
-                       tdb_spinunlock(tdb, list, ltype);
-               else
-                       tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
+               if (!tdb->read_only && tdb->header.rwlocks) {
+                       ret = tdb_spinunlock(tdb, list, ltype);
+               } else {
+                       ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
+               }
+       } else {
+               ret = 0;
        }
        tdb->locked[list+1].count--;
+
+       if (ret)
+               TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
+       return ret;
 }
 
 /* This is based on the hash algorithm from gdbm */
@@ -268,6 +338,8 @@ static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
                return 0;
        if (tdb->flags & TDB_INTERNAL) {
                if (!probe) {
+                       /* Ensure ecode is set for log fn. */
+                       tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
                                 (int)len, (int)tdb->map_size));
                }
@@ -279,6 +351,8 @@ static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
 
        if (st.st_size < (size_t)len) {
                if (!probe) {
+                       /* Ensure ecode is set for log fn. */
+                       tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
                                 (int)len, (int)st.st_size));
                }
@@ -286,7 +360,8 @@ static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
        }
 
        /* Unmap, update size, remap */
-       tdb_munmap(tdb);
+       if (tdb_munmap(tdb) == -1)
+               return TDB_ERRCODE(TDB_ERR_IO, -1);
        tdb->map_size = st.st_size;
        tdb_mmap(tdb);
        return 0;
@@ -306,6 +381,8 @@ static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
        else if (lseek(tdb->fd, off, SEEK_SET) != off
                 || write(tdb->fd, buf, len) != (ssize_t)len) {
 #endif
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_IO;
                TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
                           off, len, strerror(errno)));
                return TDB_ERRCODE(TDB_ERR_IO, -1);
@@ -327,6 +404,8 @@ static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
        else if (lseek(tdb->fd, off, SEEK_SET) != off
                 || read(tdb->fd, buf, len) != (ssize_t)len) {
 #endif
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_IO;
                TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
                           off, len, strerror(errno)));
                return TDB_ERRCODE(TDB_ERR_IO, -1);
@@ -342,6 +421,8 @@ static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
        char *buf;
 
        if (!(buf = malloc(len))) {
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_OOM;
                TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
                           len, strerror(errno)));
                return TDB_ERRCODE(TDB_ERR_OOM, buf);
@@ -370,6 +451,8 @@ static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
        if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
                return -1;
        if (TDB_BAD_MAGIC(rec)) {
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_CORRUPT;
                TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
                return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
        }
@@ -386,7 +469,20 @@ static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
 {
        if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
                return -1;
+
+       if (rec->magic == TDB_MAGIC) {
+               /* this happens when a app is showdown while deleting a record - we should
+                  not completely fail when this happens */
+               TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
+                        rec->magic, off));
+               rec->magic = TDB_FREE_MAGIC;
+               if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
+                       return -1;
+       }
+
        if (rec->magic != TDB_FREE_MAGIC) {
+               /* Ensure ecode is set for log fn. */
+               tdb->ecode = TDB_ERR_CORRUPT;
                TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
                           rec->magic, off));
                return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
@@ -434,18 +530,17 @@ static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
        return rec.next;
 }
 
-static void tdb_dump_chain(TDB_CONTEXT *tdb, int i)
+static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
 {
        tdb_off rec_ptr, top;
 
        top = TDB_HASH_TOP(i);
 
-       tdb_lock(tdb, i, F_WRLCK);
+       if (tdb_lock(tdb, i, F_WRLCK) != 0)
+               return -1;
 
-       if (ofs_read(tdb, top, &rec_ptr) == -1) {
-               tdb_unlock(tdb, i, F_WRLCK);
-               return;
-       }
+       if (ofs_read(tdb, top, &rec_ptr) == -1)
+               return tdb_unlock(tdb, i, F_WRLCK);
 
        if (rec_ptr)
                printf("hash=%d\n", i);
@@ -453,7 +548,8 @@ static void tdb_dump_chain(TDB_CONTEXT *tdb, int i)
        while (rec_ptr) {
                rec_ptr = tdb_dump_record(tdb, rec_ptr);
        }
-       tdb_unlock(tdb, i, F_WRLCK);
+
+       return tdb_unlock(tdb, i, F_WRLCK);
 }
 
 void tdb_dump_all(TDB_CONTEXT *tdb)
@@ -466,30 +562,35 @@ void tdb_dump_all(TDB_CONTEXT *tdb)
        tdb_dump_chain(tdb, -1);
 }
 
-void tdb_printfreelist(TDB_CONTEXT *tdb)
+int tdb_printfreelist(TDB_CONTEXT *tdb)
 {
+       int ret;
        long total_free = 0;
        tdb_off offset, rec_ptr;
        struct list_struct rec;
 
-       tdb_lock(tdb, -1, F_WRLCK);
+       if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
+               return ret;
 
        offset = FREELIST_TOP;
 
        /* read in the freelist top */
        if (ofs_read(tdb, offset, &rec_ptr) == -1) {
-               return;
+               tdb_unlock(tdb, -1, F_WRLCK);
+               return 0;
        }
 
        printf("freelist top=[0x%08x]\n", rec_ptr );
        while (rec_ptr) {
                if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
-                       return;
+                       tdb_unlock(tdb, -1, F_WRLCK);
+                       return -1;
                }
 
                if (rec.magic != TDB_FREE_MAGIC) {
                        printf("bad magic 0x%08x in free list\n", rec.magic);
-                       return;
+                       tdb_unlock(tdb, -1, F_WRLCK);
+                       return -1;
                }
 
                printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
@@ -501,7 +602,7 @@ void tdb_printfreelist(TDB_CONTEXT *tdb)
        printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
                (int)total_free);
 
-       tdb_unlock(tdb, -1, F_WRLCK);
+       return tdb_unlock(tdb, -1, F_WRLCK);
 }
 
 /* Remove an element from the freelist.  Must have alloc lock. */
@@ -534,7 +635,10 @@ static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
                return -1;
 
        /* set an initial tailer, so if we fail we don't leave a bogus record */
-       update_tailer(tdb, offset, rec);
+       if (update_tailer(tdb, offset, rec) != 0) {
+               TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
+               goto fail;
+       }
 
        /* Look right first (I'm an Australian, dammit) */
        right = offset + sizeof(*rec) + rec->rec_len;
@@ -954,40 +1058,44 @@ const char *tdb_errorstr(TDB_CONTEXT *tdb)
 
 /* update an entry in place - this only works if the new data size
    is <= the old data size and the key exists.
-   on failure return -1
+   on failure return -1.
 */
+
 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
 {
        struct list_struct rec;
        tdb_off rec_ptr;
-       int ret = -1;
 
        /* find entry */
-       if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
+       if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
                return -1;
 
        /* must be long enough key, data and tailer */
        if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
                tdb->ecode = TDB_SUCCESS; /* Not really an error */
-               goto out;
+               return -1;
        }
 
        if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
                      dbuf.dptr, dbuf.dsize) == -1)
-               goto out;
+               return -1;
 
        if (dbuf.dsize != rec.data_len) {
                /* update size */
                rec.data_len = dbuf.dsize;
-               ret = rec_write(tdb, rec_ptr, &rec);
-       } else
-               ret = 0;
- out:
-       tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
-       return ret;
+               return rec_write(tdb, rec_ptr, &rec);
+       }
+       return 0;
 }
 
 /* find an entry in the database given a key */
+/* If an entry doesn't exist tdb_err will be set to
+ * TDB_ERR_NOEXIST. If a key has no data attached
+ * tdb_err will not be set. Both will return a
+ * zero pptr and zero dsize.
+ */
+
 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
 {
        tdb_off rec_ptr;
@@ -998,8 +1106,11 @@ TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
        if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
                return tdb_null;
 
-       ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
-                                 rec.data_len);
+       if (rec.data_len)
+               ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
+                                         rec.data_len);
+       else
+               ret.dptr = NULL;
        ret.dsize = rec.data_len;
        tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
        return ret;
@@ -1077,7 +1188,8 @@ static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
                rec->magic = TDB_DEAD_MAGIC;
                return rec_write(tdb, rec_ptr, rec);
        }
-       write_unlock_record(tdb, rec_ptr);
+       if (write_unlock_record(tdb, rec_ptr) != 0)
+               return -1;
 
        /* find previous record in hash chain */
        if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
@@ -1120,7 +1232,8 @@ static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
                                goto fail;
                } else {
                        /* Otherwise unlock the previous record. */
-                       unlock_record(tdb, tlock->off);
+                       if (unlock_record(tdb, tlock->off) != 0)
+                               goto fail;
                }
 
                if (want_next) {
@@ -1137,13 +1250,16 @@ static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
                                goto fail;
                        if (!TDB_DEAD(rec)) {
                                /* Woohoo: we found one! */
-                               lock_record(tdb, tlock->off);
+                               if (lock_record(tdb, tlock->off) != 0)
+                                       goto fail;
                                return tlock->off;
                        }
                        /* Try to clean dead ones from old traverses */
                        current = tlock->off;
                        tlock->off = rec->next;
-                       do_delete(tdb, current, rec);
+                       if (!tdb->read_only && 
+                           do_delete(tdb, current, rec) != 0)
+                               goto fail;
                }
                tdb_unlock(tdb, tlock->hash, F_WRLCK);
                want_next = 0;
@@ -1153,7 +1269,8 @@ static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
 
  fail:
        tlock->off = 0;
-       tdb_unlock(tdb, tlock->hash, F_WRLCK);
+       if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
+               TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
        return -1;
 }
 
@@ -1184,26 +1301,36 @@ int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
                key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
                                          rec.key_len + rec.data_len);
                if (!key.dptr) {
-                       tdb_unlock(tdb, tl.hash, F_WRLCK);
-                       unlock_record(tdb, tl.off);
-                       tdb->travlocks.next = tl.next;
-                       return -1;
+                       ret = -1;
+                       if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
+                               goto out;
+                       if (unlock_record(tdb, tl.off) != 0)
+                               TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
+                       goto out;
                }
                key.dsize = rec.key_len;
                dbuf.dptr = key.dptr + rec.key_len;
                dbuf.dsize = rec.data_len;
 
                /* Drop chain lock, call out */
-               tdb_unlock(tdb, tl.hash, F_WRLCK);
+               if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
+                       ret = -1;
+                       goto out;
+               }
                if (fn && fn(tdb, key, dbuf, state)) {
                        /* They want us to terminate traversal */
-                       unlock_record(tdb, tl.off);
+                       ret = count;
+                       if (unlock_record(tdb, tl.off) != 0) {
+                               TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
+                               ret = -1;
+                       }
                        tdb->travlocks.next = tl.next;
                        SAFE_FREE(key.dptr);
                        return count;
                }
                SAFE_FREE(key.dptr);
        }
+out:
        tdb->travlocks.next = tl.next;
        if (ret < 0)
                return -1;
@@ -1218,7 +1345,8 @@ TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
        struct list_struct rec;
 
        /* release any old lock */
-       unlock_record(tdb, tdb->travlocks.off);
+       if (unlock_record(tdb, tdb->travlocks.off) != 0)
+               return tdb_null;
        tdb->travlocks.off = tdb->travlocks.hash = 0;
 
        if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
@@ -1226,7 +1354,8 @@ TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
        /* now read the key */
        key.dsize = rec.key_len;
        key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
-       tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK);
+       if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
+               TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
        return key;
 }
 
@@ -1247,8 +1376,10 @@ TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
                                            rec.key_len))
                    || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
                        /* No, it wasn't: unlock it and start from scratch */
-                       unlock_record(tdb, tdb->travlocks.off);
-                       tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
+                       if (unlock_record(tdb, tdb->travlocks.off) != 0)
+                               return tdb_null;
+                       if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
+                               return tdb_null;
                        tdb->travlocks.off = 0;
                }
 
@@ -1261,7 +1392,10 @@ TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
                if (!tdb->travlocks.off)
                        return tdb_null;
                tdb->travlocks.hash = BUCKET(rec.full_hash);
-               lock_record(tdb, tdb->travlocks.off);
+               if (lock_record(tdb, tdb->travlocks.off) != 0) {
+                       TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
+                       return tdb_null;
+               }
        }
        oldhash = tdb->travlocks.hash;
 
@@ -1272,10 +1406,12 @@ TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
                key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
                                          key.dsize);
                /* Unlock the chain of this new record */
-               tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
+               if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
+                       TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
        }
        /* Unlock the chain of old record */
-       tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK);
+       if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
+               TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
        return key;
 }
 
@@ -1289,7 +1425,8 @@ int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
        if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
                return -1;
        ret = do_delete(tdb, rec_ptr, &rec);
-       tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
+       if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
+               TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
        return ret;
 }
 
@@ -1344,7 +1481,8 @@ int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        }
 
        memcpy(p, key.dptr, key.dsize);
-       memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
+       if (dbuf.dsize)
+               memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
 
        /* now we're into insert / modify / replace of a record which
         * we know could not be optimised by an in-place store (for
@@ -1365,14 +1503,143 @@ int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
        if (rec_write(tdb, rec_ptr, &rec) == -1
            || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
            || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
-       fail:
                /* Need to tdb_unallocate() here */
-               ret = -1;
+               goto fail;
+       }
+ out:
+       SAFE_FREE(p); 
+       tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
+       return ret;
+fail:
+       ret = -1;
+       goto out;
+}
+
+/* Attempt to append data to an entry in place - this only works if the new data size
+   is <= the old data size and the key exists.
+   on failure return -1. Record must be locked before calling.
+*/
+static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
+{
+       struct list_struct rec;
+       tdb_off rec_ptr;
+
+       /* find entry */
+       if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
+               return -1;
+
+       /* Append of 0 is always ok. */
+       if (new_dbuf.dsize == 0)
+               return 0;
+
+       /* must be long enough for key, old data + new data and tailer */
+       if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
+               /* No room. */
+               tdb->ecode = TDB_SUCCESS; /* Not really an error */
+               return -1;
+       }
+
+       if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
+                     new_dbuf.dptr, new_dbuf.dsize) == -1)
+               return -1;
+
+       /* update size */
+       rec.data_len += new_dbuf.dsize;
+       return rec_write(tdb, rec_ptr, &rec);
+}
+
+/* Append to an entry. Create if not exist. */
+
+int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
+{
+       struct list_struct rec;
+       u32 hash;
+       tdb_off rec_ptr;
+       char *p = NULL;
+       int ret = 0;
+       size_t new_data_size = 0;
+
+       /* find which hash bucket it is in */
+       hash = tdb_hash(&key);
+       if (!tdb_keylocked(tdb, hash))
+               return -1;
+       if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
+               return -1;
+
+       /* first try in-place. */
+       if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
+               goto out;
+
+       /* reset the error code potentially set by the tdb_append_inplace() */
+       tdb->ecode = TDB_SUCCESS;
+
+       /* find entry */
+       if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
+               if (tdb->ecode != TDB_ERR_NOEXIST)
+                       goto fail;
+
+               /* Not found - create. */
+
+               ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
+               goto out;
+       }
+
+       new_data_size = rec.data_len + new_dbuf.dsize;
+
+       /* Copy key+old_value+value *before* allocating free space in case malloc
+          fails and we are left with a dead spot in the tdb. */
+
+       if (!(p = (char *)malloc(key.dsize + new_data_size))) {
+               tdb->ecode = TDB_ERR_OOM;
+               goto fail;
+       }
+
+       /* Copy the key in place. */
+       memcpy(p, key.dptr, key.dsize);
+
+       /* Now read the old data into place. */
+       if (rec.data_len &&
+               tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
+                       goto fail;
+
+       /* Finally append the new data. */
+       if (new_dbuf.dsize)
+               memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
+
+       /* delete any existing record - if it doesn't exist we don't
+           care.  Doing this first reduces fragmentation, and avoids
+           coalescing with `allocated' block before it's updated. */
+
+       tdb_delete(tdb, key);
+
+       if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
+               goto fail;
+
+       /* Read hash top into next ptr */
+       if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
+               goto fail;
+
+       rec.key_len = key.dsize;
+       rec.data_len = new_data_size;
+       rec.full_hash = hash;
+       rec.magic = TDB_MAGIC;
+
+       /* write out and point the top of the hash chain at it */
+       if (rec_write(tdb, rec_ptr, &rec) == -1
+           || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
+           || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
+               /* Need to tdb_unallocate() here */
+               goto fail;
        }
+
  out:
        SAFE_FREE(p); 
        tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
        return ret;
+
+fail:
+       ret = -1;
+       goto out;
 }
 
 static int tdb_already_open(dev_t device,
@@ -1413,6 +1680,8 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        TDB_CONTEXT *tdb;
        struct stat st;
        int rev = 0, locked;
+       unsigned char *vp;
+       u32 vertest;
 
        if (!(tdb = calloc(1, sizeof *tdb))) {
                /* Can't log this */
@@ -1447,7 +1716,10 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        if (tdb->flags & TDB_INTERNAL) {
                tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
                tdb->flags &= ~TDB_CLEAR_IF_FIRST;
-               tdb_new_database(tdb, hash_size);
+               if (tdb_new_database(tdb, hash_size) != 0) {
+                       TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
+                       goto fail;
+               }
                goto internal;
        }
 
@@ -1487,6 +1759,10 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
                }
                rev = (tdb->flags & TDB_CONVERT);
        }
+       vp = (unsigned char *)&tdb->header.version;
+       vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
+                 (((u32)vp[2]) << 8) | (u32)vp[3];
+       tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
        if (!rev)
                tdb->flags &= ~TDB_CONVERT;
        else {
@@ -1524,7 +1800,11 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        tdb_mmap(tdb);
        if (locked) {
                if (!tdb->read_only)
-                       tdb_clear_spinlocks(tdb);
+                       if (tdb_clear_spinlocks(tdb) != 0) {
+                               TDB_LOG((tdb, 0, "tdb_open_ex: "
+                               "failed to clear spinlock\n"));
+                               goto fail;
+                       }
                if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
                        TDB_LOG((tdb, 0, "tdb_open_ex: "
                                 "failed to take ACTIVE_LOCK on %s: %s\n",
@@ -1560,7 +1840,8 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        }
        SAFE_FREE(tdb->name);
        if (tdb->fd != -1)
-               close(tdb->fd);
+               if (close(tdb->fd) != 0)
+                       TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
        SAFE_FREE(tdb->locked);
        SAFE_FREE(tdb);
        errno = save_errno;
@@ -1568,7 +1849,11 @@ TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
        }
 }
 
-/* close a database */
+/**
+ * Close a database.
+ *
+ * @returns -1 for error; 0 for success.
+ **/
 int tdb_close(TDB_CONTEXT *tdb)
 {
        TDB_CONTEXT **i;
@@ -1670,6 +1955,8 @@ int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
 void tdb_unlockkeys(TDB_CONTEXT *tdb)
 {
        u32 i;
+       if (!tdb->lockedkeys)
+               return;
        for (i = 0; i < tdb->lockedkeys[0]; i++)
                tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
        SAFE_FREE(tdb->lockedkeys);
@@ -1681,9 +1968,20 @@ int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
 {
        return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
 }
-void tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
+
+int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
+{
+       return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
+}
+
+int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
+{
+       return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
+}
+
+int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
 {
-       tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
+       return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
 }
 
 
@@ -1700,14 +1998,21 @@ int tdb_reopen(TDB_CONTEXT *tdb)
 {
        struct stat st;
 
-       tdb_munmap(tdb);
-       close(tdb->fd);
+       if (tdb_munmap(tdb) != 0) {
+               TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
+               goto fail;
+       }
+       if (close(tdb->fd) != 0)
+               TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
        tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
        if (tdb->fd == -1) {
                TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
                goto fail;
        }
-       fstat(tdb->fd, &st);
+       if (fstat(tdb->fd, &st) != 0) {
+               TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
+               goto fail;
+       }
        if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
                TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
                goto fail;