2 Unix SMB/CIFS implementation.
4 trivial database library
6 Copyright (C) Andrew Tridgell 2005
8 ** NOTE! The following LGPL license applies to the tdb
9 ** library. This does NOT imply that all of Samba is released
12 This library is free software; you can redistribute it and/or
13 modify it under the terms of the GNU Lesser General Public
14 License as published by the Free Software Foundation; either
15 version 2 of the License, or (at your option) any later version.
17 This library is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
20 Lesser General Public License for more details.
22 You should have received a copy of the GNU Lesser General Public
23 License along with this library; if not, write to the Free Software
24 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
27 #include "tdb_private.h"
32 - only allow a single transaction at a time per database. This makes
33 using the transaction API simpler, as otherwise the caller would
34 have to cope with temporary failures in transactions that conflict
35 with other current transactions
37 - keep the transaction recovery information in the same file as the
38 database, using a special 'transaction recovery' record pointed at
39 by the header. This removes the need for extra journal files as
40 used by some other databases
42 - dymacially allocated the transaction recover record, re-using it
43 for subsequent transactions. If a larger record is needed then
44 tdb_free() the old record to place it on the normal tdb freelist
45 before allocating the new record
47 - during transactions, keep a linked list of writes all that have
48 been performed by intercepting all tdb_write() calls. The hooked
49 transaction versions of tdb_read() and tdb_write() check this
50 linked list and try to use the elements of the list in preference
53 - don't allow any locks to be held when a transaction starts,
54 otherwise we can end up with deadlock (plus lack of lock nesting
55 in posix locks would mean the lock is lost)
57 - if the caller gains a lock during the transaction but doesn't
58 release it then fail the commit
60 - allow for nested calls to tdb_transaction_start(), re-using the
61 existing transaction record. If the inner transaction is cancelled
62 then a subsequent commit will fail
64 - keep a mirrored copy of the tdb hash chain heads to allow for the
65 fast hash heads scan on traverse, updating the mirrored copy in
66 the transaction version of tdb_write
68 - allow callers to mix transaction and non-transaction use of tdb,
69 although once a transaction is started then an exclusive lock is
70 gained until the transaction is committed or cancelled
72 - the commit stategy involves first saving away all modified data
73 into a linearised buffer in the transaction recovery area, then
74 marking the transaction recovery area with a magic value to
75 indicate a valid recovery record. In total 4 fsync/msync calls are
76 needed per commit to prevent race conditions. It might be possible
77 to reduce this to 3 or even 2 with some more work.
79 - check for a valid recovery record on open of the tdb, while the
80 global lock is held. Automatically recover from the transaction
81 recovery area if needed, then continue with the open as
82 usual. This allows for smooth crash recovery with no administrator
85 - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86 still available, but no transaction recovery area is used and no
87 fsync/msync calls are made.
93 hold the context of any current transaction
95 struct tdb_transaction {
96 /* we keep a mirrored copy of the tdb hash heads here so
97 tdb_next_hash_chain() can operate efficiently */
100 /* the original io methods - used to do IOs to the real db */
101 const struct tdb_methods *io_methods;
103 /* the list of transaction elements. We use a doubly linked
104 list with a last pointer to allow us to keep the list
105 ordered, with first element at the front of the list. It
106 needs to be doubly linked as the read/write traversals need
107 to be backwards, while the commit needs to be forwards */
108 struct tdb_transaction_el {
109 struct tdb_transaction_el *next, *prev;
113 } *elements, *elements_last;
115 /* non-zero when an internal transaction error has
116 occurred. All write operations will then fail until the
117 transaction is ended */
118 int transaction_error;
120 /* when inside a transaction we need to keep track of any
121 nested tdb_transaction_start() calls, as these are allowed,
122 but don't create a new transaction */
125 /* old file size before transaction */
126 tdb_len_t old_map_size;
131 read while in a transaction. We need to check first if the data is in our list
132 of transaction elements, then if not do a real read
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
135 tdb_len_t len, int cv)
137 struct tdb_transaction_el *el;
139 /* we need to walk the list backwards to get the most recent data */
140 for (el=tdb->transaction->elements_last;el;el=el->prev) {
143 if (off+len <= el->offset) {
146 if (off >= el->offset + el->length) {
150 /* an overlapping read - needs to be split into up to
151 2 reads and a memcpy */
152 if (off < el->offset) {
153 partial = el->offset - off;
154 if (transaction_read(tdb, off, buf, partial, cv) != 0) {
159 buf = (void *)(partial + (char *)buf);
161 if (off + len <= el->offset + el->length) {
164 partial = el->offset + el->length - off;
166 memcpy(buf, el->data + (off - el->offset), partial);
168 tdb_convert(buf, len);
172 buf = (void *)(partial + (char *)buf);
174 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
181 /* its not in the transaction elements - do a real read */
182 return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
185 TDB_LOG((tdb, 0, "transaction_read: failed at off=%d len=%d\n", off, len));
186 tdb->ecode = TDB_ERR_IO;
187 tdb->transaction->transaction_error = 1;
193 write while in a transaction
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
196 const void *buf, tdb_len_t len)
198 struct tdb_transaction_el *el, *best_el=NULL;
204 /* if the write is to a hash head, then update the transaction
206 if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207 off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
212 /* first see if we can replace an existing entry */
213 for (el=tdb->transaction->elements_last;el;el=el->prev) {
216 if (best_el == NULL && off == el->offset+el->length) {
220 if (off+len <= el->offset) {
223 if (off >= el->offset + el->length) {
227 /* an overlapping write - needs to be split into up to
228 2 writes and a memcpy */
229 if (off < el->offset) {
230 partial = el->offset - off;
231 if (transaction_write(tdb, off, buf, partial) != 0) {
236 buf = (const void *)(partial + (const char *)buf);
238 if (off + len <= el->offset + el->length) {
241 partial = el->offset + el->length - off;
243 memcpy(el->data + (off - el->offset), buf, partial);
246 buf = (const void *)(partial + (const char *)buf);
248 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
255 /* see if we can append the new entry to an existing entry */
256 if (best_el && best_el->offset + best_el->length == off &&
257 (off+len < tdb->transaction->old_map_size ||
258 off > tdb->transaction->old_map_size)) {
259 unsigned char *data = best_el->data;
261 el->data = realloc(el->data, el->length + len);
262 if (el->data == NULL) {
263 tdb->ecode = TDB_ERR_OOM;
264 tdb->transaction->transaction_error = 1;
269 memcpy(el->data + el->length, buf, len);
271 memset(el->data + el->length, TDB_PAD_BYTE, len);
277 /* add a new entry at the end of the list */
278 el = malloc(sizeof(*el));
280 tdb->ecode = TDB_ERR_OOM;
281 tdb->transaction->transaction_error = 1;
285 el->prev = tdb->transaction->elements_last;
288 el->data = malloc(len);
289 if (el->data == NULL) {
291 tdb->ecode = TDB_ERR_OOM;
292 tdb->transaction->transaction_error = 1;
296 memcpy(el->data, buf, len);
298 memset(el->data, TDB_PAD_BYTE, len);
303 tdb->transaction->elements = el;
305 tdb->transaction->elements_last = el;
309 TDB_LOG((tdb, 0, "transaction_write: failed at off=%d len=%d\n", off, len));
310 tdb->ecode = TDB_ERR_IO;
311 tdb->transaction->transaction_error = 1;
316 accelerated hash chain head search, using the cached hash heads
318 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
321 for (;h < tdb->header.hash_size;h++) {
322 /* the +1 takes account of the freelist */
323 if (0 != tdb->transaction->hash_heads[h+1]) {
331 out of bounds check during a transaction
333 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
335 if (len <= tdb->map_size) {
338 return TDB_ERRCODE(TDB_ERR_IO, -1);
342 transaction version of tdb_expand().
344 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
347 /* add a write to the transaction elements, so subsequent
348 reads see the zero data */
349 if (transaction_write(tdb, size, NULL, addition) != 0) {
357 brlock during a transaction - ignore them
359 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset,
360 int rw_type, int lck_type, int probe)
365 static const struct tdb_methods transaction_methods = {
368 transaction_next_hash_chain,
370 transaction_expand_file,
376 start a tdb transaction. No token is returned, as only a single
377 transaction is allowed to be pending per tdb_context
379 int tdb_transaction_start(struct tdb_context *tdb)
381 /* some sanity checks */
382 if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
383 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
384 tdb->ecode = TDB_ERR_EINVAL;
388 /* cope with nested tdb_transaction_start() calls */
389 if (tdb->transaction != NULL) {
390 tdb->transaction->nesting++;
391 TDB_LOG((tdb, 0, "tdb_transaction_start: nesting %d\n",
392 tdb->transaction->nesting));
396 if (tdb->num_locks != 0) {
397 /* the caller must not have any locks when starting a
398 transaction as otherwise we'll be screwed by lack
399 of nested locks in posix */
400 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction with locks held\n"));
401 tdb->ecode = TDB_ERR_LOCK;
405 if (tdb->travlocks.next != NULL) {
406 /* you cannot use transactions inside a traverse (although you can use
407 traverse inside a transaction) as otherwise you can end up with
409 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
410 tdb->ecode = TDB_ERR_LOCK;
414 tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
415 if (tdb->transaction == NULL) {
416 tdb->ecode = TDB_ERR_OOM;
420 /* get the transaction write lock. This is a blocking lock. As
421 discussed with Volker, there are a number of ways we could
422 make this async, which we will probably do in the future */
423 if (tdb_brlock_len(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
424 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get transaction lock\n"));
425 tdb->ecode = TDB_ERR_LOCK;
426 SAFE_FREE(tdb->transaction);
430 /* get a read lock from the freelist to the end of file. This
431 is upgraded to a write lock during the commit */
432 if (tdb_brlock_len(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
433 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get hash locks\n"));
434 tdb->ecode = TDB_ERR_LOCK;
438 /* setup a copy of the hash table heads so the hash scan in
439 traverse can be fast */
440 tdb->transaction->hash_heads = calloc(tdb->header.hash_size+1, sizeof(tdb_off_t));
441 if (tdb->transaction->hash_heads == NULL) {
442 tdb->ecode = TDB_ERR_OOM;
445 if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
446 TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
447 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to read hash heads\n"));
448 tdb->ecode = TDB_ERR_IO;
452 /* make sure we know about any file expansions already done by
454 tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
455 tdb->transaction->old_map_size = tdb->map_size;
457 /* finally hook the io methods, replacing them with
458 transaction specific methods */
459 tdb->transaction->io_methods = tdb->methods;
460 tdb->methods = &transaction_methods;
462 /* by calling this transaction write here, we ensure that we don't grow the
463 transaction linked list due to hash table updates */
464 if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
465 TDB_HASHTABLE_SIZE(tdb)) != 0) {
466 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to prime hash table\n"));
467 tdb->ecode = TDB_ERR_IO;
474 tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
475 tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
476 SAFE_FREE(tdb->transaction->hash_heads);
477 SAFE_FREE(tdb->transaction);
483 cancel the current transaction
485 int tdb_transaction_cancel(struct tdb_context *tdb)
487 if (tdb->transaction == NULL) {
488 TDB_LOG((tdb, 0, "tdb_transaction_cancel: no transaction\n"));
492 if (tdb->transaction->nesting != 0) {
493 tdb->transaction->transaction_error = 1;
494 tdb->transaction->nesting--;
498 tdb->map_size = tdb->transaction->old_map_size;
500 /* free all the transaction elements */
501 while (tdb->transaction->elements) {
502 struct tdb_transaction_el *el = tdb->transaction->elements;
503 tdb->transaction->elements = el->next;
508 /* remove any locks created during the transaction */
509 if (tdb->num_locks != 0) {
511 for (h=0;h<tdb->header.hash_size+1;h++) {
512 if (tdb->locked[h].count != 0) {
513 tdb_brlock_len(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
514 tdb->locked[h].count = 0;
520 /* restore the normal io methods */
521 tdb->methods = tdb->transaction->io_methods;
523 tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
524 tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
525 SAFE_FREE(tdb->transaction->hash_heads);
526 SAFE_FREE(tdb->transaction);
534 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
536 if (fsync(tdb->fd) != 0) {
537 tdb->ecode = TDB_ERR_IO;
538 TDB_LOG((tdb, 0, "tdb_transaction: fsync failed\n"));
543 tdb_off_t moffset = offset & ~(tdb->page_size-1);
544 if (msync(moffset + (char *)tdb->map_ptr,
545 length + (offset - moffset), MS_SYNC) != 0) {
546 tdb->ecode = TDB_ERR_IO;
547 TDB_LOG((tdb, 0, "tdb_transaction: msync failed - %s\n",
558 work out how much space the linearised recovery data will consume
560 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
562 struct tdb_transaction_el *el;
563 tdb_len_t recovery_size = 0;
565 recovery_size = sizeof(u32);
566 for (el=tdb->transaction->elements;el;el=el->next) {
567 if (el->offset >= tdb->transaction->old_map_size) {
570 recovery_size += 2*sizeof(tdb_off_t) + el->length;
573 return recovery_size;
577 allocate the recovery area, or use an existing recovery area if it is
580 static int tdb_recovery_allocate(struct tdb_context *tdb,
581 tdb_len_t *recovery_size,
582 tdb_off_t *recovery_offset,
583 tdb_len_t *recovery_max_size)
585 struct list_struct rec;
586 const struct tdb_methods *methods = tdb->transaction->io_methods;
587 tdb_off_t recovery_head;
589 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
590 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery head\n"));
596 if (recovery_head != 0 &&
597 methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
598 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery record\n"));
602 *recovery_size = tdb_recovery_size(tdb);
604 if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
605 /* it fits in the existing area */
606 *recovery_max_size = rec.rec_len;
607 *recovery_offset = recovery_head;
611 /* we need to free up the old recovery area, then allocate a
612 new one at the end of the file. Note that we cannot use
613 tdb_allocate() to allocate the new one as that might return
614 us an area that is being currently used (as of the start of
616 if (recovery_head != 0) {
617 if (tdb_free(tdb, recovery_head, &rec) == -1) {
618 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to free previous recovery area\n"));
623 /* the tdb_free() call might have increased the recovery size */
624 *recovery_size = tdb_recovery_size(tdb);
626 /* round up to a multiple of page size */
627 *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
628 *recovery_offset = tdb->map_size;
629 recovery_head = *recovery_offset;
631 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
632 (tdb->map_size - tdb->transaction->old_map_size) +
633 sizeof(rec) + *recovery_max_size) == -1) {
634 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to create recovery area\n"));
638 /* remap the file (if using mmap) */
639 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
641 /* we have to reset the old map size so that we don't try to expand the file
642 again in the transaction commit, which would destroy the recovery area */
643 tdb->transaction->old_map_size = tdb->map_size;
645 /* write the recovery header offset and sync - we can sync without a race here
646 as the magic ptr in the recovery record has not been set */
647 CONVERT(recovery_head);
648 if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
649 &recovery_head, sizeof(tdb_off_t)) == -1) {
650 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to write recovery head\n"));
659 setup the recovery data that will be used on a crash during commit
661 static int transaction_setup_recovery(struct tdb_context *tdb,
662 tdb_off_t *magic_offset)
664 struct tdb_transaction_el *el;
665 tdb_len_t recovery_size;
666 unsigned char *data, *p;
667 const struct tdb_methods *methods = tdb->transaction->io_methods;
668 struct list_struct *rec;
669 tdb_off_t recovery_offset, recovery_max_size;
670 tdb_off_t old_map_size = tdb->transaction->old_map_size;
674 check that the recovery area has enough space
676 if (tdb_recovery_allocate(tdb, &recovery_size,
677 &recovery_offset, &recovery_max_size) == -1) {
681 data = malloc(recovery_size + sizeof(*rec));
683 tdb->ecode = TDB_ERR_OOM;
687 rec = (struct list_struct *)data;
688 memset(rec, 0, sizeof(*rec));
691 rec->data_len = recovery_size;
692 rec->rec_len = recovery_max_size;
693 rec->key_len = old_map_size;
696 /* build the recovery data into a single blob to allow us to do a single
697 large write, which should be more efficient */
698 p = data + sizeof(*rec);
699 for (el=tdb->transaction->elements;el;el=el->next) {
700 if (el->offset >= old_map_size) {
703 if (el->offset + el->length > tdb->transaction->old_map_size) {
704 TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction data over new region boundary\n"));
706 tdb->ecode = TDB_ERR_CORRUPT;
709 memcpy(p, &el->offset, 4);
710 memcpy(p+4, &el->length, 4);
714 /* the recovery area contains the old data, not the
715 new data, so we have to call the original tdb_read
717 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
719 tdb->ecode = TDB_ERR_IO;
726 tailer = sizeof(*rec) + recovery_max_size;
727 memcpy(p, &tailer, 4);
730 /* write the recovery data to the recovery area */
731 if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
732 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery data\n"));
734 tdb->ecode = TDB_ERR_IO;
738 /* as we don't have ordered writes, we have to sync the recovery
739 data before we update the magic to indicate that the recovery
741 if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
748 magic = TDB_RECOVERY_MAGIC;
751 *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
753 if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
754 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery magic\n"));
755 tdb->ecode = TDB_ERR_IO;
759 /* ensure the recovery magic marker is on disk */
760 if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
768 commit the current transaction
770 int tdb_transaction_commit(struct tdb_context *tdb)
772 const struct tdb_methods *methods;
773 tdb_off_t magic_offset = 0;
776 if (tdb->transaction == NULL) {
777 TDB_LOG((tdb, 0, "tdb_transaction_commit: no transaction\n"));
781 if (tdb->transaction->transaction_error) {
782 tdb->ecode = TDB_ERR_IO;
783 tdb_transaction_cancel(tdb);
784 TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction error pending\n"));
788 if (tdb->transaction->nesting != 0) {
789 tdb->transaction->nesting--;
793 /* check for a null transaction */
794 if (tdb->transaction->elements == NULL) {
795 tdb_transaction_cancel(tdb);
799 methods = tdb->transaction->io_methods;
801 /* if there are any locks pending then the caller has not
802 nested their locks properly, so fail the transaction */
803 if (tdb->num_locks) {
804 tdb->ecode = TDB_ERR_LOCK;
805 TDB_LOG((tdb, 0, "tdb_transaction_commit: locks pending on commit\n"));
806 tdb_transaction_cancel(tdb);
810 /* upgrade the main transaction lock region to a write lock */
811 if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
812 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to upgrade hash locks\n"));
813 tdb->ecode = TDB_ERR_LOCK;
814 tdb_transaction_cancel(tdb);
818 /* get the global lock - this prevents new users attaching to the database
820 if (tdb_brlock_len(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
821 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to get global lock\n"));
822 tdb->ecode = TDB_ERR_LOCK;
823 tdb_transaction_cancel(tdb);
827 if (!(tdb->flags & TDB_NOSYNC)) {
828 /* write the recovery data to the end of the file */
829 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
830 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to setup recovery data\n"));
831 tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
832 tdb_transaction_cancel(tdb);
837 /* expand the file to the new size if needed */
838 if (tdb->map_size != tdb->transaction->old_map_size) {
839 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
841 tdb->transaction->old_map_size) == -1) {
842 tdb->ecode = TDB_ERR_IO;
843 TDB_LOG((tdb, 0, "tdb_transaction_commit: expansion failed\n"));
844 tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
845 tdb_transaction_cancel(tdb);
848 tdb->map_size = tdb->transaction->old_map_size;
849 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
852 /* perform all the writes */
853 while (tdb->transaction->elements) {
854 struct tdb_transaction_el *el = tdb->transaction->elements;
856 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
857 TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed during commit\n"));
859 /* we've overwritten part of the data and
860 possibly expanded the file, so we need to
861 run the crash recovery code */
862 tdb->methods = methods;
863 tdb_transaction_recover(tdb);
865 tdb_transaction_cancel(tdb);
866 tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
868 TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed\n"));
871 tdb->transaction->elements = el->next;
876 if (!(tdb->flags & TDB_NOSYNC)) {
877 /* ensure the new data is on disk */
878 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
882 /* remove the recovery marker */
883 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
884 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to remove recovery magic\n"));
888 /* ensure the recovery marker has been removed on disk */
889 if (transaction_sync(tdb, magic_offset, 4) == -1) {
894 tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
897 TODO: maybe write to some dummy hdr field, or write to magic
898 offset without mmap, before the last sync, instead of the
902 /* on some systems (like Linux 2.6.x) changes via mmap/msync
903 don't change the mtime of the file, this means the file may
904 not be backed up (as tdb rounding to block sizes means that
905 file size changes are quite rare too). The following forces
906 mtime changes when a transaction completes */
908 utime(tdb->name, NULL);
911 /* use a transaction cancel to free memory and remove the
913 tdb_transaction_cancel(tdb);
919 recover from an aborted transaction. Must be called with exclusive
920 database write access already established (including the global
921 lock to prevent new processes attaching)
923 int tdb_transaction_recover(struct tdb_context *tdb)
925 tdb_off_t recovery_head, recovery_eof;
926 unsigned char *data, *p;
928 struct list_struct rec;
930 /* find the recovery area */
931 if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
932 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery head\n"));
933 tdb->ecode = TDB_ERR_IO;
937 if (recovery_head == 0) {
938 /* we have never allocated a recovery record */
942 /* read the recovery record */
943 if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
944 sizeof(rec), DOCONV()) == -1) {
945 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery record\n"));
946 tdb->ecode = TDB_ERR_IO;
950 if (rec.magic != TDB_RECOVERY_MAGIC) {
951 /* there is no valid recovery data */
955 if (tdb->read_only) {
956 TDB_LOG((tdb, 0, "tdb_transaction_recover: attempt to recover read only database\n"));
957 tdb->ecode = TDB_ERR_CORRUPT;
961 recovery_eof = rec.key_len;
963 data = malloc(rec.data_len);
965 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to allocate recovery data\n"));
966 tdb->ecode = TDB_ERR_OOM;
970 /* read the full recovery data */
971 if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
972 rec.data_len, 0) == -1) {
973 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery data\n"));
974 tdb->ecode = TDB_ERR_IO;
978 /* recover the file data */
980 while (p+8 < data + rec.data_len) {
986 memcpy(&len, p+4, 4);
988 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
990 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
991 tdb->ecode = TDB_ERR_IO;
999 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1000 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync recovery\n"));
1001 tdb->ecode = TDB_ERR_IO;
1005 /* if the recovery area is after the recovered eof then remove it */
1006 if (recovery_eof <= recovery_head) {
1007 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1008 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery head\n"));
1009 tdb->ecode = TDB_ERR_IO;
1014 /* remove the recovery magic */
1015 if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic),
1017 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery magic\n"));
1018 tdb->ecode = TDB_ERR_IO;
1022 /* reduce the file size to the old size */
1024 if (ftruncate(tdb->fd, recovery_eof) != 0) {
1025 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1026 tdb->ecode = TDB_ERR_IO;
1029 tdb->map_size = recovery_eof;
1032 if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1033 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync2 recovery\n"));
1034 tdb->ecode = TDB_ERR_IO;
1038 TDB_LOG((tdb, 0, "tdb_transaction_recover: recovered %d byte database\n",