tdb: Vectorize tdb_update_hash
[samba.git] / lib / tdb / common / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no fsync/msync calls are made.  This means we
86     are still proof against a process dying during transaction commit,
87     but not against machine reboot.
88
89   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90     tdb_add_flags() transaction nesting is enabled.
91     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92     The default is that transaction nesting is allowed.
93     Note: this default may change in future versions of tdb.
94
95     Beware. when transactions are nested a transaction successfully
96     completed with tdb_transaction_commit() can be silently unrolled later.
97
98   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99     tdb_add_flags() transaction nesting is disabled.
100     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102     The default is that transaction nesting is allowed.
103     Note: this default may change in future versions of tdb.
104 */
105
106
107 /*
108   hold the context of any current transaction
109 */
110 struct tdb_transaction {
111         /* we keep a mirrored copy of the tdb hash heads here so
112            tdb_next_hash_chain() can operate efficiently */
113         uint32_t *hash_heads;
114
115         /* the original io methods - used to do IOs to the real db */
116         const struct tdb_methods *io_methods;
117
118         /* the list of transaction blocks. When a block is first
119            written to, it gets created in this list */
120         uint8_t **blocks;
121         uint32_t num_blocks;
122         uint32_t block_size;      /* bytes in each block */
123         uint32_t last_block_size; /* number of valid bytes in the last block */
124
125         /* non-zero when an internal transaction error has
126            occurred. All write operations will then fail until the
127            transaction is ended */
128         int transaction_error;
129
130         /* when inside a transaction we need to keep track of any
131            nested tdb_transaction_start() calls, as these are allowed,
132            but don't create a new transaction */
133         int nesting;
134
135         /* set when a prepare has already occurred */
136         bool prepared;
137         tdb_off_t magic_offset;
138
139         /* old file size before transaction */
140         tdb_len_t old_map_size;
141
142         /* did we expand in this transaction */
143         bool expanded;
144 };
145
146
147 /*
148   read while in a transaction. We need to check first if the data is in our list
149   of transaction elements, then if not do a real read
150 */
151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152                             tdb_len_t len, int cv)
153 {
154         uint32_t blk;
155
156         /* break it down into block sized ops */
157         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160                         return -1;
161                 }
162                 len -= len2;
163                 off += len2;
164                 buf = (void *)(len2 + (char *)buf);
165         }
166
167         if (len == 0) {
168                 return 0;
169         }
170
171         blk = off / tdb->transaction->block_size;
172
173         /* see if we have it in the block list */
174         if (tdb->transaction->num_blocks <= blk ||
175             tdb->transaction->blocks[blk] == NULL) {
176                 /* nope, do a real read */
177                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178                         goto fail;
179                 }
180                 return 0;
181         }
182
183         /* it is in the block list. Now check for the last block */
184         if (blk == tdb->transaction->num_blocks-1) {
185                 if (len > tdb->transaction->last_block_size) {
186                         goto fail;
187                 }
188         }
189
190         /* now copy it out of this block */
191         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192         if (cv) {
193                 tdb_convert(buf, len);
194         }
195         return 0;
196
197 fail:
198         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
199         tdb->ecode = TDB_ERR_IO;
200         tdb->transaction->transaction_error = 1;
201         return -1;
202 }
203
204
205 /*
206   write while in a transaction
207 */
208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209                              const void *buf, tdb_len_t len)
210 {
211         uint32_t blk;
212
213         /* Only a commit is allowed on a prepared transaction */
214         if (tdb->transaction->prepared) {
215                 tdb->ecode = TDB_ERR_EINVAL;
216                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
217                 tdb->transaction->transaction_error = 1;
218                 return -1;
219         }
220
221         /* if the write is to a hash head, then update the transaction
222            hash heads */
223         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
224             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
225                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
226                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
227         }
228
229         /* break it up into block sized chunks */
230         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
231                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
232                 if (transaction_write(tdb, off, buf, len2) != 0) {
233                         return -1;
234                 }
235                 len -= len2;
236                 off += len2;
237                 if (buf != NULL) {
238                         buf = (const void *)(len2 + (const char *)buf);
239                 }
240         }
241
242         if (len == 0) {
243                 return 0;
244         }
245
246         blk = off / tdb->transaction->block_size;
247         off = off % tdb->transaction->block_size;
248
249         if (tdb->transaction->num_blocks <= blk) {
250                 uint8_t **new_blocks;
251                 /* expand the blocks array */
252                 new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
253                                                  (blk+1)*sizeof(uint8_t *));
254                 if (new_blocks == NULL) {
255                         tdb->ecode = TDB_ERR_OOM;
256                         goto fail;
257                 }
258                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
259                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
260                 tdb->transaction->blocks = new_blocks;
261                 tdb->transaction->num_blocks = blk+1;
262                 tdb->transaction->last_block_size = 0;
263         }
264
265         /* allocate and fill a block? */
266         if (tdb->transaction->blocks[blk] == NULL) {
267                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
268                 if (tdb->transaction->blocks[blk] == NULL) {
269                         tdb->ecode = TDB_ERR_OOM;
270                         tdb->transaction->transaction_error = 1;
271                         return -1;
272                 }
273                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
274                         tdb_len_t len2 = tdb->transaction->block_size;
275                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
276                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
277                         }
278                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
279                                                                    tdb->transaction->blocks[blk],
280                                                                    len2, 0) != 0) {
281                                 SAFE_FREE(tdb->transaction->blocks[blk]);
282                                 tdb->ecode = TDB_ERR_IO;
283                                 goto fail;
284                         }
285                         if (blk == tdb->transaction->num_blocks-1) {
286                                 tdb->transaction->last_block_size = len2;
287                         }
288                 }
289         }
290
291         /* overwrite part of an existing block */
292         if (buf == NULL) {
293                 memset(tdb->transaction->blocks[blk] + off, 0, len);
294         } else {
295                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
296         }
297         if (blk == tdb->transaction->num_blocks-1) {
298                 if (len + off > tdb->transaction->last_block_size) {
299                         tdb->transaction->last_block_size = len + off;
300                 }
301         }
302
303         return 0;
304
305 fail:
306         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
307                  (blk*tdb->transaction->block_size) + off, len));
308         tdb->transaction->transaction_error = 1;
309         return -1;
310 }
311
312
313 /*
314   write while in a transaction - this variant never expands the transaction blocks, it only
315   updates existing blocks. This means it cannot change the recovery size
316 */
317 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
318                                       const void *buf, tdb_len_t len)
319 {
320         uint32_t blk;
321
322         /* break it up into block sized chunks */
323         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
324                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
325                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
326                         return -1;
327                 }
328                 len -= len2;
329                 off += len2;
330                 if (buf != NULL) {
331                         buf = (const void *)(len2 + (const char *)buf);
332                 }
333         }
334
335         if (len == 0) {
336                 return 0;
337         }
338
339         blk = off / tdb->transaction->block_size;
340         off = off % tdb->transaction->block_size;
341
342         if (tdb->transaction->num_blocks <= blk ||
343             tdb->transaction->blocks[blk] == NULL) {
344                 return 0;
345         }
346
347         if (blk == tdb->transaction->num_blocks-1 &&
348             off + len > tdb->transaction->last_block_size) {
349                 if (off >= tdb->transaction->last_block_size) {
350                         return 0;
351                 }
352                 len = tdb->transaction->last_block_size - off;
353         }
354
355         /* overwrite part of an existing block */
356         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
357
358         return 0;
359 }
360
361
362 /*
363   accelerated hash chain head search, using the cached hash heads
364 */
365 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
366 {
367         uint32_t h = *chain;
368         for (;h < tdb->hash_size;h++) {
369                 /* the +1 takes account of the freelist */
370                 if (0 != tdb->transaction->hash_heads[h+1]) {
371                         break;
372                 }
373         }
374         (*chain) = h;
375 }
376
377 /*
378   out of bounds check during a transaction
379 */
380 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
381                            tdb_len_t len, int probe)
382 {
383         if (off + len >= off && off + len <= tdb->map_size) {
384                 return 0;
385         }
386         tdb->ecode = TDB_ERR_IO;
387         return -1;
388 }
389
390 /*
391   transaction version of tdb_expand().
392 */
393 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
394                                    tdb_off_t addition)
395 {
396         /* add a write to the transaction elements, so subsequent
397            reads see the zero data */
398         if (transaction_write(tdb, size, NULL, addition) != 0) {
399                 return -1;
400         }
401
402         tdb->transaction->expanded = true;
403
404         return 0;
405 }
406
407 static const struct tdb_methods transaction_methods = {
408         transaction_read,
409         transaction_write,
410         transaction_next_hash_chain,
411         transaction_oob,
412         transaction_expand_file,
413 };
414
415
416 /*
417   start a tdb transaction. No token is returned, as only a single
418   transaction is allowed to be pending per tdb_context
419 */
420 static int _tdb_transaction_start(struct tdb_context *tdb,
421                                   enum tdb_lock_flags lockflags)
422 {
423         /* some sanity checks */
424         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
425             || tdb->traverse_read) {
426                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
427                 tdb->ecode = TDB_ERR_EINVAL;
428                 return -1;
429         }
430
431         /* cope with nested tdb_transaction_start() calls */
432         if (tdb->transaction != NULL) {
433                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
434                         tdb->ecode = TDB_ERR_NESTING;
435                         return -1;
436                 }
437                 tdb->transaction->nesting++;
438                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
439                          tdb->transaction->nesting));
440                 return 0;
441         }
442
443         if (tdb_have_extra_locks(tdb)) {
444                 /* the caller must not have any locks when starting a
445                    transaction as otherwise we'll be screwed by lack
446                    of nested locks in posix */
447                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
448                 tdb->ecode = TDB_ERR_LOCK;
449                 return -1;
450         }
451
452         if (tdb->travlocks.next != NULL) {
453                 /* you cannot use transactions inside a traverse (although you can use
454                    traverse inside a transaction) as otherwise you can end up with
455                    deadlock */
456                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
457                 tdb->ecode = TDB_ERR_LOCK;
458                 return -1;
459         }
460
461         tdb->transaction = (struct tdb_transaction *)
462                 calloc(sizeof(struct tdb_transaction), 1);
463         if (tdb->transaction == NULL) {
464                 tdb->ecode = TDB_ERR_OOM;
465                 return -1;
466         }
467
468         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
469         tdb->transaction->block_size = tdb->page_size;
470
471         /* get the transaction write lock. This is a blocking lock. As
472            discussed with Volker, there are a number of ways we could
473            make this async, which we will probably do in the future */
474         if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
475                 SAFE_FREE(tdb->transaction->blocks);
476                 SAFE_FREE(tdb->transaction);
477                 if ((lockflags & TDB_LOCK_WAIT) == 0) {
478                         tdb->ecode = TDB_ERR_NOLOCK;
479                 }
480                 return -1;
481         }
482
483         /* get a read lock from the freelist to the end of file. This
484            is upgraded to a write lock during the commit */
485         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
486                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
487                 goto fail_allrecord_lock;
488         }
489
490         /* setup a copy of the hash table heads so the hash scan in
491            traverse can be fast */
492         tdb->transaction->hash_heads = (uint32_t *)
493                 calloc(tdb->hash_size+1, sizeof(uint32_t));
494         if (tdb->transaction->hash_heads == NULL) {
495                 tdb->ecode = TDB_ERR_OOM;
496                 goto fail;
497         }
498         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
499                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
500                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
501                 tdb->ecode = TDB_ERR_IO;
502                 goto fail;
503         }
504
505         /* make sure we know about any file expansions already done by
506            anyone else */
507         tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
508         tdb->transaction->old_map_size = tdb->map_size;
509
510         /* finally hook the io methods, replacing them with
511            transaction specific methods */
512         tdb->transaction->io_methods = tdb->methods;
513         tdb->methods = &transaction_methods;
514
515         /* Trace at the end, so we get sequence number correct. */
516         tdb_trace(tdb, "tdb_transaction_start");
517         return 0;
518
519 fail:
520         tdb_allrecord_unlock(tdb, F_RDLCK, false);
521 fail_allrecord_lock:
522         tdb_transaction_unlock(tdb, F_WRLCK);
523         SAFE_FREE(tdb->transaction->blocks);
524         SAFE_FREE(tdb->transaction->hash_heads);
525         SAFE_FREE(tdb->transaction);
526         return -1;
527 }
528
529 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
530 {
531         return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
532 }
533
534 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
535 {
536         return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
537 }
538
539 /*
540   sync to disk
541 */
542 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
543 {
544         if (tdb->flags & TDB_NOSYNC) {
545                 return 0;
546         }
547
548 #ifdef HAVE_FDATASYNC
549         if (fdatasync(tdb->fd) != 0) {
550 #else
551         if (fsync(tdb->fd) != 0) {
552 #endif
553                 tdb->ecode = TDB_ERR_IO;
554                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
555                 return -1;
556         }
557 #ifdef HAVE_MMAP
558         if (tdb->map_ptr) {
559                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
560                 if (msync(moffset + (char *)tdb->map_ptr,
561                           length + (offset - moffset), MS_SYNC) != 0) {
562                         tdb->ecode = TDB_ERR_IO;
563                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
564                                  strerror(errno)));
565                         return -1;
566                 }
567         }
568 #endif
569         return 0;
570 }
571
572
573 static int _tdb_transaction_cancel(struct tdb_context *tdb)
574 {
575         int i, ret = 0;
576
577         if (tdb->transaction == NULL) {
578                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
579                 return -1;
580         }
581
582         if (tdb->transaction->nesting != 0) {
583                 tdb->transaction->transaction_error = 1;
584                 tdb->transaction->nesting--;
585                 return 0;
586         }
587
588         tdb->map_size = tdb->transaction->old_map_size;
589
590         /* free all the transaction blocks */
591         for (i=0;i<tdb->transaction->num_blocks;i++) {
592                 if (tdb->transaction->blocks[i] != NULL) {
593                         free(tdb->transaction->blocks[i]);
594                 }
595         }
596         SAFE_FREE(tdb->transaction->blocks);
597
598         if (tdb->transaction->magic_offset) {
599                 const struct tdb_methods *methods = tdb->transaction->io_methods;
600                 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
601
602                 /* remove the recovery marker */
603                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
604                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
605                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
606                         ret = -1;
607                 }
608         }
609
610         /* This also removes the OPEN_LOCK, if we have it. */
611         tdb_release_transaction_locks(tdb);
612
613         /* restore the normal io methods */
614         tdb->methods = tdb->transaction->io_methods;
615
616         SAFE_FREE(tdb->transaction->hash_heads);
617         SAFE_FREE(tdb->transaction);
618
619         return ret;
620 }
621
622 /*
623   cancel the current transaction
624 */
625 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
626 {
627         tdb_trace(tdb, "tdb_transaction_cancel");
628         return _tdb_transaction_cancel(tdb);
629 }
630
631 /*
632   work out how much space the linearised recovery data will consume
633 */
634 static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
635 {
636         tdb_len_t recovery_size = 0;
637         int i;
638
639         recovery_size = sizeof(uint32_t);
640         for (i=0;i<tdb->transaction->num_blocks;i++) {
641                 tdb_len_t block_size;
642                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
643                         break;
644                 }
645                 if (tdb->transaction->blocks[i] == NULL) {
646                         continue;
647                 }
648                 if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
649                                    &recovery_size)) {
650                         return false;
651                 }
652                 if (i == tdb->transaction->num_blocks-1) {
653                         block_size = tdb->transaction->last_block_size;
654                 } else {
655                         block_size =  tdb->transaction->block_size;
656                 }
657                 if (!tdb_add_len_t(recovery_size, block_size,
658                                    &recovery_size)) {
659                         return false;
660                 }
661         }
662
663         *result = recovery_size;
664         return true;
665 }
666
667 int tdb_recovery_area(struct tdb_context *tdb,
668                       const struct tdb_methods *methods,
669                       tdb_off_t *recovery_offset,
670                       struct tdb_record *rec)
671 {
672         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
673                 return -1;
674         }
675
676         if (*recovery_offset == 0) {
677                 rec->rec_len = 0;
678                 return 0;
679         }
680
681         if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
682                               DOCONV()) == -1) {
683                 return -1;
684         }
685
686         /* ignore invalid recovery regions: can happen in crash */
687         if (rec->magic != TDB_RECOVERY_MAGIC &&
688             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
689                 *recovery_offset = 0;
690                 rec->rec_len = 0;
691         }
692         return 0;
693 }
694
695 /*
696   allocate the recovery area, or use an existing recovery area if it is
697   large enough
698 */
699 static int tdb_recovery_allocate(struct tdb_context *tdb,
700                                  tdb_len_t *recovery_size,
701                                  tdb_off_t *recovery_offset,
702                                  tdb_len_t *recovery_max_size)
703 {
704         struct tdb_record rec;
705         const struct tdb_methods *methods = tdb->transaction->io_methods;
706         tdb_off_t recovery_head, new_end;
707
708         if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
709                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
710                 return -1;
711         }
712
713         if (!tdb_recovery_size(tdb, recovery_size)) {
714                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
715                          "overflow recovery size\n"));
716                 return -1;
717         }
718
719         /* Existing recovery area? */
720         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
721                 /* it fits in the existing area */
722                 *recovery_max_size = rec.rec_len;
723                 *recovery_offset = recovery_head;
724                 return 0;
725         }
726
727         /* If recovery area in middle of file, we need a new one. */
728         if (recovery_head == 0
729             || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
730                 /* we need to free up the old recovery area, then allocate a
731                    new one at the end of the file. Note that we cannot use
732                    tdb_allocate() to allocate the new one as that might return
733                    us an area that is being currently used (as of the start of
734                    the transaction) */
735                 if (recovery_head) {
736                         if (tdb_free(tdb, recovery_head, &rec) == -1) {
737                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
738                                          "tdb_recovery_allocate: failed to"
739                                          " free previous recovery area\n"));
740                                 return -1;
741                         }
742
743                         /* the tdb_free() call might have increased
744                          * the recovery size */
745                         if (!tdb_recovery_size(tdb, recovery_size)) {
746                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
747                                          "tdb_recovery_allocate: "
748                                          "overflow recovery size\n"));
749                                 return -1;
750                         }
751                 }
752
753                 /* New head will be at end of file. */
754                 recovery_head = tdb->map_size;
755         }
756
757         /* Now we know where it will be. */
758         *recovery_offset = recovery_head;
759
760         /* Expand by more than we need, so we don't do it often. */
761         *recovery_max_size = tdb_expand_adjust(tdb->map_size,
762                                                *recovery_size,
763                                                tdb->page_size)
764                 - sizeof(rec);
765
766         if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
767             !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
768                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
769                          "overflow recovery area\n"));
770                 return -1;
771         }
772
773         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
774                                      new_end - tdb->transaction->old_map_size)
775             == -1) {
776                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
777                 return -1;
778         }
779
780         /* remap the file (if using mmap) */
781         methods->tdb_oob(tdb, tdb->map_size, 1, 1);
782
783         /* we have to reset the old map size so that we don't try to expand the file
784            again in the transaction commit, which would destroy the recovery area */
785         tdb->transaction->old_map_size = tdb->map_size;
786
787         /* write the recovery header offset and sync - we can sync without a race here
788            as the magic ptr in the recovery record has not been set */
789         CONVERT(recovery_head);
790         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
791                                &recovery_head, sizeof(tdb_off_t)) == -1) {
792                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
793                 return -1;
794         }
795         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
796                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
797                 return -1;
798         }
799
800         return 0;
801 }
802
803
804 /*
805   setup the recovery data that will be used on a crash during commit
806 */
807 static int transaction_setup_recovery(struct tdb_context *tdb,
808                                       tdb_off_t *magic_offset)
809 {
810         tdb_len_t recovery_size;
811         unsigned char *data, *p;
812         const struct tdb_methods *methods = tdb->transaction->io_methods;
813         struct tdb_record *rec;
814         tdb_off_t recovery_offset, recovery_max_size;
815         tdb_off_t old_map_size = tdb->transaction->old_map_size;
816         uint32_t magic, tailer;
817         int i;
818
819         /*
820           check that the recovery area has enough space
821         */
822         if (tdb_recovery_allocate(tdb, &recovery_size,
823                                   &recovery_offset, &recovery_max_size) == -1) {
824                 return -1;
825         }
826
827         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
828         if (data == NULL) {
829                 tdb->ecode = TDB_ERR_OOM;
830                 return -1;
831         }
832
833         rec = (struct tdb_record *)data;
834         memset(rec, 0, sizeof(*rec));
835
836         rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
837         rec->data_len = recovery_size;
838         rec->rec_len  = recovery_max_size;
839         rec->key_len  = old_map_size;
840         CONVERT(*rec);
841
842         /* build the recovery data into a single blob to allow us to do a single
843            large write, which should be more efficient */
844         p = data + sizeof(*rec);
845         for (i=0;i<tdb->transaction->num_blocks;i++) {
846                 tdb_off_t offset;
847                 tdb_len_t length;
848
849                 if (tdb->transaction->blocks[i] == NULL) {
850                         continue;
851                 }
852
853                 offset = i * tdb->transaction->block_size;
854                 length = tdb->transaction->block_size;
855                 if (i == tdb->transaction->num_blocks-1) {
856                         length = tdb->transaction->last_block_size;
857                 }
858
859                 if (offset >= old_map_size) {
860                         continue;
861                 }
862                 if (offset + length > tdb->transaction->old_map_size) {
863                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
864                         free(data);
865                         tdb->ecode = TDB_ERR_CORRUPT;
866                         return -1;
867                 }
868                 memcpy(p, &offset, 4);
869                 memcpy(p+4, &length, 4);
870                 if (DOCONV()) {
871                         tdb_convert(p, 8);
872                 }
873                 /* the recovery area contains the old data, not the
874                    new data, so we have to call the original tdb_read
875                    method to get it */
876                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
877                         free(data);
878                         tdb->ecode = TDB_ERR_IO;
879                         return -1;
880                 }
881                 p += 8 + length;
882         }
883
884         /* and the tailer */
885         tailer = sizeof(*rec) + recovery_max_size;
886         memcpy(p, &tailer, 4);
887         if (DOCONV()) {
888                 tdb_convert(p, 4);
889         }
890
891         /* write the recovery data to the recovery area */
892         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
893                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
894                 free(data);
895                 tdb->ecode = TDB_ERR_IO;
896                 return -1;
897         }
898         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
899                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
900                 free(data);
901                 tdb->ecode = TDB_ERR_IO;
902                 return -1;
903         }
904
905         /* as we don't have ordered writes, we have to sync the recovery
906            data before we update the magic to indicate that the recovery
907            data is present */
908         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
909                 free(data);
910                 return -1;
911         }
912
913         free(data);
914
915         magic = TDB_RECOVERY_MAGIC;
916         CONVERT(magic);
917
918         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
919
920         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
921                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
922                 tdb->ecode = TDB_ERR_IO;
923                 return -1;
924         }
925         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
926                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
927                 tdb->ecode = TDB_ERR_IO;
928                 return -1;
929         }
930
931         /* ensure the recovery magic marker is on disk */
932         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
933                 return -1;
934         }
935
936         return 0;
937 }
938
939 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
940 {
941         const struct tdb_methods *methods;
942
943         if (tdb->transaction == NULL) {
944                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
945                 return -1;
946         }
947
948         if (tdb->transaction->prepared) {
949                 tdb->ecode = TDB_ERR_EINVAL;
950                 _tdb_transaction_cancel(tdb);
951                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
952                 return -1;
953         }
954
955         if (tdb->transaction->transaction_error) {
956                 tdb->ecode = TDB_ERR_IO;
957                 _tdb_transaction_cancel(tdb);
958                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
959                 return -1;
960         }
961
962
963         if (tdb->transaction->nesting != 0) {
964                 return 0;
965         }
966
967         /* check for a null transaction */
968         if (tdb->transaction->blocks == NULL) {
969                 return 0;
970         }
971
972         methods = tdb->transaction->io_methods;
973
974         /* if there are any locks pending then the caller has not
975            nested their locks properly, so fail the transaction */
976         if (tdb_have_extra_locks(tdb)) {
977                 tdb->ecode = TDB_ERR_LOCK;
978                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
979                 _tdb_transaction_cancel(tdb);
980                 return -1;
981         }
982
983         /* upgrade the main transaction lock region to a write lock */
984         if (tdb_allrecord_upgrade(tdb) == -1) {
985                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
986                 _tdb_transaction_cancel(tdb);
987                 return -1;
988         }
989
990         /* get the open lock - this prevents new users attaching to the database
991            during the commit */
992         if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
993                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
994                 _tdb_transaction_cancel(tdb);
995                 return -1;
996         }
997
998         /* write the recovery data to the end of the file */
999         if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
1000                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
1001                 _tdb_transaction_cancel(tdb);
1002                 return -1;
1003         }
1004
1005         tdb->transaction->prepared = true;
1006
1007         /* expand the file to the new size if needed */
1008         if (tdb->map_size != tdb->transaction->old_map_size) {
1009                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1010                                              tdb->map_size -
1011                                              tdb->transaction->old_map_size) == -1) {
1012                         tdb->ecode = TDB_ERR_IO;
1013                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
1014                         _tdb_transaction_cancel(tdb);
1015                         return -1;
1016                 }
1017                 tdb->map_size = tdb->transaction->old_map_size;
1018                 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1019         }
1020
1021         /* Keep the open lock until the actual commit */
1022
1023         return 0;
1024 }
1025
1026 /*
1027    prepare to commit the current transaction
1028 */
1029 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1030 {
1031         tdb_trace(tdb, "tdb_transaction_prepare_commit");
1032         return _tdb_transaction_prepare_commit(tdb);
1033 }
1034
1035 /* A repack is worthwhile if the largest is less than half total free. */
1036 static bool repack_worthwhile(struct tdb_context *tdb)
1037 {
1038         tdb_off_t ptr;
1039         struct tdb_record rec;
1040         tdb_len_t total = 0, largest = 0;
1041
1042         if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1043                 return false;
1044         }
1045
1046         while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1047                 total += rec.rec_len;
1048                 if (rec.rec_len > largest) {
1049                         largest = rec.rec_len;
1050                 }
1051                 ptr = rec.next;
1052         }
1053
1054         return total > largest * 2;
1055 }
1056
1057 /*
1058   commit the current transaction
1059 */
1060 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1061 {
1062         const struct tdb_methods *methods;
1063         int i;
1064         bool need_repack = false;
1065
1066         if (tdb->transaction == NULL) {
1067                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1068                 return -1;
1069         }
1070
1071         tdb_trace(tdb, "tdb_transaction_commit");
1072
1073         if (tdb->transaction->transaction_error) {
1074                 tdb->ecode = TDB_ERR_IO;
1075                 _tdb_transaction_cancel(tdb);
1076                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1077                 return -1;
1078         }
1079
1080
1081         if (tdb->transaction->nesting != 0) {
1082                 tdb->transaction->nesting--;
1083                 return 0;
1084         }
1085
1086         /* check for a null transaction */
1087         if (tdb->transaction->blocks == NULL) {
1088                 _tdb_transaction_cancel(tdb);
1089                 return 0;
1090         }
1091
1092         if (!tdb->transaction->prepared) {
1093                 int ret = _tdb_transaction_prepare_commit(tdb);
1094                 if (ret)
1095                         return ret;
1096         }
1097
1098         methods = tdb->transaction->io_methods;
1099
1100         /* perform all the writes */
1101         for (i=0;i<tdb->transaction->num_blocks;i++) {
1102                 tdb_off_t offset;
1103                 tdb_len_t length;
1104
1105                 if (tdb->transaction->blocks[i] == NULL) {
1106                         continue;
1107                 }
1108
1109                 offset = i * tdb->transaction->block_size;
1110                 length = tdb->transaction->block_size;
1111                 if (i == tdb->transaction->num_blocks-1) {
1112                         length = tdb->transaction->last_block_size;
1113                 }
1114
1115                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1116                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1117
1118                         /* we've overwritten part of the data and
1119                            possibly expanded the file, so we need to
1120                            run the crash recovery code */
1121                         tdb->methods = methods;
1122                         tdb_transaction_recover(tdb);
1123
1124                         _tdb_transaction_cancel(tdb);
1125
1126                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1127                         return -1;
1128                 }
1129                 SAFE_FREE(tdb->transaction->blocks[i]);
1130         }
1131
1132         /* Do this before we drop lock or blocks. */
1133         if (tdb->transaction->expanded) {
1134                 need_repack = repack_worthwhile(tdb);
1135         }
1136
1137         SAFE_FREE(tdb->transaction->blocks);
1138         tdb->transaction->num_blocks = 0;
1139
1140         /* ensure the new data is on disk */
1141         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1142                 return -1;
1143         }
1144
1145         /*
1146           TODO: maybe write to some dummy hdr field, or write to magic
1147           offset without mmap, before the last sync, instead of the
1148           utime() call
1149         */
1150
1151         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1152            don't change the mtime of the file, this means the file may
1153            not be backed up (as tdb rounding to block sizes means that
1154            file size changes are quite rare too). The following forces
1155            mtime changes when a transaction completes */
1156 #ifdef HAVE_UTIME
1157         utime(tdb->name, NULL);
1158 #endif
1159
1160         /* use a transaction cancel to free memory and remove the
1161            transaction locks */
1162         _tdb_transaction_cancel(tdb);
1163
1164         if (need_repack) {
1165                 return tdb_repack(tdb);
1166         }
1167
1168         return 0;
1169 }
1170
1171
1172 /*
1173   recover from an aborted transaction. Must be called with exclusive
1174   database write access already established (including the open
1175   lock to prevent new processes attaching)
1176 */
1177 int tdb_transaction_recover(struct tdb_context *tdb)
1178 {
1179         tdb_off_t recovery_head, recovery_eof;
1180         unsigned char *data, *p;
1181         uint32_t zero = 0;
1182         struct tdb_record rec;
1183
1184         /* find the recovery area */
1185         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1186                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1187                 tdb->ecode = TDB_ERR_IO;
1188                 return -1;
1189         }
1190
1191         if (recovery_head == 0) {
1192                 /* we have never allocated a recovery record */
1193                 return 0;
1194         }
1195
1196         /* read the recovery record */
1197         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1198                                    sizeof(rec), DOCONV()) == -1) {
1199                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1200                 tdb->ecode = TDB_ERR_IO;
1201                 return -1;
1202         }
1203
1204         if (rec.magic != TDB_RECOVERY_MAGIC) {
1205                 /* there is no valid recovery data */
1206                 return 0;
1207         }
1208
1209         if (tdb->read_only) {
1210                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1211                 tdb->ecode = TDB_ERR_CORRUPT;
1212                 return -1;
1213         }
1214
1215         recovery_eof = rec.key_len;
1216
1217         data = (unsigned char *)malloc(rec.data_len);
1218         if (data == NULL) {
1219                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1220                 tdb->ecode = TDB_ERR_OOM;
1221                 return -1;
1222         }
1223
1224         /* read the full recovery data */
1225         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1226                                    rec.data_len, 0) == -1) {
1227                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1228                 tdb->ecode = TDB_ERR_IO;
1229                 return -1;
1230         }
1231
1232         /* recover the file data */
1233         p = data;
1234         while (p+8 < data + rec.data_len) {
1235                 uint32_t ofs, len;
1236                 if (DOCONV()) {
1237                         tdb_convert(p, 8);
1238                 }
1239                 memcpy(&ofs, p, 4);
1240                 memcpy(&len, p+4, 4);
1241
1242                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1243                         free(data);
1244                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
1245                         tdb->ecode = TDB_ERR_IO;
1246                         return -1;
1247                 }
1248                 p += 8 + len;
1249         }
1250
1251         free(data);
1252
1253         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1254                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1255                 tdb->ecode = TDB_ERR_IO;
1256                 return -1;
1257         }
1258
1259         /* if the recovery area is after the recovered eof then remove it */
1260         if (recovery_eof <= recovery_head) {
1261                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1262                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1263                         tdb->ecode = TDB_ERR_IO;
1264                         return -1;
1265                 }
1266         }
1267
1268         /* remove the recovery magic */
1269         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1270                           &zero) == -1) {
1271                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1272                 tdb->ecode = TDB_ERR_IO;
1273                 return -1;
1274         }
1275
1276         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1277                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1278                 tdb->ecode = TDB_ERR_IO;
1279                 return -1;
1280         }
1281
1282         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
1283                  recovery_eof));
1284
1285         /* all done */
1286         return 0;
1287 }
1288
1289 /* Any I/O failures we say "needs recovery". */
1290 bool tdb_needs_recovery(struct tdb_context *tdb)
1291 {
1292         tdb_off_t recovery_head;
1293         struct tdb_record rec;
1294
1295         /* find the recovery area */
1296         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1297                 return true;
1298         }
1299
1300         if (recovery_head == 0) {
1301                 /* we have never allocated a recovery record */
1302                 return false;
1303         }
1304
1305         /* read the recovery record */
1306         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1307                                    sizeof(rec), DOCONV()) == -1) {
1308                 return true;
1309         }
1310
1311         return (rec.magic == TDB_RECOVERY_MAGIC);
1312 }