2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 2 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, write to the Free Software
24 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #include "tdb_private.h"
32 - only allow a single transaction at a time per database. This makes
33 using the transaction API simpler, as otherwise the caller would
34 have to cope with temporary failures in transactions that conflict
35 with other current transactions
37 - keep the transaction recovery information in the same file as the
38 database, using a special 'transaction recovery' record pointed at
39 by the header. This removes the need for extra journal files as
40 used by some other databases
42 - dymacially allocated the transaction recover record, re-using it
43 for subsequent transactions. If a larger record is needed then
44 tdb_free() the old record to place it on the normal tdb freelist
45 before allocating the new record
47 - during transactions, keep a linked list of writes all that have
48 been performed by intercepting all tdb_write() calls. The hooked
49 transaction versions of tdb_read() and tdb_write() check this
50 linked list and try to use the elements of the list in preference
53 - don't allow any locks to be held when a transaction starts,
54 otherwise we can end up with deadlock (plus lack of lock nesting
55 in posix locks would mean the lock is lost)
57 - if the caller gains a lock during the transaction but doesn't
58 release it then fail the commit
60 - allow for nested calls to tdb_transaction_start(), re-using the
61 existing transaction record. If the inner transaction is cancelled
62 then a subsequent commit will fail
64 - keep a mirrored copy of the tdb hash chain heads to allow for the
65 fast hash heads scan on traverse, updating the mirrored copy in
66 the transaction version of tdb_write
68 - allow callers to mix transaction and non-transaction use of tdb,
69 although once a transaction is started then an exclusive lock is
70 gained until the transaction is committed or cancelled
72 - the commit stategy involves first saving away all modified data
73 into a linearised buffer in the transaction recovery area, then
74 marking the transaction recovery area with a magic value to
75 indicate a valid recovery record. In total 4 fsync/msync calls are
76 needed per commit to prevent race conditions. It might be possible
77 to reduce this to 3 or even 2 with some more work.
79 - check for a valid recovery record on open of the tdb, while the
80 global lock is held. Automatically recover from the transaction
81 recovery area if needed, then continue with the open as
82 usual. This allows for smooth crash recovery with no administrator
85 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86 still available, but no transaction recovery area is used and no
87 fsync/msync calls are made.
93 hold the context of any current transaction
95 struct tdb_transaction {
96 /* we keep a mirrored copy of the tdb hash heads here so
97 tdb_next_hash_chain() can operate efficiently */
100 /* the original io methods - used to do IOs to the real db */
101 const struct tdb_methods *io_methods;
103 /* the list of transaction elements. We use a doubly linked
104 list with a last pointer to allow us to keep the list
105 ordered, with first element at the front of the list. It
106 needs to be doubly linked as the read/write traversals need
107 to be backwards, while the commit needs to be forwards */
108 struct tdb_transaction_el {
109 struct tdb_transaction_el *next, *prev;
113 } *elements, *elements_last;
115 /* non-zero when an internal transaction error has
116 occurred. All write operations will then fail until the
117 transaction is ended */
118 int transaction_error;
120 /* when inside a transaction we need to keep track of any
121 nested tdb_transaction_start() calls, as these are allowed,
122 but don't create a new transaction */
125 /* old file size before transaction */
126 tdb_len_t old_map_size;
131 read while in a transaction. We need to check first if the data is in our list
132 of transaction elements, then if not do a real read
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
135 tdb_len_t len, int cv)
137 struct tdb_transaction_el *el;
139 /* we need to walk the list backwards to get the most recent data */
140 for (el=tdb->transaction->elements_last;el;el=el->prev) {
143 if (off+len <= el->offset) {
146 if (off >= el->offset + el->length) {
150 /* an overlapping read - needs to be split into up to
151 2 reads and a memcpy */
152 if (off < el->offset) {
153 partial = el->offset - off;
154 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
159 buf = (void *)(partial + (char *)buf);
161 if (off + len <= el->offset + el->length) {
164 partial = el->offset + el->length - off;
166 memcpy(buf, el->data + (off - el->offset), partial);
168 tdb_convert(buf, len);
172 buf = (void *)(partial + (char *)buf);
174 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
181 /* its not in the transaction elements - do a real read */
182 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
185 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
186 tdb->ecode = TDB_ERR_IO;
187 tdb->transaction->transaction_error = 1;
193 write while in a transaction
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
196 const void *buf, tdb_len_t len)
198 struct tdb_transaction_el *el, *best_el=NULL;
204 /* if the write is to a hash head, then update the transaction
206 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
212 /* first see if we can replace an existing entry */
213 for (el=tdb->transaction->elements_last;el;el=el->prev) {
216 if (best_el == NULL && off == el->offset+el->length) {
220 if (off+len <= el->offset) {
223 if (off >= el->offset + el->length) {
227 /* an overlapping write - needs to be split into up to
228 2 writes and a memcpy */
229 if (off < el->offset) {
230 partial = el->offset - off;
231 if (transaction_write(tdb, off, buf, partial) != 0) {
236 buf = (const void *)(partial + (const char *)buf);
238 if (off + len <= el->offset + el->length) {
241 partial = el->offset + el->length - off;
243 memcpy(el->data + (off - el->offset), buf, partial);
246 buf = (const void *)(partial + (const char *)buf);
248 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
255 /* see if we can append the new entry to an existing entry */
256 if (best_el && best_el->offset + best_el->length == off &&
257 (off+len < tdb->transaction->old_map_size ||
258 off > tdb->transaction->old_map_size)) {
259 unsigned char *data = best_el->data;
261 el->data = realloc(el->data, el->length + len);
262 if (el->data == NULL) {
263 tdb->ecode = TDB_ERR_OOM;
264 tdb->transaction->transaction_error = 1;
269 memcpy(el->data + el->length, buf, len);
271 memset(el->data + el->length, TDB_PAD_BYTE, len);
277 /* add a new entry at the end of the list */
278 el = malloc(sizeof(*el));
280 tdb->ecode = TDB_ERR_OOM;
281 tdb->transaction->transaction_error = 1;
285 el->prev = tdb->transaction->elements_last;
288 el->data = malloc(len);
289 if (el->data == NULL) {
291 tdb->ecode = TDB_ERR_OOM;
292 tdb->transaction->transaction_error = 1;
296 memcpy(el->data, buf, len);
298 memset(el->data, TDB_PAD_BYTE, len);
303 tdb->transaction->elements = el;
305 tdb->transaction->elements_last = el;
309 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
310 tdb->ecode = TDB_ERR_IO;
311 tdb->transaction->transaction_error = 1;
316 accelerated hash chain head search, using the cached hash heads
318 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
321 for (;h < tdb->header.hash_size;h++) {
322 /* the +1 takes account of the freelist */
323 if (0 != tdb->transaction->hash_heads[h+1]) {
331 out of bounds check during a transaction
333 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
335 if (len <= tdb->map_size) {
338 return TDB_ERRCODE(TDB_ERR_IO, -1);
342 transaction version of tdb_expand().
344 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
347 /* add a write to the transaction elements, so subsequent
348 reads see the zero data */
349 if (transaction_write(tdb, size, NULL, addition) != 0) {
357 brlock during a transaction - ignore them
359 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
360 int rw_type, int lck_type, int probe, size_t len)
365 static const struct tdb_methods transaction_methods = {
368 transaction_next_hash_chain,
370 transaction_expand_file,
376 start a tdb transaction. No token is returned, as only a single
377 transaction is allowed to be pending per tdb_context
379 int tdb_transaction_start(struct tdb_context *tdb)
381 /* some sanity checks */
382 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
383 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
384 tdb->ecode = TDB_ERR_EINVAL;
388 /* cope with nested tdb_transaction_start() calls */
389 if (tdb->transaction != NULL) {
390 tdb->transaction->nesting++;
391 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
392 tdb->transaction->nesting));
396 if (tdb->num_locks != 0 || tdb->global_lock.count) {
397 /* the caller must not have any locks when starting a
398 transaction as otherwise we'll be screwed by lack
399 of nested locks in posix */
400 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
401 tdb->ecode = TDB_ERR_LOCK;
405 if (tdb->travlocks.next != NULL) {
406 /* you cannot use transactions inside a traverse (although you can use
407 traverse inside a transaction) as otherwise you can end up with
409 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
410 tdb->ecode = TDB_ERR_LOCK;
414 tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
415 if (tdb->transaction == NULL) {
416 tdb->ecode = TDB_ERR_OOM;
420 /* get the transaction write lock. This is a blocking lock. As
421 discussed with Volker, there are a number of ways we could
422 make this async, which we will probably do in the future */
423 if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
424 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
425 tdb->ecode = TDB_ERR_LOCK;
426 SAFE_FREE(tdb->transaction);
430 /* get a read lock from the freelist to the end of file. This
431 is upgraded to a write lock during the commit */
432 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
433 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
434 tdb->ecode = TDB_ERR_LOCK;
438 /* setup a copy of the hash table heads so the hash scan in
439 traverse can be fast */
440 tdb->transaction->hash_heads = calloc(tdb->header.hash_size+1, sizeof(tdb_off_t));
441 if (tdb->transaction->hash_heads == NULL) {
442 tdb->ecode = TDB_ERR_OOM;
445 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
446 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
447 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
448 tdb->ecode = TDB_ERR_IO;
452 /* make sure we know about any file expansions already done by
454 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
455 tdb->transaction->old_map_size = tdb->map_size;
457 /* finally hook the io methods, replacing them with
458 transaction specific methods */
459 tdb->transaction->io_methods = tdb->methods;
460 tdb->methods = &transaction_methods;
462 /* by calling this transaction write here, we ensure that we don't grow the
463 transaction linked list due to hash table updates */
464 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
465 TDB_HASHTABLE_SIZE(tdb)) != 0) {
466 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
467 tdb->ecode = TDB_ERR_IO;
474 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
475 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
476 SAFE_FREE(tdb->transaction->hash_heads);
477 SAFE_FREE(tdb->transaction);
483 cancel the current transaction
485 int tdb_transaction_cancel(struct tdb_context *tdb)
487 if (tdb->transaction == NULL) {
488 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
492 if (tdb->transaction->nesting != 0) {
493 tdb->transaction->transaction_error = 1;
494 tdb->transaction->nesting--;
498 tdb->map_size = tdb->transaction->old_map_size;
500 /* free all the transaction elements */
501 while (tdb->transaction->elements) {
502 struct tdb_transaction_el *el = tdb->transaction->elements;
503 tdb->transaction->elements = el->next;
508 /* remove any global lock created during the transaction */
509 if (tdb->global_lock.count != 0) {
510 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
511 tdb->global_lock.count = 0;
514 /* remove any locks created during the transaction */
515 if (tdb->num_locks != 0) {
517 for (h=0;h<tdb->header.hash_size+1;h++) {
518 if (tdb->locked[h].count != 0) {
519 tdb_brlock(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
520 tdb->locked[h].count = 0;
526 /* restore the normal io methods */
527 tdb->methods = tdb->transaction->io_methods;
529 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
530 tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
531 SAFE_FREE(tdb->transaction->hash_heads);
532 SAFE_FREE(tdb->transaction);
540 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
542 if (fsync(tdb->fd) != 0) {
543 tdb->ecode = TDB_ERR_IO;
544 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
549 tdb_off_t moffset = offset & ~(tdb->page_size-1);
550 if (msync(moffset + (char *)tdb->map_ptr,
551 length + (offset - moffset), MS_SYNC) != 0) {
552 tdb->ecode = TDB_ERR_IO;
553 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
564 work out how much space the linearised recovery data will consume
566 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
568 struct tdb_transaction_el *el;
569 tdb_len_t recovery_size = 0;
571 recovery_size = sizeof(u32);
572 for (el=tdb->transaction->elements;el;el=el->next) {
573 if (el->offset >= tdb->transaction->old_map_size) {
576 recovery_size += 2*sizeof(tdb_off_t) + el->length;
579 return recovery_size;
583 allocate the recovery area, or use an existing recovery area if it is
586 static int tdb_recovery_allocate(struct tdb_context *tdb,
587 tdb_len_t *recovery_size,
588 tdb_off_t *recovery_offset,
589 tdb_len_t *recovery_max_size)
591 struct list_struct rec;
592 const struct tdb_methods *methods = tdb->transaction->io_methods;
593 tdb_off_t recovery_head;
595 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
596 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
602 if (recovery_head != 0 &&
603 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
604 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
608 *recovery_size = tdb_recovery_size(tdb);
610 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
611 /* it fits in the existing area */
612 *recovery_max_size = rec.rec_len;
613 *recovery_offset = recovery_head;
617 /* we need to free up the old recovery area, then allocate a
618 new one at the end of the file. Note that we cannot use
619 tdb_allocate() to allocate the new one as that might return
620 us an area that is being currently used (as of the start of
622 if (recovery_head != 0) {
623 if (tdb_free(tdb, recovery_head, &rec) == -1) {
624 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
629 /* the tdb_free() call might have increased the recovery size */
630 *recovery_size = tdb_recovery_size(tdb);
632 /* round up to a multiple of page size */
633 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
634 *recovery_offset = tdb->map_size;
635 recovery_head = *recovery_offset;
637 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
638 (tdb->map_size - tdb->transaction->old_map_size) +
639 sizeof(rec) + *recovery_max_size) == -1) {
640 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
644 /* remap the file (if using mmap) */
645 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
647 /* we have to reset the old map size so that we don't try to expand the file
648 again in the transaction commit, which would destroy the recovery area */
649 tdb->transaction->old_map_size = tdb->map_size;
651 /* write the recovery header offset and sync - we can sync without a race here
652 as the magic ptr in the recovery record has not been set */
653 CONVERT(recovery_head);
654 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
655 &recovery_head, sizeof(tdb_off_t)) == -1) {
656 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
665 setup the recovery data that will be used on a crash during commit
667 static int transaction_setup_recovery(struct tdb_context *tdb,
668 tdb_off_t *magic_offset)
670 struct tdb_transaction_el *el;
671 tdb_len_t recovery_size;
672 unsigned char *data, *p;
673 const struct tdb_methods *methods = tdb->transaction->io_methods;
674 struct list_struct *rec;
675 tdb_off_t recovery_offset, recovery_max_size;
676 tdb_off_t old_map_size = tdb->transaction->old_map_size;
680 check that the recovery area has enough space
682 if (tdb_recovery_allocate(tdb, &recovery_size,
683 &recovery_offset, &recovery_max_size) == -1) {
687 data = malloc(recovery_size + sizeof(*rec));
689 tdb->ecode = TDB_ERR_OOM;
693 rec = (struct list_struct *)data;
694 memset(rec, 0, sizeof(*rec));
697 rec->data_len = recovery_size;
698 rec->rec_len = recovery_max_size;
699 rec->key_len = old_map_size;
702 /* build the recovery data into a single blob to allow us to do a single
703 large write, which should be more efficient */
704 p = data + sizeof(*rec);
705 for (el=tdb->transaction->elements;el;el=el->next) {
706 if (el->offset >= old_map_size) {
709 if (el->offset + el->length > tdb->transaction->old_map_size) {
710 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
712 tdb->ecode = TDB_ERR_CORRUPT;
715 memcpy(p, &el->offset, 4);
716 memcpy(p+4, &el->length, 4);
720 /* the recovery area contains the old data, not the
721 new data, so we have to call the original tdb_read
723 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
725 tdb->ecode = TDB_ERR_IO;
732 tailer = sizeof(*rec) + recovery_max_size;
733 memcpy(p, &tailer, 4);
736 /* write the recovery data to the recovery area */
737 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
738 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
740 tdb->ecode = TDB_ERR_IO;
744 /* as we don't have ordered writes, we have to sync the recovery
745 data before we update the magic to indicate that the recovery
747 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
754 magic = TDB_RECOVERY_MAGIC;
757 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
759 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
760 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
761 tdb->ecode = TDB_ERR_IO;
765 /* ensure the recovery magic marker is on disk */
766 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
774 commit the current transaction
776 int tdb_transaction_commit(struct tdb_context *tdb)
778 const struct tdb_methods *methods;
779 tdb_off_t magic_offset = 0;
782 if (tdb->transaction == NULL) {
783 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
787 if (tdb->transaction->transaction_error) {
788 tdb->ecode = TDB_ERR_IO;
789 tdb_transaction_cancel(tdb);
790 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
794 if (tdb->transaction->nesting != 0) {
795 tdb->transaction->nesting--;
799 /* check for a null transaction */
800 if (tdb->transaction->elements == NULL) {
801 tdb_transaction_cancel(tdb);
805 methods = tdb->transaction->io_methods;
807 /* if there are any locks pending then the caller has not
808 nested their locks properly, so fail the transaction */
809 if (tdb->num_locks || tdb->global_lock.count) {
810 tdb->ecode = TDB_ERR_LOCK;
811 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
812 tdb_transaction_cancel(tdb);
816 /* upgrade the main transaction lock region to a write lock */
817 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
818 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
819 tdb->ecode = TDB_ERR_LOCK;
820 tdb_transaction_cancel(tdb);
824 /* get the global lock - this prevents new users attaching to the database
826 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
827 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
828 tdb->ecode = TDB_ERR_LOCK;
829 tdb_transaction_cancel(tdb);
833 if (!(tdb->flags & TDB_NOSYNC)) {
834 /* write the recovery data to the end of the file */
835 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
836 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
837 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
838 tdb_transaction_cancel(tdb);
843 /* expand the file to the new size if needed */
844 if (tdb->map_size != tdb->transaction->old_map_size) {
845 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
847 tdb->transaction->old_map_size) == -1) {
848 tdb->ecode = TDB_ERR_IO;
849 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
850 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
851 tdb_transaction_cancel(tdb);
854 tdb->map_size = tdb->transaction->old_map_size;
855 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
858 /* perform all the writes */
859 while (tdb->transaction->elements) {
860 struct tdb_transaction_el *el = tdb->transaction->elements;
862 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
863 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
865 /* we've overwritten part of the data and
866 possibly expanded the file, so we need to
867 run the crash recovery code */
868 tdb->methods = methods;
869 tdb_transaction_recover(tdb);
871 tdb_transaction_cancel(tdb);
872 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
874 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
877 tdb->transaction->elements = el->next;
882 if (!(tdb->flags & TDB_NOSYNC)) {
883 /* ensure the new data is on disk */
884 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
888 /* remove the recovery marker */
889 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
890 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
894 /* ensure the recovery marker has been removed on disk */
895 if (transaction_sync(tdb, magic_offset, 4) == -1) {
900 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
903 TODO: maybe write to some dummy hdr field, or write to magic
904 offset without mmap, before the last sync, instead of the
908 /* on some systems (like Linux 2.6.x) changes via mmap/msync
909 don't change the mtime of the file, this means the file may
910 not be backed up (as tdb rounding to block sizes means that
911 file size changes are quite rare too). The following forces
912 mtime changes when a transaction completes */
914 utime(tdb->name, NULL);
917 /* use a transaction cancel to free memory and remove the
919 tdb_transaction_cancel(tdb);
925 recover from an aborted transaction. Must be called with exclusive
926 database write access already established (including the global
927 lock to prevent new processes attaching)
929 int tdb_transaction_recover(struct tdb_context *tdb)
931 tdb_off_t recovery_head, recovery_eof;
932 unsigned char *data, *p;
934 struct list_struct rec;
936 /* find the recovery area */
937 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
938 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
939 tdb->ecode = TDB_ERR_IO;
943 if (recovery_head == 0) {
944 /* we have never allocated a recovery record */
948 /* read the recovery record */
949 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
950 sizeof(rec), DOCONV()) == -1) {
951 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
952 tdb->ecode = TDB_ERR_IO;
956 if (rec.magic != TDB_RECOVERY_MAGIC) {
957 /* there is no valid recovery data */
961 if (tdb->read_only) {
962 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
963 tdb->ecode = TDB_ERR_CORRUPT;
967 recovery_eof = rec.key_len;
969 data = malloc(rec.data_len);
971 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
972 tdb->ecode = TDB_ERR_OOM;
976 /* read the full recovery data */
977 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
978 rec.data_len, 0) == -1) {
979 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
980 tdb->ecode = TDB_ERR_IO;
984 /* recover the file data */
986 while (p+8 < data + rec.data_len) {
992 memcpy(&len, p+4, 4);
994 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
996 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
997 tdb->ecode = TDB_ERR_IO;
1005 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1006 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1007 tdb->ecode = TDB_ERR_IO;
1011 /* if the recovery area is after the recovered eof then remove it */
1012 if (recovery_eof <= recovery_head) {
1013 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1014 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1015 tdb->ecode = TDB_ERR_IO;
1020 /* remove the recovery magic */
1021 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1023 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1024 tdb->ecode = TDB_ERR_IO;
1028 /* reduce the file size to the old size */
1030 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1031 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1032 tdb->ecode = TDB_ERR_IO;
1035 tdb->map_size = recovery_eof;
1038 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1039 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1040 tdb->ecode = TDB_ERR_IO;
1044 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",