tdb: Improve debugging when the allrecord lock fails to upgrade
[ambi/samba-autobuild/.git] / lib / tdb / common / transaction.c
index e72eff5b1f94d7da96d4ee20bc21e31b93038737..4d08fee6c25e61c61c783d08fc64c611fab51119 100644 (file)
@@ -1,4 +1,4 @@
- /* 
+ /*
    Unix SMB/CIFS implementation.
 
    trivial database library
@@ -8,7 +8,7 @@
      ** NOTE! The following LGPL license applies to the tdb
      ** library. This does NOT imply that all of Samba is released
      ** under the LGPL
-   
+
    This library is free software; you can redistribute it and/or
    modify it under the terms of the GNU Lesser General Public
    License as published by the Free Software Foundation; either
@@ -59,7 +59,7 @@
   - allow for nested calls to tdb_transaction_start(), re-using the
     existing transaction record. If the inner transaction is cancelled
     then a subsequent commit will fail
+
   - keep a mirrored copy of the tdb hash chain heads to allow for the
     fast hash heads scan on traverse, updating the mirrored copy in
     the transaction version of tdb_write
@@ -82,8 +82,9 @@
     intervention.
 
   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
-    still available, but no transaction recovery area is used and no
-    fsync/msync calls are made.
+    still available, but no fsync/msync calls are made.  This means we
+    are still proof against a process dying during transaction commit,
+    but not against machine reboot.
 
   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
     tdb_add_flags() transaction nesting is enabled.
@@ -135,14 +136,11 @@ struct tdb_transaction {
        bool prepared;
        tdb_off_t magic_offset;
 
-       /* set when the OPEN_LOCK has been taken */
-       bool open_lock_taken;
-
        /* old file size before transaction */
        tdb_len_t old_map_size;
 
-       /* we should re-pack on commit */
-       bool need_repack;
+       /* did we expand in this transaction */
+       bool expanded;
 };
 
 
@@ -150,7 +148,7 @@ struct tdb_transaction {
   read while in a transaction. We need to check first if the data is in our list
   of transaction elements, then if not do a real read
 */
-static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
+static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
                            tdb_len_t len, int cv)
 {
        uint32_t blk;
@@ -188,7 +186,7 @@ static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
                        goto fail;
                }
        }
-       
+
        /* now copy it out of this block */
        memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
        if (cv) {
@@ -197,7 +195,7 @@ static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
        return 0;
 
 fail:
-       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
+       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
        tdb->ecode = TDB_ERR_IO;
        tdb->transaction->transaction_error = 1;
        return -1;
@@ -207,7 +205,7 @@ fail:
 /*
   write while in a transaction
 */
-static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
+static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
                             const void *buf, tdb_len_t len)
 {
        uint32_t blk;
@@ -251,19 +249,13 @@ static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
        if (tdb->transaction->num_blocks <= blk) {
                uint8_t **new_blocks;
                /* expand the blocks array */
-               if (tdb->transaction->blocks == NULL) {
-                       new_blocks = (uint8_t **)malloc(
-                               (blk+1)*sizeof(uint8_t *));
-               } else {
-                       new_blocks = (uint8_t **)realloc(
-                               tdb->transaction->blocks,
-                               (blk+1)*sizeof(uint8_t *));
-               }
+               new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
+                                                (blk+1)*sizeof(uint8_t *));
                if (new_blocks == NULL) {
                        tdb->ecode = TDB_ERR_OOM;
                        goto fail;
                }
-               memset(&new_blocks[tdb->transaction->num_blocks], 0, 
+               memset(&new_blocks[tdb->transaction->num_blocks], 0,
                       (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
                tdb->transaction->blocks = new_blocks;
                tdb->transaction->num_blocks = blk+1;
@@ -276,26 +268,26 @@ static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
                if (tdb->transaction->blocks[blk] == NULL) {
                        tdb->ecode = TDB_ERR_OOM;
                        tdb->transaction->transaction_error = 1;
-                       return -1;                      
+                       return -1;
                }
                if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
                        tdb_len_t len2 = tdb->transaction->block_size;
                        if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
                                len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
                        }
-                       if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
-                                                                  tdb->transaction->blocks[blk], 
+                       if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
+                                                                  tdb->transaction->blocks[blk],
                                                                   len2, 0) != 0) {
-                               SAFE_FREE(tdb->transaction->blocks[blk]);                               
+                               SAFE_FREE(tdb->transaction->blocks[blk]);
                                tdb->ecode = TDB_ERR_IO;
                                goto fail;
                        }
                        if (blk == tdb->transaction->num_blocks-1) {
                                tdb->transaction->last_block_size = len2;
-                       }                       
+                       }
                }
        }
-       
+
        /* overwrite part of an existing block */
        if (buf == NULL) {
                memset(tdb->transaction->blocks[blk] + off, 0, len);
@@ -311,7 +303,7 @@ static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
        return 0;
 
 fail:
-       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
+       TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
                 (blk*tdb->transaction->block_size) + off, len));
        tdb->transaction->transaction_error = 1;
        return -1;
@@ -319,10 +311,10 @@ fail:
 
 
 /*
-  write while in a transaction - this varient never expands the transaction blocks, it only
+  write while in a transaction - this variant never expands the transaction blocks, it only
   updates existing blocks. This means it cannot change the recovery size
 */
-static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
+static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
                                      const void *buf, tdb_len_t len)
 {
        uint32_t blk;
@@ -373,7 +365,7 @@ static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
 {
        uint32_t h = *chain;
-       for (;h < tdb->header.hash_size;h++) {
+       for (;h < tdb->hash_size;h++) {
                /* the +1 takes account of the freelist */
                if (0 != tdb->transaction->hash_heads[h+1]) {
                        break;
@@ -385,9 +377,10 @@ static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain
 /*
   out of bounds check during a transaction
 */
-static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
+static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
+                          tdb_len_t len, int probe)
 {
-       if (len <= tdb->map_size) {
+       if (off + len >= off && off + len <= tdb->map_size) {
                return 0;
        }
        tdb->ecode = TDB_ERR_IO;
@@ -397,7 +390,7 @@ static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
 /*
   transaction version of tdb_expand().
 */
-static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
+static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
                                   tdb_off_t addition)
 {
        /* add a write to the transaction elements, so subsequent
@@ -406,24 +399,8 @@ static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
                return -1;
        }
 
-       tdb->transaction->need_repack = true;
-
-       return 0;
-}
+       tdb->transaction->expanded = true;
 
-/*
-  brlock during a transaction - ignore them
-*/
-static int transaction_brlock(struct tdb_context *tdb,
-                             int rw_type, tdb_off_t offset, size_t len,
-                             enum tdb_lock_flags flags)
-{
-       return 0;
-}
-
-static int transaction_brunlock(struct tdb_context *tdb,
-                               int rw_type, tdb_off_t offset, size_t len)
-{
        return 0;
 }
 
@@ -433,8 +410,6 @@ static const struct tdb_methods transaction_methods = {
        transaction_next_hash_chain,
        transaction_oob,
        transaction_expand_file,
-       transaction_brlock,
-       transaction_brunlock
 };
 
 
@@ -442,10 +417,12 @@ static const struct tdb_methods transaction_methods = {
   start a tdb transaction. No token is returned, as only a single
   transaction is allowed to be pending per tdb_context
 */
-int tdb_transaction_start(struct tdb_context *tdb)
+static int _tdb_transaction_start(struct tdb_context *tdb,
+                                 enum tdb_lock_flags lockflags)
 {
        /* some sanity checks */
-       if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
+       if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
+           || tdb->traverse_read) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
                tdb->ecode = TDB_ERR_EINVAL;
                return -1;
@@ -458,12 +435,12 @@ int tdb_transaction_start(struct tdb_context *tdb)
                        return -1;
                }
                tdb->transaction->nesting++;
-               TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
+               TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
                         tdb->transaction->nesting));
                return 0;
        }
 
-       if (tdb->num_locks != 0 || tdb->allrecord_lock.count) {
+       if (tdb_have_extra_locks(tdb)) {
                /* the caller must not have any locks when starting a
                   transaction as otherwise we'll be screwed by lack
                   of nested locks in posix */
@@ -494,24 +471,26 @@ int tdb_transaction_start(struct tdb_context *tdb)
        /* get the transaction write lock. This is a blocking lock. As
           discussed with Volker, there are a number of ways we could
           make this async, which we will probably do in the future */
-       if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
+       if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
                SAFE_FREE(tdb->transaction->blocks);
                SAFE_FREE(tdb->transaction);
+               if ((lockflags & TDB_LOCK_WAIT) == 0) {
+                       tdb->ecode = TDB_ERR_NOLOCK;
+               }
                return -1;
        }
-       
+
        /* get a read lock from the freelist to the end of file. This
           is upgraded to a write lock during the commit */
-       if (tdb_brlock(tdb, F_RDLCK, FREELIST_TOP, 0, TDB_LOCK_WAIT) == -1) {
+       if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
-               tdb->ecode = TDB_ERR_LOCK;
-               goto fail;
+               goto fail_allrecord_lock;
        }
 
        /* setup a copy of the hash table heads so the hash scan in
           traverse can be fast */
        tdb->transaction->hash_heads = (uint32_t *)
-               calloc(tdb->header.hash_size+1, sizeof(uint32_t));
+               calloc(tdb->hash_size+1, sizeof(uint32_t));
        if (tdb->transaction->hash_heads == NULL) {
                tdb->ecode = TDB_ERR_OOM;
                goto fail;
@@ -525,7 +504,7 @@ int tdb_transaction_start(struct tdb_context *tdb)
 
        /* make sure we know about any file expansions already done by
           anyone else */
-       tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+       tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
        tdb->transaction->old_map_size = tdb->map_size;
 
        /* finally hook the io methods, replacing them with
@@ -536,9 +515,10 @@ int tdb_transaction_start(struct tdb_context *tdb)
        /* Trace at the end, so we get sequence number correct. */
        tdb_trace(tdb, "tdb_transaction_start");
        return 0;
-       
+
 fail:
-       tdb_brunlock(tdb, F_RDLCK, FREELIST_TOP, 0);
+       tdb_allrecord_unlock(tdb, F_RDLCK, false);
+fail_allrecord_lock:
        tdb_transaction_unlock(tdb, F_WRLCK);
        SAFE_FREE(tdb->transaction->blocks);
        SAFE_FREE(tdb->transaction->hash_heads);
@@ -546,17 +526,30 @@ fail:
        return -1;
 }
 
+_PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
+{
+       return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
+}
+
+_PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
+{
+       return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
+}
 
 /*
   sync to disk
 */
 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
-{      
+{
        if (tdb->flags & TDB_NOSYNC) {
                return 0;
        }
 
+#ifdef HAVE_FDATASYNC
        if (fdatasync(tdb->fd) != 0) {
+#else
+       if (fsync(tdb->fd) != 0) {
+#endif
                tdb->ecode = TDB_ERR_IO;
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
                return -1;
@@ -564,7 +557,7 @@ static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t
 #ifdef HAVE_MMAP
        if (tdb->map_ptr) {
                tdb_off_t moffset = offset & ~(tdb->page_size-1);
-               if (msync(moffset + (char *)tdb->map_ptr, 
+               if (msync(moffset + (char *)tdb->map_ptr,
                          length + (offset - moffset), MS_SYNC) != 0) {
                        tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
@@ -577,9 +570,8 @@ static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t
 }
 
 
-/* ltype is F_WRLCK after prepare. */
-static int _tdb_transaction_cancel(struct tdb_context *tdb, int ltype)
-{      
+static int _tdb_transaction_cancel(struct tdb_context *tdb)
+{
        int i, ret = 0;
 
        if (tdb->transaction == NULL) {
@@ -591,7 +583,7 @@ static int _tdb_transaction_cancel(struct tdb_context *tdb, int ltype)
                tdb->transaction->transaction_error = 1;
                tdb->transaction->nesting--;
                return 0;
-       }               
+       }
 
        tdb->map_size = tdb->transaction->old_map_size;
 
@@ -615,113 +607,116 @@ static int _tdb_transaction_cancel(struct tdb_context *tdb, int ltype)
                }
        }
 
-       if (tdb->transaction->open_lock_taken) {
-               tdb_brunlock(tdb, F_WRLCK, OPEN_LOCK, 1);
-               tdb->transaction->open_lock_taken = false;
-       }
-
-       /* remove any global lock created during the transaction */
-       if (tdb->allrecord_lock.count != 0) {
-               tdb_brunlock(tdb, tdb->allrecord_lock.ltype,
-                            FREELIST_TOP, 4*tdb->header.hash_size);
-               tdb->allrecord_lock.count = 0;
-       }
-
-       /* remove any locks created during the transaction */
-       if (tdb->num_locks != 0) {
-               for (i=0;i<tdb->num_lockrecs;i++) {
-                       tdb_brunlock(tdb, tdb->lockrecs[i].ltype,
-                                    FREELIST_TOP+4*tdb->lockrecs[i].list, 1);
-               }
-               tdb->num_locks = 0;
-               tdb->num_lockrecs = 0;
-               SAFE_FREE(tdb->lockrecs);
-       }
+       /* This also removes the OPEN_LOCK, if we have it. */
+       tdb_release_transaction_locks(tdb);
 
        /* restore the normal io methods */
        tdb->methods = tdb->transaction->io_methods;
 
-       tdb_brunlock(tdb, ltype, FREELIST_TOP, 0);
-       tdb_transaction_unlock(tdb, F_WRLCK);
        SAFE_FREE(tdb->transaction->hash_heads);
        SAFE_FREE(tdb->transaction);
-       
+
        return ret;
 }
 
 /*
   cancel the current transaction
 */
-int tdb_transaction_cancel(struct tdb_context *tdb)
+_PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
 {
-       int ltype = F_RDLCK;
        tdb_trace(tdb, "tdb_transaction_cancel");
-       if (tdb->transaction && tdb->transaction->prepared)
-               ltype = F_WRLCK;
-       return _tdb_transaction_cancel(tdb, ltype);
+       return _tdb_transaction_cancel(tdb);
 }
 
 /*
   work out how much space the linearised recovery data will consume
 */
-static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
+static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
 {
        tdb_len_t recovery_size = 0;
        int i;
 
        recovery_size = sizeof(uint32_t);
        for (i=0;i<tdb->transaction->num_blocks;i++) {
+               tdb_len_t block_size;
                if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
                        break;
                }
                if (tdb->transaction->blocks[i] == NULL) {
                        continue;
                }
-               recovery_size += 2*sizeof(tdb_off_t);
+               if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
+                                  &recovery_size)) {
+                       return false;
+               }
                if (i == tdb->transaction->num_blocks-1) {
-                       recovery_size += tdb->transaction->last_block_size;
+                       block_size = tdb->transaction->last_block_size;
                } else {
-                       recovery_size += tdb->transaction->block_size;
+                       block_size =  tdb->transaction->block_size;
+               }
+               if (!tdb_add_len_t(recovery_size, block_size,
+                                  &recovery_size)) {
+                       return false;
                }
-       }       
+       }
 
-       return recovery_size;
+       *result = recovery_size;
+       return true;
+}
+
+int tdb_recovery_area(struct tdb_context *tdb,
+                     const struct tdb_methods *methods,
+                     tdb_off_t *recovery_offset,
+                     struct tdb_record *rec)
+{
+       if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
+               return -1;
+       }
+
+       if (*recovery_offset == 0) {
+               rec->rec_len = 0;
+               return 0;
+       }
+
+       if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
+                             DOCONV()) == -1) {
+               return -1;
+       }
+
+       /* ignore invalid recovery regions: can happen in crash */
+       if (rec->magic != TDB_RECOVERY_MAGIC &&
+           rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
+               *recovery_offset = 0;
+               rec->rec_len = 0;
+       }
+       return 0;
 }
 
 /*
   allocate the recovery area, or use an existing recovery area if it is
   large enough
 */
-static int tdb_recovery_allocate(struct tdb_context *tdb, 
+static int tdb_recovery_allocate(struct tdb_context *tdb,
                                 tdb_len_t *recovery_size,
                                 tdb_off_t *recovery_offset,
                                 tdb_len_t *recovery_max_size)
 {
        struct tdb_record rec;
        const struct tdb_methods *methods = tdb->transaction->io_methods;
-       tdb_off_t recovery_head;
+       tdb_off_t recovery_head, new_end;
 
-       if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+       if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
                return -1;
        }
 
-       rec.rec_len = 0;
-
-       if (recovery_head != 0) {
-               if (methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
-                       return -1;
-               }
-               /* ignore invalid recovery regions: can happen in crash */
-               if (rec.magic != TDB_RECOVERY_MAGIC &&
-                   rec.magic != TDB_RECOVERY_INVALID_MAGIC) {
-                       recovery_head = 0;
-               }
+       if (!tdb_recovery_size(tdb, recovery_size)) {
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
+                        "overflow recovery size\n"));
+               return -1;
        }
 
-       *recovery_size = tdb_recovery_size(tdb);
-
+       /* Existing recovery area? */
        if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
                /* it fits in the existing area */
                *recovery_max_size = rec.rec_len;
@@ -729,35 +724,61 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
                return 0;
        }
 
-       /* we need to free up the old recovery area, then allocate a
-          new one at the end of the file. Note that we cannot use
-          tdb_allocate() to allocate the new one as that might return
-          us an area that is being currently used (as of the start of
-          the transaction) */
-       if (recovery_head != 0) {
-               if (tdb_free(tdb, recovery_head, &rec) == -1) {
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
-                       return -1;
+       /* If recovery area in middle of file, we need a new one. */
+       if (recovery_head == 0
+           || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
+               /* we need to free up the old recovery area, then allocate a
+                  new one at the end of the file. Note that we cannot use
+                  tdb_allocate() to allocate the new one as that might return
+                  us an area that is being currently used (as of the start of
+                  the transaction) */
+               if (recovery_head) {
+                       if (tdb_free(tdb, recovery_head, &rec) == -1) {
+                               TDB_LOG((tdb, TDB_DEBUG_FATAL,
+                                        "tdb_recovery_allocate: failed to"
+                                        " free previous recovery area\n"));
+                               return -1;
+                       }
+
+                       /* the tdb_free() call might have increased
+                        * the recovery size */
+                       if (!tdb_recovery_size(tdb, recovery_size)) {
+                               TDB_LOG((tdb, TDB_DEBUG_FATAL,
+                                        "tdb_recovery_allocate: "
+                                        "overflow recovery size\n"));
+                               return -1;
+                       }
                }
+
+               /* New head will be at end of file. */
+               recovery_head = tdb->map_size;
        }
 
-       /* the tdb_free() call might have increased the recovery size */
-       *recovery_size = tdb_recovery_size(tdb);
+       /* Now we know where it will be. */
+       *recovery_offset = recovery_head;
+
+       /* Expand by more than we need, so we don't do it often. */
+       *recovery_max_size = tdb_expand_adjust(tdb->map_size,
+                                              *recovery_size,
+                                              tdb->page_size)
+               - sizeof(rec);
 
-       /* round up to a multiple of page size */
-       *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
-       *recovery_offset = tdb->map_size;
-       recovery_head = *recovery_offset;
+       if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
+           !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
+                        "overflow recovery area\n"));
+               return -1;
+       }
 
-       if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
-                                    (tdb->map_size - tdb->transaction->old_map_size) +
-                                    sizeof(rec) + *recovery_max_size) == -1) {
+       if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+                                    new_end - tdb->transaction->old_map_size)
+           == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
                return -1;
        }
 
        /* remap the file (if using mmap) */
-       methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+       methods->tdb_oob(tdb, tdb->map_size, 1, 1);
 
        /* we have to reset the old map size so that we don't try to expand the file
           again in the transaction commit, which would destroy the recovery area */
@@ -766,7 +787,7 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
        /* write the recovery header offset and sync - we can sync without a race here
           as the magic ptr in the recovery record has not been set */
        CONVERT(recovery_head);
-       if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
+       if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
                               &recovery_head, sizeof(tdb_off_t)) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
                return -1;
@@ -783,7 +804,7 @@ static int tdb_recovery_allocate(struct tdb_context *tdb,
 /*
   setup the recovery data that will be used on a crash during commit
 */
-static int transaction_setup_recovery(struct tdb_context *tdb, 
+static int transaction_setup_recovery(struct tdb_context *tdb,
                                      tdb_off_t *magic_offset)
 {
        tdb_len_t recovery_size;
@@ -798,7 +819,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        /*
          check that the recovery area has enough space
        */
-       if (tdb_recovery_allocate(tdb, &recovery_size, 
+       if (tdb_recovery_allocate(tdb, &recovery_size,
                                  &recovery_offset, &recovery_max_size) == -1) {
                return -1;
        }
@@ -816,7 +837,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        rec->data_len = recovery_size;
        rec->rec_len  = recovery_max_size;
        rec->key_len  = old_map_size;
-       CONVERT(rec);
+       CONVERT(*rec);
 
        /* build the recovery data into a single blob to allow us to do a single
           large write, which should be more efficient */
@@ -834,7 +855,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
                if (i == tdb->transaction->num_blocks-1) {
                        length = tdb->transaction->last_block_size;
                }
-               
+
                if (offset >= old_map_size) {
                        continue;
                }
@@ -863,7 +884,9 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
        /* and the tailer */
        tailer = sizeof(*rec) + recovery_max_size;
        memcpy(p, &tailer, 4);
-       CONVERT(p);
+       if (DOCONV()) {
+               tdb_convert(p, 4);
+       }
 
        /* write the recovery data to the recovery area */
        if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
@@ -914,7 +937,7 @@ static int transaction_setup_recovery(struct tdb_context *tdb,
 }
 
 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
-{      
+{
        const struct tdb_methods *methods;
 
        if (tdb->transaction == NULL) {
@@ -924,14 +947,14 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 
        if (tdb->transaction->prepared) {
                tdb->ecode = TDB_ERR_EINVAL;
-               _tdb_transaction_cancel(tdb, F_WRLCK);
+               _tdb_transaction_cancel(tdb);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
                return -1;
        }
 
        if (tdb->transaction->transaction_error) {
                tdb->ecode = TDB_ERR_IO;
-               _tdb_transaction_cancel(tdb, F_RDLCK);
+               _tdb_transaction_cancel(tdb);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
                return -1;
        }
@@ -939,7 +962,7 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 
        if (tdb->transaction->nesting != 0) {
                return 0;
-       }               
+       }
 
        /* check for a null transaction */
        if (tdb->transaction->blocks == NULL) {
@@ -947,58 +970,55 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
        }
 
        methods = tdb->transaction->io_methods;
-       
+
        /* if there are any locks pending then the caller has not
           nested their locks properly, so fail the transaction */
-       if (tdb->num_locks || tdb->allrecord_lock.count) {
+       if (tdb_have_extra_locks(tdb)) {
                tdb->ecode = TDB_ERR_LOCK;
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
-               _tdb_transaction_cancel(tdb, F_RDLCK);
+               _tdb_transaction_cancel(tdb);
                return -1;
        }
 
        /* upgrade the main transaction lock region to a write lock */
-       if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
-               tdb->ecode = TDB_ERR_LOCK;
-               _tdb_transaction_cancel(tdb, F_RDLCK);
+       if (tdb_allrecord_upgrade(tdb) == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_ERROR,
+                       "tdb_transaction_prepare_commit: "
+                       "failed to upgrade hash locks: %s\n",
+                        tdb_errorstr(tdb)));
+               _tdb_transaction_cancel(tdb);
                return -1;
        }
 
        /* get the open lock - this prevents new users attaching to the database
           during the commit */
-       if (tdb_brlock(tdb, F_WRLCK, OPEN_LOCK, 1, TDB_LOCK_WAIT) == -1) {
+       if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
-               tdb->ecode = TDB_ERR_LOCK;
-               _tdb_transaction_cancel(tdb, F_WRLCK);
+               _tdb_transaction_cancel(tdb);
                return -1;
        }
 
-       tdb->transaction->open_lock_taken = true;
-
-       if (!(tdb->flags & TDB_NOSYNC)) {
-               /* write the recovery data to the end of the file */
-               if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
-                       _tdb_transaction_cancel(tdb, F_WRLCK);
-                       return -1;
-               }
+       /* write the recovery data to the end of the file */
+       if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
+               _tdb_transaction_cancel(tdb);
+               return -1;
        }
 
        tdb->transaction->prepared = true;
 
        /* expand the file to the new size if needed */
        if (tdb->map_size != tdb->transaction->old_map_size) {
-               if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
-                                            tdb->map_size - 
+               if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+                                            tdb->map_size -
                                             tdb->transaction->old_map_size) == -1) {
                        tdb->ecode = TDB_ERR_IO;
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
-                       _tdb_transaction_cancel(tdb, F_WRLCK);
+                       _tdb_transaction_cancel(tdb);
                        return -1;
                }
                tdb->map_size = tdb->transaction->old_map_size;
-               methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+               methods->tdb_oob(tdb, tdb->map_size, 1, 1);
        }
 
        /* Keep the open lock until the actual commit */
@@ -1009,20 +1029,42 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
 /*
    prepare to commit the current transaction
 */
-int tdb_transaction_prepare_commit(struct tdb_context *tdb)
-{      
+_PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
+{
        tdb_trace(tdb, "tdb_transaction_prepare_commit");
        return _tdb_transaction_prepare_commit(tdb);
 }
 
+/* A repack is worthwhile if the largest is less than half total free. */
+static bool repack_worthwhile(struct tdb_context *tdb)
+{
+       tdb_off_t ptr;
+       struct tdb_record rec;
+       tdb_len_t total = 0, largest = 0;
+
+       if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
+               return false;
+       }
+
+       while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
+               total += rec.rec_len;
+               if (rec.rec_len > largest) {
+                       largest = rec.rec_len;
+               }
+               ptr = rec.next;
+       }
+
+       return total > largest * 2;
+}
+
 /*
   commit the current transaction
 */
-int tdb_transaction_commit(struct tdb_context *tdb)
-{      
+_PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
+{
        const struct tdb_methods *methods;
        int i;
-       bool need_repack;
+       bool need_repack = false;
 
        if (tdb->transaction == NULL) {
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
@@ -1033,7 +1075,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
        if (tdb->transaction->transaction_error) {
                tdb->ecode = TDB_ERR_IO;
-               _tdb_transaction_cancel(tdb, F_RDLCK);
+               _tdb_transaction_cancel(tdb);
                TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
                return -1;
        }
@@ -1046,7 +1088,7 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
        /* check for a null transaction */
        if (tdb->transaction->blocks == NULL) {
-               _tdb_transaction_cancel(tdb, F_RDLCK);
+               _tdb_transaction_cancel(tdb);
                return 0;
        }
 
@@ -1075,20 +1117,25 @@ int tdb_transaction_commit(struct tdb_context *tdb)
 
                if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
-                       
+
                        /* we've overwritten part of the data and
                           possibly expanded the file, so we need to
                           run the crash recovery code */
                        tdb->methods = methods;
-                       tdb_transaction_recover(tdb); 
+                       tdb_transaction_recover(tdb);
 
-                       _tdb_transaction_cancel(tdb, F_WRLCK);
+                       _tdb_transaction_cancel(tdb);
 
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
                        return -1;
                }
                SAFE_FREE(tdb->transaction->blocks[i]);
-       } 
+       }
+
+       /* Do this before we drop lock or blocks. */
+       if (tdb->transaction->expanded) {
+               need_repack = repack_worthwhile(tdb);
+       }
 
        SAFE_FREE(tdb->transaction->blocks);
        tdb->transaction->num_blocks = 0;
@@ -1113,11 +1160,9 @@ int tdb_transaction_commit(struct tdb_context *tdb)
        utime(tdb->name, NULL);
 #endif
 
-       need_repack = tdb->transaction->need_repack;
-
        /* use a transaction cancel to free memory and remove the
           transaction locks */
-       _tdb_transaction_cancel(tdb, F_WRLCK);
+       _tdb_transaction_cancel(tdb);
 
        if (need_repack) {
                return tdb_repack(tdb);
@@ -1152,9 +1197,9 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        }
 
        /* read the recovery record */
-       if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
+       if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
                                   sizeof(rec), DOCONV()) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
                tdb->ecode = TDB_ERR_IO;
                return -1;
        }
@@ -1174,7 +1219,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
 
        data = (unsigned char *)malloc(rec.data_len);
        if (data == NULL) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
                tdb->ecode = TDB_ERR_OOM;
                return -1;
        }
@@ -1182,7 +1227,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
        /* read the full recovery data */
        if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
                                   rec.data_len, 0) == -1) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
+               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
                tdb->ecode = TDB_ERR_IO;
                return -1;
        }
@@ -1199,7 +1244,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
 
                if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
                        free(data);
-                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
+                       TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
                        tdb->ecode = TDB_ERR_IO;
                        return -1;
                }
@@ -1219,7 +1264,7 @@ int tdb_transaction_recover(struct tdb_context *tdb)
                if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
                        TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
                        tdb->ecode = TDB_ERR_IO;
-                       return -1;                      
+                       return -1;
                }
        }
 
@@ -1228,18 +1273,8 @@ int tdb_transaction_recover(struct tdb_context *tdb)
                          &zero) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
                tdb->ecode = TDB_ERR_IO;
-               return -1;                      
-       }
-       
-       /* reduce the file size to the old size */
-       tdb_munmap(tdb);
-       if (ftruncate(tdb->fd, recovery_eof) != 0) {
-               TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
-               tdb->ecode = TDB_ERR_IO;
-               return -1;                      
+               return -1;
        }
-       tdb->map_size = recovery_eof;
-       tdb_mmap(tdb);
 
        if (transaction_sync(tdb, 0, recovery_eof) == -1) {
                TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
@@ -1247,9 +1282,34 @@ int tdb_transaction_recover(struct tdb_context *tdb)
                return -1;
        }
 
-       TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
+       TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
                 recovery_eof));
 
        /* all done */
        return 0;
 }
+
+/* Any I/O failures we say "needs recovery". */
+bool tdb_needs_recovery(struct tdb_context *tdb)
+{
+       tdb_off_t recovery_head;
+       struct tdb_record rec;
+
+       /* find the recovery area */
+       if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+               return true;
+       }
+
+       if (recovery_head == 0) {
+               /* we have never allocated a recovery record */
+               return false;
+       }
+
+       /* read the recovery record */
+       if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
+                                  sizeof(rec), DOCONV()) == -1) {
+               return true;
+       }
+
+       return (rec.magic == TDB_RECOVERY_MAGIC);
+}