tdb: Make tdb_recovery_size overflow-safe
[samba.git] / lib / tdb / common / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no fsync/msync calls are made.  This means we
86     are still proof against a process dying during transaction commit,
87     but not against machine reboot.
88
89   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90     tdb_add_flags() transaction nesting is enabled.
91     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92     The default is that transaction nesting is allowed.
93     Note: this default may change in future versions of tdb.
94
95     Beware. when transactions are nested a transaction successfully
96     completed with tdb_transaction_commit() can be silently unrolled later.
97
98   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99     tdb_add_flags() transaction nesting is disabled.
100     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102     The default is that transaction nesting is allowed.
103     Note: this default may change in future versions of tdb.
104 */
105
106
107 /*
108   hold the context of any current transaction
109 */
110 struct tdb_transaction {
111         /* we keep a mirrored copy of the tdb hash heads here so
112            tdb_next_hash_chain() can operate efficiently */
113         uint32_t *hash_heads;
114
115         /* the original io methods - used to do IOs to the real db */
116         const struct tdb_methods *io_methods;
117
118         /* the list of transaction blocks. When a block is first
119            written to, it gets created in this list */
120         uint8_t **blocks;
121         uint32_t num_blocks;
122         uint32_t block_size;      /* bytes in each block */
123         uint32_t last_block_size; /* number of valid bytes in the last block */
124
125         /* non-zero when an internal transaction error has
126            occurred. All write operations will then fail until the
127            transaction is ended */
128         int transaction_error;
129
130         /* when inside a transaction we need to keep track of any
131            nested tdb_transaction_start() calls, as these are allowed,
132            but don't create a new transaction */
133         int nesting;
134
135         /* set when a prepare has already occurred */
136         bool prepared;
137         tdb_off_t magic_offset;
138
139         /* old file size before transaction */
140         tdb_len_t old_map_size;
141
142         /* did we expand in this transaction */
143         bool expanded;
144 };
145
146
147 /*
148   read while in a transaction. We need to check first if the data is in our list
149   of transaction elements, then if not do a real read
150 */
151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152                             tdb_len_t len, int cv)
153 {
154         uint32_t blk;
155
156         /* break it down into block sized ops */
157         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160                         return -1;
161                 }
162                 len -= len2;
163                 off += len2;
164                 buf = (void *)(len2 + (char *)buf);
165         }
166
167         if (len == 0) {
168                 return 0;
169         }
170
171         blk = off / tdb->transaction->block_size;
172
173         /* see if we have it in the block list */
174         if (tdb->transaction->num_blocks <= blk ||
175             tdb->transaction->blocks[blk] == NULL) {
176                 /* nope, do a real read */
177                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178                         goto fail;
179                 }
180                 return 0;
181         }
182
183         /* it is in the block list. Now check for the last block */
184         if (blk == tdb->transaction->num_blocks-1) {
185                 if (len > tdb->transaction->last_block_size) {
186                         goto fail;
187                 }
188         }
189
190         /* now copy it out of this block */
191         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192         if (cv) {
193                 tdb_convert(buf, len);
194         }
195         return 0;
196
197 fail:
198         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
199         tdb->ecode = TDB_ERR_IO;
200         tdb->transaction->transaction_error = 1;
201         return -1;
202 }
203
204
205 /*
206   write while in a transaction
207 */
208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209                              const void *buf, tdb_len_t len)
210 {
211         uint32_t blk;
212
213         /* Only a commit is allowed on a prepared transaction */
214         if (tdb->transaction->prepared) {
215                 tdb->ecode = TDB_ERR_EINVAL;
216                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
217                 tdb->transaction->transaction_error = 1;
218                 return -1;
219         }
220
221         /* if the write is to a hash head, then update the transaction
222            hash heads */
223         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
224             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
225                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
226                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
227         }
228
229         /* break it up into block sized chunks */
230         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
231                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
232                 if (transaction_write(tdb, off, buf, len2) != 0) {
233                         return -1;
234                 }
235                 len -= len2;
236                 off += len2;
237                 if (buf != NULL) {
238                         buf = (const void *)(len2 + (const char *)buf);
239                 }
240         }
241
242         if (len == 0) {
243                 return 0;
244         }
245
246         blk = off / tdb->transaction->block_size;
247         off = off % tdb->transaction->block_size;
248
249         if (tdb->transaction->num_blocks <= blk) {
250                 uint8_t **new_blocks;
251                 /* expand the blocks array */
252                 new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
253                                                  (blk+1)*sizeof(uint8_t *));
254                 if (new_blocks == NULL) {
255                         tdb->ecode = TDB_ERR_OOM;
256                         goto fail;
257                 }
258                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
259                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
260                 tdb->transaction->blocks = new_blocks;
261                 tdb->transaction->num_blocks = blk+1;
262                 tdb->transaction->last_block_size = 0;
263         }
264
265         /* allocate and fill a block? */
266         if (tdb->transaction->blocks[blk] == NULL) {
267                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
268                 if (tdb->transaction->blocks[blk] == NULL) {
269                         tdb->ecode = TDB_ERR_OOM;
270                         tdb->transaction->transaction_error = 1;
271                         return -1;
272                 }
273                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
274                         tdb_len_t len2 = tdb->transaction->block_size;
275                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
276                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
277                         }
278                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
279                                                                    tdb->transaction->blocks[blk],
280                                                                    len2, 0) != 0) {
281                                 SAFE_FREE(tdb->transaction->blocks[blk]);
282                                 tdb->ecode = TDB_ERR_IO;
283                                 goto fail;
284                         }
285                         if (blk == tdb->transaction->num_blocks-1) {
286                                 tdb->transaction->last_block_size = len2;
287                         }
288                 }
289         }
290
291         /* overwrite part of an existing block */
292         if (buf == NULL) {
293                 memset(tdb->transaction->blocks[blk] + off, 0, len);
294         } else {
295                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
296         }
297         if (blk == tdb->transaction->num_blocks-1) {
298                 if (len + off > tdb->transaction->last_block_size) {
299                         tdb->transaction->last_block_size = len + off;
300                 }
301         }
302
303         return 0;
304
305 fail:
306         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
307                  (blk*tdb->transaction->block_size) + off, len));
308         tdb->transaction->transaction_error = 1;
309         return -1;
310 }
311
312
313 /*
314   write while in a transaction - this variant never expands the transaction blocks, it only
315   updates existing blocks. This means it cannot change the recovery size
316 */
317 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
318                                       const void *buf, tdb_len_t len)
319 {
320         uint32_t blk;
321
322         /* break it up into block sized chunks */
323         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
324                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
325                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
326                         return -1;
327                 }
328                 len -= len2;
329                 off += len2;
330                 if (buf != NULL) {
331                         buf = (const void *)(len2 + (const char *)buf);
332                 }
333         }
334
335         if (len == 0) {
336                 return 0;
337         }
338
339         blk = off / tdb->transaction->block_size;
340         off = off % tdb->transaction->block_size;
341
342         if (tdb->transaction->num_blocks <= blk ||
343             tdb->transaction->blocks[blk] == NULL) {
344                 return 0;
345         }
346
347         if (blk == tdb->transaction->num_blocks-1 &&
348             off + len > tdb->transaction->last_block_size) {
349                 if (off >= tdb->transaction->last_block_size) {
350                         return 0;
351                 }
352                 len = tdb->transaction->last_block_size - off;
353         }
354
355         /* overwrite part of an existing block */
356         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
357
358         return 0;
359 }
360
361
362 /*
363   accelerated hash chain head search, using the cached hash heads
364 */
365 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
366 {
367         uint32_t h = *chain;
368         for (;h < tdb->hash_size;h++) {
369                 /* the +1 takes account of the freelist */
370                 if (0 != tdb->transaction->hash_heads[h+1]) {
371                         break;
372                 }
373         }
374         (*chain) = h;
375 }
376
377 /*
378   out of bounds check during a transaction
379 */
380 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
381                            tdb_len_t len, int probe)
382 {
383         if (off + len >= off && off + len <= tdb->map_size) {
384                 return 0;
385         }
386         tdb->ecode = TDB_ERR_IO;
387         return -1;
388 }
389
390 /*
391   transaction version of tdb_expand().
392 */
393 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
394                                    tdb_off_t addition)
395 {
396         /* add a write to the transaction elements, so subsequent
397            reads see the zero data */
398         if (transaction_write(tdb, size, NULL, addition) != 0) {
399                 return -1;
400         }
401
402         tdb->transaction->expanded = true;
403
404         return 0;
405 }
406
407 static const struct tdb_methods transaction_methods = {
408         transaction_read,
409         transaction_write,
410         transaction_next_hash_chain,
411         transaction_oob,
412         transaction_expand_file,
413 };
414
415
416 /*
417   start a tdb transaction. No token is returned, as only a single
418   transaction is allowed to be pending per tdb_context
419 */
420 static int _tdb_transaction_start(struct tdb_context *tdb,
421                                   enum tdb_lock_flags lockflags)
422 {
423         /* some sanity checks */
424         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
425                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
426                 tdb->ecode = TDB_ERR_EINVAL;
427                 return -1;
428         }
429
430         /* cope with nested tdb_transaction_start() calls */
431         if (tdb->transaction != NULL) {
432                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
433                         tdb->ecode = TDB_ERR_NESTING;
434                         return -1;
435                 }
436                 tdb->transaction->nesting++;
437                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
438                          tdb->transaction->nesting));
439                 return 0;
440         }
441
442         if (tdb_have_extra_locks(tdb)) {
443                 /* the caller must not have any locks when starting a
444                    transaction as otherwise we'll be screwed by lack
445                    of nested locks in posix */
446                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
447                 tdb->ecode = TDB_ERR_LOCK;
448                 return -1;
449         }
450
451         if (tdb->travlocks.next != NULL) {
452                 /* you cannot use transactions inside a traverse (although you can use
453                    traverse inside a transaction) as otherwise you can end up with
454                    deadlock */
455                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
456                 tdb->ecode = TDB_ERR_LOCK;
457                 return -1;
458         }
459
460         tdb->transaction = (struct tdb_transaction *)
461                 calloc(sizeof(struct tdb_transaction), 1);
462         if (tdb->transaction == NULL) {
463                 tdb->ecode = TDB_ERR_OOM;
464                 return -1;
465         }
466
467         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
468         tdb->transaction->block_size = tdb->page_size;
469
470         /* get the transaction write lock. This is a blocking lock. As
471            discussed with Volker, there are a number of ways we could
472            make this async, which we will probably do in the future */
473         if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
474                 SAFE_FREE(tdb->transaction->blocks);
475                 SAFE_FREE(tdb->transaction);
476                 if ((lockflags & TDB_LOCK_WAIT) == 0) {
477                         tdb->ecode = TDB_ERR_NOLOCK;
478                 }
479                 return -1;
480         }
481
482         /* get a read lock from the freelist to the end of file. This
483            is upgraded to a write lock during the commit */
484         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
485                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
486                 goto fail_allrecord_lock;
487         }
488
489         /* setup a copy of the hash table heads so the hash scan in
490            traverse can be fast */
491         tdb->transaction->hash_heads = (uint32_t *)
492                 calloc(tdb->hash_size+1, sizeof(uint32_t));
493         if (tdb->transaction->hash_heads == NULL) {
494                 tdb->ecode = TDB_ERR_OOM;
495                 goto fail;
496         }
497         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
498                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
499                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
500                 tdb->ecode = TDB_ERR_IO;
501                 goto fail;
502         }
503
504         /* make sure we know about any file expansions already done by
505            anyone else */
506         tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
507         tdb->transaction->old_map_size = tdb->map_size;
508
509         /* finally hook the io methods, replacing them with
510            transaction specific methods */
511         tdb->transaction->io_methods = tdb->methods;
512         tdb->methods = &transaction_methods;
513
514         /* Trace at the end, so we get sequence number correct. */
515         tdb_trace(tdb, "tdb_transaction_start");
516         return 0;
517
518 fail:
519         tdb_allrecord_unlock(tdb, F_RDLCK, false);
520 fail_allrecord_lock:
521         tdb_transaction_unlock(tdb, F_WRLCK);
522         SAFE_FREE(tdb->transaction->blocks);
523         SAFE_FREE(tdb->transaction->hash_heads);
524         SAFE_FREE(tdb->transaction);
525         return -1;
526 }
527
528 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
529 {
530         return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
531 }
532
533 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
534 {
535         return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
536 }
537
538 /*
539   sync to disk
540 */
541 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
542 {
543         if (tdb->flags & TDB_NOSYNC) {
544                 return 0;
545         }
546
547 #ifdef HAVE_FDATASYNC
548         if (fdatasync(tdb->fd) != 0) {
549 #else
550         if (fsync(tdb->fd) != 0) {
551 #endif
552                 tdb->ecode = TDB_ERR_IO;
553                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
554                 return -1;
555         }
556 #ifdef HAVE_MMAP
557         if (tdb->map_ptr) {
558                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
559                 if (msync(moffset + (char *)tdb->map_ptr,
560                           length + (offset - moffset), MS_SYNC) != 0) {
561                         tdb->ecode = TDB_ERR_IO;
562                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
563                                  strerror(errno)));
564                         return -1;
565                 }
566         }
567 #endif
568         return 0;
569 }
570
571
572 static int _tdb_transaction_cancel(struct tdb_context *tdb)
573 {
574         int i, ret = 0;
575
576         if (tdb->transaction == NULL) {
577                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
578                 return -1;
579         }
580
581         if (tdb->transaction->nesting != 0) {
582                 tdb->transaction->transaction_error = 1;
583                 tdb->transaction->nesting--;
584                 return 0;
585         }
586
587         tdb->map_size = tdb->transaction->old_map_size;
588
589         /* free all the transaction blocks */
590         for (i=0;i<tdb->transaction->num_blocks;i++) {
591                 if (tdb->transaction->blocks[i] != NULL) {
592                         free(tdb->transaction->blocks[i]);
593                 }
594         }
595         SAFE_FREE(tdb->transaction->blocks);
596
597         if (tdb->transaction->magic_offset) {
598                 const struct tdb_methods *methods = tdb->transaction->io_methods;
599                 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
600
601                 /* remove the recovery marker */
602                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
603                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
604                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
605                         ret = -1;
606                 }
607         }
608
609         /* This also removes the OPEN_LOCK, if we have it. */
610         tdb_release_transaction_locks(tdb);
611
612         /* restore the normal io methods */
613         tdb->methods = tdb->transaction->io_methods;
614
615         SAFE_FREE(tdb->transaction->hash_heads);
616         SAFE_FREE(tdb->transaction);
617
618         return ret;
619 }
620
621 /*
622   cancel the current transaction
623 */
624 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
625 {
626         tdb_trace(tdb, "tdb_transaction_cancel");
627         return _tdb_transaction_cancel(tdb);
628 }
629
630 /*
631   work out how much space the linearised recovery data will consume
632 */
633 static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
634 {
635         tdb_len_t recovery_size = 0;
636         int i;
637
638         recovery_size = sizeof(uint32_t);
639         for (i=0;i<tdb->transaction->num_blocks;i++) {
640                 tdb_len_t block_size;
641                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
642                         break;
643                 }
644                 if (tdb->transaction->blocks[i] == NULL) {
645                         continue;
646                 }
647                 if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
648                                    &recovery_size)) {
649                         return false;
650                 }
651                 if (i == tdb->transaction->num_blocks-1) {
652                         block_size = tdb->transaction->last_block_size;
653                 } else {
654                         block_size =  tdb->transaction->block_size;
655                 }
656                 if (!tdb_add_len_t(recovery_size, block_size,
657                                    &recovery_size)) {
658                         return false;
659                 }
660         }
661
662         *result = recovery_size;
663         return true;
664 }
665
666 int tdb_recovery_area(struct tdb_context *tdb,
667                       const struct tdb_methods *methods,
668                       tdb_off_t *recovery_offset,
669                       struct tdb_record *rec)
670 {
671         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
672                 return -1;
673         }
674
675         if (*recovery_offset == 0) {
676                 rec->rec_len = 0;
677                 return 0;
678         }
679
680         if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
681                               DOCONV()) == -1) {
682                 return -1;
683         }
684
685         /* ignore invalid recovery regions: can happen in crash */
686         if (rec->magic != TDB_RECOVERY_MAGIC &&
687             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
688                 *recovery_offset = 0;
689                 rec->rec_len = 0;
690         }
691         return 0;
692 }
693
694 /*
695   allocate the recovery area, or use an existing recovery area if it is
696   large enough
697 */
698 static int tdb_recovery_allocate(struct tdb_context *tdb,
699                                  tdb_len_t *recovery_size,
700                                  tdb_off_t *recovery_offset,
701                                  tdb_len_t *recovery_max_size)
702 {
703         struct tdb_record rec;
704         const struct tdb_methods *methods = tdb->transaction->io_methods;
705         tdb_off_t recovery_head, new_end;
706
707         if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
708                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
709                 return -1;
710         }
711
712         if (!tdb_recovery_size(tdb, recovery_size)) {
713                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
714                          "overflow recovery size\n"));
715                 return -1;
716         }
717
718         /* Existing recovery area? */
719         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
720                 /* it fits in the existing area */
721                 *recovery_max_size = rec.rec_len;
722                 *recovery_offset = recovery_head;
723                 return 0;
724         }
725
726         /* If recovery area in middle of file, we need a new one. */
727         if (recovery_head == 0
728             || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
729                 /* we need to free up the old recovery area, then allocate a
730                    new one at the end of the file. Note that we cannot use
731                    tdb_allocate() to allocate the new one as that might return
732                    us an area that is being currently used (as of the start of
733                    the transaction) */
734                 if (recovery_head) {
735                         if (tdb_free(tdb, recovery_head, &rec) == -1) {
736                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
737                                          "tdb_recovery_allocate: failed to"
738                                          " free previous recovery area\n"));
739                                 return -1;
740                         }
741
742                         /* the tdb_free() call might have increased
743                          * the recovery size */
744                         if (!tdb_recovery_size(tdb, recovery_size)) {
745                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
746                                          "tdb_recovery_allocate: "
747                                          "overflow recovery size\n"));
748                                 return -1;
749                         }
750                 }
751
752                 /* New head will be at end of file. */
753                 recovery_head = tdb->map_size;
754         }
755
756         /* Now we know where it will be. */
757         *recovery_offset = recovery_head;
758
759         /* Expand by more than we need, so we don't do it often. */
760         *recovery_max_size = tdb_expand_adjust(tdb->map_size,
761                                                *recovery_size,
762                                                tdb->page_size)
763                 - sizeof(rec);
764
765         new_end = recovery_head + sizeof(rec) + *recovery_max_size;
766
767         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
768                                      new_end - tdb->transaction->old_map_size)
769             == -1) {
770                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
771                 return -1;
772         }
773
774         /* remap the file (if using mmap) */
775         methods->tdb_oob(tdb, tdb->map_size, 1, 1);
776
777         /* we have to reset the old map size so that we don't try to expand the file
778            again in the transaction commit, which would destroy the recovery area */
779         tdb->transaction->old_map_size = tdb->map_size;
780
781         /* write the recovery header offset and sync - we can sync without a race here
782            as the magic ptr in the recovery record has not been set */
783         CONVERT(recovery_head);
784         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
785                                &recovery_head, sizeof(tdb_off_t)) == -1) {
786                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
787                 return -1;
788         }
789         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
790                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
791                 return -1;
792         }
793
794         return 0;
795 }
796
797
798 /*
799   setup the recovery data that will be used on a crash during commit
800 */
801 static int transaction_setup_recovery(struct tdb_context *tdb,
802                                       tdb_off_t *magic_offset)
803 {
804         tdb_len_t recovery_size;
805         unsigned char *data, *p;
806         const struct tdb_methods *methods = tdb->transaction->io_methods;
807         struct tdb_record *rec;
808         tdb_off_t recovery_offset, recovery_max_size;
809         tdb_off_t old_map_size = tdb->transaction->old_map_size;
810         uint32_t magic, tailer;
811         int i;
812
813         /*
814           check that the recovery area has enough space
815         */
816         if (tdb_recovery_allocate(tdb, &recovery_size,
817                                   &recovery_offset, &recovery_max_size) == -1) {
818                 return -1;
819         }
820
821         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
822         if (data == NULL) {
823                 tdb->ecode = TDB_ERR_OOM;
824                 return -1;
825         }
826
827         rec = (struct tdb_record *)data;
828         memset(rec, 0, sizeof(*rec));
829
830         rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
831         rec->data_len = recovery_size;
832         rec->rec_len  = recovery_max_size;
833         rec->key_len  = old_map_size;
834         CONVERT(*rec);
835
836         /* build the recovery data into a single blob to allow us to do a single
837            large write, which should be more efficient */
838         p = data + sizeof(*rec);
839         for (i=0;i<tdb->transaction->num_blocks;i++) {
840                 tdb_off_t offset;
841                 tdb_len_t length;
842
843                 if (tdb->transaction->blocks[i] == NULL) {
844                         continue;
845                 }
846
847                 offset = i * tdb->transaction->block_size;
848                 length = tdb->transaction->block_size;
849                 if (i == tdb->transaction->num_blocks-1) {
850                         length = tdb->transaction->last_block_size;
851                 }
852
853                 if (offset >= old_map_size) {
854                         continue;
855                 }
856                 if (offset + length > tdb->transaction->old_map_size) {
857                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
858                         free(data);
859                         tdb->ecode = TDB_ERR_CORRUPT;
860                         return -1;
861                 }
862                 memcpy(p, &offset, 4);
863                 memcpy(p+4, &length, 4);
864                 if (DOCONV()) {
865                         tdb_convert(p, 8);
866                 }
867                 /* the recovery area contains the old data, not the
868                    new data, so we have to call the original tdb_read
869                    method to get it */
870                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
871                         free(data);
872                         tdb->ecode = TDB_ERR_IO;
873                         return -1;
874                 }
875                 p += 8 + length;
876         }
877
878         /* and the tailer */
879         tailer = sizeof(*rec) + recovery_max_size;
880         memcpy(p, &tailer, 4);
881         if (DOCONV()) {
882                 tdb_convert(p, 4);
883         }
884
885         /* write the recovery data to the recovery area */
886         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
887                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
888                 free(data);
889                 tdb->ecode = TDB_ERR_IO;
890                 return -1;
891         }
892         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
893                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
894                 free(data);
895                 tdb->ecode = TDB_ERR_IO;
896                 return -1;
897         }
898
899         /* as we don't have ordered writes, we have to sync the recovery
900            data before we update the magic to indicate that the recovery
901            data is present */
902         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
903                 free(data);
904                 return -1;
905         }
906
907         free(data);
908
909         magic = TDB_RECOVERY_MAGIC;
910         CONVERT(magic);
911
912         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
913
914         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
915                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
916                 tdb->ecode = TDB_ERR_IO;
917                 return -1;
918         }
919         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
920                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
921                 tdb->ecode = TDB_ERR_IO;
922                 return -1;
923         }
924
925         /* ensure the recovery magic marker is on disk */
926         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
927                 return -1;
928         }
929
930         return 0;
931 }
932
933 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
934 {
935         const struct tdb_methods *methods;
936
937         if (tdb->transaction == NULL) {
938                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
939                 return -1;
940         }
941
942         if (tdb->transaction->prepared) {
943                 tdb->ecode = TDB_ERR_EINVAL;
944                 _tdb_transaction_cancel(tdb);
945                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
946                 return -1;
947         }
948
949         if (tdb->transaction->transaction_error) {
950                 tdb->ecode = TDB_ERR_IO;
951                 _tdb_transaction_cancel(tdb);
952                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
953                 return -1;
954         }
955
956
957         if (tdb->transaction->nesting != 0) {
958                 return 0;
959         }
960
961         /* check for a null transaction */
962         if (tdb->transaction->blocks == NULL) {
963                 return 0;
964         }
965
966         methods = tdb->transaction->io_methods;
967
968         /* if there are any locks pending then the caller has not
969            nested their locks properly, so fail the transaction */
970         if (tdb_have_extra_locks(tdb)) {
971                 tdb->ecode = TDB_ERR_LOCK;
972                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
973                 _tdb_transaction_cancel(tdb);
974                 return -1;
975         }
976
977         /* upgrade the main transaction lock region to a write lock */
978         if (tdb_allrecord_upgrade(tdb) == -1) {
979                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
980                 _tdb_transaction_cancel(tdb);
981                 return -1;
982         }
983
984         /* get the open lock - this prevents new users attaching to the database
985            during the commit */
986         if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
987                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
988                 _tdb_transaction_cancel(tdb);
989                 return -1;
990         }
991
992         /* write the recovery data to the end of the file */
993         if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
994                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
995                 _tdb_transaction_cancel(tdb);
996                 return -1;
997         }
998
999         tdb->transaction->prepared = true;
1000
1001         /* expand the file to the new size if needed */
1002         if (tdb->map_size != tdb->transaction->old_map_size) {
1003                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1004                                              tdb->map_size -
1005                                              tdb->transaction->old_map_size) == -1) {
1006                         tdb->ecode = TDB_ERR_IO;
1007                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
1008                         _tdb_transaction_cancel(tdb);
1009                         return -1;
1010                 }
1011                 tdb->map_size = tdb->transaction->old_map_size;
1012                 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1013         }
1014
1015         /* Keep the open lock until the actual commit */
1016
1017         return 0;
1018 }
1019
1020 /*
1021    prepare to commit the current transaction
1022 */
1023 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1024 {
1025         tdb_trace(tdb, "tdb_transaction_prepare_commit");
1026         return _tdb_transaction_prepare_commit(tdb);
1027 }
1028
1029 /* A repack is worthwhile if the largest is less than half total free. */
1030 static bool repack_worthwhile(struct tdb_context *tdb)
1031 {
1032         tdb_off_t ptr;
1033         struct tdb_record rec;
1034         tdb_len_t total = 0, largest = 0;
1035
1036         if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1037                 return false;
1038         }
1039
1040         while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1041                 total += rec.rec_len;
1042                 if (rec.rec_len > largest) {
1043                         largest = rec.rec_len;
1044                 }
1045                 ptr = rec.next;
1046         }
1047
1048         return total > largest * 2;
1049 }
1050
1051 /*
1052   commit the current transaction
1053 */
1054 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1055 {
1056         const struct tdb_methods *methods;
1057         int i;
1058         bool need_repack = false;
1059
1060         if (tdb->transaction == NULL) {
1061                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1062                 return -1;
1063         }
1064
1065         tdb_trace(tdb, "tdb_transaction_commit");
1066
1067         if (tdb->transaction->transaction_error) {
1068                 tdb->ecode = TDB_ERR_IO;
1069                 _tdb_transaction_cancel(tdb);
1070                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1071                 return -1;
1072         }
1073
1074
1075         if (tdb->transaction->nesting != 0) {
1076                 tdb->transaction->nesting--;
1077                 return 0;
1078         }
1079
1080         /* check for a null transaction */
1081         if (tdb->transaction->blocks == NULL) {
1082                 _tdb_transaction_cancel(tdb);
1083                 return 0;
1084         }
1085
1086         if (!tdb->transaction->prepared) {
1087                 int ret = _tdb_transaction_prepare_commit(tdb);
1088                 if (ret)
1089                         return ret;
1090         }
1091
1092         methods = tdb->transaction->io_methods;
1093
1094         /* perform all the writes */
1095         for (i=0;i<tdb->transaction->num_blocks;i++) {
1096                 tdb_off_t offset;
1097                 tdb_len_t length;
1098
1099                 if (tdb->transaction->blocks[i] == NULL) {
1100                         continue;
1101                 }
1102
1103                 offset = i * tdb->transaction->block_size;
1104                 length = tdb->transaction->block_size;
1105                 if (i == tdb->transaction->num_blocks-1) {
1106                         length = tdb->transaction->last_block_size;
1107                 }
1108
1109                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1110                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1111
1112                         /* we've overwritten part of the data and
1113                            possibly expanded the file, so we need to
1114                            run the crash recovery code */
1115                         tdb->methods = methods;
1116                         tdb_transaction_recover(tdb);
1117
1118                         _tdb_transaction_cancel(tdb);
1119
1120                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1121                         return -1;
1122                 }
1123                 SAFE_FREE(tdb->transaction->blocks[i]);
1124         }
1125
1126         /* Do this before we drop lock or blocks. */
1127         if (tdb->transaction->expanded) {
1128                 need_repack = repack_worthwhile(tdb);
1129         }
1130
1131         SAFE_FREE(tdb->transaction->blocks);
1132         tdb->transaction->num_blocks = 0;
1133
1134         /* ensure the new data is on disk */
1135         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1136                 return -1;
1137         }
1138
1139         /*
1140           TODO: maybe write to some dummy hdr field, or write to magic
1141           offset without mmap, before the last sync, instead of the
1142           utime() call
1143         */
1144
1145         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1146            don't change the mtime of the file, this means the file may
1147            not be backed up (as tdb rounding to block sizes means that
1148            file size changes are quite rare too). The following forces
1149            mtime changes when a transaction completes */
1150 #ifdef HAVE_UTIME
1151         utime(tdb->name, NULL);
1152 #endif
1153
1154         /* use a transaction cancel to free memory and remove the
1155            transaction locks */
1156         _tdb_transaction_cancel(tdb);
1157
1158         if (need_repack) {
1159                 return tdb_repack(tdb);
1160         }
1161
1162         return 0;
1163 }
1164
1165
1166 /*
1167   recover from an aborted transaction. Must be called with exclusive
1168   database write access already established (including the open
1169   lock to prevent new processes attaching)
1170 */
1171 int tdb_transaction_recover(struct tdb_context *tdb)
1172 {
1173         tdb_off_t recovery_head, recovery_eof;
1174         unsigned char *data, *p;
1175         uint32_t zero = 0;
1176         struct tdb_record rec;
1177
1178         /* find the recovery area */
1179         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1180                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1181                 tdb->ecode = TDB_ERR_IO;
1182                 return -1;
1183         }
1184
1185         if (recovery_head == 0) {
1186                 /* we have never allocated a recovery record */
1187                 return 0;
1188         }
1189
1190         /* read the recovery record */
1191         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1192                                    sizeof(rec), DOCONV()) == -1) {
1193                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1194                 tdb->ecode = TDB_ERR_IO;
1195                 return -1;
1196         }
1197
1198         if (rec.magic != TDB_RECOVERY_MAGIC) {
1199                 /* there is no valid recovery data */
1200                 return 0;
1201         }
1202
1203         if (tdb->read_only) {
1204                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1205                 tdb->ecode = TDB_ERR_CORRUPT;
1206                 return -1;
1207         }
1208
1209         recovery_eof = rec.key_len;
1210
1211         data = (unsigned char *)malloc(rec.data_len);
1212         if (data == NULL) {
1213                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1214                 tdb->ecode = TDB_ERR_OOM;
1215                 return -1;
1216         }
1217
1218         /* read the full recovery data */
1219         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1220                                    rec.data_len, 0) == -1) {
1221                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1222                 tdb->ecode = TDB_ERR_IO;
1223                 return -1;
1224         }
1225
1226         /* recover the file data */
1227         p = data;
1228         while (p+8 < data + rec.data_len) {
1229                 uint32_t ofs, len;
1230                 if (DOCONV()) {
1231                         tdb_convert(p, 8);
1232                 }
1233                 memcpy(&ofs, p, 4);
1234                 memcpy(&len, p+4, 4);
1235
1236                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1237                         free(data);
1238                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
1239                         tdb->ecode = TDB_ERR_IO;
1240                         return -1;
1241                 }
1242                 p += 8 + len;
1243         }
1244
1245         free(data);
1246
1247         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1248                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1249                 tdb->ecode = TDB_ERR_IO;
1250                 return -1;
1251         }
1252
1253         /* if the recovery area is after the recovered eof then remove it */
1254         if (recovery_eof <= recovery_head) {
1255                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1256                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1257                         tdb->ecode = TDB_ERR_IO;
1258                         return -1;
1259                 }
1260         }
1261
1262         /* remove the recovery magic */
1263         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1264                           &zero) == -1) {
1265                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1266                 tdb->ecode = TDB_ERR_IO;
1267                 return -1;
1268         }
1269
1270         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1271                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1272                 tdb->ecode = TDB_ERR_IO;
1273                 return -1;
1274         }
1275
1276         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
1277                  recovery_eof));
1278
1279         /* all done */
1280         return 0;
1281 }
1282
1283 /* Any I/O failures we say "needs recovery". */
1284 bool tdb_needs_recovery(struct tdb_context *tdb)
1285 {
1286         tdb_off_t recovery_head;
1287         struct tdb_record rec;
1288
1289         /* find the recovery area */
1290         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1291                 return true;
1292         }
1293
1294         if (recovery_head == 0) {
1295                 /* we have never allocated a recovery record */
1296                 return false;
1297         }
1298
1299         /* read the recovery record */
1300         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1301                                    sizeof(rec), DOCONV()) == -1) {
1302                 return true;
1303         }
1304
1305         return (rec.magic == TDB_RECOVERY_MAGIC);
1306 }