tdb: Fix a typo
[samba.git] / lib / tdb / common / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of all writes that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no fsync/msync calls are made.  This means we
86     are still proof against a process dying during transaction commit,
87     but not against machine reboot.
88
89   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90     tdb_add_flags() transaction nesting is enabled.
91     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92     The default is that transaction nesting is allowed.
93     Note: this default may change in future versions of tdb.
94
95     Beware. when transactions are nested a transaction successfully
96     completed with tdb_transaction_commit() can be silently unrolled later.
97
98   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99     tdb_add_flags() transaction nesting is disabled.
100     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102     The default is that transaction nesting is allowed.
103     Note: this default may change in future versions of tdb.
104 */
105
106
107 /*
108   hold the context of any current transaction
109 */
110 struct tdb_transaction {
111         /* we keep a mirrored copy of the tdb hash heads here so
112            tdb_next_hash_chain() can operate efficiently */
113         uint32_t *hash_heads;
114
115         /* the original io methods - used to do IOs to the real db */
116         const struct tdb_methods *io_methods;
117
118         /* the list of transaction blocks. When a block is first
119            written to, it gets created in this list */
120         uint8_t **blocks;
121         uint32_t num_blocks;
122         uint32_t block_size;      /* bytes in each block */
123         uint32_t last_block_size; /* number of valid bytes in the last block */
124
125         /* non-zero when an internal transaction error has
126            occurred. All write operations will then fail until the
127            transaction is ended */
128         int transaction_error;
129
130         /* when inside a transaction we need to keep track of any
131            nested tdb_transaction_start() calls, as these are allowed,
132            but don't create a new transaction */
133         int nesting;
134
135         /* set when a prepare has already occurred */
136         bool prepared;
137         tdb_off_t magic_offset;
138
139         /* old file size before transaction */
140         tdb_len_t old_map_size;
141
142         /* did we expand in this transaction */
143         bool expanded;
144 };
145
146
147 /*
148   read while in a transaction. We need to check first if the data is in our list
149   of transaction elements, then if not do a real read
150 */
151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152                             tdb_len_t len, int cv)
153 {
154         uint32_t blk;
155
156         /* break it down into block sized ops */
157         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160                         return -1;
161                 }
162                 len -= len2;
163                 off += len2;
164                 buf = (void *)(len2 + (char *)buf);
165         }
166
167         if (len == 0) {
168                 return 0;
169         }
170
171         blk = off / tdb->transaction->block_size;
172
173         /* see if we have it in the block list */
174         if (tdb->transaction->num_blocks <= blk ||
175             tdb->transaction->blocks[blk] == NULL) {
176                 /* nope, do a real read */
177                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178                         goto fail;
179                 }
180                 return 0;
181         }
182
183         /* it is in the block list. Now check for the last block */
184         if (blk == tdb->transaction->num_blocks-1) {
185                 if (len > tdb->transaction->last_block_size) {
186                         goto fail;
187                 }
188         }
189
190         /* now copy it out of this block */
191         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192         if (cv) {
193                 tdb_convert(buf, len);
194         }
195         return 0;
196
197 fail:
198         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
199         tdb->ecode = TDB_ERR_IO;
200         tdb->transaction->transaction_error = 1;
201         return -1;
202 }
203
204
205 /*
206   write while in a transaction
207 */
208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209                              const void *buf, tdb_len_t len)
210 {
211         uint32_t blk;
212
213         if (buf == NULL) {
214                 return -1;
215         }
216
217         /* Only a commit is allowed on a prepared transaction */
218         if (tdb->transaction->prepared) {
219                 tdb->ecode = TDB_ERR_EINVAL;
220                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
221                 tdb->transaction->transaction_error = 1;
222                 return -1;
223         }
224
225         /* if the write is to a hash head, then update the transaction
226            hash heads */
227         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
228             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
229                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
230                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
231         }
232
233         /* break it up into block sized chunks */
234         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
235                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
236                 if (transaction_write(tdb, off, buf, len2) != 0) {
237                         return -1;
238                 }
239                 len -= len2;
240                 off += len2;
241                 buf = (const void *)(len2 + (const char *)buf);
242         }
243
244         if (len == 0) {
245                 return 0;
246         }
247
248         blk = off / tdb->transaction->block_size;
249         off = off % tdb->transaction->block_size;
250
251         if (tdb->transaction->num_blocks <= blk) {
252                 uint8_t **new_blocks;
253                 /* expand the blocks array */
254                 new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
255                                                  (blk+1)*sizeof(uint8_t *));
256                 if (new_blocks == NULL) {
257                         tdb->ecode = TDB_ERR_OOM;
258                         goto fail;
259                 }
260                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
261                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
262                 tdb->transaction->blocks = new_blocks;
263                 tdb->transaction->num_blocks = blk+1;
264                 tdb->transaction->last_block_size = 0;
265         }
266
267         /* allocate and fill a block? */
268         if (tdb->transaction->blocks[blk] == NULL) {
269                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
270                 if (tdb->transaction->blocks[blk] == NULL) {
271                         tdb->ecode = TDB_ERR_OOM;
272                         tdb->transaction->transaction_error = 1;
273                         return -1;
274                 }
275                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
276                         tdb_len_t len2 = tdb->transaction->block_size;
277                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
278                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
279                         }
280                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
281                                                                    tdb->transaction->blocks[blk],
282                                                                    len2, 0) != 0) {
283                                 SAFE_FREE(tdb->transaction->blocks[blk]);
284                                 tdb->ecode = TDB_ERR_IO;
285                                 goto fail;
286                         }
287                         if (blk == tdb->transaction->num_blocks-1) {
288                                 tdb->transaction->last_block_size = len2;
289                         }
290                 }
291         }
292
293         /* overwrite part of an existing block */
294         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
295         if (blk == tdb->transaction->num_blocks-1) {
296                 if (len + off > tdb->transaction->last_block_size) {
297                         tdb->transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
305                  (blk*tdb->transaction->block_size) + off, len));
306         tdb->transaction->transaction_error = 1;
307         return -1;
308 }
309
310
311 /*
312   write while in a transaction - this variant never expands the transaction blocks, it only
313   updates existing blocks. This means it cannot change the recovery size
314 */
315 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
316                                       const void *buf, tdb_len_t len)
317 {
318         uint32_t blk;
319
320         /* break it up into block sized chunks */
321         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
322                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
323                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
324                         return -1;
325                 }
326                 len -= len2;
327                 off += len2;
328                 if (buf != NULL) {
329                         buf = (const void *)(len2 + (const char *)buf);
330                 }
331         }
332
333         if (len == 0) {
334                 return 0;
335         }
336
337         blk = off / tdb->transaction->block_size;
338         off = off % tdb->transaction->block_size;
339
340         if (tdb->transaction->num_blocks <= blk ||
341             tdb->transaction->blocks[blk] == NULL) {
342                 return 0;
343         }
344
345         if (blk == tdb->transaction->num_blocks-1 &&
346             off + len > tdb->transaction->last_block_size) {
347                 if (off >= tdb->transaction->last_block_size) {
348                         return 0;
349                 }
350                 len = tdb->transaction->last_block_size - off;
351         }
352
353         /* overwrite part of an existing block */
354         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
355
356         return 0;
357 }
358
359
360 /*
361   accelerated hash chain head search, using the cached hash heads
362 */
363 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
364 {
365         uint32_t h = *chain;
366         for (;h < tdb->hash_size;h++) {
367                 /* the +1 takes account of the freelist */
368                 if (0 != tdb->transaction->hash_heads[h+1]) {
369                         break;
370                 }
371         }
372         (*chain) = h;
373 }
374
375 /*
376   out of bounds check during a transaction
377 */
378 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
379                            tdb_len_t len, int probe)
380 {
381         if (off + len >= off && off + len <= tdb->map_size) {
382                 return 0;
383         }
384         tdb->ecode = TDB_ERR_IO;
385         return -1;
386 }
387
388 /*
389   transaction version of tdb_expand().
390 */
391 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
392                                    tdb_off_t addition)
393 {
394         const char buf_zero[8192] = {0};
395         size_t buf_len = sizeof(buf_zero);
396
397         while (addition > 0) {
398                 size_t n = MIN(addition, buf_len);
399                 int ret;
400
401                 ret = transaction_write(tdb, size, buf_zero, n);
402                 if (ret != 0) {
403                         return ret;
404                 }
405
406                 addition -= n;
407                 size += n;
408         }
409
410         tdb->transaction->expanded = true;
411
412         return 0;
413 }
414
415 static const struct tdb_methods transaction_methods = {
416         transaction_read,
417         transaction_write,
418         transaction_next_hash_chain,
419         transaction_oob,
420         transaction_expand_file,
421 };
422
423 /*
424  * Is a transaction currently active on this context?
425  *
426  */
427 _PUBLIC_ bool tdb_transaction_active(struct tdb_context *tdb)
428 {
429         return (tdb->transaction != NULL);
430 }
431
432 /*
433   start a tdb transaction. No token is returned, as only a single
434   transaction is allowed to be pending per tdb_context
435 */
436 static int _tdb_transaction_start(struct tdb_context *tdb,
437                                   enum tdb_lock_flags lockflags)
438 {
439         /* some sanity checks */
440         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
441             || tdb->traverse_read) {
442                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
443                 tdb->ecode = TDB_ERR_EINVAL;
444                 return -1;
445         }
446
447         /* cope with nested tdb_transaction_start() calls */
448         if (tdb->transaction != NULL) {
449                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
450                         tdb->ecode = TDB_ERR_NESTING;
451                         return -1;
452                 }
453                 tdb->transaction->nesting++;
454                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
455                          tdb->transaction->nesting));
456                 return 0;
457         }
458
459         if (tdb_have_extra_locks(tdb)) {
460                 /* the caller must not have any locks when starting a
461                    transaction as otherwise we'll be screwed by lack
462                    of nested locks in posix */
463                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
464                 tdb->ecode = TDB_ERR_LOCK;
465                 return -1;
466         }
467
468         if (tdb->travlocks.next != NULL) {
469                 /* you cannot use transactions inside a traverse (although you can use
470                    traverse inside a transaction) as otherwise you can end up with
471                    deadlock */
472                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
473                 tdb->ecode = TDB_ERR_LOCK;
474                 return -1;
475         }
476
477         tdb->transaction = (struct tdb_transaction *)
478                 calloc(sizeof(struct tdb_transaction), 1);
479         if (tdb->transaction == NULL) {
480                 tdb->ecode = TDB_ERR_OOM;
481                 return -1;
482         }
483
484         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
485         tdb->transaction->block_size = tdb->page_size;
486
487         /* get the transaction write lock. This is a blocking lock. As
488            discussed with Volker, there are a number of ways we could
489            make this async, which we will probably do in the future */
490         if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
491                 SAFE_FREE(tdb->transaction->blocks);
492                 SAFE_FREE(tdb->transaction);
493                 if ((lockflags & TDB_LOCK_WAIT) == 0) {
494                         tdb->ecode = TDB_ERR_NOLOCK;
495                 } else {
496                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
497                                  "tdb_transaction_start: "
498                                  "failed to get transaction lock\n"));
499                 }
500                 return -1;
501         }
502
503         /* get a read lock from the freelist to the end of file. This
504            is upgraded to a write lock during the commit */
505         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
506                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
507                 goto fail_allrecord_lock;
508         }
509
510         /* setup a copy of the hash table heads so the hash scan in
511            traverse can be fast */
512         tdb->transaction->hash_heads = (uint32_t *)
513                 calloc(tdb->hash_size+1, sizeof(uint32_t));
514         if (tdb->transaction->hash_heads == NULL) {
515                 tdb->ecode = TDB_ERR_OOM;
516                 goto fail;
517         }
518         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
519                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
520                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
521                 tdb->ecode = TDB_ERR_IO;
522                 goto fail;
523         }
524
525         /* make sure we know about any file expansions already done by
526            anyone else */
527         tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
528         tdb->transaction->old_map_size = tdb->map_size;
529
530         /* finally hook the io methods, replacing them with
531            transaction specific methods */
532         tdb->transaction->io_methods = tdb->methods;
533         tdb->methods = &transaction_methods;
534
535         /* Trace at the end, so we get sequence number correct. */
536         tdb_trace(tdb, "tdb_transaction_start");
537         return 0;
538
539 fail:
540         tdb_allrecord_unlock(tdb, F_RDLCK, false);
541 fail_allrecord_lock:
542         tdb_transaction_unlock(tdb, F_WRLCK);
543         SAFE_FREE(tdb->transaction->blocks);
544         SAFE_FREE(tdb->transaction->hash_heads);
545         SAFE_FREE(tdb->transaction);
546         return -1;
547 }
548
549 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
550 {
551         return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
552 }
553
554 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
555 {
556         return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
557 }
558
559 /*
560   sync to disk
561 */
562 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
563 {
564         if (tdb->flags & TDB_NOSYNC) {
565                 return 0;
566         }
567
568 #ifdef HAVE_FDATASYNC
569         if (fdatasync(tdb->fd) != 0) {
570 #else
571         if (fsync(tdb->fd) != 0) {
572 #endif
573                 tdb->ecode = TDB_ERR_IO;
574                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
575                 return -1;
576         }
577 #ifdef HAVE_MMAP
578         if (tdb->map_ptr) {
579                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
580                 if (msync(moffset + (char *)tdb->map_ptr,
581                           length + (offset - moffset), MS_SYNC) != 0) {
582                         tdb->ecode = TDB_ERR_IO;
583                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
584                                  strerror(errno)));
585                         return -1;
586                 }
587         }
588 #endif
589         return 0;
590 }
591
592
593 static int _tdb_transaction_cancel(struct tdb_context *tdb)
594 {
595         int i, ret = 0;
596
597         if (tdb->transaction == NULL) {
598                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
599                 return -1;
600         }
601
602         if (tdb->transaction->nesting != 0) {
603                 tdb->transaction->transaction_error = 1;
604                 tdb->transaction->nesting--;
605                 return 0;
606         }
607
608         tdb->map_size = tdb->transaction->old_map_size;
609
610         /* free all the transaction blocks */
611         for (i=0;i<tdb->transaction->num_blocks;i++) {
612                 if (tdb->transaction->blocks[i] != NULL) {
613                         free(tdb->transaction->blocks[i]);
614                 }
615         }
616         SAFE_FREE(tdb->transaction->blocks);
617
618         if (tdb->transaction->magic_offset) {
619                 const struct tdb_methods *methods = tdb->transaction->io_methods;
620                 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
621
622                 /* remove the recovery marker */
623                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
624                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
625                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
626                         ret = -1;
627                 }
628         }
629
630         /* This also removes the OPEN_LOCK, if we have it. */
631         tdb_release_transaction_locks(tdb);
632
633         /* restore the normal io methods */
634         tdb->methods = tdb->transaction->io_methods;
635
636         SAFE_FREE(tdb->transaction->hash_heads);
637         SAFE_FREE(tdb->transaction);
638
639         return ret;
640 }
641
642 /*
643   cancel the current transaction
644 */
645 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
646 {
647         tdb_trace(tdb, "tdb_transaction_cancel");
648         return _tdb_transaction_cancel(tdb);
649 }
650
651 /*
652   work out how much space the linearised recovery data will consume
653 */
654 static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
655 {
656         tdb_len_t recovery_size = 0;
657         int i;
658
659         recovery_size = sizeof(uint32_t);
660         for (i=0;i<tdb->transaction->num_blocks;i++) {
661                 tdb_len_t block_size;
662                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
663                         break;
664                 }
665                 if (tdb->transaction->blocks[i] == NULL) {
666                         continue;
667                 }
668                 if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
669                                    &recovery_size)) {
670                         return false;
671                 }
672                 if (i == tdb->transaction->num_blocks-1) {
673                         block_size = tdb->transaction->last_block_size;
674                 } else {
675                         block_size =  tdb->transaction->block_size;
676                 }
677                 if (!tdb_add_len_t(recovery_size, block_size,
678                                    &recovery_size)) {
679                         return false;
680                 }
681         }
682
683         *result = recovery_size;
684         return true;
685 }
686
687 int tdb_recovery_area(struct tdb_context *tdb,
688                       const struct tdb_methods *methods,
689                       tdb_off_t *recovery_offset,
690                       struct tdb_record *rec)
691 {
692         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
693                 return -1;
694         }
695
696         if (*recovery_offset == 0) {
697                 rec->rec_len = 0;
698                 return 0;
699         }
700
701         if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
702                               DOCONV()) == -1) {
703                 return -1;
704         }
705
706         /* ignore invalid recovery regions: can happen in crash */
707         if (rec->magic != TDB_RECOVERY_MAGIC &&
708             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
709                 *recovery_offset = 0;
710                 rec->rec_len = 0;
711         }
712         return 0;
713 }
714
715 /*
716   allocate the recovery area, or use an existing recovery area if it is
717   large enough
718 */
719 static int tdb_recovery_allocate(struct tdb_context *tdb,
720                                  tdb_len_t *recovery_size,
721                                  tdb_off_t *recovery_offset,
722                                  tdb_len_t *recovery_max_size)
723 {
724         struct tdb_record rec;
725         const struct tdb_methods *methods = tdb->transaction->io_methods;
726         tdb_off_t recovery_head, new_end;
727
728         if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
729                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
730                 return -1;
731         }
732
733         if (!tdb_recovery_size(tdb, recovery_size)) {
734                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
735                          "overflow recovery size\n"));
736                 return -1;
737         }
738
739         /* Existing recovery area? */
740         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
741                 /* it fits in the existing area */
742                 *recovery_max_size = rec.rec_len;
743                 *recovery_offset = recovery_head;
744                 return 0;
745         }
746
747         /* If recovery area in middle of file, we need a new one. */
748         if (recovery_head == 0
749             || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
750                 /* we need to free up the old recovery area, then allocate a
751                    new one at the end of the file. Note that we cannot use
752                    tdb_allocate() to allocate the new one as that might return
753                    us an area that is being currently used (as of the start of
754                    the transaction) */
755                 if (recovery_head) {
756                         if (tdb_free(tdb, recovery_head, &rec) == -1) {
757                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
758                                          "tdb_recovery_allocate: failed to"
759                                          " free previous recovery area\n"));
760                                 return -1;
761                         }
762
763                         /* the tdb_free() call might have increased
764                          * the recovery size */
765                         if (!tdb_recovery_size(tdb, recovery_size)) {
766                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
767                                          "tdb_recovery_allocate: "
768                                          "overflow recovery size\n"));
769                                 return -1;
770                         }
771                 }
772
773                 /* New head will be at end of file. */
774                 recovery_head = tdb->map_size;
775         }
776
777         /* Now we know where it will be. */
778         *recovery_offset = recovery_head;
779
780         /* Expand by more than we need, so we don't do it often. */
781         *recovery_max_size = tdb_expand_adjust(tdb->map_size,
782                                                *recovery_size,
783                                                tdb->page_size)
784                 - sizeof(rec);
785
786         if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
787             !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
788                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
789                          "overflow recovery area\n"));
790                 return -1;
791         }
792
793         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
794                                      new_end - tdb->transaction->old_map_size)
795             == -1) {
796                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
797                 return -1;
798         }
799
800         /* remap the file (if using mmap) */
801         methods->tdb_oob(tdb, tdb->map_size, 1, 1);
802
803         /* we have to reset the old map size so that we don't try to expand the file
804            again in the transaction commit, which would destroy the recovery area */
805         tdb->transaction->old_map_size = tdb->map_size;
806
807         /* write the recovery header offset and sync - we can sync without a race here
808            as the magic ptr in the recovery record has not been set */
809         CONVERT(recovery_head);
810         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
811                                &recovery_head, sizeof(tdb_off_t)) == -1) {
812                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
813                 return -1;
814         }
815         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
816                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
817                 return -1;
818         }
819
820         return 0;
821 }
822
823
824 /*
825   setup the recovery data that will be used on a crash during commit
826 */
827 static int transaction_setup_recovery(struct tdb_context *tdb,
828                                       tdb_off_t *magic_offset)
829 {
830         tdb_len_t recovery_size;
831         unsigned char *data, *p;
832         const struct tdb_methods *methods = tdb->transaction->io_methods;
833         struct tdb_record *rec;
834         tdb_off_t recovery_offset, recovery_max_size;
835         tdb_off_t old_map_size = tdb->transaction->old_map_size;
836         uint32_t magic, tailer;
837         int i;
838
839         /*
840           check that the recovery area has enough space
841         */
842         if (tdb_recovery_allocate(tdb, &recovery_size,
843                                   &recovery_offset, &recovery_max_size) == -1) {
844                 return -1;
845         }
846
847         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
848         if (data == NULL) {
849                 tdb->ecode = TDB_ERR_OOM;
850                 return -1;
851         }
852
853         rec = (struct tdb_record *)data;
854         memset(rec, 0, sizeof(*rec));
855
856         rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
857         rec->data_len = recovery_size;
858         rec->rec_len  = recovery_max_size;
859         rec->key_len  = old_map_size;
860         CONVERT(*rec);
861
862         /* build the recovery data into a single blob to allow us to do a single
863            large write, which should be more efficient */
864         p = data + sizeof(*rec);
865         for (i=0;i<tdb->transaction->num_blocks;i++) {
866                 tdb_off_t offset;
867                 tdb_len_t length;
868
869                 if (tdb->transaction->blocks[i] == NULL) {
870                         continue;
871                 }
872
873                 offset = i * tdb->transaction->block_size;
874                 length = tdb->transaction->block_size;
875                 if (i == tdb->transaction->num_blocks-1) {
876                         length = tdb->transaction->last_block_size;
877                 }
878
879                 if (offset >= old_map_size) {
880                         continue;
881                 }
882                 if (offset + length > tdb->transaction->old_map_size) {
883                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
884                         free(data);
885                         tdb->ecode = TDB_ERR_CORRUPT;
886                         return -1;
887                 }
888                 memcpy(p, &offset, 4);
889                 memcpy(p+4, &length, 4);
890                 if (DOCONV()) {
891                         tdb_convert(p, 8);
892                 }
893                 /* the recovery area contains the old data, not the
894                    new data, so we have to call the original tdb_read
895                    method to get it */
896                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
897                         free(data);
898                         tdb->ecode = TDB_ERR_IO;
899                         return -1;
900                 }
901                 p += 8 + length;
902         }
903
904         /* and the tailer */
905         tailer = sizeof(*rec) + recovery_max_size;
906         memcpy(p, &tailer, 4);
907         if (DOCONV()) {
908                 tdb_convert(p, 4);
909         }
910
911         /* write the recovery data to the recovery area */
912         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
913                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
914                 free(data);
915                 tdb->ecode = TDB_ERR_IO;
916                 return -1;
917         }
918         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
919                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
920                 free(data);
921                 tdb->ecode = TDB_ERR_IO;
922                 return -1;
923         }
924
925         /* as we don't have ordered writes, we have to sync the recovery
926            data before we update the magic to indicate that the recovery
927            data is present */
928         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
929                 free(data);
930                 return -1;
931         }
932
933         free(data);
934
935         magic = TDB_RECOVERY_MAGIC;
936         CONVERT(magic);
937
938         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
939
940         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
941                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
942                 tdb->ecode = TDB_ERR_IO;
943                 return -1;
944         }
945         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
946                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
947                 tdb->ecode = TDB_ERR_IO;
948                 return -1;
949         }
950
951         /* ensure the recovery magic marker is on disk */
952         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
953                 return -1;
954         }
955
956         return 0;
957 }
958
959 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
960 {
961         const struct tdb_methods *methods;
962
963         if (tdb->transaction == NULL) {
964                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
965                 return -1;
966         }
967
968         if (tdb->transaction->prepared) {
969                 tdb->ecode = TDB_ERR_EINVAL;
970                 _tdb_transaction_cancel(tdb);
971                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
972                 return -1;
973         }
974
975         if (tdb->transaction->transaction_error) {
976                 tdb->ecode = TDB_ERR_IO;
977                 _tdb_transaction_cancel(tdb);
978                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
979                 return -1;
980         }
981
982
983         if (tdb->transaction->nesting != 0) {
984                 return 0;
985         }
986
987         /* check for a null transaction */
988         if (tdb->transaction->blocks == NULL) {
989                 return 0;
990         }
991
992         methods = tdb->transaction->io_methods;
993
994         /* if there are any locks pending then the caller has not
995            nested their locks properly, so fail the transaction */
996         if (tdb_have_extra_locks(tdb)) {
997                 tdb->ecode = TDB_ERR_LOCK;
998                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
999                 _tdb_transaction_cancel(tdb);
1000                 return -1;
1001         }
1002
1003         /* upgrade the main transaction lock region to a write lock */
1004         if (tdb_allrecord_upgrade(tdb) == -1) {
1005                 if (tdb->ecode == TDB_ERR_RDONLY && tdb->read_only) {
1006                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1007                                  "tdb_transaction_prepare_commit: "
1008                                  "failed to upgrade hash locks: "
1009                                  "database is read only\n"));
1010                 } else if (tdb->ecode == TDB_ERR_RDONLY
1011                            && tdb->traverse_read) {
1012                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1013                                  "tdb_transaction_prepare_commit: "
1014                                  "failed to upgrade hash locks: "
1015                                  "a database traverse is in progress\n"));
1016                 } else {
1017                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1018                                  "tdb_transaction_prepare_commit: "
1019                                  "failed to upgrade hash locks: %s\n",
1020                                  tdb_errorstr(tdb)));
1021                 }
1022                 _tdb_transaction_cancel(tdb);
1023                 return -1;
1024         }
1025
1026         /* get the open lock - this prevents new users attaching to the database
1027            during the commit */
1028         if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
1029                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
1030                 _tdb_transaction_cancel(tdb);
1031                 return -1;
1032         }
1033
1034         /* write the recovery data to the end of the file */
1035         if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
1036                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
1037                 _tdb_transaction_cancel(tdb);
1038                 return -1;
1039         }
1040
1041         tdb->transaction->prepared = true;
1042
1043         /* expand the file to the new size if needed */
1044         if (tdb->map_size != tdb->transaction->old_map_size) {
1045                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1046                                              tdb->map_size -
1047                                              tdb->transaction->old_map_size) == -1) {
1048                         tdb->ecode = TDB_ERR_IO;
1049                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
1050                         _tdb_transaction_cancel(tdb);
1051                         return -1;
1052                 }
1053                 tdb->map_size = tdb->transaction->old_map_size;
1054                 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1055         }
1056
1057         /* Keep the open lock until the actual commit */
1058
1059         return 0;
1060 }
1061
1062 /*
1063    prepare to commit the current transaction
1064 */
1065 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1066 {
1067         tdb_trace(tdb, "tdb_transaction_prepare_commit");
1068         return _tdb_transaction_prepare_commit(tdb);
1069 }
1070
1071 /* A repack is worthwhile if the largest is less than half total free. */
1072 static bool repack_worthwhile(struct tdb_context *tdb)
1073 {
1074         tdb_off_t ptr;
1075         struct tdb_record rec;
1076         tdb_len_t total = 0, largest = 0;
1077
1078         if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1079                 return false;
1080         }
1081
1082         while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1083                 total += rec.rec_len;
1084                 if (rec.rec_len > largest) {
1085                         largest = rec.rec_len;
1086                 }
1087                 ptr = rec.next;
1088         }
1089
1090         return total > largest * 2;
1091 }
1092
1093 /*
1094   commit the current transaction
1095 */
1096 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1097 {
1098         const struct tdb_methods *methods;
1099         int i;
1100         bool need_repack = false;
1101
1102         if (tdb->transaction == NULL) {
1103                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1104                 return -1;
1105         }
1106
1107         tdb_trace(tdb, "tdb_transaction_commit");
1108
1109         if (tdb->transaction->transaction_error) {
1110                 tdb->ecode = TDB_ERR_IO;
1111                 _tdb_transaction_cancel(tdb);
1112                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1113                 return -1;
1114         }
1115
1116
1117         if (tdb->transaction->nesting != 0) {
1118                 tdb->transaction->nesting--;
1119                 return 0;
1120         }
1121
1122         /* check for a null transaction */
1123         if (tdb->transaction->blocks == NULL) {
1124                 _tdb_transaction_cancel(tdb);
1125                 return 0;
1126         }
1127
1128         if (!tdb->transaction->prepared) {
1129                 int ret = _tdb_transaction_prepare_commit(tdb);
1130                 if (ret)
1131                         return ret;
1132         }
1133
1134         methods = tdb->transaction->io_methods;
1135
1136         /* perform all the writes */
1137         for (i=0;i<tdb->transaction->num_blocks;i++) {
1138                 tdb_off_t offset;
1139                 tdb_len_t length;
1140
1141                 if (tdb->transaction->blocks[i] == NULL) {
1142                         continue;
1143                 }
1144
1145                 offset = i * tdb->transaction->block_size;
1146                 length = tdb->transaction->block_size;
1147                 if (i == tdb->transaction->num_blocks-1) {
1148                         length = tdb->transaction->last_block_size;
1149                 }
1150
1151                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1152                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1153
1154                         /* we've overwritten part of the data and
1155                            possibly expanded the file, so we need to
1156                            run the crash recovery code */
1157                         tdb->methods = methods;
1158                         tdb_transaction_recover(tdb);
1159
1160                         _tdb_transaction_cancel(tdb);
1161
1162                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1163                         return -1;
1164                 }
1165                 SAFE_FREE(tdb->transaction->blocks[i]);
1166         }
1167
1168         /* Do this before we drop lock or blocks. */
1169         if (tdb->transaction->expanded) {
1170                 need_repack = repack_worthwhile(tdb);
1171         }
1172
1173         SAFE_FREE(tdb->transaction->blocks);
1174         tdb->transaction->num_blocks = 0;
1175
1176         /* ensure the new data is on disk */
1177         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1178                 return -1;
1179         }
1180
1181         /*
1182           TODO: maybe write to some dummy hdr field, or write to magic
1183           offset without mmap, before the last sync, instead of the
1184           utime() call
1185         */
1186
1187         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1188            don't change the mtime of the file, this means the file may
1189            not be backed up (as tdb rounding to block sizes means that
1190            file size changes are quite rare too). The following forces
1191            mtime changes when a transaction completes */
1192 #ifdef HAVE_UTIME
1193         utime(tdb->name, NULL);
1194 #endif
1195
1196         /* use a transaction cancel to free memory and remove the
1197            transaction locks */
1198         _tdb_transaction_cancel(tdb);
1199
1200         if (need_repack) {
1201                 return tdb_repack(tdb);
1202         }
1203
1204         return 0;
1205 }
1206
1207
1208 /*
1209   recover from an aborted transaction. Must be called with exclusive
1210   database write access already established (including the open
1211   lock to prevent new processes attaching)
1212 */
1213 int tdb_transaction_recover(struct tdb_context *tdb)
1214 {
1215         tdb_off_t recovery_head, recovery_eof;
1216         unsigned char *data, *p;
1217         uint32_t zero = 0;
1218         struct tdb_record rec;
1219
1220         /* find the recovery area */
1221         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1222                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1223                 tdb->ecode = TDB_ERR_IO;
1224                 return -1;
1225         }
1226
1227         if (recovery_head == 0) {
1228                 /* we have never allocated a recovery record */
1229                 return 0;
1230         }
1231
1232         /* read the recovery record */
1233         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1234                                    sizeof(rec), DOCONV()) == -1) {
1235                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1236                 tdb->ecode = TDB_ERR_IO;
1237                 return -1;
1238         }
1239
1240         if (rec.magic != TDB_RECOVERY_MAGIC) {
1241                 /* there is no valid recovery data */
1242                 return 0;
1243         }
1244
1245         if (tdb->read_only) {
1246                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1247                 tdb->ecode = TDB_ERR_CORRUPT;
1248                 return -1;
1249         }
1250
1251         recovery_eof = rec.key_len;
1252
1253         data = (unsigned char *)malloc(rec.data_len);
1254         if (data == NULL) {
1255                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1256                 tdb->ecode = TDB_ERR_OOM;
1257                 return -1;
1258         }
1259
1260         /* read the full recovery data */
1261         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1262                                    rec.data_len, 0) == -1) {
1263                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1264                 tdb->ecode = TDB_ERR_IO;
1265                 return -1;
1266         }
1267
1268         /* recover the file data */
1269         p = data;
1270         while (p+8 < data + rec.data_len) {
1271                 uint32_t ofs, len;
1272                 if (DOCONV()) {
1273                         tdb_convert(p, 8);
1274                 }
1275                 memcpy(&ofs, p, 4);
1276                 memcpy(&len, p+4, 4);
1277
1278                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1279                         free(data);
1280                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
1281                         tdb->ecode = TDB_ERR_IO;
1282                         return -1;
1283                 }
1284                 p += 8 + len;
1285         }
1286
1287         free(data);
1288
1289         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1290                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1291                 tdb->ecode = TDB_ERR_IO;
1292                 return -1;
1293         }
1294
1295         /* if the recovery area is after the recovered eof then remove it */
1296         if (recovery_eof <= recovery_head) {
1297                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1298                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1299                         tdb->ecode = TDB_ERR_IO;
1300                         return -1;
1301                 }
1302         }
1303
1304         /* remove the recovery magic */
1305         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1306                           &zero) == -1) {
1307                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1308                 tdb->ecode = TDB_ERR_IO;
1309                 return -1;
1310         }
1311
1312         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1313                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1314                 tdb->ecode = TDB_ERR_IO;
1315                 return -1;
1316         }
1317
1318         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
1319                  recovery_eof));
1320
1321         /* all done */
1322         return 0;
1323 }
1324
1325 /* Any I/O failures we say "needs recovery". */
1326 bool tdb_needs_recovery(struct tdb_context *tdb)
1327 {
1328         tdb_off_t recovery_head;
1329         struct tdb_record rec;
1330
1331         /* find the recovery area */
1332         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1333                 return true;
1334         }
1335
1336         if (recovery_head == 0) {
1337                 /* we have never allocated a recovery record */
1338                 return false;
1339         }
1340
1341         /* read the recovery record */
1342         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1343                                    sizeof(rec), DOCONV()) == -1) {
1344                 return true;
1345         }
1346
1347         return (rec.magic == TDB_RECOVERY_MAGIC);
1348 }