2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 3 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, write to the Free Software
24 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #include "tdb_private.h"
32 - only allow a single transaction at a time per database. This makes
33 using the transaction API simpler, as otherwise the caller would
34 have to cope with temporary failures in transactions that conflict
35 with other current transactions
37 - keep the transaction recovery information in the same file as the
38 database, using a special 'transaction recovery' record pointed at
39 by the header. This removes the need for extra journal files as
40 used by some other databases
42 - dynamically allocated the transaction recover record, re-using it
43 for subsequent transactions. If a larger record is needed then
44 tdb_free() the old record to place it on the normal tdb freelist
45 before allocating the new record
47 - during transactions, keep a linked list of writes all that have
48 been performed by intercepting all tdb_write() calls. The hooked
49 transaction versions of tdb_read() and tdb_write() check this
50 linked list and try to use the elements of the list in preference
53 - don't allow any locks to be held when a transaction starts,
54 otherwise we can end up with deadlock (plus lack of lock nesting
55 in posix locks would mean the lock is lost)
57 - if the caller gains a lock during the transaction but doesn't
58 release it then fail the commit
60 - allow for nested calls to tdb_transaction_start(), re-using the
61 existing transaction record. If the inner transaction is cancelled
62 then a subsequent commit will fail
64 - keep a mirrored copy of the tdb hash chain heads to allow for the
65 fast hash heads scan on traverse, updating the mirrored copy in
66 the transaction version of tdb_write
68 - allow callers to mix transaction and non-transaction use of tdb,
69 although once a transaction is started then an exclusive lock is
70 gained until the transaction is committed or cancelled
72 - the commit stategy involves first saving away all modified data
73 into a linearised buffer in the transaction recovery area, then
74 marking the transaction recovery area with a magic value to
75 indicate a valid recovery record. In total 4 fsync/msync calls are
76 needed per commit to prevent race conditions. It might be possible
77 to reduce this to 3 or even 2 with some more work.
79 - check for a valid recovery record on open of the tdb, while the
80 global lock is held. Automatically recover from the transaction
81 recovery area if needed, then continue with the open as
82 usual. This allows for smooth crash recovery with no administrator
85 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86 still available, but no transaction recovery area is used and no
87 fsync/msync calls are made.
91 struct tdb_transaction_el {
92 struct tdb_transaction_el *next, *prev;
99 hold the context of any current transaction
101 struct tdb_transaction {
102 /* we keep a mirrored copy of the tdb hash heads here so
103 tdb_next_hash_chain() can operate efficiently */
106 /* the original io methods - used to do IOs to the real db */
107 const struct tdb_methods *io_methods;
109 /* the list of transaction elements. We use a doubly linked
110 list with a last pointer to allow us to keep the list
111 ordered, with first element at the front of the list. It
112 needs to be doubly linked as the read/write traversals need
113 to be backwards, while the commit needs to be forwards */
114 struct tdb_transaction_el *elements, *elements_last;
116 /* non-zero when an internal transaction error has
117 occurred. All write operations will then fail until the
118 transaction is ended */
119 int transaction_error;
121 /* when inside a transaction we need to keep track of any
122 nested tdb_transaction_start() calls, as these are allowed,
123 but don't create a new transaction */
126 /* old file size before transaction */
127 tdb_len_t old_map_size;
132 read while in a transaction. We need to check first if the data is in our list
133 of transaction elements, then if not do a real read
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
136 tdb_len_t len, int cv)
138 struct tdb_transaction_el *el;
140 /* we need to walk the list backwards to get the most recent data */
141 for (el=tdb->transaction->elements_last;el;el=el->prev) {
144 if (off+len <= el->offset) {
147 if (off >= el->offset + el->length) {
151 /* an overlapping read - needs to be split into up to
152 2 reads and a memcpy */
153 if (off < el->offset) {
154 partial = el->offset - off;
155 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
160 buf = (void *)(partial + (char *)buf);
162 if (off + len <= el->offset + el->length) {
165 partial = el->offset + el->length - off;
167 memcpy(buf, el->data + (off - el->offset), partial);
169 tdb_convert(buf, len);
173 buf = (void *)(partial + (char *)buf);
175 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
182 /* its not in the transaction elements - do a real read */
183 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
186 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
187 tdb->ecode = TDB_ERR_IO;
188 tdb->transaction->transaction_error = 1;
194 write while in a transaction
196 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
197 const void *buf, tdb_len_t len)
199 struct tdb_transaction_el *el, *best_el=NULL;
205 /* if the write is to a hash head, then update the transaction
207 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
208 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
209 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
210 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
213 /* first see if we can replace an existing entry */
214 for (el=tdb->transaction->elements_last;el;el=el->prev) {
217 if (best_el == NULL && off == el->offset+el->length) {
221 if (off+len <= el->offset) {
224 if (off >= el->offset + el->length) {
228 /* an overlapping write - needs to be split into up to
229 2 writes and a memcpy */
230 if (off < el->offset) {
231 partial = el->offset - off;
232 if (transaction_write(tdb, off, buf, partial) != 0) {
237 buf = (const void *)(partial + (const char *)buf);
239 if (off + len <= el->offset + el->length) {
242 partial = el->offset + el->length - off;
244 memcpy(el->data + (off - el->offset), buf, partial);
247 buf = (const void *)(partial + (const char *)buf);
249 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
256 /* see if we can append the new entry to an existing entry */
257 if (best_el && best_el->offset + best_el->length == off &&
258 (off+len < tdb->transaction->old_map_size ||
259 off > tdb->transaction->old_map_size)) {
260 unsigned char *data = best_el->data;
262 el->data = (unsigned char *)realloc(el->data,
264 if (el->data == NULL) {
265 tdb->ecode = TDB_ERR_OOM;
266 tdb->transaction->transaction_error = 1;
271 memcpy(el->data + el->length, buf, len);
273 memset(el->data + el->length, TDB_PAD_BYTE, len);
279 /* add a new entry at the end of the list */
280 el = (struct tdb_transaction_el *)malloc(sizeof(*el));
282 tdb->ecode = TDB_ERR_OOM;
283 tdb->transaction->transaction_error = 1;
287 el->prev = tdb->transaction->elements_last;
290 el->data = (unsigned char *)malloc(len);
291 if (el->data == NULL) {
293 tdb->ecode = TDB_ERR_OOM;
294 tdb->transaction->transaction_error = 1;
298 memcpy(el->data, buf, len);
300 memset(el->data, TDB_PAD_BYTE, len);
305 tdb->transaction->elements = el;
307 tdb->transaction->elements_last = el;
311 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
312 tdb->ecode = TDB_ERR_IO;
313 tdb->transaction->transaction_error = 1;
318 accelerated hash chain head search, using the cached hash heads
320 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
323 for (;h < tdb->header.hash_size;h++) {
324 /* the +1 takes account of the freelist */
325 if (0 != tdb->transaction->hash_heads[h+1]) {
333 out of bounds check during a transaction
335 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
337 if (len <= tdb->map_size) {
340 return TDB_ERRCODE(TDB_ERR_IO, -1);
344 transaction version of tdb_expand().
346 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
349 /* add a write to the transaction elements, so subsequent
350 reads see the zero data */
351 if (transaction_write(tdb, size, NULL, addition) != 0) {
359 brlock during a transaction - ignore them
361 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
362 int rw_type, int lck_type, int probe, size_t len)
367 static const struct tdb_methods transaction_methods = {
370 transaction_next_hash_chain,
372 transaction_expand_file,
378 start a tdb transaction. No token is returned, as only a single
379 transaction is allowed to be pending per tdb_context
381 int tdb_transaction_start(struct tdb_context *tdb)
383 /* some sanity checks */
384 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
385 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
386 tdb->ecode = TDB_ERR_EINVAL;
390 /* cope with nested tdb_transaction_start() calls */
391 if (tdb->transaction != NULL) {
392 tdb->transaction->nesting++;
393 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
394 tdb->transaction->nesting));
398 if (tdb->num_locks != 0 || tdb->global_lock.count) {
399 /* the caller must not have any locks when starting a
400 transaction as otherwise we'll be screwed by lack
401 of nested locks in posix */
402 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
403 tdb->ecode = TDB_ERR_LOCK;
407 if (tdb->travlocks.next != NULL) {
408 /* you cannot use transactions inside a traverse (although you can use
409 traverse inside a transaction) as otherwise you can end up with
411 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
412 tdb->ecode = TDB_ERR_LOCK;
416 tdb->transaction = (struct tdb_transaction *)
417 calloc(sizeof(struct tdb_transaction), 1);
418 if (tdb->transaction == NULL) {
419 tdb->ecode = TDB_ERR_OOM;
423 /* get the transaction write lock. This is a blocking lock. As
424 discussed with Volker, there are a number of ways we could
425 make this async, which we will probably do in the future */
426 if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
427 SAFE_FREE(tdb->transaction);
431 /* get a read lock from the freelist to the end of file. This
432 is upgraded to a write lock during the commit */
433 if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
434 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
435 tdb->ecode = TDB_ERR_LOCK;
439 /* setup a copy of the hash table heads so the hash scan in
440 traverse can be fast */
441 tdb->transaction->hash_heads = (u32 *)
442 calloc(tdb->header.hash_size+1, sizeof(u32));
443 if (tdb->transaction->hash_heads == NULL) {
444 tdb->ecode = TDB_ERR_OOM;
447 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
448 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
449 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
450 tdb->ecode = TDB_ERR_IO;
454 /* make sure we know about any file expansions already done by
456 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
457 tdb->transaction->old_map_size = tdb->map_size;
459 /* finally hook the io methods, replacing them with
460 transaction specific methods */
461 tdb->transaction->io_methods = tdb->methods;
462 tdb->methods = &transaction_methods;
464 /* by calling this transaction write here, we ensure that we don't grow the
465 transaction linked list due to hash table updates */
466 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
467 TDB_HASHTABLE_SIZE(tdb)) != 0) {
468 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
469 tdb->ecode = TDB_ERR_IO;
470 tdb->methods = tdb->transaction->io_methods;
477 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
478 tdb_transaction_unlock(tdb);
479 SAFE_FREE(tdb->transaction->hash_heads);
480 SAFE_FREE(tdb->transaction);
486 cancel the current transaction
488 int tdb_transaction_cancel(struct tdb_context *tdb)
490 if (tdb->transaction == NULL) {
491 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
495 if (tdb->transaction->nesting != 0) {
496 tdb->transaction->transaction_error = 1;
497 tdb->transaction->nesting--;
501 tdb->map_size = tdb->transaction->old_map_size;
503 /* free all the transaction elements */
504 while (tdb->transaction->elements) {
505 struct tdb_transaction_el *el = tdb->transaction->elements;
506 tdb->transaction->elements = el->next;
511 /* remove any global lock created during the transaction */
512 if (tdb->global_lock.count != 0) {
513 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
514 tdb->global_lock.count = 0;
517 /* remove any locks created during the transaction */
518 if (tdb->num_locks != 0) {
520 for (i=0;i<tdb->num_lockrecs;i++) {
521 tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
522 F_UNLCK,F_SETLKW, 0, 1);
525 tdb->num_lockrecs = 0;
526 SAFE_FREE(tdb->lockrecs);
529 /* restore the normal io methods */
530 tdb->methods = tdb->transaction->io_methods;
532 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
533 tdb_transaction_unlock(tdb);
534 SAFE_FREE(tdb->transaction->hash_heads);
535 SAFE_FREE(tdb->transaction);
543 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
545 if (fsync(tdb->fd) != 0) {
546 tdb->ecode = TDB_ERR_IO;
547 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
552 tdb_off_t moffset = offset & ~(tdb->page_size-1);
553 if (msync(moffset + (char *)tdb->map_ptr,
554 length + (offset - moffset), MS_SYNC) != 0) {
555 tdb->ecode = TDB_ERR_IO;
556 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
567 work out how much space the linearised recovery data will consume
569 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
571 struct tdb_transaction_el *el;
572 tdb_len_t recovery_size = 0;
574 recovery_size = sizeof(u32);
575 for (el=tdb->transaction->elements;el;el=el->next) {
576 if (el->offset >= tdb->transaction->old_map_size) {
579 recovery_size += 2*sizeof(tdb_off_t) + el->length;
582 return recovery_size;
586 allocate the recovery area, or use an existing recovery area if it is
589 static int tdb_recovery_allocate(struct tdb_context *tdb,
590 tdb_len_t *recovery_size,
591 tdb_off_t *recovery_offset,
592 tdb_len_t *recovery_max_size)
594 struct list_struct rec;
595 const struct tdb_methods *methods = tdb->transaction->io_methods;
596 tdb_off_t recovery_head;
598 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
599 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
605 if (recovery_head != 0 &&
606 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
607 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
611 *recovery_size = tdb_recovery_size(tdb);
613 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
614 /* it fits in the existing area */
615 *recovery_max_size = rec.rec_len;
616 *recovery_offset = recovery_head;
620 /* we need to free up the old recovery area, then allocate a
621 new one at the end of the file. Note that we cannot use
622 tdb_allocate() to allocate the new one as that might return
623 us an area that is being currently used (as of the start of
625 if (recovery_head != 0) {
626 if (tdb_free(tdb, recovery_head, &rec) == -1) {
627 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
632 /* the tdb_free() call might have increased the recovery size */
633 *recovery_size = tdb_recovery_size(tdb);
635 /* round up to a multiple of page size */
636 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
637 *recovery_offset = tdb->map_size;
638 recovery_head = *recovery_offset;
640 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
641 (tdb->map_size - tdb->transaction->old_map_size) +
642 sizeof(rec) + *recovery_max_size) == -1) {
643 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
647 /* remap the file (if using mmap) */
648 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
650 /* we have to reset the old map size so that we don't try to expand the file
651 again in the transaction commit, which would destroy the recovery area */
652 tdb->transaction->old_map_size = tdb->map_size;
654 /* write the recovery header offset and sync - we can sync without a race here
655 as the magic ptr in the recovery record has not been set */
656 CONVERT(recovery_head);
657 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
658 &recovery_head, sizeof(tdb_off_t)) == -1) {
659 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
668 setup the recovery data that will be used on a crash during commit
670 static int transaction_setup_recovery(struct tdb_context *tdb,
671 tdb_off_t *magic_offset)
673 struct tdb_transaction_el *el;
674 tdb_len_t recovery_size;
675 unsigned char *data, *p;
676 const struct tdb_methods *methods = tdb->transaction->io_methods;
677 struct list_struct *rec;
678 tdb_off_t recovery_offset, recovery_max_size;
679 tdb_off_t old_map_size = tdb->transaction->old_map_size;
683 check that the recovery area has enough space
685 if (tdb_recovery_allocate(tdb, &recovery_size,
686 &recovery_offset, &recovery_max_size) == -1) {
690 data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
692 tdb->ecode = TDB_ERR_OOM;
696 rec = (struct list_struct *)data;
697 memset(rec, 0, sizeof(*rec));
700 rec->data_len = recovery_size;
701 rec->rec_len = recovery_max_size;
702 rec->key_len = old_map_size;
705 /* build the recovery data into a single blob to allow us to do a single
706 large write, which should be more efficient */
707 p = data + sizeof(*rec);
708 for (el=tdb->transaction->elements;el;el=el->next) {
709 if (el->offset >= old_map_size) {
712 if (el->offset + el->length > tdb->transaction->old_map_size) {
713 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
715 tdb->ecode = TDB_ERR_CORRUPT;
718 memcpy(p, &el->offset, 4);
719 memcpy(p+4, &el->length, 4);
723 /* the recovery area contains the old data, not the
724 new data, so we have to call the original tdb_read
726 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
728 tdb->ecode = TDB_ERR_IO;
735 tailer = sizeof(*rec) + recovery_max_size;
736 memcpy(p, &tailer, 4);
739 /* write the recovery data to the recovery area */
740 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
741 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
743 tdb->ecode = TDB_ERR_IO;
747 /* as we don't have ordered writes, we have to sync the recovery
748 data before we update the magic to indicate that the recovery
750 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
757 magic = TDB_RECOVERY_MAGIC;
760 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
762 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
763 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
764 tdb->ecode = TDB_ERR_IO;
768 /* ensure the recovery magic marker is on disk */
769 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
777 commit the current transaction
779 int tdb_transaction_commit(struct tdb_context *tdb)
781 const struct tdb_methods *methods;
782 tdb_off_t magic_offset = 0;
785 if (tdb->transaction == NULL) {
786 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
790 if (tdb->transaction->transaction_error) {
791 tdb->ecode = TDB_ERR_IO;
792 tdb_transaction_cancel(tdb);
793 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
797 if (tdb->transaction->nesting != 0) {
798 tdb->transaction->nesting--;
802 /* check for a null transaction */
803 if (tdb->transaction->elements == NULL) {
804 tdb_transaction_cancel(tdb);
808 methods = tdb->transaction->io_methods;
810 /* if there are any locks pending then the caller has not
811 nested their locks properly, so fail the transaction */
812 if (tdb->num_locks || tdb->global_lock.count) {
813 tdb->ecode = TDB_ERR_LOCK;
814 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
815 tdb_transaction_cancel(tdb);
819 /* upgrade the main transaction lock region to a write lock */
820 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
821 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
822 tdb->ecode = TDB_ERR_LOCK;
823 tdb_transaction_cancel(tdb);
827 /* get the global lock - this prevents new users attaching to the database
829 if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
830 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
831 tdb->ecode = TDB_ERR_LOCK;
832 tdb_transaction_cancel(tdb);
836 if (!(tdb->flags & TDB_NOSYNC)) {
837 /* write the recovery data to the end of the file */
838 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
839 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
840 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
841 tdb_transaction_cancel(tdb);
846 /* expand the file to the new size if needed */
847 if (tdb->map_size != tdb->transaction->old_map_size) {
848 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
850 tdb->transaction->old_map_size) == -1) {
851 tdb->ecode = TDB_ERR_IO;
852 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
853 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
854 tdb_transaction_cancel(tdb);
857 tdb->map_size = tdb->transaction->old_map_size;
858 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
861 /* perform all the writes */
862 while (tdb->transaction->elements) {
863 struct tdb_transaction_el *el = tdb->transaction->elements;
865 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
866 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
868 /* we've overwritten part of the data and
869 possibly expanded the file, so we need to
870 run the crash recovery code */
871 tdb->methods = methods;
872 tdb_transaction_recover(tdb);
874 tdb_transaction_cancel(tdb);
875 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
877 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
880 tdb->transaction->elements = el->next;
885 if (!(tdb->flags & TDB_NOSYNC)) {
886 /* ensure the new data is on disk */
887 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
891 /* remove the recovery marker */
892 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
893 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
897 /* ensure the recovery marker has been removed on disk */
898 if (transaction_sync(tdb, magic_offset, 4) == -1) {
903 tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
906 TODO: maybe write to some dummy hdr field, or write to magic
907 offset without mmap, before the last sync, instead of the
911 /* on some systems (like Linux 2.6.x) changes via mmap/msync
912 don't change the mtime of the file, this means the file may
913 not be backed up (as tdb rounding to block sizes means that
914 file size changes are quite rare too). The following forces
915 mtime changes when a transaction completes */
917 utime(tdb->name, NULL);
920 /* use a transaction cancel to free memory and remove the
922 tdb_transaction_cancel(tdb);
928 recover from an aborted transaction. Must be called with exclusive
929 database write access already established (including the global
930 lock to prevent new processes attaching)
932 int tdb_transaction_recover(struct tdb_context *tdb)
934 tdb_off_t recovery_head, recovery_eof;
935 unsigned char *data, *p;
937 struct list_struct rec;
939 /* find the recovery area */
940 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
941 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
942 tdb->ecode = TDB_ERR_IO;
946 if (recovery_head == 0) {
947 /* we have never allocated a recovery record */
951 /* read the recovery record */
952 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
953 sizeof(rec), DOCONV()) == -1) {
954 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
955 tdb->ecode = TDB_ERR_IO;
959 if (rec.magic != TDB_RECOVERY_MAGIC) {
960 /* there is no valid recovery data */
964 if (tdb->read_only) {
965 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
966 tdb->ecode = TDB_ERR_CORRUPT;
970 recovery_eof = rec.key_len;
972 data = (unsigned char *)malloc(rec.data_len);
974 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
975 tdb->ecode = TDB_ERR_OOM;
979 /* read the full recovery data */
980 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
981 rec.data_len, 0) == -1) {
982 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
983 tdb->ecode = TDB_ERR_IO;
987 /* recover the file data */
989 while (p+8 < data + rec.data_len) {
995 memcpy(&len, p+4, 4);
997 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
999 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1000 tdb->ecode = TDB_ERR_IO;
1008 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1009 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1010 tdb->ecode = TDB_ERR_IO;
1014 /* if the recovery area is after the recovered eof then remove it */
1015 if (recovery_eof <= recovery_head) {
1016 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1017 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1018 tdb->ecode = TDB_ERR_IO;
1023 /* remove the recovery magic */
1024 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1026 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1027 tdb->ecode = TDB_ERR_IO;
1031 /* reduce the file size to the old size */
1033 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1034 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1035 tdb->ecode = TDB_ERR_IO;
1038 tdb->map_size = recovery_eof;
1041 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1042 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1043 tdb->ecode = TDB_ERR_IO;
1047 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n",