78bbd7ad23e16090db7aa1545c71880aee6cfc21
[samba.git] / lib / tdb / common / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of all writes that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit strategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no fsync/msync calls are made.  This means we
86     are still proof against a process dying during transaction commit,
87     but not against machine reboot.
88
89   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90     tdb_add_flags() transaction nesting is enabled.
91     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92     The default is that transaction nesting is allowed.
93     Note: this default may change in future versions of tdb.
94
95     Beware. when transactions are nested a transaction successfully
96     completed with tdb_transaction_commit() can be silently unrolled later.
97
98   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99     tdb_add_flags() transaction nesting is disabled.
100     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102     The default is that transaction nesting is allowed.
103     Note: this default may change in future versions of tdb.
104 */
105
106
107 /*
108   hold the context of any current transaction
109 */
110 struct tdb_transaction {
111         /* we keep a mirrored copy of the tdb hash heads here so
112            tdb_next_hash_chain() can operate efficiently */
113         uint32_t *hash_heads;
114
115         /* the original io methods - used to do IOs to the real db */
116         const struct tdb_methods *io_methods;
117
118         /* the list of transaction blocks. When a block is first
119            written to, it gets created in this list */
120         uint8_t **blocks;
121         uint32_t num_blocks;
122         uint32_t block_size;      /* bytes in each block */
123         uint32_t last_block_size; /* number of valid bytes in the last block */
124
125         /* non-zero when an internal transaction error has
126            occurred. All write operations will then fail until the
127            transaction is ended */
128         int transaction_error;
129
130         /* when inside a transaction we need to keep track of any
131            nested tdb_transaction_start() calls, as these are allowed,
132            but don't create a new transaction */
133         int nesting;
134
135         /* set when a prepare has already occurred */
136         bool prepared;
137         tdb_off_t magic_offset;
138
139         /* old file size before transaction */
140         tdb_len_t old_map_size;
141
142         /* did we expand in this transaction */
143         bool expanded;
144 };
145
146
147 /*
148   read while in a transaction. We need to check first if the data is in our list
149   of transaction elements, then if not do a real read
150 */
151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152                             tdb_len_t len, int cv)
153 {
154         uint32_t blk;
155
156         /* break it down into block sized ops */
157         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160                         return -1;
161                 }
162                 len -= len2;
163                 off += len2;
164                 buf = (void *)(len2 + (char *)buf);
165         }
166
167         if (len == 0) {
168                 return 0;
169         }
170
171         blk = off / tdb->transaction->block_size;
172
173         /* see if we have it in the block list */
174         if (tdb->transaction->num_blocks <= blk ||
175             tdb->transaction->blocks[blk] == NULL) {
176                 /* nope, do a real read */
177                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178                         goto fail;
179                 }
180                 return 0;
181         }
182
183         /* it is in the block list. Now check for the last block */
184         if (blk == tdb->transaction->num_blocks-1) {
185                 if (len > tdb->transaction->last_block_size) {
186                         goto fail;
187                 }
188         }
189
190         /* now copy it out of this block */
191         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192         if (cv) {
193                 tdb_convert(buf, len);
194         }
195         return 0;
196
197 fail:
198         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
199         tdb->ecode = TDB_ERR_IO;
200         tdb->transaction->transaction_error = 1;
201         return -1;
202 }
203
204
205 /*
206   write while in a transaction
207 */
208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209                              const void *buf, tdb_len_t len)
210 {
211         uint32_t blk;
212
213         if (buf == NULL) {
214                 return -1;
215         }
216
217         /* Only a commit is allowed on a prepared transaction */
218         if (tdb->transaction->prepared) {
219                 tdb->ecode = TDB_ERR_EINVAL;
220                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
221                 tdb->transaction->transaction_error = 1;
222                 return -1;
223         }
224
225         /* if the write is to a hash head, then update the transaction
226            hash heads */
227         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
228             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
229                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
230                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
231         }
232
233         /* break it up into block sized chunks */
234         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
235                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
236                 if (transaction_write(tdb, off, buf, len2) != 0) {
237                         return -1;
238                 }
239                 len -= len2;
240                 off += len2;
241                 buf = (const void *)(len2 + (const char *)buf);
242         }
243
244         if (len == 0) {
245                 return 0;
246         }
247
248         blk = off / tdb->transaction->block_size;
249         off = off % tdb->transaction->block_size;
250
251         if (tdb->transaction->num_blocks <= blk) {
252                 uint8_t **new_blocks;
253                 /* expand the blocks array */
254                 new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
255                                                  (blk+1)*sizeof(uint8_t *));
256                 if (new_blocks == NULL) {
257                         tdb->ecode = TDB_ERR_OOM;
258                         goto fail;
259                 }
260                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
261                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
262                 tdb->transaction->blocks = new_blocks;
263                 tdb->transaction->num_blocks = blk+1;
264                 tdb->transaction->last_block_size = 0;
265         }
266
267         /* allocate and fill a block? */
268         if (tdb->transaction->blocks[blk] == NULL) {
269                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
270                 if (tdb->transaction->blocks[blk] == NULL) {
271                         tdb->ecode = TDB_ERR_OOM;
272                         tdb->transaction->transaction_error = 1;
273                         return -1;
274                 }
275                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
276                         tdb_len_t len2 = tdb->transaction->block_size;
277                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
278                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
279                         }
280                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
281                                                                    tdb->transaction->blocks[blk],
282                                                                    len2, 0) != 0) {
283                                 SAFE_FREE(tdb->transaction->blocks[blk]);
284                                 tdb->ecode = TDB_ERR_IO;
285                                 goto fail;
286                         }
287                         if (blk == tdb->transaction->num_blocks-1) {
288                                 tdb->transaction->last_block_size = len2;
289                         }
290                 }
291         }
292
293         /* overwrite part of an existing block */
294         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
295         if (blk == tdb->transaction->num_blocks-1) {
296                 if (len + off > tdb->transaction->last_block_size) {
297                         tdb->transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
305                  (blk*tdb->transaction->block_size) + off, len));
306         tdb->transaction->transaction_error = 1;
307         return -1;
308 }
309
310
311 /*
312   write while in a transaction - this variant never expands the transaction blocks, it only
313   updates existing blocks. This means it cannot change the recovery size
314 */
315 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
316                                       const void *buf, tdb_len_t len)
317 {
318         uint32_t blk;
319
320         /* break it up into block sized chunks */
321         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
322                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
323                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
324                         return -1;
325                 }
326                 len -= len2;
327                 off += len2;
328                 if (buf != NULL) {
329                         buf = (const void *)(len2 + (const char *)buf);
330                 }
331         }
332
333         if (len == 0 || buf == NULL) {
334                 return 0;
335         }
336
337         blk = off / tdb->transaction->block_size;
338         off = off % tdb->transaction->block_size;
339
340         if (tdb->transaction->num_blocks <= blk ||
341             tdb->transaction->blocks[blk] == NULL) {
342                 return 0;
343         }
344
345         if (blk == tdb->transaction->num_blocks-1 &&
346             off + len > tdb->transaction->last_block_size) {
347                 if (off >= tdb->transaction->last_block_size) {
348                         return 0;
349                 }
350                 len = tdb->transaction->last_block_size - off;
351         }
352
353         /* overwrite part of an existing block */
354         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
355
356         return 0;
357 }
358
359
360 /*
361   accelerated hash chain head search, using the cached hash heads
362 */
363 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
364 {
365         uint32_t h = *chain;
366         for (;h < tdb->hash_size;h++) {
367                 /* the +1 takes account of the freelist */
368                 if (0 != tdb->transaction->hash_heads[h+1]) {
369                         break;
370                 }
371         }
372         (*chain) = h;
373 }
374
375 /*
376   out of bounds check during a transaction
377 */
378 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
379                            tdb_len_t len, int probe)
380 {
381         /*
382          * This duplicates functionality from tdb_oob(). Don't remove:
383          * we still have direct callers of tdb->methods->tdb_oob()
384          * inside transaction.c.
385          */
386         if (off + len >= off && off + len <= tdb->map_size) {
387                 return 0;
388         }
389         tdb->ecode = TDB_ERR_IO;
390         return -1;
391 }
392
393 /*
394   transaction version of tdb_expand().
395 */
396 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
397                                    tdb_off_t addition)
398 {
399         const char buf_zero[8192] = {0};
400         size_t buf_len = sizeof(buf_zero);
401
402         while (addition > 0) {
403                 size_t n = MIN(addition, buf_len);
404                 int ret;
405
406                 ret = transaction_write(tdb, size, buf_zero, n);
407                 if (ret != 0) {
408                         return ret;
409                 }
410
411                 addition -= n;
412                 size += n;
413         }
414
415         tdb->transaction->expanded = true;
416
417         return 0;
418 }
419
420 static const struct tdb_methods transaction_methods = {
421         transaction_read,
422         transaction_write,
423         transaction_next_hash_chain,
424         transaction_oob,
425         transaction_expand_file,
426 };
427
428 /*
429  * Is a transaction currently active on this context?
430  *
431  */
432 _PUBLIC_ bool tdb_transaction_active(struct tdb_context *tdb)
433 {
434         return (tdb->transaction != NULL);
435 }
436
437 /*
438   start a tdb transaction. No token is returned, as only a single
439   transaction is allowed to be pending per tdb_context
440 */
441 static int _tdb_transaction_start(struct tdb_context *tdb,
442                                   enum tdb_lock_flags lockflags)
443 {
444         /* some sanity checks */
445         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
446             || tdb->traverse_read) {
447                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
448                 tdb->ecode = TDB_ERR_EINVAL;
449                 return -1;
450         }
451
452         /* cope with nested tdb_transaction_start() calls */
453         if (tdb->transaction != NULL) {
454                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
455                         tdb->ecode = TDB_ERR_NESTING;
456                         return -1;
457                 }
458                 tdb->transaction->nesting++;
459                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
460                          tdb->transaction->nesting));
461                 return 0;
462         }
463
464         if (tdb_have_extra_locks(tdb)) {
465                 /* the caller must not have any locks when starting a
466                    transaction as otherwise we'll be screwed by lack
467                    of nested locks in posix */
468                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
469                 tdb->ecode = TDB_ERR_LOCK;
470                 return -1;
471         }
472
473         if (tdb->travlocks.next != NULL) {
474                 /* you cannot use transactions inside a traverse (although you can use
475                    traverse inside a transaction) as otherwise you can end up with
476                    deadlock */
477                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
478                 tdb->ecode = TDB_ERR_LOCK;
479                 return -1;
480         }
481
482         tdb->transaction = (struct tdb_transaction *)
483                 calloc(sizeof(struct tdb_transaction), 1);
484         if (tdb->transaction == NULL) {
485                 tdb->ecode = TDB_ERR_OOM;
486                 return -1;
487         }
488
489         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
490         tdb->transaction->block_size = tdb->page_size;
491
492         /* get the transaction write lock. This is a blocking lock. As
493            discussed with Volker, there are a number of ways we could
494            make this async, which we will probably do in the future */
495         if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
496                 SAFE_FREE(tdb->transaction->blocks);
497                 SAFE_FREE(tdb->transaction);
498                 if ((lockflags & TDB_LOCK_WAIT) == 0) {
499                         tdb->ecode = TDB_ERR_NOLOCK;
500                 } else {
501                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
502                                  "tdb_transaction_start: "
503                                  "failed to get transaction lock\n"));
504                 }
505                 return -1;
506         }
507
508         /* get a read lock from the freelist to the end of file. This
509            is upgraded to a write lock during the commit */
510         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
511                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
512                 goto fail_allrecord_lock;
513         }
514
515         /* setup a copy of the hash table heads so the hash scan in
516            traverse can be fast */
517         tdb->transaction->hash_heads = (uint32_t *)
518                 calloc(tdb->hash_size+1, sizeof(uint32_t));
519         if (tdb->transaction->hash_heads == NULL) {
520                 tdb->ecode = TDB_ERR_OOM;
521                 goto fail;
522         }
523         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
524                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
525                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
526                 tdb->ecode = TDB_ERR_IO;
527                 goto fail;
528         }
529
530         /* make sure we know about any file expansions already done by
531            anyone else */
532         tdb_oob(tdb, tdb->map_size, 1, 1);
533         tdb->transaction->old_map_size = tdb->map_size;
534
535         /* finally hook the io methods, replacing them with
536            transaction specific methods */
537         tdb->transaction->io_methods = tdb->methods;
538         tdb->methods = &transaction_methods;
539
540         /* Trace at the end, so we get sequence number correct. */
541         tdb_trace(tdb, "tdb_transaction_start");
542         return 0;
543
544 fail:
545         tdb_allrecord_unlock(tdb, F_RDLCK, false);
546 fail_allrecord_lock:
547         tdb_transaction_unlock(tdb, F_WRLCK);
548         SAFE_FREE(tdb->transaction->blocks);
549         SAFE_FREE(tdb->transaction->hash_heads);
550         SAFE_FREE(tdb->transaction);
551         return -1;
552 }
553
554 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
555 {
556         return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
557 }
558
559 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
560 {
561         return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
562 }
563
564 /*
565   sync to disk
566 */
567 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
568 {
569         if (tdb->flags & TDB_NOSYNC) {
570                 return 0;
571         }
572
573 #ifdef HAVE_FDATASYNC
574         if (fdatasync(tdb->fd) != 0) {
575 #else
576         if (fsync(tdb->fd) != 0) {
577 #endif
578                 tdb->ecode = TDB_ERR_IO;
579                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
580                 return -1;
581         }
582 #ifdef HAVE_MMAP
583         if (tdb->map_ptr) {
584                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
585                 if (msync(moffset + (char *)tdb->map_ptr,
586                           length + (offset - moffset), MS_SYNC) != 0) {
587                         tdb->ecode = TDB_ERR_IO;
588                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
589                                  strerror(errno)));
590                         return -1;
591                 }
592         }
593 #endif
594         return 0;
595 }
596
597
598 static int _tdb_transaction_cancel(struct tdb_context *tdb)
599 {
600         uint32_t i;
601         int ret = 0;
602
603         if (tdb->transaction == NULL) {
604                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
605                 return -1;
606         }
607
608         if (tdb->transaction->nesting != 0) {
609                 tdb->transaction->transaction_error = 1;
610                 tdb->transaction->nesting--;
611                 return 0;
612         }
613
614         tdb->map_size = tdb->transaction->old_map_size;
615
616         /* free all the transaction blocks */
617         for (i=0;i<tdb->transaction->num_blocks;i++) {
618                 if ((tdb->transaction->blocks != NULL) &&
619                     tdb->transaction->blocks[i] != NULL) {
620                         free(tdb->transaction->blocks[i]);
621                 }
622         }
623         SAFE_FREE(tdb->transaction->blocks);
624
625         if (tdb->transaction->magic_offset) {
626                 const struct tdb_methods *methods = tdb->transaction->io_methods;
627                 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
628
629                 /* remove the recovery marker */
630                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
631                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
632                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
633                         ret = -1;
634                 }
635         }
636
637         /* This also removes the OPEN_LOCK, if we have it. */
638         tdb_release_transaction_locks(tdb);
639
640         /* restore the normal io methods */
641         tdb->methods = tdb->transaction->io_methods;
642
643         SAFE_FREE(tdb->transaction->hash_heads);
644         SAFE_FREE(tdb->transaction);
645
646         return ret;
647 }
648
649 /*
650   cancel the current transaction
651 */
652 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
653 {
654         tdb_trace(tdb, "tdb_transaction_cancel");
655         return _tdb_transaction_cancel(tdb);
656 }
657
658 /*
659   work out how much space the linearised recovery data will consume
660 */
661 static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
662 {
663         tdb_len_t recovery_size = 0;
664         uint32_t i;
665
666         recovery_size = sizeof(uint32_t);
667         for (i=0;i<tdb->transaction->num_blocks;i++) {
668                 tdb_len_t block_size;
669                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
670                         break;
671                 }
672                 if (tdb->transaction->blocks[i] == NULL) {
673                         continue;
674                 }
675                 if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
676                                    &recovery_size)) {
677                         return false;
678                 }
679                 if (i == tdb->transaction->num_blocks-1) {
680                         block_size = tdb->transaction->last_block_size;
681                 } else {
682                         block_size =  tdb->transaction->block_size;
683                 }
684                 if (!tdb_add_len_t(recovery_size, block_size,
685                                    &recovery_size)) {
686                         return false;
687                 }
688         }
689
690         *result = recovery_size;
691         return true;
692 }
693
694 int tdb_recovery_area(struct tdb_context *tdb,
695                       const struct tdb_methods *methods,
696                       tdb_off_t *recovery_offset,
697                       struct tdb_record *rec)
698 {
699         int ret;
700
701         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
702                 return -1;
703         }
704
705         if (*recovery_offset == 0) {
706                 rec->rec_len = 0;
707                 return 0;
708         }
709
710         if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
711                               DOCONV()) == -1) {
712                 return -1;
713         }
714
715         /* ignore invalid recovery regions: can happen in crash */
716         if (rec->magic != TDB_RECOVERY_MAGIC &&
717             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
718                 *recovery_offset = 0;
719                 rec->rec_len = 0;
720         }
721
722         ret = methods->tdb_oob(tdb, *recovery_offset, rec->rec_len, 1);
723         if (ret == -1) {
724                 *recovery_offset = 0;
725                 rec->rec_len = 0;
726         }
727
728         return 0;
729 }
730
731 /*
732   allocate the recovery area, or use an existing recovery area if it is
733   large enough
734 */
735 static int tdb_recovery_allocate(struct tdb_context *tdb,
736                                  tdb_len_t *recovery_size,
737                                  tdb_off_t *recovery_offset,
738                                  tdb_len_t *recovery_max_size)
739 {
740         struct tdb_record rec;
741         const struct tdb_methods *methods = tdb->transaction->io_methods;
742         tdb_off_t recovery_head, new_end;
743
744         if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
745                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
746                 return -1;
747         }
748
749         if (!tdb_recovery_size(tdb, recovery_size)) {
750                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
751                          "overflow recovery size\n"));
752                 return -1;
753         }
754
755         /* Existing recovery area? */
756         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
757                 /* it fits in the existing area */
758                 *recovery_max_size = rec.rec_len;
759                 *recovery_offset = recovery_head;
760                 return 0;
761         }
762
763         /* If recovery area in middle of file, we need a new one. */
764         if (recovery_head == 0
765             || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
766                 /* we need to free up the old recovery area, then allocate a
767                    new one at the end of the file. Note that we cannot use
768                    tdb_allocate() to allocate the new one as that might return
769                    us an area that is being currently used (as of the start of
770                    the transaction) */
771                 if (recovery_head) {
772                         if (tdb_free(tdb, recovery_head, &rec) == -1) {
773                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
774                                          "tdb_recovery_allocate: failed to"
775                                          " free previous recovery area\n"));
776                                 return -1;
777                         }
778
779                         /* the tdb_free() call might have increased
780                          * the recovery size */
781                         if (!tdb_recovery_size(tdb, recovery_size)) {
782                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
783                                          "tdb_recovery_allocate: "
784                                          "overflow recovery size\n"));
785                                 return -1;
786                         }
787                 }
788
789                 /* New head will be at end of file. */
790                 recovery_head = tdb->map_size;
791         }
792
793         /* Now we know where it will be. */
794         *recovery_offset = recovery_head;
795
796         /* Expand by more than we need, so we don't do it often. */
797         *recovery_max_size = tdb_expand_adjust(tdb->map_size,
798                                                *recovery_size,
799                                                tdb->page_size)
800                 - sizeof(rec);
801
802         if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
803             !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
804                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
805                          "overflow recovery area\n"));
806                 return -1;
807         }
808
809         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
810                                      new_end - tdb->transaction->old_map_size)
811             == -1) {
812                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
813                 return -1;
814         }
815
816         /* remap the file (if using mmap) */
817         methods->tdb_oob(tdb, tdb->map_size, 1, 1);
818
819         /* we have to reset the old map size so that we don't try to expand the file
820            again in the transaction commit, which would destroy the recovery area */
821         tdb->transaction->old_map_size = tdb->map_size;
822
823         /* write the recovery header offset and sync - we can sync without a race here
824            as the magic ptr in the recovery record has not been set */
825         CONVERT(recovery_head);
826         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
827                                &recovery_head, sizeof(tdb_off_t)) == -1) {
828                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
829                 return -1;
830         }
831         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
832                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
833                 return -1;
834         }
835
836         return 0;
837 }
838
839
840 /*
841   setup the recovery data that will be used on a crash during commit
842 */
843 static int transaction_setup_recovery(struct tdb_context *tdb,
844                                       tdb_off_t *magic_offset)
845 {
846         tdb_len_t recovery_size;
847         unsigned char *data, *p;
848         const struct tdb_methods *methods = tdb->transaction->io_methods;
849         struct tdb_record *rec;
850         tdb_off_t recovery_offset, recovery_max_size;
851         tdb_off_t old_map_size = tdb->transaction->old_map_size;
852         uint32_t magic, tailer;
853         uint32_t i;
854
855         /*
856           check that the recovery area has enough space
857         */
858         if (tdb_recovery_allocate(tdb, &recovery_size,
859                                   &recovery_offset, &recovery_max_size) == -1) {
860                 return -1;
861         }
862
863         rec = malloc(recovery_size + sizeof(*rec));
864         if (rec == NULL) {
865                 tdb->ecode = TDB_ERR_OOM;
866                 return -1;
867         }
868
869         memset(rec, 0, sizeof(*rec));
870
871         rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
872         rec->data_len = recovery_size;
873         rec->rec_len  = recovery_max_size;
874         rec->key_len  = old_map_size;
875         CONVERT(*rec);
876
877         data = (unsigned char *)rec;
878
879         /* build the recovery data into a single blob to allow us to do a single
880            large write, which should be more efficient */
881         p = data + sizeof(*rec);
882         for (i=0;i<tdb->transaction->num_blocks;i++) {
883                 tdb_off_t offset;
884                 tdb_len_t length;
885
886                 if (tdb->transaction->blocks[i] == NULL) {
887                         continue;
888                 }
889
890                 offset = i * tdb->transaction->block_size;
891                 length = tdb->transaction->block_size;
892                 if (i == tdb->transaction->num_blocks-1) {
893                         length = tdb->transaction->last_block_size;
894                 }
895
896                 if (offset >= old_map_size) {
897                         continue;
898                 }
899                 if (offset + length > tdb->transaction->old_map_size) {
900                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
901                         free(data);
902                         tdb->ecode = TDB_ERR_CORRUPT;
903                         return -1;
904                 }
905                 memcpy(p, &offset, 4);
906                 memcpy(p+4, &length, 4);
907                 if (DOCONV()) {
908                         tdb_convert(p, 8);
909                 }
910                 /* the recovery area contains the old data, not the
911                    new data, so we have to call the original tdb_read
912                    method to get it */
913                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
914                         free(data);
915                         tdb->ecode = TDB_ERR_IO;
916                         return -1;
917                 }
918                 p += 8 + length;
919         }
920
921         /* and the tailer */
922         tailer = sizeof(*rec) + recovery_max_size;
923         memcpy(p, &tailer, 4);
924         if (DOCONV()) {
925                 tdb_convert(p, 4);
926         }
927
928         /* write the recovery data to the recovery area */
929         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
930                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
931                 free(data);
932                 tdb->ecode = TDB_ERR_IO;
933                 return -1;
934         }
935         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
936                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
937                 free(data);
938                 tdb->ecode = TDB_ERR_IO;
939                 return -1;
940         }
941
942         /* as we don't have ordered writes, we have to sync the recovery
943            data before we update the magic to indicate that the recovery
944            data is present */
945         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
946                 free(data);
947                 return -1;
948         }
949
950         free(data);
951
952         magic = TDB_RECOVERY_MAGIC;
953         CONVERT(magic);
954
955         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
956
957         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
958                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
959                 tdb->ecode = TDB_ERR_IO;
960                 return -1;
961         }
962         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
963                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
964                 tdb->ecode = TDB_ERR_IO;
965                 return -1;
966         }
967
968         /* ensure the recovery magic marker is on disk */
969         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
970                 return -1;
971         }
972
973         return 0;
974 }
975
976 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
977 {
978         const struct tdb_methods *methods;
979
980         if (tdb->transaction == NULL) {
981                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
982                 return -1;
983         }
984
985         if (tdb->transaction->prepared) {
986                 tdb->ecode = TDB_ERR_EINVAL;
987                 _tdb_transaction_cancel(tdb);
988                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
989                 return -1;
990         }
991
992         if (tdb->transaction->transaction_error) {
993                 tdb->ecode = TDB_ERR_IO;
994                 _tdb_transaction_cancel(tdb);
995                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
996                 return -1;
997         }
998
999
1000         if (tdb->transaction->nesting != 0) {
1001                 return 0;
1002         }
1003
1004         /* check for a null transaction */
1005         if (tdb->transaction->blocks == NULL) {
1006                 return 0;
1007         }
1008
1009         methods = tdb->transaction->io_methods;
1010
1011         /* if there are any locks pending then the caller has not
1012            nested their locks properly, so fail the transaction */
1013         if (tdb_have_extra_locks(tdb)) {
1014                 tdb->ecode = TDB_ERR_LOCK;
1015                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
1016                 _tdb_transaction_cancel(tdb);
1017                 return -1;
1018         }
1019
1020         /* upgrade the main transaction lock region to a write lock */
1021         if (tdb_allrecord_upgrade(tdb) == -1) {
1022                 if (tdb->ecode == TDB_ERR_RDONLY && tdb->read_only) {
1023                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1024                                  "tdb_transaction_prepare_commit: "
1025                                  "failed to upgrade hash locks: "
1026                                  "database is read only\n"));
1027                 } else if (tdb->ecode == TDB_ERR_RDONLY
1028                            && tdb->traverse_read) {
1029                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1030                                  "tdb_transaction_prepare_commit: "
1031                                  "failed to upgrade hash locks: "
1032                                  "a database traverse is in progress\n"));
1033                 } else {
1034                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1035                                  "tdb_transaction_prepare_commit: "
1036                                  "failed to upgrade hash locks: %s\n",
1037                                  tdb_errorstr(tdb)));
1038                 }
1039                 _tdb_transaction_cancel(tdb);
1040                 return -1;
1041         }
1042
1043         /* get the open lock - this prevents new users attaching to the database
1044            during the commit */
1045         if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
1046                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
1047                 _tdb_transaction_cancel(tdb);
1048                 return -1;
1049         }
1050
1051         /* write the recovery data to the end of the file */
1052         if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
1053                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
1054                 _tdb_transaction_cancel(tdb);
1055                 return -1;
1056         }
1057
1058         tdb->transaction->prepared = true;
1059
1060         /* expand the file to the new size if needed */
1061         if (tdb->map_size != tdb->transaction->old_map_size) {
1062                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1063                                              tdb->map_size -
1064                                              tdb->transaction->old_map_size) == -1) {
1065                         tdb->ecode = TDB_ERR_IO;
1066                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
1067                         _tdb_transaction_cancel(tdb);
1068                         return -1;
1069                 }
1070                 tdb->map_size = tdb->transaction->old_map_size;
1071                 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1072         }
1073
1074         /* Keep the open lock until the actual commit */
1075
1076         return 0;
1077 }
1078
1079 /*
1080    prepare to commit the current transaction
1081 */
1082 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1083 {
1084         tdb_trace(tdb, "tdb_transaction_prepare_commit");
1085         return _tdb_transaction_prepare_commit(tdb);
1086 }
1087
1088 /* A repack is worthwhile if the largest is less than half total free. */
1089 static bool repack_worthwhile(struct tdb_context *tdb)
1090 {
1091         tdb_off_t ptr;
1092         struct tdb_record rec;
1093         tdb_len_t total = 0, largest = 0;
1094
1095         if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1096                 return false;
1097         }
1098
1099         while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1100                 total += rec.rec_len;
1101                 if (rec.rec_len > largest) {
1102                         largest = rec.rec_len;
1103                 }
1104                 ptr = rec.next;
1105         }
1106
1107         return total > largest * 2;
1108 }
1109
1110 /*
1111   commit the current transaction
1112 */
1113 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1114 {
1115         const struct tdb_methods *methods;
1116         uint32_t i;
1117         bool need_repack = false;
1118
1119         if (tdb->transaction == NULL) {
1120                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1121                 return -1;
1122         }
1123
1124         tdb_trace(tdb, "tdb_transaction_commit");
1125
1126         if (tdb->transaction->transaction_error) {
1127                 tdb->ecode = TDB_ERR_IO;
1128                 _tdb_transaction_cancel(tdb);
1129                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1130                 return -1;
1131         }
1132
1133
1134         if (tdb->transaction->nesting != 0) {
1135                 tdb->transaction->nesting--;
1136                 return 0;
1137         }
1138
1139         /* check for a null transaction */
1140         if (tdb->transaction->blocks == NULL) {
1141                 _tdb_transaction_cancel(tdb);
1142                 return 0;
1143         }
1144
1145         if (!tdb->transaction->prepared) {
1146                 int ret = _tdb_transaction_prepare_commit(tdb);
1147                 if (ret)
1148                         return ret;
1149         }
1150
1151         methods = tdb->transaction->io_methods;
1152
1153         /* perform all the writes */
1154         for (i=0;i<tdb->transaction->num_blocks;i++) {
1155                 tdb_off_t offset;
1156                 tdb_len_t length;
1157
1158                 if (tdb->transaction->blocks[i] == NULL) {
1159                         continue;
1160                 }
1161
1162                 offset = i * tdb->transaction->block_size;
1163                 length = tdb->transaction->block_size;
1164                 if (i == tdb->transaction->num_blocks-1) {
1165                         length = tdb->transaction->last_block_size;
1166                 }
1167
1168                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1169                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1170
1171                         /* we've overwritten part of the data and
1172                            possibly expanded the file, so we need to
1173                            run the crash recovery code */
1174                         tdb->methods = methods;
1175                         tdb_transaction_recover(tdb);
1176
1177                         _tdb_transaction_cancel(tdb);
1178
1179                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1180                         return -1;
1181                 }
1182                 SAFE_FREE(tdb->transaction->blocks[i]);
1183         }
1184
1185         /* Do this before we drop lock or blocks. */
1186         if (tdb->transaction->expanded) {
1187                 need_repack = repack_worthwhile(tdb);
1188         }
1189
1190         SAFE_FREE(tdb->transaction->blocks);
1191         tdb->transaction->num_blocks = 0;
1192
1193         /* ensure the new data is on disk */
1194         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1195                 return -1;
1196         }
1197
1198         /*
1199           TODO: maybe write to some dummy hdr field, or write to magic
1200           offset without mmap, before the last sync, instead of the
1201           utime() call
1202         */
1203
1204         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1205            don't change the mtime of the file, this means the file may
1206            not be backed up (as tdb rounding to block sizes means that
1207            file size changes are quite rare too). The following forces
1208            mtime changes when a transaction completes */
1209 #ifdef HAVE_UTIME
1210         utime(tdb->name, NULL);
1211 #endif
1212
1213         /* use a transaction cancel to free memory and remove the
1214            transaction locks */
1215         _tdb_transaction_cancel(tdb);
1216
1217         if (need_repack) {
1218                 int ret = tdb_repack(tdb);
1219                 if (ret != 0) {
1220                         TDB_LOG((tdb, TDB_DEBUG_FATAL,
1221                                  __location__ " Failed to repack database (not fatal)\n"));
1222                 }
1223                 /*
1224                  * Ignore the error.
1225                  *
1226                  * Why?
1227                  *
1228                  * We just committed to the DB above, so anything
1229                  * written during the transaction is committed, the
1230                  * caller needs to know that the long-term state was
1231                  * successfully modified.
1232                  *
1233                  * tdb_repack is an optimization that can fail for
1234                  * reasons like lock ordering and we cannot recover
1235                  * the transaction lock at this point, having released
1236                  * it above.
1237                  *
1238                  * If we return a failure the caller thinks the
1239                  * transaction was rolled back.
1240                  */
1241         }
1242
1243         return 0;
1244 }
1245
1246
1247 /*
1248   recover from an aborted transaction. Must be called with exclusive
1249   database write access already established (including the open
1250   lock to prevent new processes attaching)
1251 */
1252 int tdb_transaction_recover(struct tdb_context *tdb)
1253 {
1254         tdb_off_t recovery_head, recovery_eof;
1255         unsigned char *data, *p;
1256         uint32_t zero = 0;
1257         struct tdb_record rec;
1258
1259         /* find the recovery area */
1260         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1261                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1262                 tdb->ecode = TDB_ERR_IO;
1263                 return -1;
1264         }
1265
1266         if (recovery_head == 0) {
1267                 /* we have never allocated a recovery record */
1268                 return 0;
1269         }
1270
1271         /* read the recovery record */
1272         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1273                                    sizeof(rec), DOCONV()) == -1) {
1274                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1275                 tdb->ecode = TDB_ERR_IO;
1276                 return -1;
1277         }
1278
1279         if (rec.magic != TDB_RECOVERY_MAGIC) {
1280                 /* there is no valid recovery data */
1281                 return 0;
1282         }
1283
1284         if (tdb->read_only) {
1285                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1286                 tdb->ecode = TDB_ERR_CORRUPT;
1287                 return -1;
1288         }
1289
1290         recovery_eof = rec.key_len;
1291
1292         data = (unsigned char *)malloc(rec.data_len);
1293         if (data == NULL) {
1294                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1295                 tdb->ecode = TDB_ERR_OOM;
1296                 return -1;
1297         }
1298
1299         /* read the full recovery data */
1300         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1301                                    rec.data_len, 0) == -1) {
1302                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1303                 tdb->ecode = TDB_ERR_IO;
1304                 free(data);
1305                 return -1;
1306         }
1307
1308         /* recover the file data */
1309         p = data;
1310         while (p+8 < data + rec.data_len) {
1311                 uint32_t ofs, len;
1312                 if (DOCONV()) {
1313                         tdb_convert(p, 8);
1314                 }
1315                 memcpy(&ofs, p, 4);
1316                 memcpy(&len, p+4, 4);
1317
1318                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1319                         free(data);
1320                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
1321                         tdb->ecode = TDB_ERR_IO;
1322                         return -1;
1323                 }
1324                 p += 8 + len;
1325         }
1326
1327         free(data);
1328
1329         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1330                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1331                 tdb->ecode = TDB_ERR_IO;
1332                 return -1;
1333         }
1334
1335         /* if the recovery area is after the recovered eof then remove it */
1336         if (recovery_eof <= recovery_head) {
1337                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1338                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1339                         tdb->ecode = TDB_ERR_IO;
1340                         return -1;
1341                 }
1342         }
1343
1344         /* remove the recovery magic */
1345         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1346                           &zero) == -1) {
1347                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1348                 tdb->ecode = TDB_ERR_IO;
1349                 return -1;
1350         }
1351
1352         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1353                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1354                 tdb->ecode = TDB_ERR_IO;
1355                 return -1;
1356         }
1357
1358         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
1359                  recovery_eof));
1360
1361         /* all done */
1362         return 0;
1363 }
1364
1365 /* Any I/O failures we say "needs recovery". */
1366 bool tdb_needs_recovery(struct tdb_context *tdb)
1367 {
1368         tdb_off_t recovery_head;
1369         struct tdb_record rec;
1370
1371         /* find the recovery area */
1372         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1373                 return true;
1374         }
1375
1376         if (recovery_head == 0) {
1377                 /* we have never allocated a recovery record */
1378                 return false;
1379         }
1380
1381         /* read the recovery record */
1382         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1383                                    sizeof(rec), DOCONV()) == -1) {
1384                 return true;
1385         }
1386
1387         return (rec.magic == TDB_RECOVERY_MAGIC);
1388 }