tdb: Make tdb_find circular-safe
[samba.git] / lib / tdb / common / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of all writes that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     open lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no fsync/msync calls are made.  This means we
86     are still proof against a process dying during transaction commit,
87     but not against machine reboot.
88
89   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
90     tdb_add_flags() transaction nesting is enabled.
91     It resets the TDB_DISALLOW_NESTING flag, as both cannot be used together.
92     The default is that transaction nesting is allowed.
93     Note: this default may change in future versions of tdb.
94
95     Beware. when transactions are nested a transaction successfully
96     completed with tdb_transaction_commit() can be silently unrolled later.
97
98   - if TDB_DISALLOW_NESTING is passed to flags in tdb open, or added using
99     tdb_add_flags() transaction nesting is disabled.
100     It resets the TDB_ALLOW_NESTING flag, as both cannot be used together.
101     An attempt create a nested transaction will fail with TDB_ERR_NESTING.
102     The default is that transaction nesting is allowed.
103     Note: this default may change in future versions of tdb.
104 */
105
106
107 /*
108   hold the context of any current transaction
109 */
110 struct tdb_transaction {
111         /* we keep a mirrored copy of the tdb hash heads here so
112            tdb_next_hash_chain() can operate efficiently */
113         uint32_t *hash_heads;
114
115         /* the original io methods - used to do IOs to the real db */
116         const struct tdb_methods *io_methods;
117
118         /* the list of transaction blocks. When a block is first
119            written to, it gets created in this list */
120         uint8_t **blocks;
121         uint32_t num_blocks;
122         uint32_t block_size;      /* bytes in each block */
123         uint32_t last_block_size; /* number of valid bytes in the last block */
124
125         /* non-zero when an internal transaction error has
126            occurred. All write operations will then fail until the
127            transaction is ended */
128         int transaction_error;
129
130         /* when inside a transaction we need to keep track of any
131            nested tdb_transaction_start() calls, as these are allowed,
132            but don't create a new transaction */
133         int nesting;
134
135         /* set when a prepare has already occurred */
136         bool prepared;
137         tdb_off_t magic_offset;
138
139         /* old file size before transaction */
140         tdb_len_t old_map_size;
141
142         /* did we expand in this transaction */
143         bool expanded;
144 };
145
146
147 /*
148   read while in a transaction. We need to check first if the data is in our list
149   of transaction elements, then if not do a real read
150 */
151 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf,
152                             tdb_len_t len, int cv)
153 {
154         uint32_t blk;
155
156         /* break it down into block sized ops */
157         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
158                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
159                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
160                         return -1;
161                 }
162                 len -= len2;
163                 off += len2;
164                 buf = (void *)(len2 + (char *)buf);
165         }
166
167         if (len == 0) {
168                 return 0;
169         }
170
171         blk = off / tdb->transaction->block_size;
172
173         /* see if we have it in the block list */
174         if (tdb->transaction->num_blocks <= blk ||
175             tdb->transaction->blocks[blk] == NULL) {
176                 /* nope, do a real read */
177                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
178                         goto fail;
179                 }
180                 return 0;
181         }
182
183         /* it is in the block list. Now check for the last block */
184         if (blk == tdb->transaction->num_blocks-1) {
185                 if (len > tdb->transaction->last_block_size) {
186                         goto fail;
187                 }
188         }
189
190         /* now copy it out of this block */
191         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
192         if (cv) {
193                 tdb_convert(buf, len);
194         }
195         return 0;
196
197 fail:
198         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%u len=%u\n", off, len));
199         tdb->ecode = TDB_ERR_IO;
200         tdb->transaction->transaction_error = 1;
201         return -1;
202 }
203
204
205 /*
206   write while in a transaction
207 */
208 static int transaction_write(struct tdb_context *tdb, tdb_off_t off,
209                              const void *buf, tdb_len_t len)
210 {
211         uint32_t blk;
212
213         if (buf == NULL) {
214                 return -1;
215         }
216
217         /* Only a commit is allowed on a prepared transaction */
218         if (tdb->transaction->prepared) {
219                 tdb->ecode = TDB_ERR_EINVAL;
220                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
221                 tdb->transaction->transaction_error = 1;
222                 return -1;
223         }
224
225         /* if the write is to a hash head, then update the transaction
226            hash heads */
227         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
228             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
229                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
230                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
231         }
232
233         /* break it up into block sized chunks */
234         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
235                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
236                 if (transaction_write(tdb, off, buf, len2) != 0) {
237                         return -1;
238                 }
239                 len -= len2;
240                 off += len2;
241                 buf = (const void *)(len2 + (const char *)buf);
242         }
243
244         if (len == 0) {
245                 return 0;
246         }
247
248         blk = off / tdb->transaction->block_size;
249         off = off % tdb->transaction->block_size;
250
251         if (tdb->transaction->num_blocks <= blk) {
252                 uint8_t **new_blocks;
253                 /* expand the blocks array */
254                 new_blocks = (uint8_t **)realloc(tdb->transaction->blocks,
255                                                  (blk+1)*sizeof(uint8_t *));
256                 if (new_blocks == NULL) {
257                         tdb->ecode = TDB_ERR_OOM;
258                         goto fail;
259                 }
260                 memset(&new_blocks[tdb->transaction->num_blocks], 0,
261                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
262                 tdb->transaction->blocks = new_blocks;
263                 tdb->transaction->num_blocks = blk+1;
264                 tdb->transaction->last_block_size = 0;
265         }
266
267         /* allocate and fill a block? */
268         if (tdb->transaction->blocks[blk] == NULL) {
269                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
270                 if (tdb->transaction->blocks[blk] == NULL) {
271                         tdb->ecode = TDB_ERR_OOM;
272                         tdb->transaction->transaction_error = 1;
273                         return -1;
274                 }
275                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
276                         tdb_len_t len2 = tdb->transaction->block_size;
277                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
278                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
279                         }
280                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size,
281                                                                    tdb->transaction->blocks[blk],
282                                                                    len2, 0) != 0) {
283                                 SAFE_FREE(tdb->transaction->blocks[blk]);
284                                 tdb->ecode = TDB_ERR_IO;
285                                 goto fail;
286                         }
287                         if (blk == tdb->transaction->num_blocks-1) {
288                                 tdb->transaction->last_block_size = len2;
289                         }
290                 }
291         }
292
293         /* overwrite part of an existing block */
294         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
295         if (blk == tdb->transaction->num_blocks-1) {
296                 if (len + off > tdb->transaction->last_block_size) {
297                         tdb->transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%u len=%u\n",
305                  (blk*tdb->transaction->block_size) + off, len));
306         tdb->transaction->transaction_error = 1;
307         return -1;
308 }
309
310
311 /*
312   write while in a transaction - this variant never expands the transaction blocks, it only
313   updates existing blocks. This means it cannot change the recovery size
314 */
315 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off,
316                                       const void *buf, tdb_len_t len)
317 {
318         uint32_t blk;
319
320         /* break it up into block sized chunks */
321         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
322                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
323                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
324                         return -1;
325                 }
326                 len -= len2;
327                 off += len2;
328                 if (buf != NULL) {
329                         buf = (const void *)(len2 + (const char *)buf);
330                 }
331         }
332
333         if (len == 0) {
334                 return 0;
335         }
336
337         blk = off / tdb->transaction->block_size;
338         off = off % tdb->transaction->block_size;
339
340         if (tdb->transaction->num_blocks <= blk ||
341             tdb->transaction->blocks[blk] == NULL) {
342                 return 0;
343         }
344
345         if (blk == tdb->transaction->num_blocks-1 &&
346             off + len > tdb->transaction->last_block_size) {
347                 if (off >= tdb->transaction->last_block_size) {
348                         return 0;
349                 }
350                 len = tdb->transaction->last_block_size - off;
351         }
352
353         /* overwrite part of an existing block */
354         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
355
356         return 0;
357 }
358
359
360 /*
361   accelerated hash chain head search, using the cached hash heads
362 */
363 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
364 {
365         uint32_t h = *chain;
366         for (;h < tdb->hash_size;h++) {
367                 /* the +1 takes account of the freelist */
368                 if (0 != tdb->transaction->hash_heads[h+1]) {
369                         break;
370                 }
371         }
372         (*chain) = h;
373 }
374
375 /*
376   out of bounds check during a transaction
377 */
378 static int transaction_oob(struct tdb_context *tdb, tdb_off_t off,
379                            tdb_len_t len, int probe)
380 {
381         if (off + len >= off && off + len <= tdb->map_size) {
382                 return 0;
383         }
384         tdb->ecode = TDB_ERR_IO;
385         return -1;
386 }
387
388 /*
389   transaction version of tdb_expand().
390 */
391 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size,
392                                    tdb_off_t addition)
393 {
394         const char buf_zero[8192] = {0};
395         size_t buf_len = sizeof(buf_zero);
396
397         while (addition > 0) {
398                 size_t n = MIN(addition, buf_len);
399                 int ret;
400
401                 ret = transaction_write(tdb, size, buf_zero, n);
402                 if (ret != 0) {
403                         return ret;
404                 }
405
406                 addition -= n;
407                 size += n;
408         }
409
410         tdb->transaction->expanded = true;
411
412         return 0;
413 }
414
415 static const struct tdb_methods transaction_methods = {
416         transaction_read,
417         transaction_write,
418         transaction_next_hash_chain,
419         transaction_oob,
420         transaction_expand_file,
421 };
422
423 /*
424  * Is a transaction currently active on this context?
425  *
426  */
427 _PUBLIC_ bool tdb_transaction_active(struct tdb_context *tdb)
428 {
429         return (tdb->transaction != NULL);
430 }
431
432 /*
433   start a tdb transaction. No token is returned, as only a single
434   transaction is allowed to be pending per tdb_context
435 */
436 static int _tdb_transaction_start(struct tdb_context *tdb,
437                                   enum tdb_lock_flags lockflags)
438 {
439         /* some sanity checks */
440         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)
441             || tdb->traverse_read) {
442                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
443                 tdb->ecode = TDB_ERR_EINVAL;
444                 return -1;
445         }
446
447         /* cope with nested tdb_transaction_start() calls */
448         if (tdb->transaction != NULL) {
449                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
450                         tdb->ecode = TDB_ERR_NESTING;
451                         return -1;
452                 }
453                 tdb->transaction->nesting++;
454                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n",
455                          tdb->transaction->nesting));
456                 return 0;
457         }
458
459         if (tdb_have_extra_locks(tdb)) {
460                 /* the caller must not have any locks when starting a
461                    transaction as otherwise we'll be screwed by lack
462                    of nested locks in posix */
463                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
464                 tdb->ecode = TDB_ERR_LOCK;
465                 return -1;
466         }
467
468         if (tdb->travlocks.next != NULL) {
469                 /* you cannot use transactions inside a traverse (although you can use
470                    traverse inside a transaction) as otherwise you can end up with
471                    deadlock */
472                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
473                 tdb->ecode = TDB_ERR_LOCK;
474                 return -1;
475         }
476
477         tdb->transaction = (struct tdb_transaction *)
478                 calloc(sizeof(struct tdb_transaction), 1);
479         if (tdb->transaction == NULL) {
480                 tdb->ecode = TDB_ERR_OOM;
481                 return -1;
482         }
483
484         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
485         tdb->transaction->block_size = tdb->page_size;
486
487         /* get the transaction write lock. This is a blocking lock. As
488            discussed with Volker, there are a number of ways we could
489            make this async, which we will probably do in the future */
490         if (tdb_transaction_lock(tdb, F_WRLCK, lockflags) == -1) {
491                 SAFE_FREE(tdb->transaction->blocks);
492                 SAFE_FREE(tdb->transaction);
493                 if ((lockflags & TDB_LOCK_WAIT) == 0) {
494                         tdb->ecode = TDB_ERR_NOLOCK;
495                 } else {
496                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
497                                  "tdb_transaction_start: "
498                                  "failed to get transaction lock\n"));
499                 }
500                 return -1;
501         }
502
503         /* get a read lock from the freelist to the end of file. This
504            is upgraded to a write lock during the commit */
505         if (tdb_allrecord_lock(tdb, F_RDLCK, TDB_LOCK_WAIT, true) == -1) {
506                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
507                 goto fail_allrecord_lock;
508         }
509
510         /* setup a copy of the hash table heads so the hash scan in
511            traverse can be fast */
512         tdb->transaction->hash_heads = (uint32_t *)
513                 calloc(tdb->hash_size+1, sizeof(uint32_t));
514         if (tdb->transaction->hash_heads == NULL) {
515                 tdb->ecode = TDB_ERR_OOM;
516                 goto fail;
517         }
518         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
519                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
520                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
521                 tdb->ecode = TDB_ERR_IO;
522                 goto fail;
523         }
524
525         /* make sure we know about any file expansions already done by
526            anyone else */
527         tdb->methods->tdb_oob(tdb, tdb->map_size, 1, 1);
528         tdb->transaction->old_map_size = tdb->map_size;
529
530         /* finally hook the io methods, replacing them with
531            transaction specific methods */
532         tdb->transaction->io_methods = tdb->methods;
533         tdb->methods = &transaction_methods;
534
535         /* Trace at the end, so we get sequence number correct. */
536         tdb_trace(tdb, "tdb_transaction_start");
537         return 0;
538
539 fail:
540         tdb_allrecord_unlock(tdb, F_RDLCK, false);
541 fail_allrecord_lock:
542         tdb_transaction_unlock(tdb, F_WRLCK);
543         SAFE_FREE(tdb->transaction->blocks);
544         SAFE_FREE(tdb->transaction->hash_heads);
545         SAFE_FREE(tdb->transaction);
546         return -1;
547 }
548
549 _PUBLIC_ int tdb_transaction_start(struct tdb_context *tdb)
550 {
551         return _tdb_transaction_start(tdb, TDB_LOCK_WAIT);
552 }
553
554 _PUBLIC_ int tdb_transaction_start_nonblock(struct tdb_context *tdb)
555 {
556         return _tdb_transaction_start(tdb, TDB_LOCK_NOWAIT|TDB_LOCK_PROBE);
557 }
558
559 /*
560   sync to disk
561 */
562 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
563 {
564         if (tdb->flags & TDB_NOSYNC) {
565                 return 0;
566         }
567
568 #ifdef HAVE_FDATASYNC
569         if (fdatasync(tdb->fd) != 0) {
570 #else
571         if (fsync(tdb->fd) != 0) {
572 #endif
573                 tdb->ecode = TDB_ERR_IO;
574                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
575                 return -1;
576         }
577 #ifdef HAVE_MMAP
578         if (tdb->map_ptr) {
579                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
580                 if (msync(moffset + (char *)tdb->map_ptr,
581                           length + (offset - moffset), MS_SYNC) != 0) {
582                         tdb->ecode = TDB_ERR_IO;
583                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
584                                  strerror(errno)));
585                         return -1;
586                 }
587         }
588 #endif
589         return 0;
590 }
591
592
593 static int _tdb_transaction_cancel(struct tdb_context *tdb)
594 {
595         uint32_t i;
596         int ret = 0;
597
598         if (tdb->transaction == NULL) {
599                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
600                 return -1;
601         }
602
603         if (tdb->transaction->nesting != 0) {
604                 tdb->transaction->transaction_error = 1;
605                 tdb->transaction->nesting--;
606                 return 0;
607         }
608
609         tdb->map_size = tdb->transaction->old_map_size;
610
611         /* free all the transaction blocks */
612         for (i=0;i<tdb->transaction->num_blocks;i++) {
613                 if (tdb->transaction->blocks[i] != NULL) {
614                         free(tdb->transaction->blocks[i]);
615                 }
616         }
617         SAFE_FREE(tdb->transaction->blocks);
618
619         if (tdb->transaction->magic_offset) {
620                 const struct tdb_methods *methods = tdb->transaction->io_methods;
621                 const uint32_t invalid = TDB_RECOVERY_INVALID_MAGIC;
622
623                 /* remove the recovery marker */
624                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &invalid, 4) == -1 ||
625                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
626                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
627                         ret = -1;
628                 }
629         }
630
631         /* This also removes the OPEN_LOCK, if we have it. */
632         tdb_release_transaction_locks(tdb);
633
634         /* restore the normal io methods */
635         tdb->methods = tdb->transaction->io_methods;
636
637         SAFE_FREE(tdb->transaction->hash_heads);
638         SAFE_FREE(tdb->transaction);
639
640         return ret;
641 }
642
643 /*
644   cancel the current transaction
645 */
646 _PUBLIC_ int tdb_transaction_cancel(struct tdb_context *tdb)
647 {
648         tdb_trace(tdb, "tdb_transaction_cancel");
649         return _tdb_transaction_cancel(tdb);
650 }
651
652 /*
653   work out how much space the linearised recovery data will consume
654 */
655 static bool tdb_recovery_size(struct tdb_context *tdb, tdb_len_t *result)
656 {
657         tdb_len_t recovery_size = 0;
658         uint32_t i;
659
660         recovery_size = sizeof(uint32_t);
661         for (i=0;i<tdb->transaction->num_blocks;i++) {
662                 tdb_len_t block_size;
663                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
664                         break;
665                 }
666                 if (tdb->transaction->blocks[i] == NULL) {
667                         continue;
668                 }
669                 if (!tdb_add_len_t(recovery_size, 2*sizeof(tdb_off_t),
670                                    &recovery_size)) {
671                         return false;
672                 }
673                 if (i == tdb->transaction->num_blocks-1) {
674                         block_size = tdb->transaction->last_block_size;
675                 } else {
676                         block_size =  tdb->transaction->block_size;
677                 }
678                 if (!tdb_add_len_t(recovery_size, block_size,
679                                    &recovery_size)) {
680                         return false;
681                 }
682         }
683
684         *result = recovery_size;
685         return true;
686 }
687
688 int tdb_recovery_area(struct tdb_context *tdb,
689                       const struct tdb_methods *methods,
690                       tdb_off_t *recovery_offset,
691                       struct tdb_record *rec)
692 {
693         int ret;
694
695         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, recovery_offset) == -1) {
696                 return -1;
697         }
698
699         if (*recovery_offset == 0) {
700                 rec->rec_len = 0;
701                 return 0;
702         }
703
704         if (methods->tdb_read(tdb, *recovery_offset, rec, sizeof(*rec),
705                               DOCONV()) == -1) {
706                 return -1;
707         }
708
709         /* ignore invalid recovery regions: can happen in crash */
710         if (rec->magic != TDB_RECOVERY_MAGIC &&
711             rec->magic != TDB_RECOVERY_INVALID_MAGIC) {
712                 *recovery_offset = 0;
713                 rec->rec_len = 0;
714         }
715
716         ret = methods->tdb_oob(tdb, *recovery_offset, rec->rec_len, 1);
717         if (ret == -1) {
718                 *recovery_offset = 0;
719                 rec->rec_len = 0;
720         }
721
722         return 0;
723 }
724
725 /*
726   allocate the recovery area, or use an existing recovery area if it is
727   large enough
728 */
729 static int tdb_recovery_allocate(struct tdb_context *tdb,
730                                  tdb_len_t *recovery_size,
731                                  tdb_off_t *recovery_offset,
732                                  tdb_len_t *recovery_max_size)
733 {
734         struct tdb_record rec;
735         const struct tdb_methods *methods = tdb->transaction->io_methods;
736         tdb_off_t recovery_head, new_end;
737
738         if (tdb_recovery_area(tdb, methods, &recovery_head, &rec) == -1) {
739                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
740                 return -1;
741         }
742
743         if (!tdb_recovery_size(tdb, recovery_size)) {
744                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
745                          "overflow recovery size\n"));
746                 return -1;
747         }
748
749         /* Existing recovery area? */
750         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
751                 /* it fits in the existing area */
752                 *recovery_max_size = rec.rec_len;
753                 *recovery_offset = recovery_head;
754                 return 0;
755         }
756
757         /* If recovery area in middle of file, we need a new one. */
758         if (recovery_head == 0
759             || recovery_head + sizeof(rec) + rec.rec_len != tdb->map_size) {
760                 /* we need to free up the old recovery area, then allocate a
761                    new one at the end of the file. Note that we cannot use
762                    tdb_allocate() to allocate the new one as that might return
763                    us an area that is being currently used (as of the start of
764                    the transaction) */
765                 if (recovery_head) {
766                         if (tdb_free(tdb, recovery_head, &rec) == -1) {
767                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
768                                          "tdb_recovery_allocate: failed to"
769                                          " free previous recovery area\n"));
770                                 return -1;
771                         }
772
773                         /* the tdb_free() call might have increased
774                          * the recovery size */
775                         if (!tdb_recovery_size(tdb, recovery_size)) {
776                                 TDB_LOG((tdb, TDB_DEBUG_FATAL,
777                                          "tdb_recovery_allocate: "
778                                          "overflow recovery size\n"));
779                                 return -1;
780                         }
781                 }
782
783                 /* New head will be at end of file. */
784                 recovery_head = tdb->map_size;
785         }
786
787         /* Now we know where it will be. */
788         *recovery_offset = recovery_head;
789
790         /* Expand by more than we need, so we don't do it often. */
791         *recovery_max_size = tdb_expand_adjust(tdb->map_size,
792                                                *recovery_size,
793                                                tdb->page_size)
794                 - sizeof(rec);
795
796         if (!tdb_add_off_t(recovery_head, sizeof(rec), &new_end) ||
797             !tdb_add_off_t(new_end, *recovery_max_size, &new_end)) {
798                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: "
799                          "overflow recovery area\n"));
800                 return -1;
801         }
802
803         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
804                                      new_end - tdb->transaction->old_map_size)
805             == -1) {
806                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
807                 return -1;
808         }
809
810         /* remap the file (if using mmap) */
811         methods->tdb_oob(tdb, tdb->map_size, 1, 1);
812
813         /* we have to reset the old map size so that we don't try to expand the file
814            again in the transaction commit, which would destroy the recovery area */
815         tdb->transaction->old_map_size = tdb->map_size;
816
817         /* write the recovery header offset and sync - we can sync without a race here
818            as the magic ptr in the recovery record has not been set */
819         CONVERT(recovery_head);
820         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD,
821                                &recovery_head, sizeof(tdb_off_t)) == -1) {
822                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
823                 return -1;
824         }
825         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
826                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
827                 return -1;
828         }
829
830         return 0;
831 }
832
833
834 /*
835   setup the recovery data that will be used on a crash during commit
836 */
837 static int transaction_setup_recovery(struct tdb_context *tdb,
838                                       tdb_off_t *magic_offset)
839 {
840         tdb_len_t recovery_size;
841         unsigned char *data, *p;
842         const struct tdb_methods *methods = tdb->transaction->io_methods;
843         struct tdb_record *rec;
844         tdb_off_t recovery_offset, recovery_max_size;
845         tdb_off_t old_map_size = tdb->transaction->old_map_size;
846         uint32_t magic, tailer;
847         uint32_t i;
848
849         /*
850           check that the recovery area has enough space
851         */
852         if (tdb_recovery_allocate(tdb, &recovery_size,
853                                   &recovery_offset, &recovery_max_size) == -1) {
854                 return -1;
855         }
856
857         rec = malloc(recovery_size + sizeof(*rec));
858         if (rec == NULL) {
859                 tdb->ecode = TDB_ERR_OOM;
860                 return -1;
861         }
862
863         memset(rec, 0, sizeof(*rec));
864
865         rec->magic    = TDB_RECOVERY_INVALID_MAGIC;
866         rec->data_len = recovery_size;
867         rec->rec_len  = recovery_max_size;
868         rec->key_len  = old_map_size;
869         CONVERT(*rec);
870
871         data = (unsigned char *)rec;
872
873         /* build the recovery data into a single blob to allow us to do a single
874            large write, which should be more efficient */
875         p = data + sizeof(*rec);
876         for (i=0;i<tdb->transaction->num_blocks;i++) {
877                 tdb_off_t offset;
878                 tdb_len_t length;
879
880                 if (tdb->transaction->blocks[i] == NULL) {
881                         continue;
882                 }
883
884                 offset = i * tdb->transaction->block_size;
885                 length = tdb->transaction->block_size;
886                 if (i == tdb->transaction->num_blocks-1) {
887                         length = tdb->transaction->last_block_size;
888                 }
889
890                 if (offset >= old_map_size) {
891                         continue;
892                 }
893                 if (offset + length > tdb->transaction->old_map_size) {
894                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
895                         free(data);
896                         tdb->ecode = TDB_ERR_CORRUPT;
897                         return -1;
898                 }
899                 memcpy(p, &offset, 4);
900                 memcpy(p+4, &length, 4);
901                 if (DOCONV()) {
902                         tdb_convert(p, 8);
903                 }
904                 /* the recovery area contains the old data, not the
905                    new data, so we have to call the original tdb_read
906                    method to get it */
907                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
908                         free(data);
909                         tdb->ecode = TDB_ERR_IO;
910                         return -1;
911                 }
912                 p += 8 + length;
913         }
914
915         /* and the tailer */
916         tailer = sizeof(*rec) + recovery_max_size;
917         memcpy(p, &tailer, 4);
918         if (DOCONV()) {
919                 tdb_convert(p, 4);
920         }
921
922         /* write the recovery data to the recovery area */
923         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
924                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
925                 free(data);
926                 tdb->ecode = TDB_ERR_IO;
927                 return -1;
928         }
929         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
930                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
931                 free(data);
932                 tdb->ecode = TDB_ERR_IO;
933                 return -1;
934         }
935
936         /* as we don't have ordered writes, we have to sync the recovery
937            data before we update the magic to indicate that the recovery
938            data is present */
939         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
940                 free(data);
941                 return -1;
942         }
943
944         free(data);
945
946         magic = TDB_RECOVERY_MAGIC;
947         CONVERT(magic);
948
949         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
950
951         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
952                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
953                 tdb->ecode = TDB_ERR_IO;
954                 return -1;
955         }
956         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
957                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
958                 tdb->ecode = TDB_ERR_IO;
959                 return -1;
960         }
961
962         /* ensure the recovery magic marker is on disk */
963         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
964                 return -1;
965         }
966
967         return 0;
968 }
969
970 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
971 {
972         const struct tdb_methods *methods;
973
974         if (tdb->transaction == NULL) {
975                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
976                 return -1;
977         }
978
979         if (tdb->transaction->prepared) {
980                 tdb->ecode = TDB_ERR_EINVAL;
981                 _tdb_transaction_cancel(tdb);
982                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
983                 return -1;
984         }
985
986         if (tdb->transaction->transaction_error) {
987                 tdb->ecode = TDB_ERR_IO;
988                 _tdb_transaction_cancel(tdb);
989                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
990                 return -1;
991         }
992
993
994         if (tdb->transaction->nesting != 0) {
995                 return 0;
996         }
997
998         /* check for a null transaction */
999         if (tdb->transaction->blocks == NULL) {
1000                 return 0;
1001         }
1002
1003         methods = tdb->transaction->io_methods;
1004
1005         /* if there are any locks pending then the caller has not
1006            nested their locks properly, so fail the transaction */
1007         if (tdb_have_extra_locks(tdb)) {
1008                 tdb->ecode = TDB_ERR_LOCK;
1009                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
1010                 _tdb_transaction_cancel(tdb);
1011                 return -1;
1012         }
1013
1014         /* upgrade the main transaction lock region to a write lock */
1015         if (tdb_allrecord_upgrade(tdb) == -1) {
1016                 if (tdb->ecode == TDB_ERR_RDONLY && tdb->read_only) {
1017                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1018                                  "tdb_transaction_prepare_commit: "
1019                                  "failed to upgrade hash locks: "
1020                                  "database is read only\n"));
1021                 } else if (tdb->ecode == TDB_ERR_RDONLY
1022                            && tdb->traverse_read) {
1023                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1024                                  "tdb_transaction_prepare_commit: "
1025                                  "failed to upgrade hash locks: "
1026                                  "a database traverse is in progress\n"));
1027                 } else {
1028                         TDB_LOG((tdb, TDB_DEBUG_ERROR,
1029                                  "tdb_transaction_prepare_commit: "
1030                                  "failed to upgrade hash locks: %s\n",
1031                                  tdb_errorstr(tdb)));
1032                 }
1033                 _tdb_transaction_cancel(tdb);
1034                 return -1;
1035         }
1036
1037         /* get the open lock - this prevents new users attaching to the database
1038            during the commit */
1039         if (tdb_nest_lock(tdb, OPEN_LOCK, F_WRLCK, TDB_LOCK_WAIT) == -1) {
1040                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get open lock\n"));
1041                 _tdb_transaction_cancel(tdb);
1042                 return -1;
1043         }
1044
1045         /* write the recovery data to the end of the file */
1046         if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
1047                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
1048                 _tdb_transaction_cancel(tdb);
1049                 return -1;
1050         }
1051
1052         tdb->transaction->prepared = true;
1053
1054         /* expand the file to the new size if needed */
1055         if (tdb->map_size != tdb->transaction->old_map_size) {
1056                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size,
1057                                              tdb->map_size -
1058                                              tdb->transaction->old_map_size) == -1) {
1059                         tdb->ecode = TDB_ERR_IO;
1060                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
1061                         _tdb_transaction_cancel(tdb);
1062                         return -1;
1063                 }
1064                 tdb->map_size = tdb->transaction->old_map_size;
1065                 methods->tdb_oob(tdb, tdb->map_size, 1, 1);
1066         }
1067
1068         /* Keep the open lock until the actual commit */
1069
1070         return 0;
1071 }
1072
1073 /*
1074    prepare to commit the current transaction
1075 */
1076 _PUBLIC_ int tdb_transaction_prepare_commit(struct tdb_context *tdb)
1077 {
1078         tdb_trace(tdb, "tdb_transaction_prepare_commit");
1079         return _tdb_transaction_prepare_commit(tdb);
1080 }
1081
1082 /* A repack is worthwhile if the largest is less than half total free. */
1083 static bool repack_worthwhile(struct tdb_context *tdb)
1084 {
1085         tdb_off_t ptr;
1086         struct tdb_record rec;
1087         tdb_len_t total = 0, largest = 0;
1088
1089         if (tdb_ofs_read(tdb, FREELIST_TOP, &ptr) == -1) {
1090                 return false;
1091         }
1092
1093         while (ptr != 0 && tdb_rec_free_read(tdb, ptr, &rec) == 0) {
1094                 total += rec.rec_len;
1095                 if (rec.rec_len > largest) {
1096                         largest = rec.rec_len;
1097                 }
1098                 ptr = rec.next;
1099         }
1100
1101         return total > largest * 2;
1102 }
1103
1104 /*
1105   commit the current transaction
1106 */
1107 _PUBLIC_ int tdb_transaction_commit(struct tdb_context *tdb)
1108 {
1109         const struct tdb_methods *methods;
1110         uint32_t i;
1111         bool need_repack = false;
1112
1113         if (tdb->transaction == NULL) {
1114                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
1115                 return -1;
1116         }
1117
1118         tdb_trace(tdb, "tdb_transaction_commit");
1119
1120         if (tdb->transaction->transaction_error) {
1121                 tdb->ecode = TDB_ERR_IO;
1122                 _tdb_transaction_cancel(tdb);
1123                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
1124                 return -1;
1125         }
1126
1127
1128         if (tdb->transaction->nesting != 0) {
1129                 tdb->transaction->nesting--;
1130                 return 0;
1131         }
1132
1133         /* check for a null transaction */
1134         if (tdb->transaction->blocks == NULL) {
1135                 _tdb_transaction_cancel(tdb);
1136                 return 0;
1137         }
1138
1139         if (!tdb->transaction->prepared) {
1140                 int ret = _tdb_transaction_prepare_commit(tdb);
1141                 if (ret)
1142                         return ret;
1143         }
1144
1145         methods = tdb->transaction->io_methods;
1146
1147         /* perform all the writes */
1148         for (i=0;i<tdb->transaction->num_blocks;i++) {
1149                 tdb_off_t offset;
1150                 tdb_len_t length;
1151
1152                 if (tdb->transaction->blocks[i] == NULL) {
1153                         continue;
1154                 }
1155
1156                 offset = i * tdb->transaction->block_size;
1157                 length = tdb->transaction->block_size;
1158                 if (i == tdb->transaction->num_blocks-1) {
1159                         length = tdb->transaction->last_block_size;
1160                 }
1161
1162                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1163                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1164
1165                         /* we've overwritten part of the data and
1166                            possibly expanded the file, so we need to
1167                            run the crash recovery code */
1168                         tdb->methods = methods;
1169                         tdb_transaction_recover(tdb);
1170
1171                         _tdb_transaction_cancel(tdb);
1172
1173                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1174                         return -1;
1175                 }
1176                 SAFE_FREE(tdb->transaction->blocks[i]);
1177         }
1178
1179         /* Do this before we drop lock or blocks. */
1180         if (tdb->transaction->expanded) {
1181                 need_repack = repack_worthwhile(tdb);
1182         }
1183
1184         SAFE_FREE(tdb->transaction->blocks);
1185         tdb->transaction->num_blocks = 0;
1186
1187         /* ensure the new data is on disk */
1188         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1189                 return -1;
1190         }
1191
1192         /*
1193           TODO: maybe write to some dummy hdr field, or write to magic
1194           offset without mmap, before the last sync, instead of the
1195           utime() call
1196         */
1197
1198         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1199            don't change the mtime of the file, this means the file may
1200            not be backed up (as tdb rounding to block sizes means that
1201            file size changes are quite rare too). The following forces
1202            mtime changes when a transaction completes */
1203 #ifdef HAVE_UTIME
1204         utime(tdb->name, NULL);
1205 #endif
1206
1207         /* use a transaction cancel to free memory and remove the
1208            transaction locks */
1209         _tdb_transaction_cancel(tdb);
1210
1211         if (need_repack) {
1212                 return tdb_repack(tdb);
1213         }
1214
1215         return 0;
1216 }
1217
1218
1219 /*
1220   recover from an aborted transaction. Must be called with exclusive
1221   database write access already established (including the open
1222   lock to prevent new processes attaching)
1223 */
1224 int tdb_transaction_recover(struct tdb_context *tdb)
1225 {
1226         tdb_off_t recovery_head, recovery_eof;
1227         unsigned char *data, *p;
1228         uint32_t zero = 0;
1229         struct tdb_record rec;
1230
1231         /* find the recovery area */
1232         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1233                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1234                 tdb->ecode = TDB_ERR_IO;
1235                 return -1;
1236         }
1237
1238         if (recovery_head == 0) {
1239                 /* we have never allocated a recovery record */
1240                 return 0;
1241         }
1242
1243         /* read the recovery record */
1244         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1245                                    sizeof(rec), DOCONV()) == -1) {
1246                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));
1247                 tdb->ecode = TDB_ERR_IO;
1248                 return -1;
1249         }
1250
1251         if (rec.magic != TDB_RECOVERY_MAGIC) {
1252                 /* there is no valid recovery data */
1253                 return 0;
1254         }
1255
1256         if (tdb->read_only) {
1257                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1258                 tdb->ecode = TDB_ERR_CORRUPT;
1259                 return -1;
1260         }
1261
1262         recovery_eof = rec.key_len;
1263
1264         data = (unsigned char *)malloc(rec.data_len);
1265         if (data == NULL) {
1266                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));
1267                 tdb->ecode = TDB_ERR_OOM;
1268                 return -1;
1269         }
1270
1271         /* read the full recovery data */
1272         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1273                                    rec.data_len, 0) == -1) {
1274                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));
1275                 tdb->ecode = TDB_ERR_IO;
1276                 return -1;
1277         }
1278
1279         /* recover the file data */
1280         p = data;
1281         while (p+8 < data + rec.data_len) {
1282                 uint32_t ofs, len;
1283                 if (DOCONV()) {
1284                         tdb_convert(p, 8);
1285                 }
1286                 memcpy(&ofs, p, 4);
1287                 memcpy(&len, p+4, 4);
1288
1289                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1290                         free(data);
1291                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %u bytes at offset %u\n", len, ofs));
1292                         tdb->ecode = TDB_ERR_IO;
1293                         return -1;
1294                 }
1295                 p += 8 + len;
1296         }
1297
1298         free(data);
1299
1300         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1301                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1302                 tdb->ecode = TDB_ERR_IO;
1303                 return -1;
1304         }
1305
1306         /* if the recovery area is after the recovered eof then remove it */
1307         if (recovery_eof <= recovery_head) {
1308                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1309                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1310                         tdb->ecode = TDB_ERR_IO;
1311                         return -1;
1312                 }
1313         }
1314
1315         /* remove the recovery magic */
1316         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1317                           &zero) == -1) {
1318                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1319                 tdb->ecode = TDB_ERR_IO;
1320                 return -1;
1321         }
1322
1323         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1324                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1325                 tdb->ecode = TDB_ERR_IO;
1326                 return -1;
1327         }
1328
1329         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %u byte database\n",
1330                  recovery_eof));
1331
1332         /* all done */
1333         return 0;
1334 }
1335
1336 /* Any I/O failures we say "needs recovery". */
1337 bool tdb_needs_recovery(struct tdb_context *tdb)
1338 {
1339         tdb_off_t recovery_head;
1340         struct tdb_record rec;
1341
1342         /* find the recovery area */
1343         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1344                 return true;
1345         }
1346
1347         if (recovery_head == 0) {
1348                 /* we have never allocated a recovery record */
1349                 return false;
1350         }
1351
1352         /* read the recovery record */
1353         if (tdb->methods->tdb_read(tdb, recovery_head, &rec,
1354                                    sizeof(rec), DOCONV()) == -1) {
1355                 return true;
1356         }
1357
1358         return (rec.magic == TDB_RECOVERY_MAGIC);
1359 }