- /*
+ /*
Unix SMB/CIFS implementation.
trivial database library
** NOTE! The following LGPL license applies to the tdb
** library. This does NOT imply that all of Samba is released
** under the LGPL
-
+
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
- allow for nested calls to tdb_transaction_start(), re-using the
existing transaction record. If the inner transaction is cancelled
then a subsequent commit will fail
-
+
- keep a mirrored copy of the tdb hash chain heads to allow for the
fast hash heads scan on traverse, updating the mirrored copy in
the transaction version of tdb_write
to reduce this to 3 or even 2 with some more work.
- check for a valid recovery record on open of the tdb, while the
- global lock is held. Automatically recover from the transaction
+ open lock is held. Automatically recover from the transaction
recovery area if needed, then continue with the open as
usual. This allows for smooth crash recovery with no administrator
intervention.
- if TDB_NOSYNC is passed to flags in tdb_open then transactions are
- still available, but no transaction recovery area is used and no
- fsync/msync calls are made.
-
+ still available, but no fsync/msync calls are made. This means we
+ are still proof against a process dying during transaction commit,
+ but not against machine reboot.
+
+ - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
+ tdb_add_flags() transaction nesting is enabled.
+ It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
+ The default is that transaction nesting is allowed.
+ Note: this default may change in future versions of tdb.
+
+ Beware. when transactions are nested a transaction successfully
+ completed with tdb_transaction_commit() can be silently unrolled later.
+
+ - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
+ tdb_add_flags() transaction nesting is disabled.
+ It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
+ An attempt create a nested transaction will fail with TDB_ERR_NESTING.
+ The default is that transaction nesting is allowed.
+ Note: this default may change in future versions of tdb.
*/
/* old file size before transaction */
tdb_len_t old_map_size;
- /* we should re-pack on commit */
- bool need_repack;
+ /* did we expand in this transaction */
+ bool expanded;
};
read while in a transaction. We need to check first if the data is in our list
of transaction elements, then if not do a real read
*/
-static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
+static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
tdb_len_t len, int cv)
{
uint32_t blk;
goto fail;
}
}
-
+
/* now copy it out of this block */
memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
if (cv) {
return 0;
fail:
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
tdb->ecode = TDB_ERR_IO;
tdb->transaction->transaction_error = 1;
return -1;
/*
write while in a transaction
*/
-static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
+static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
const void *buf, tdb_len_t len)
{
uint32_t blk;
if (tdb->transaction->num_blocks <= blk) {
uint8_t **new_blocks;
/* expand the blocks array */
- if (tdb->transaction->blocks == NULL) {
- new_blocks = (uint8_t **)malloc(
- (blk+1)*sizeof(uint8_t *));
- } else {
- new_blocks = (uint8_t **)realloc(
- tdb->transaction->blocks,
- (blk+1)*sizeof(uint8_t *));
- }
+ new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
+ (blk+1)*sizeof(uint8_t *));
if (new_blocks == NULL) {
tdb->ecode = TDB_ERR_OOM;
goto fail;
}
- memset(&new_blocks[tdb->transaction->num_blocks], 0,
+ memset(&new_blocks[tdb->transaction->num_blocks], 0,
(1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
tdb->transaction->blocks = new_blocks;
tdb->transaction->num_blocks = blk+1;
if (tdb->transaction->blocks[blk] == NULL) {
tdb->ecode = TDB_ERR_OOM;
tdb->transaction->transaction_error = 1;
- return -1;
+ return -1;
}
if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
tdb_len_t len2 = tdb->transaction->block_size;
if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
}
- if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
- tdb->transaction->blocks[blk],
+ if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
+ tdb->transaction->blocks[blk],
len2, 0) != 0) {
- SAFE_FREE(tdb->transaction->blocks[blk]);
+ SAFE_FREE(tdb->transaction->blocks[blk]);
tdb->ecode = TDB_ERR_IO;
goto fail;
}
if (blk == tdb->transaction->num_blocks-1) {
tdb->transaction->last_block_size = len2;
- }
+ }
}
}
-
+
/* overwrite part of an existing block */
if (buf == NULL) {
memset(tdb->transaction->blocks[blk] + off, 0, len);
return 0;
fail:
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n",
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
(blk*tdb->transaction->block_size) + off, len));
tdb->transaction->transaction_error = 1;
return -1;
/*
- write while in a transaction - this varient never expands the transaction blocks, it only
+ write while in a transaction - this variant never expands the transaction blocks, it only
updates existing blocks. This means it cannot change the recovery size
*/
-static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
+static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
const void *buf, tdb_len_t len)
{
uint32_t blk;
static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
{
uint32_t h = *chain;
- for (;h < tdb->header.hash_size;h++) {
+ for (;h < tdb->hash_size;h++) {
/* the +1 takes account of the freelist */
if (0 != tdb->transaction->hash_heads[h+1]) {
break;
/*
out of bounds check during a transaction
*/
-static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
+static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
+ tdb_len_t len, int probe)
{
- if (len <= tdb->map_size) {
+ if (off + len >= off && off + len <= tdb->map_size) {
return 0;
}
- return TDB_ERRCODE(TDB_ERR_IO, -1);
+ tdb->ecode = TDB_ERR_IO;
+ return -1;
}
/*
transaction version of tdb_expand().
*/
-static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
+static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
tdb_off_t addition)
{
/* add a write to the transaction elements, so subsequent
return -1;
}
- tdb->transaction->need_repack = true;
-
- return 0;
-}
+ tdb->transaction->expanded = true;
-/*
- brlock during a transaction - ignore them
-*/
-static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
- int rw_type, int lck_type, int probe, size_t len)
-{
return 0;
}
transaction_next_hash_chain,
transaction_oob,
transaction_expand_file,
- transaction_brlock
};
start a tdb transaction. No token is returned, as only a single
transaction is allowed to be pending per tdb_context
*/
-int tdb_transaction_start(struct tdb_context *tdb)
+static int _tdb_transaction_start(struct tdb_context *tdb,
+ enum tdb_lock_flags lockflags)
{
/* some sanity checks */
- if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
+ if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
+ || tdb->traverse_read) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
tdb->ecode = TDB_ERR_EINVAL;
return -1;
/* cope with nested tdb_transaction_start() calls */
if (tdb->transaction != NULL) {
+ if (!(tdb->flags & TDB_ALLOW_NESTING)) {
+ tdb->ecode = TDB_ERR_NESTING;
+ return -1;
+ }
tdb->transaction->nesting++;
- TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
+ TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
tdb->transaction->nesting));
return 0;
}
- if (tdb->num_locks != 0 || tdb->global_lock.count) {
+ if (tdb_have_extra_locks(tdb)) {
/* the caller must not have any locks when starting a
transaction as otherwise we'll be screwed by lack
of nested locks in posix */
/* get the transaction write lock. This is a blocking lock. As
discussed with Volker, there are a number of ways we could
make this async, which we will probably do in the future */
- if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
+ if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
SAFE_FREE(tdb->transaction->blocks);
SAFE_FREE(tdb->transaction);
+ if ((lockflags & TDB_LOCK_WAIT) == 0) {
+ tdb->ecode = TDB_ERR_NOLOCK;
+ }
return -1;
}
-
+
/* get a read lock from the freelist to the end of file. This
is upgraded to a write lock during the commit */
- if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
+ if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
- tdb->ecode = TDB_ERR_LOCK;
- goto fail;
+ goto fail_allrecord_lock;
}
/* setup a copy of the hash table heads so the hash scan in
traverse can be fast */
tdb->transaction->hash_heads = (uint32_t *)
- calloc(tdb->header.hash_size+1, sizeof(uint32_t));
+ calloc(tdb->hash_size+1, sizeof(uint32_t));
if (tdb->transaction->hash_heads == NULL) {
tdb->ecode = TDB_ERR_OOM;
goto fail;
/* make sure we know about any file expansions already done by
anyone else */
- tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+ tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
tdb->transaction->old_map_size = tdb->map_size;
/* finally hook the io methods, replacing them with
tdb->transaction->io_methods = tdb->methods;
tdb->methods = &transaction_methods;
+ /* Trace at the end, so we get sequence number correct. */
+ tdb_trace(tdb, "tdb_transaction_start");
return 0;
-
+
fail:
- tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
- tdb_transaction_unlock(tdb);
+ tdb_allrecord_unlock(tdb, F_RDLCK, false);
+fail_allrecord_lock:
+ tdb_transaction_unlock(tdb, F_WRLCK);
SAFE_FREE(tdb->transaction->blocks);
SAFE_FREE(tdb->transaction->hash_heads);
SAFE_FREE(tdb->transaction);
return -1;
}
+_PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
+{
+ return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
+}
+
+_PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
+{
+ return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
+}
/*
sync to disk
*/
static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
-{
+{
if (tdb->flags & TDB_NOSYNC) {
return 0;
}
+#ifdef HAVE_FDATASYNC
+ if (fdatasync(tdb->fd) != 0) {
+#else
if (fsync(tdb->fd) != 0) {
+#endif
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
return -1;
#ifdef HAVE_MMAP
if (tdb->map_ptr) {
tdb_off_t moffset = offset & ~(tdb->page_size-1);
- if (msync(moffset + (char *)tdb->map_ptr,
+ if (msync(moffset + (char *)tdb->map_ptr,
length + (offset - moffset), MS_SYNC) != 0) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
}
-/*
- cancel the current transaction
-*/
-int tdb_transaction_cancel(struct tdb_context *tdb)
-{
+static int _tdb_transaction_cancel(struct tdb_context *tdb)
+{
int i, ret = 0;
if (tdb->transaction == NULL) {
tdb->transaction->transaction_error = 1;
tdb->transaction->nesting--;
return 0;
- }
+ }
tdb->map_size = tdb->transaction->old_map_size;
if (tdb->transaction->magic_offset) {
const struct tdb_methods *methods = tdb->transaction->io_methods;
- uint32_t zero = 0;
+ const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
/* remove the recovery marker */
- if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
+ if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
ret = -1;
}
}
- /* remove any global lock created during the transaction */
- if (tdb->global_lock.count != 0) {
- tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
- tdb->global_lock.count = 0;
- }
-
- /* remove any locks created during the transaction */
- if (tdb->num_locks != 0) {
- for (i=0;i<tdb->num_lockrecs;i++) {
- tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
- F_UNLCK,F_SETLKW, 0, 1);
- }
- tdb->num_locks = 0;
- tdb->num_lockrecs = 0;
- SAFE_FREE(tdb->lockrecs);
- }
+ /* This also removes the OPEN_LOCK, if we have it. */
+ tdb_release_transaction_locks(tdb);
/* restore the normal io methods */
tdb->methods = tdb->transaction->io_methods;
- tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
- tdb_transaction_unlock(tdb);
SAFE_FREE(tdb->transaction->hash_heads);
SAFE_FREE(tdb->transaction);
-
+
return ret;
}
+/*
+ cancel the current transaction
+*/
+_PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
+{
+ tdb_trace(tdb, "tdb_transaction_cancel");
+ return _tdb_transaction_cancel(tdb);
+}
/*
work out how much space the linearised recovery data will consume
*/
-static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
+static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
{
tdb_len_t recovery_size = 0;
int i;
recovery_size = sizeof(uint32_t);
for (i=0;i<tdb->transaction->num_blocks;i++) {
+ tdb_len_t block_size;
if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
break;
}
if (tdb->transaction->blocks[i] == NULL) {
continue;
}
- recovery_size += 2*sizeof(tdb_off_t);
+ if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
+ &recovery_size)) {
+ return false;
+ }
if (i == tdb->transaction->num_blocks-1) {
- recovery_size += tdb->transaction->last_block_size;
+ block_size = tdb->transaction->last_block_size;
} else {
- recovery_size += tdb->transaction->block_size;
+ block_size = tdb->transaction->block_size;
}
- }
+ if (!tdb_add_len_t(recovery_size, block_size,
+ &recovery_size)) {
+ return false;
+ }
+ }
- return recovery_size;
+ *result = recovery_size;
+ return true;
+}
+
+int tdb_recovery_area(struct tdb_context *tdb,
+ const struct tdb_methods *methods,
+ tdb_off_t *recovery_offset,
+ struct tdb_record *rec)
+{
+ if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
+ return -1;
+ }
+
+ if (*recovery_offset == 0) {
+ rec->rec_len = 0;
+ return 0;
+ }
+
+ if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
+ DOCONV()) == -1) {
+ return -1;
+ }
+
+ /* ignore invalid recovery regions: can happen in crash */
+ if (rec->magic != TDB_RECOVERY_MAGIC &&
+ rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
+ *recovery_offset = 0;
+ rec->rec_len = 0;
+ }
+ return 0;
}
/*
allocate the recovery area, or use an existing recovery area if it is
large enough
*/
-static int tdb_recovery_allocate(struct tdb_context *tdb,
+static int tdb_recovery_allocate(struct tdb_context *tdb,
tdb_len_t *recovery_size,
tdb_off_t *recovery_offset,
tdb_len_t *recovery_max_size)
{
- struct list_struct rec;
+ struct tdb_record rec;
const struct tdb_methods *methods = tdb->transaction->io_methods;
- tdb_off_t recovery_head;
+ tdb_off_t recovery_head, new_end;
- if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+ if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
return -1;
}
- rec.rec_len = 0;
-
- if (recovery_head != 0 &&
- methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
+ if (!tdb_recovery_size(tdb, recovery_size)) {
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
+ "overflow recovery size\n"));
return -1;
}
- *recovery_size = tdb_recovery_size(tdb);
-
+ /* Existing recovery area? */
if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
/* it fits in the existing area */
*recovery_max_size = rec.rec_len;
return 0;
}
- /* we need to free up the old recovery area, then allocate a
- new one at the end of the file. Note that we cannot use
- tdb_allocate() to allocate the new one as that might return
- us an area that is being currently used (as of the start of
- the transaction) */
- if (recovery_head != 0) {
- if (tdb_free(tdb, recovery_head, &rec) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
- return -1;
+ /* If recovery area in middle of file, we need a new one. */
+ if (recovery_head == 0
+ || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
+ /* we need to free up the old recovery area, then allocate a
+ new one at the end of the file. Note that we cannot use
+ tdb_allocate() to allocate the new one as that might return
+ us an area that is being currently used (as of the start of
+ the transaction) */
+ if (recovery_head) {
+ if (tdb_free(tdb, recovery_head, &rec) == -1) {
+ TDB_LOG((tdb, TDB_DEBUG_FATAL,
+ "tdb_recovery_allocate: failed to"
+ " free previous recovery area\n"));
+ return -1;
+ }
+
+ /* the tdb_free() call might have increased
+ * the recovery size */
+ if (!tdb_recovery_size(tdb, recovery_size)) {
+ TDB_LOG((tdb, TDB_DEBUG_FATAL,
+ "tdb_recovery_allocate: "
+ "overflow recovery size\n"));
+ return -1;
+ }
}
+
+ /* New head will be at end of file. */
+ recovery_head = tdb->map_size;
}
- /* the tdb_free() call might have increased the recovery size */
- *recovery_size = tdb_recovery_size(tdb);
+ /* Now we know where it will be. */
+ *recovery_offset = recovery_head;
- /* round up to a multiple of page size */
- *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
- *recovery_offset = tdb->map_size;
- recovery_head = *recovery_offset;
+ /* Expand by more than we need, so we don't do it often. */
+ *recovery_max_size = tdb_expand_adjust(tdb->map_size,
+ *recovery_size,
+ tdb->page_size)
+ - sizeof(rec);
- if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
- (tdb->map_size - tdb->transaction->old_map_size) +
- sizeof(rec) + *recovery_max_size) == -1) {
+ if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
+ !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
+ "overflow recovery area\n"));
+ return -1;
+ }
+
+ if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+ new_end - tdb->transaction->old_map_size)
+ == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
return -1;
}
/* remap the file (if using mmap) */
- methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+ methods->tdb_oob(tdb, tdb->map_size, 1, 1);
/* we have to reset the old map size so that we don't try to expand the file
again in the transaction commit, which would destroy the recovery area */
/* write the recovery header offset and sync - we can sync without a race here
as the magic ptr in the recovery record has not been set */
CONVERT(recovery_head);
- if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
+ if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
&recovery_head, sizeof(tdb_off_t)) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
return -1;
/*
setup the recovery data that will be used on a crash during commit
*/
-static int transaction_setup_recovery(struct tdb_context *tdb,
+static int transaction_setup_recovery(struct tdb_context *tdb,
tdb_off_t *magic_offset)
{
tdb_len_t recovery_size;
unsigned char *data, *p;
const struct tdb_methods *methods = tdb->transaction->io_methods;
- struct list_struct *rec;
+ struct tdb_record *rec;
tdb_off_t recovery_offset, recovery_max_size;
tdb_off_t old_map_size = tdb->transaction->old_map_size;
uint32_t magic, tailer;
/*
check that the recovery area has enough space
*/
- if (tdb_recovery_allocate(tdb, &recovery_size,
+ if (tdb_recovery_allocate(tdb, &recovery_size,
&recovery_offset, &recovery_max_size) == -1) {
return -1;
}
return -1;
}
- rec = (struct list_struct *)data;
+ rec = (struct tdb_record *)data;
memset(rec, 0, sizeof(*rec));
- rec->magic = 0;
+ rec->magic = TDB_RECOVERY_INVALID_MAGIC;
rec->data_len = recovery_size;
rec->rec_len = recovery_max_size;
rec->key_len = old_map_size;
- CONVERT(rec);
+ CONVERT(*rec);
/* build the recovery data into a single blob to allow us to do a single
large write, which should be more efficient */
if (i == tdb->transaction->num_blocks-1) {
length = tdb->transaction->last_block_size;
}
-
+
if (offset >= old_map_size) {
continue;
}
/* and the tailer */
tailer = sizeof(*rec) + recovery_max_size;
memcpy(p, &tailer, 4);
- CONVERT(p);
+ if (DOCONV()) {
+ tdb_convert(p, 4);
+ }
/* write the recovery data to the recovery area */
if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
magic = TDB_RECOVERY_MAGIC;
CONVERT(magic);
- *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
+ *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
return 0;
}
-/*
- prepare to commit the current transaction
-*/
-int tdb_transaction_prepare_commit(struct tdb_context *tdb)
-{
+static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
+{
const struct tdb_methods *methods;
if (tdb->transaction == NULL) {
if (tdb->transaction->prepared) {
tdb->ecode = TDB_ERR_EINVAL;
- tdb_transaction_cancel(tdb);
+ _tdb_transaction_cancel(tdb);
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
return -1;
}
if (tdb->transaction->transaction_error) {
tdb->ecode = TDB_ERR_IO;
- tdb_transaction_cancel(tdb);
+ _tdb_transaction_cancel(tdb);
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
return -1;
}
if (tdb->transaction->nesting != 0) {
return 0;
- }
+ }
/* check for a null transaction */
if (tdb->transaction->blocks == NULL) {
}
methods = tdb->transaction->io_methods;
-
+
/* if there are any locks pending then the caller has not
nested their locks properly, so fail the transaction */
- if (tdb->num_locks || tdb->global_lock.count) {
+ if (tdb_have_extra_locks(tdb)) {
tdb->ecode = TDB_ERR_LOCK;
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
- tdb_transaction_cancel(tdb);
+ _tdb_transaction_cancel(tdb);
return -1;
}
/* upgrade the main transaction lock region to a write lock */
- if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
- tdb->ecode = TDB_ERR_LOCK;
- tdb_transaction_cancel(tdb);
+ if (tdb_allrecord_upgrade(tdb) == -1) {
+ TDB_LOG((tdb, TDB_DEBUG_ERROR,
+ "tdb_transaction_prepare_commit: "
+ "failed to upgrade hash locks: %s\n",
+ tdb_errorstr(tdb)));
+ _tdb_transaction_cancel(tdb);
return -1;
}
- /* get the global lock - this prevents new users attaching to the database
+ /* get the open lock - this prevents new users attaching to the database
during the commit */
- if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
- tdb->ecode = TDB_ERR_LOCK;
- tdb_transaction_cancel(tdb);
+ if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
+ TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
+ _tdb_transaction_cancel(tdb);
return -1;
}
- if (!(tdb->flags & TDB_NOSYNC)) {
- /* write the recovery data to the end of the file */
- if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
- tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
- tdb_transaction_cancel(tdb);
- return -1;
- }
+ /* write the recovery data to the end of the file */
+ if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
+ _tdb_transaction_cancel(tdb);
+ return -1;
}
tdb->transaction->prepared = true;
/* expand the file to the new size if needed */
if (tdb->map_size != tdb->transaction->old_map_size) {
- if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
- tdb->map_size -
+ if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
+ tdb->map_size -
tdb->transaction->old_map_size) == -1) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
- tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
- tdb_transaction_cancel(tdb);
+ _tdb_transaction_cancel(tdb);
return -1;
}
tdb->map_size = tdb->transaction->old_map_size;
- methods->tdb_oob(tdb, tdb->map_size + 1, 1);
+ methods->tdb_oob(tdb, tdb->map_size, 1, 1);
}
- /* Keep the global lock until the actual commit */
+ /* Keep the open lock until the actual commit */
return 0;
}
+/*
+ prepare to commit the current transaction
+*/
+_PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
+{
+ tdb_trace(tdb, "tdb_transaction_prepare_commit");
+ return _tdb_transaction_prepare_commit(tdb);
+}
+
+/* A repack is worthwhile if the largest is less than half total free. */
+static bool repack_worthwhile(struct tdb_context *tdb)
+{
+ tdb_off_t ptr;
+ struct tdb_record rec;
+ tdb_len_t total = 0, largest = 0;
+
+ if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
+ return false;
+ }
+
+ while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
+ total += rec.rec_len;
+ if (rec.rec_len > largest) {
+ largest = rec.rec_len;
+ }
+ ptr = rec.next;
+ }
+
+ return total > largest * 2;
+}
+
/*
commit the current transaction
*/
-int tdb_transaction_commit(struct tdb_context *tdb)
-{
+_PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
+{
const struct tdb_methods *methods;
int i;
- bool need_repack;
+ bool need_repack = false;
if (tdb->transaction == NULL) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
return -1;
}
+ tdb_trace(tdb, "tdb_transaction_commit");
+
if (tdb->transaction->transaction_error) {
tdb->ecode = TDB_ERR_IO;
- tdb_transaction_cancel(tdb);
+ _tdb_transaction_cancel(tdb);
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
return -1;
}
/* check for a null transaction */
if (tdb->transaction->blocks == NULL) {
- tdb_transaction_cancel(tdb);
+ _tdb_transaction_cancel(tdb);
return 0;
}
if (!tdb->transaction->prepared) {
- int ret = tdb_transaction_prepare_commit(tdb);
+ int ret = _tdb_transaction_prepare_commit(tdb);
if (ret)
return ret;
}
if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
-
+
/* we've overwritten part of the data and
possibly expanded the file, so we need to
run the crash recovery code */
tdb->methods = methods;
- tdb_transaction_recover(tdb);
+ tdb_transaction_recover(tdb);
- tdb_transaction_cancel(tdb);
- tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
+ _tdb_transaction_cancel(tdb);
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
return -1;
}
SAFE_FREE(tdb->transaction->blocks[i]);
- }
+ }
+
+ /* Do this before we drop lock or blocks. */
+ if (tdb->transaction->expanded) {
+ need_repack = repack_worthwhile(tdb);
+ }
SAFE_FREE(tdb->transaction->blocks);
tdb->transaction->num_blocks = 0;
return -1;
}
- tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
-
/*
TODO: maybe write to some dummy hdr field, or write to magic
offset without mmap, before the last sync, instead of the
utime(tdb->name, NULL);
#endif
- need_repack = tdb->transaction->need_repack;
-
/* use a transaction cancel to free memory and remove the
transaction locks */
- tdb_transaction_cancel(tdb);
+ _tdb_transaction_cancel(tdb);
if (need_repack) {
return tdb_repack(tdb);
/*
recover from an aborted transaction. Must be called with exclusive
- database write access already established (including the global
+ database write access already established (including the open
lock to prevent new processes attaching)
*/
int tdb_transaction_recover(struct tdb_context *tdb)
tdb_off_t recovery_head, recovery_eof;
unsigned char *data, *p;
uint32_t zero = 0;
- struct list_struct rec;
+ struct tdb_record rec;
/* find the recovery area */
if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
}
/* read the recovery record */
- if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
+ if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
sizeof(rec), DOCONV()) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
tdb->ecode = TDB_ERR_IO;
return -1;
}
data = (unsigned char *)malloc(rec.data_len);
if (data == NULL) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
tdb->ecode = TDB_ERR_OOM;
return -1;
}
/* read the full recovery data */
if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
rec.data_len, 0) == -1) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
tdb->ecode = TDB_ERR_IO;
return -1;
}
if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
free(data);
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
+ TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
tdb->ecode = TDB_ERR_IO;
return -1;
}
if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
tdb->ecode = TDB_ERR_IO;
- return -1;
+ return -1;
}
}
/* remove the recovery magic */
- if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
+ if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
&zero) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
tdb->ecode = TDB_ERR_IO;
- return -1;
- }
-
- /* reduce the file size to the old size */
- tdb_munmap(tdb);
- if (ftruncate(tdb->fd, recovery_eof) != 0) {
- TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
- tdb->ecode = TDB_ERR_IO;
- return -1;
+ return -1;
}
- tdb->map_size = recovery_eof;
- tdb_mmap(tdb);
if (transaction_sync(tdb, 0, recovery_eof) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
return -1;
}
- TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",
+ TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
recovery_eof));
/* all done */
return 0;
}
+
+/* Any I/O failures we say "needs recovery". */
+bool tdb_needs_recovery(struct tdb_context *tdb)
+{
+ tdb_off_t recovery_head;
+ struct tdb_record rec;
+
+ /* find the recovery area */
+ if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
+ return true;
+ }
+
+ if (recovery_head == 0) {
+ /* we have never allocated a recovery record */
+ return false;
+ }
+
+ /* read the recovery record */
+ if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
+ sizeof(rec), DOCONV()) == -1) {
+ return true;
+ }
+
+ return (rec.magic == TDB_RECOVERY_MAGIC);
+}