TDB2: Goodbye TDB2, Hello NTDB.
[obnox/samba/samba-obnox.git] / lib / ntdb / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the ntdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #define SAFE_FREE(x) do { if ((x) != NULL) {free((void *)x); (x)=NULL;} } while(0)
29
30 /*
31   transaction design:
32
33   - only allow a single transaction at a time per database. This makes
34     using the transaction API simpler, as otherwise the caller would
35     have to cope with temporary failures in transactions that conflict
36     with other current transactions
37
38   - keep the transaction recovery information in the same file as the
39     database, using a special 'transaction recovery' record pointed at
40     by the header. This removes the need for extra journal files as
41     used by some other databases
42
43   - dynamically allocated the transaction recover record, re-using it
44     for subsequent transactions. If a larger record is needed then
45     ntdb_free() the old record to place it on the normal ntdb freelist
46     before allocating the new record
47
48   - during transactions, keep a linked list of writes all that have
49     been performed by intercepting all ntdb_write() calls. The hooked
50     transaction versions of ntdb_read() and ntdb_write() check this
51     linked list and try to use the elements of the list in preference
52     to the real database.
53
54   - don't allow any locks to be held when a transaction starts,
55     otherwise we can end up with deadlock (plus lack of lock nesting
56     in POSIX locks would mean the lock is lost)
57
58   - if the caller gains a lock during the transaction but doesn't
59     release it then fail the commit
60
61   - allow for nested calls to ntdb_transaction_start(), re-using the
62     existing transaction record. If the inner transaction is canceled
63     then a subsequent commit will fail
64
65   - keep a mirrored copy of the ntdb hash chain heads to allow for the
66     fast hash heads scan on traverse, updating the mirrored copy in
67     the transaction version of ntdb_write
68
69   - allow callers to mix transaction and non-transaction use of ntdb,
70     although once a transaction is started then an exclusive lock is
71     gained until the transaction is committed or canceled
72
73   - the commit stategy involves first saving away all modified data
74     into a linearised buffer in the transaction recovery area, then
75     marking the transaction recovery area with a magic value to
76     indicate a valid recovery record. In total 4 fsync/msync calls are
77     needed per commit to prevent race conditions. It might be possible
78     to reduce this to 3 or even 2 with some more work.
79
80   - check for a valid recovery record on open of the ntdb, while the
81     open lock is held. Automatically recover from the transaction
82     recovery area if needed, then continue with the open as
83     usual. This allows for smooth crash recovery with no administrator
84     intervention.
85
86   - if NTDB_NOSYNC is passed to flags in ntdb_open then transactions are
87     still available, but no transaction recovery area is used and no
88     fsync/msync calls are made.
89 */
90
91 /*
92   hold the context of any current transaction
93 */
94 struct ntdb_transaction {
95         /* the original io methods - used to do IOs to the real db */
96         const struct ntdb_methods *io_methods;
97
98         /* the list of transaction blocks. When a block is first
99            written to, it gets created in this list */
100         uint8_t **blocks;
101         size_t num_blocks;
102         size_t last_block_size; /* number of valid bytes in the last block */
103
104         /* non-zero when an internal transaction error has
105            occurred. All write operations will then fail until the
106            transaction is ended */
107         int transaction_error;
108
109         /* when inside a transaction we need to keep track of any
110            nested ntdb_transaction_start() calls, as these are allowed,
111            but don't create a new transaction */
112         unsigned int nesting;
113
114         /* set when a prepare has already occurred */
115         bool prepared;
116         ntdb_off_t magic_offset;
117
118         /* old file size before transaction */
119         ntdb_len_t old_map_size;
120 };
121
122 /* This doesn't really need to be pagesize, but we use it for similar reasons. */
123 #define PAGESIZE 65536
124
125 /*
126   read while in a transaction. We need to check first if the data is in our list
127   of transaction elements, then if not do a real read
128 */
129 static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t off,
130                                        void *buf, ntdb_len_t len)
131 {
132         size_t blk;
133         enum NTDB_ERROR ecode;
134
135         /* break it down into block sized ops */
136         while (len + (off % PAGESIZE) > PAGESIZE) {
137                 ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
138                 ecode = transaction_read(ntdb, off, buf, len2);
139                 if (ecode != NTDB_SUCCESS) {
140                         return ecode;
141                 }
142                 len -= len2;
143                 off += len2;
144                 buf = (void *)(len2 + (char *)buf);
145         }
146
147         if (len == 0) {
148                 return NTDB_SUCCESS;
149         }
150
151         blk = off / PAGESIZE;
152
153         /* see if we have it in the block list */
154         if (ntdb->transaction->num_blocks <= blk ||
155             ntdb->transaction->blocks[blk] == NULL) {
156                 /* nope, do a real read */
157                 ecode = ntdb->transaction->io_methods->tread(ntdb, off, buf, len);
158                 if (ecode != NTDB_SUCCESS) {
159                         goto fail;
160                 }
161                 return 0;
162         }
163
164         /* it is in the block list. Now check for the last block */
165         if (blk == ntdb->transaction->num_blocks-1) {
166                 if (len > ntdb->transaction->last_block_size) {
167                         ecode = NTDB_ERR_IO;
168                         goto fail;
169                 }
170         }
171
172         /* now copy it out of this block */
173         memcpy(buf, ntdb->transaction->blocks[blk] + (off % PAGESIZE), len);
174         return NTDB_SUCCESS;
175
176 fail:
177         ntdb->transaction->transaction_error = 1;
178         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
179                           "transaction_read: failed at off=%zu len=%zu",
180                           (size_t)off, (size_t)len);
181 }
182
183
184 /*
185   write while in a transaction
186 */
187 static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t off,
188                                         const void *buf, ntdb_len_t len)
189 {
190         size_t blk;
191         enum NTDB_ERROR ecode;
192
193         /* Only a commit is allowed on a prepared transaction */
194         if (ntdb->transaction->prepared) {
195                 ecode = ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
196                                    "transaction_write: transaction already"
197                                    " prepared, write not allowed");
198                 goto fail;
199         }
200
201         /* break it up into block sized chunks */
202         while (len + (off % PAGESIZE) > PAGESIZE) {
203                 ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
204                 ecode = transaction_write(ntdb, off, buf, len2);
205                 if (ecode != NTDB_SUCCESS) {
206                         return ecode;
207                 }
208                 len -= len2;
209                 off += len2;
210                 if (buf != NULL) {
211                         buf = (const void *)(len2 + (const char *)buf);
212                 }
213         }
214
215         if (len == 0) {
216                 return NTDB_SUCCESS;
217         }
218
219         blk = off / PAGESIZE;
220         off = off % PAGESIZE;
221
222         if (ntdb->transaction->num_blocks <= blk) {
223                 uint8_t **new_blocks;
224                 /* expand the blocks array */
225                 if (ntdb->transaction->blocks == NULL) {
226                         new_blocks = (uint8_t **)malloc(
227                                 (blk+1)*sizeof(uint8_t *));
228                 } else {
229                         new_blocks = (uint8_t **)realloc(
230                                 ntdb->transaction->blocks,
231                                 (blk+1)*sizeof(uint8_t *));
232                 }
233                 if (new_blocks == NULL) {
234                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
235                                            "transaction_write:"
236                                            " failed to allocate");
237                         goto fail;
238                 }
239                 memset(&new_blocks[ntdb->transaction->num_blocks], 0,
240                        (1+(blk - ntdb->transaction->num_blocks))*sizeof(uint8_t *));
241                 ntdb->transaction->blocks = new_blocks;
242                 ntdb->transaction->num_blocks = blk+1;
243                 ntdb->transaction->last_block_size = 0;
244         }
245
246         /* allocate and fill a block? */
247         if (ntdb->transaction->blocks[blk] == NULL) {
248                 ntdb->transaction->blocks[blk] = (uint8_t *)calloc(PAGESIZE, 1);
249                 if (ntdb->transaction->blocks[blk] == NULL) {
250                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
251                                            "transaction_write:"
252                                            " failed to allocate");
253                         goto fail;
254                 }
255                 if (ntdb->transaction->old_map_size > blk * PAGESIZE) {
256                         ntdb_len_t len2 = PAGESIZE;
257                         if (len2 + (blk * PAGESIZE) > ntdb->transaction->old_map_size) {
258                                 len2 = ntdb->transaction->old_map_size - (blk * PAGESIZE);
259                         }
260                         ecode = ntdb->transaction->io_methods->tread(ntdb,
261                                         blk * PAGESIZE,
262                                         ntdb->transaction->blocks[blk],
263                                         len2);
264                         if (ecode != NTDB_SUCCESS) {
265                                 ecode = ntdb_logerr(ntdb, ecode,
266                                                    NTDB_LOG_ERROR,
267                                                    "transaction_write:"
268                                                    " failed to"
269                                                    " read old block: %s",
270                                                    strerror(errno));
271                                 SAFE_FREE(ntdb->transaction->blocks[blk]);
272                                 goto fail;
273                         }
274                         if (blk == ntdb->transaction->num_blocks-1) {
275                                 ntdb->transaction->last_block_size = len2;
276                         }
277                 }
278         }
279
280         /* overwrite part of an existing block */
281         if (buf == NULL) {
282                 memset(ntdb->transaction->blocks[blk] + off, 0, len);
283         } else {
284                 memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
285         }
286         if (blk == ntdb->transaction->num_blocks-1) {
287                 if (len + off > ntdb->transaction->last_block_size) {
288                         ntdb->transaction->last_block_size = len + off;
289                 }
290         }
291
292         return NTDB_SUCCESS;
293
294 fail:
295         ntdb->transaction->transaction_error = 1;
296         return ecode;
297 }
298
299
300 /*
301   write while in a transaction - this variant never expands the transaction blocks, it only
302   updates existing blocks. This means it cannot change the recovery size
303 */
304 static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off,
305                                        const void *buf, ntdb_len_t len)
306 {
307         size_t blk;
308
309         /* break it up into block sized chunks */
310         while (len + (off % PAGESIZE) > PAGESIZE) {
311                 ntdb_len_t len2 = PAGESIZE - (off % PAGESIZE);
312                 transaction_write_existing(ntdb, off, buf, len2);
313                 len -= len2;
314                 off += len2;
315                 if (buf != NULL) {
316                         buf = (const void *)(len2 + (const char *)buf);
317                 }
318         }
319
320         if (len == 0) {
321                 return;
322         }
323
324         blk = off / PAGESIZE;
325         off = off % PAGESIZE;
326
327         if (ntdb->transaction->num_blocks <= blk ||
328             ntdb->transaction->blocks[blk] == NULL) {
329                 return;
330         }
331
332         if (blk == ntdb->transaction->num_blocks-1 &&
333             off + len > ntdb->transaction->last_block_size) {
334                 if (off >= ntdb->transaction->last_block_size) {
335                         return;
336                 }
337                 len = ntdb->transaction->last_block_size - off;
338         }
339
340         /* overwrite part of an existing block */
341         memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
342 }
343
344
345 /*
346   out of bounds check during a transaction
347 */
348 static enum NTDB_ERROR transaction_oob(struct ntdb_context *ntdb,
349                                       ntdb_off_t off, ntdb_len_t len, bool probe)
350 {
351         if ((off + len >= off && off + len <= ntdb->file->map_size) || probe) {
352                 return NTDB_SUCCESS;
353         }
354
355         ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
356                    "ntdb_oob len %lld beyond transaction size %lld",
357                    (long long)(off + len),
358                    (long long)ntdb->file->map_size);
359         return NTDB_ERR_IO;
360 }
361
362 /*
363   transaction version of ntdb_expand().
364 */
365 static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
366                                               ntdb_off_t addition)
367 {
368         enum NTDB_ERROR ecode;
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(ntdb, ntdb->file->map_size, NULL, addition);
373         if (ecode == NTDB_SUCCESS) {
374                 ntdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / PAGESIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / PAGESIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 ntdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= ntdb->transaction->num_blocks
392                     || ntdb->transaction->blocks[blk] == NULL) {
393                         ntdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return ntdb->transaction->blocks[blk] + off % PAGESIZE;
397         }
398
399         ntdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < ntdb->transaction->num_blocks
403             && ntdb->transaction->blocks[blk])
404                 return ntdb->transaction->blocks[blk] + off % PAGESIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= ntdb->transaction->num_blocks)
409                         break;
410                 if (ntdb->transaction->blocks[blk]) {
411                         ntdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return ntdb->transaction->io_methods->direct(ntdb, off, len, false);
417 }
418
419 static const struct ntdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum NTDB_ERROR transaction_sync(struct ntdb_context *ntdb,
431                                        ntdb_off_t offset, ntdb_len_t length)
432 {
433         if (ntdb->flags & NTDB_NOSYNC) {
434                 return NTDB_SUCCESS;
435         }
436
437         if (fsync(ntdb->file->fd) != 0) {
438                 return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
439                                   "ntdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (ntdb->file->map_ptr) {
444                 ntdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)ntdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
448                                           "ntdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return NTDB_SUCCESS;
454 }
455
456
457 static void _ntdb_transaction_cancel(struct ntdb_context *ntdb)
458 {
459         int i;
460         enum NTDB_ERROR ecode;
461
462         if (ntdb->transaction == NULL) {
463                 ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
464                            "ntdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (ntdb->transaction->nesting != 0) {
469                 ntdb->transaction->transaction_error = 1;
470                 ntdb->transaction->nesting--;
471                 return;
472         }
473
474         ntdb->file->map_size = ntdb->transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<ntdb->transaction->num_blocks;i++) {
478                 if (ntdb->transaction->blocks[i] != NULL) {
479                         free(ntdb->transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(ntdb->transaction->blocks);
483
484         if (ntdb->transaction->magic_offset) {
485                 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
486                 uint64_t invalid = NTDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == NTDB_SUCCESS)
492                         ecode = transaction_sync(ntdb,
493                                                  ntdb->transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != NTDB_SUCCESS) {
496                         ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
497                                    "ntdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (ntdb->file->allrecord_lock.count)
503                 ntdb_allrecord_unlock(ntdb, ntdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         ntdb->io = ntdb->transaction->io_methods;
507
508         ntdb_transaction_unlock(ntdb, F_WRLCK);
509
510         if (ntdb_has_open_lock(ntdb))
511                 ntdb_unlock_open(ntdb, F_WRLCK);
512
513         SAFE_FREE(ntdb->transaction);
514 }
515
516 /*
517   start a ntdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per ntdb_context
519 */
520 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_start(struct ntdb_context *ntdb)
521 {
522         enum NTDB_ERROR ecode;
523
524         ntdb->stats.transactions++;
525         /* some sanity checks */
526         if (ntdb->flags & NTDB_INTERNAL) {
527                 return ntdb->last_error = ntdb_logerr(ntdb, NTDB_ERR_EINVAL,
528                                                     NTDB_LOG_USE_ERROR,
529                                                     "ntdb_transaction_start:"
530                                                     " cannot start a"
531                                                     " transaction on an"
532                                                     " internal ntdb");
533         }
534
535         if (ntdb->flags & NTDB_RDONLY) {
536                 return ntdb->last_error = ntdb_logerr(ntdb, NTDB_ERR_RDONLY,
537                                                     NTDB_LOG_USE_ERROR,
538                                                     "ntdb_transaction_start:"
539                                                     " cannot start a"
540                                                     " transaction on a "
541                                                     " read-only ntdb");
542         }
543
544         /* cope with nested ntdb_transaction_start() calls */
545         if (ntdb->transaction != NULL) {
546                 if (!(ntdb->flags & NTDB_ALLOW_NESTING)) {
547                         return ntdb->last_error
548                                 = ntdb_logerr(ntdb, NTDB_ERR_IO,
549                                              NTDB_LOG_USE_ERROR,
550                                              "ntdb_transaction_start:"
551                                              " already inside transaction");
552                 }
553                 ntdb->transaction->nesting++;
554                 ntdb->stats.transaction_nest++;
555                 return 0;
556         }
557
558         if (ntdb_has_hash_locks(ntdb)) {
559                 /* the caller must not have any locks when starting a
560                    transaction as otherwise we'll be screwed by lack
561                    of nested locks in POSIX */
562                 return ntdb->last_error = ntdb_logerr(ntdb, NTDB_ERR_LOCK,
563                                                     NTDB_LOG_USE_ERROR,
564                                                     "ntdb_transaction_start:"
565                                                     " cannot start a"
566                                                     " transaction with locks"
567                                                     " held");
568         }
569
570         ntdb->transaction = (struct ntdb_transaction *)
571                 calloc(sizeof(struct ntdb_transaction), 1);
572         if (ntdb->transaction == NULL) {
573                 return ntdb->last_error = ntdb_logerr(ntdb, NTDB_ERR_OOM,
574                                                     NTDB_LOG_ERROR,
575                                                     "ntdb_transaction_start:"
576                                                     " cannot allocate");
577         }
578
579         /* get the transaction write lock. This is a blocking lock. As
580            discussed with Volker, there are a number of ways we could
581            make this async, which we will probably do in the future */
582         ecode = ntdb_transaction_lock(ntdb, F_WRLCK);
583         if (ecode != NTDB_SUCCESS) {
584                 SAFE_FREE(ntdb->transaction->blocks);
585                 SAFE_FREE(ntdb->transaction);
586                 return ntdb->last_error = ecode;
587         }
588
589         /* get a read lock over entire file. This is upgraded to a write
590            lock during the commit */
591         ecode = ntdb_allrecord_lock(ntdb, F_RDLCK, NTDB_LOCK_WAIT, true);
592         if (ecode != NTDB_SUCCESS) {
593                 goto fail_allrecord_lock;
594         }
595
596         /* make sure we know about any file expansions already done by
597            anyone else */
598         ntdb->io->oob(ntdb, ntdb->file->map_size, 1, true);
599         ntdb->transaction->old_map_size = ntdb->file->map_size;
600
601         /* finally hook the io methods, replacing them with
602            transaction specific methods */
603         ntdb->transaction->io_methods = ntdb->io;
604         ntdb->io = &transaction_methods;
605         return ntdb->last_error = NTDB_SUCCESS;
606
607 fail_allrecord_lock:
608         ntdb_transaction_unlock(ntdb, F_WRLCK);
609         SAFE_FREE(ntdb->transaction->blocks);
610         SAFE_FREE(ntdb->transaction);
611         return ntdb->last_error = ecode;
612 }
613
614
615 /*
616   cancel the current transaction
617 */
618 _PUBLIC_ void ntdb_transaction_cancel(struct ntdb_context *ntdb)
619 {
620         ntdb->stats.transaction_cancel++;
621         _ntdb_transaction_cancel(ntdb);
622 }
623
624 /*
625   work out how much space the linearised recovery data will consume (worst case)
626 */
627 static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
628 {
629         ntdb_len_t recovery_size = 0;
630         int i;
631
632         recovery_size = 0;
633         for (i=0;i<ntdb->transaction->num_blocks;i++) {
634                 if (i * PAGESIZE >= ntdb->transaction->old_map_size) {
635                         break;
636                 }
637                 if (ntdb->transaction->blocks[i] == NULL) {
638                         continue;
639                 }
640                 recovery_size += 2*sizeof(ntdb_off_t);
641                 if (i == ntdb->transaction->num_blocks-1) {
642                         recovery_size += ntdb->transaction->last_block_size;
643                 } else {
644                         recovery_size += PAGESIZE;
645                 }
646         }
647
648         return recovery_size;
649 }
650
651 static enum NTDB_ERROR ntdb_recovery_area(struct ntdb_context *ntdb,
652                                         const struct ntdb_methods *methods,
653                                         ntdb_off_t *recovery_offset,
654                                         struct ntdb_recovery_record *rec)
655 {
656         enum NTDB_ERROR ecode;
657
658         *recovery_offset = ntdb_read_off(ntdb,
659                                         offsetof(struct ntdb_header, recovery));
660         if (NTDB_OFF_IS_ERR(*recovery_offset)) {
661                 return NTDB_OFF_TO_ERR(*recovery_offset);
662         }
663
664         if (*recovery_offset == 0) {
665                 rec->max_len = 0;
666                 return NTDB_SUCCESS;
667         }
668
669         ecode = methods->tread(ntdb, *recovery_offset, rec, sizeof(*rec));
670         if (ecode != NTDB_SUCCESS)
671                 return ecode;
672
673         ntdb_convert(ntdb, rec, sizeof(*rec));
674         /* ignore invalid recovery regions: can happen in crash */
675         if (rec->magic != NTDB_RECOVERY_MAGIC &&
676             rec->magic != NTDB_RECOVERY_INVALID_MAGIC) {
677                 *recovery_offset = 0;
678                 rec->max_len = 0;
679         }
680         return NTDB_SUCCESS;
681 }
682
683 static unsigned int same(const unsigned char *new,
684                          const unsigned char *old,
685                          unsigned int length)
686 {
687         unsigned int i;
688
689         for (i = 0; i < length; i++) {
690                 if (new[i] != old[i])
691                         break;
692         }
693         return i;
694 }
695
696 static unsigned int different(const unsigned char *new,
697                               const unsigned char *old,
698                               unsigned int length,
699                               unsigned int min_same,
700                               unsigned int *samelen)
701 {
702         unsigned int i;
703
704         *samelen = 0;
705         for (i = 0; i < length; i++) {
706                 if (new[i] == old[i]) {
707                         (*samelen)++;
708                 } else {
709                         if (*samelen >= min_same) {
710                                 return i - *samelen;
711                         }
712                         *samelen = 0;
713                 }
714         }
715
716         if (*samelen < min_same)
717                 *samelen = 0;
718         return length - *samelen;
719 }
720
721 /* Allocates recovery blob, without ntdb_recovery_record at head set up. */
722 static struct ntdb_recovery_record *alloc_recovery(struct ntdb_context *ntdb,
723                                                   ntdb_len_t *len)
724 {
725         struct ntdb_recovery_record *rec;
726         size_t i;
727         enum NTDB_ERROR ecode;
728         unsigned char *p;
729         const struct ntdb_methods *old_methods = ntdb->io;
730
731         rec = malloc(sizeof(*rec) + ntdb_recovery_size(ntdb));
732         if (!rec) {
733                 ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
734                            "transaction_setup_recovery:"
735                            " cannot allocate");
736                 return NTDB_ERR_PTR(NTDB_ERR_OOM);
737         }
738
739         /* We temporarily revert to the old I/O methods, so we can use
740          * ntdb_access_read */
741         ntdb->io = ntdb->transaction->io_methods;
742
743         /* build the recovery data into a single blob to allow us to do a single
744            large write, which should be more efficient */
745         p = (unsigned char *)(rec + 1);
746         for (i=0;i<ntdb->transaction->num_blocks;i++) {
747                 ntdb_off_t offset;
748                 ntdb_len_t length;
749                 unsigned int off;
750                 const unsigned char *buffer;
751
752                 if (ntdb->transaction->blocks[i] == NULL) {
753                         continue;
754                 }
755
756                 offset = i * PAGESIZE;
757                 length = PAGESIZE;
758                 if (i == ntdb->transaction->num_blocks-1) {
759                         length = ntdb->transaction->last_block_size;
760                 }
761
762                 if (offset >= ntdb->transaction->old_map_size) {
763                         continue;
764                 }
765
766                 if (offset + length > ntdb->file->map_size) {
767                         ecode = ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
768                                            "ntdb_transaction_setup_recovery:"
769                                            " transaction data over new region"
770                                            " boundary");
771                         goto fail;
772                 }
773                 if (offset + length > ntdb->transaction->old_map_size) {
774                         /* Short read at EOF. */
775                         length = ntdb->transaction->old_map_size - offset;
776                 }
777                 buffer = ntdb_access_read(ntdb, offset, length, false);
778                 if (NTDB_PTR_IS_ERR(buffer)) {
779                         ecode = NTDB_PTR_ERR(buffer);
780                         goto fail;
781                 }
782
783                 /* Skip over anything the same at the start. */
784                 off = same(ntdb->transaction->blocks[i], buffer, length);
785                 offset += off;
786
787                 while (off < length) {
788                         ntdb_len_t len1;
789                         unsigned int samelen;
790
791                         len1 = different(ntdb->transaction->blocks[i] + off,
792                                         buffer + off, length - off,
793                                         sizeof(offset) + sizeof(len1) + 1,
794                                         &samelen);
795
796                         memcpy(p, &offset, sizeof(offset));
797                         memcpy(p + sizeof(offset), &len1, sizeof(len1));
798                         ntdb_convert(ntdb, p, sizeof(offset) + sizeof(len1));
799                         p += sizeof(offset) + sizeof(len1);
800                         memcpy(p, buffer + off, len1);
801                         p += len1;
802                         off += len1 + samelen;
803                         offset += len1 + samelen;
804                 }
805                 ntdb_access_release(ntdb, buffer);
806         }
807
808         *len = p - (unsigned char *)(rec + 1);
809         ntdb->io = old_methods;
810         return rec;
811
812 fail:
813         free(rec);
814         ntdb->io = old_methods;
815         return NTDB_ERR_PTR(ecode);
816 }
817
818 static ntdb_off_t create_recovery_area(struct ntdb_context *ntdb,
819                                       ntdb_len_t rec_length,
820                                       struct ntdb_recovery_record *rec)
821 {
822         ntdb_off_t off, recovery_off;
823         ntdb_len_t addition;
824         enum NTDB_ERROR ecode;
825         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
826
827         /* round up to a multiple of page size. Overallocate, since each
828          * such allocation forces us to expand the file. */
829         rec->max_len = ntdb_expand_adjust(ntdb->file->map_size, rec_length);
830
831         /* Round up to a page. */
832         rec->max_len = ((sizeof(*rec) + rec->max_len + PAGESIZE-1)
833                         & ~(PAGESIZE-1))
834                 - sizeof(*rec);
835
836         off = ntdb->file->map_size;
837
838         /* Restore ->map_size before calling underlying expand_file.
839            Also so that we don't try to expand the file again in the
840            transaction commit, which would destroy the recovery
841            area */
842         addition = (ntdb->file->map_size - ntdb->transaction->old_map_size) +
843                 sizeof(*rec) + rec->max_len;
844         ntdb->file->map_size = ntdb->transaction->old_map_size;
845         ntdb->stats.transaction_expand_file++;
846         ecode = methods->expand_file(ntdb, addition);
847         if (ecode != NTDB_SUCCESS) {
848                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
849                            "ntdb_recovery_allocate:"
850                            " failed to create recovery area");
851                 return NTDB_ERR_TO_OFF(ecode);
852         }
853
854         /* we have to reset the old map size so that we don't try to
855            expand the file again in the transaction commit, which
856            would destroy the recovery area */
857         ntdb->transaction->old_map_size = ntdb->file->map_size;
858
859         /* write the recovery header offset and sync - we can sync without a race here
860            as the magic ptr in the recovery record has not been set */
861         recovery_off = off;
862         ntdb_convert(ntdb, &recovery_off, sizeof(recovery_off));
863         ecode = methods->twrite(ntdb, offsetof(struct ntdb_header, recovery),
864                                 &recovery_off, sizeof(ntdb_off_t));
865         if (ecode != NTDB_SUCCESS) {
866                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
867                            "ntdb_recovery_allocate:"
868                            " failed to write recovery head");
869                 return NTDB_ERR_TO_OFF(ecode);
870         }
871         transaction_write_existing(ntdb, offsetof(struct ntdb_header, recovery),
872                                    &recovery_off,
873                                    sizeof(ntdb_off_t));
874         return off;
875 }
876
877 /*
878   setup the recovery data that will be used on a crash during commit
879 */
880 static enum NTDB_ERROR transaction_setup_recovery(struct ntdb_context *ntdb)
881 {
882         ntdb_len_t recovery_size = 0;
883         ntdb_off_t recovery_off = 0;
884         ntdb_off_t old_map_size = ntdb->transaction->old_map_size;
885         struct ntdb_recovery_record *recovery;
886         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
887         uint64_t magic;
888         enum NTDB_ERROR ecode;
889
890         recovery = alloc_recovery(ntdb, &recovery_size);
891         if (NTDB_PTR_IS_ERR(recovery))
892                 return NTDB_PTR_ERR(recovery);
893
894         ecode = ntdb_recovery_area(ntdb, methods, &recovery_off, recovery);
895         if (ecode) {
896                 free(recovery);
897                 return ecode;
898         }
899
900         if (recovery->max_len < recovery_size) {
901                 /* Not large enough. Free up old recovery area. */
902                 if (recovery_off) {
903                         ntdb->stats.frees++;
904                         ecode = add_free_record(ntdb, recovery_off,
905                                                 sizeof(*recovery)
906                                                 + recovery->max_len,
907                                                 NTDB_LOCK_WAIT, true);
908                         free(recovery);
909                         if (ecode != NTDB_SUCCESS) {
910                                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
911                                                   "ntdb_recovery_allocate:"
912                                                   " failed to free previous"
913                                                   " recovery area");
914                         }
915
916                         /* Refresh recovery after add_free_record above. */
917                         recovery = alloc_recovery(ntdb, &recovery_size);
918                         if (NTDB_PTR_IS_ERR(recovery))
919                                 return NTDB_PTR_ERR(recovery);
920                 }
921
922                 recovery_off = create_recovery_area(ntdb, recovery_size,
923                                                     recovery);
924                 if (NTDB_OFF_IS_ERR(recovery_off)) {
925                         free(recovery);
926                         return NTDB_OFF_TO_ERR(recovery_off);
927                 }
928         }
929
930         /* Now we know size, convert rec header. */
931         recovery->magic = NTDB_RECOVERY_INVALID_MAGIC;
932         recovery->len = recovery_size;
933         recovery->eof = old_map_size;
934         ntdb_convert(ntdb, recovery, sizeof(*recovery));
935
936         /* write the recovery data to the recovery area */
937         ecode = methods->twrite(ntdb, recovery_off, recovery, recovery_size);
938         if (ecode != NTDB_SUCCESS) {
939                 free(recovery);
940                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
941                                   "ntdb_transaction_setup_recovery:"
942                                   " failed to write recovery data");
943         }
944         transaction_write_existing(ntdb, recovery_off, recovery, recovery_size);
945
946         free(recovery);
947
948         /* as we don't have ordered writes, we have to sync the recovery
949            data before we update the magic to indicate that the recovery
950            data is present */
951         ecode = transaction_sync(ntdb, recovery_off, recovery_size);
952         if (ecode != NTDB_SUCCESS)
953                 return ecode;
954
955         magic = NTDB_RECOVERY_MAGIC;
956         ntdb_convert(ntdb, &magic, sizeof(magic));
957
958         ntdb->transaction->magic_offset
959                 = recovery_off + offsetof(struct ntdb_recovery_record, magic);
960
961         ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
962                                 &magic, sizeof(magic));
963         if (ecode != NTDB_SUCCESS) {
964                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
965                                   "ntdb_transaction_setup_recovery:"
966                                   " failed to write recovery magic");
967         }
968         transaction_write_existing(ntdb, ntdb->transaction->magic_offset,
969                                    &magic, sizeof(magic));
970
971         /* ensure the recovery magic marker is on disk */
972         return transaction_sync(ntdb, ntdb->transaction->magic_offset,
973                                 sizeof(magic));
974 }
975
976 static enum NTDB_ERROR _ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
977 {
978         const struct ntdb_methods *methods;
979         enum NTDB_ERROR ecode;
980
981         if (ntdb->transaction == NULL) {
982                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
983                                   "ntdb_transaction_prepare_commit:"
984                                   " no transaction");
985         }
986
987         if (ntdb->transaction->prepared) {
988                 _ntdb_transaction_cancel(ntdb);
989                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
990                                   "ntdb_transaction_prepare_commit:"
991                                   " transaction already prepared");
992         }
993
994         if (ntdb->transaction->transaction_error) {
995                 _ntdb_transaction_cancel(ntdb);
996                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
997                                   "ntdb_transaction_prepare_commit:"
998                                   " transaction error pending");
999         }
1000
1001
1002         if (ntdb->transaction->nesting != 0) {
1003                 return NTDB_SUCCESS;
1004         }
1005
1006         /* check for a null transaction */
1007         if (ntdb->transaction->blocks == NULL) {
1008                 return NTDB_SUCCESS;
1009         }
1010
1011         methods = ntdb->transaction->io_methods;
1012
1013         /* upgrade the main transaction lock region to a write lock */
1014         ecode = ntdb_allrecord_upgrade(ntdb, NTDB_HASH_LOCK_START);
1015         if (ecode != NTDB_SUCCESS) {
1016                 return ecode;
1017         }
1018
1019         /* get the open lock - this prevents new users attaching to the database
1020            during the commit */
1021         ecode = ntdb_lock_open(ntdb, F_WRLCK, NTDB_LOCK_WAIT|NTDB_LOCK_NOCHECK);
1022         if (ecode != NTDB_SUCCESS) {
1023                 return ecode;
1024         }
1025
1026         /* Since we have whole db locked, we don't need the expansion lock. */
1027         if (!(ntdb->flags & NTDB_NOSYNC)) {
1028                 /* Sets up ntdb->transaction->recovery and
1029                  * ntdb->transaction->magic_offset. */
1030                 ecode = transaction_setup_recovery(ntdb);
1031                 if (ecode != NTDB_SUCCESS) {
1032                         return ecode;
1033                 }
1034         }
1035
1036         ntdb->transaction->prepared = true;
1037
1038         /* expand the file to the new size if needed */
1039         if (ntdb->file->map_size != ntdb->transaction->old_map_size) {
1040                 ntdb_len_t add;
1041
1042                 add = ntdb->file->map_size - ntdb->transaction->old_map_size;
1043                 /* Restore original map size for ntdb_expand_file */
1044                 ntdb->file->map_size = ntdb->transaction->old_map_size;
1045                 ecode = methods->expand_file(ntdb, add);
1046                 if (ecode != NTDB_SUCCESS) {
1047                         return ecode;
1048                 }
1049         }
1050
1051         /* Keep the open lock until the actual commit */
1052         return NTDB_SUCCESS;
1053 }
1054
1055 /*
1056    prepare to commit the current transaction
1057 */
1058 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
1059 {
1060         return ntdb->last_error = _ntdb_transaction_prepare_commit(ntdb);
1061 }
1062
1063 /*
1064   commit the current transaction
1065 */
1066 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_commit(struct ntdb_context *ntdb)
1067 {
1068         const struct ntdb_methods *methods;
1069         int i;
1070         enum NTDB_ERROR ecode;
1071
1072         if (ntdb->transaction == NULL) {
1073                 return ntdb->last_error = ntdb_logerr(ntdb, NTDB_ERR_EINVAL,
1074                                                     NTDB_LOG_USE_ERROR,
1075                                                     "ntdb_transaction_commit:"
1076                                                     " no transaction");
1077         }
1078
1079         ntdb_trace(ntdb, "ntdb_transaction_commit");
1080
1081         if (ntdb->transaction->nesting != 0) {
1082                 ntdb->transaction->nesting--;
1083                 return ntdb->last_error = NTDB_SUCCESS;
1084         }
1085
1086         /* check for a null transaction */
1087         if (ntdb->transaction->blocks == NULL) {
1088                 _ntdb_transaction_cancel(ntdb);
1089                 return ntdb->last_error = NTDB_SUCCESS;
1090         }
1091
1092         if (!ntdb->transaction->prepared) {
1093                 ecode = _ntdb_transaction_prepare_commit(ntdb);
1094                 if (ecode != NTDB_SUCCESS) {
1095                         _ntdb_transaction_cancel(ntdb);
1096                         return ntdb->last_error = ecode;
1097                 }
1098         }
1099
1100         methods = ntdb->transaction->io_methods;
1101
1102         /* perform all the writes */
1103         for (i=0;i<ntdb->transaction->num_blocks;i++) {
1104                 ntdb_off_t offset;
1105                 ntdb_len_t length;
1106
1107                 if (ntdb->transaction->blocks[i] == NULL) {
1108                         continue;
1109                 }
1110
1111                 offset = i * PAGESIZE;
1112                 length = PAGESIZE;
1113                 if (i == ntdb->transaction->num_blocks-1) {
1114                         length = ntdb->transaction->last_block_size;
1115                 }
1116
1117                 ecode = methods->twrite(ntdb, offset,
1118                                         ntdb->transaction->blocks[i], length);
1119                 if (ecode != NTDB_SUCCESS) {
1120                         /* we've overwritten part of the data and
1121                            possibly expanded the file, so we need to
1122                            run the crash recovery code */
1123                         ntdb->io = methods;
1124                         ntdb_transaction_recover(ntdb);
1125
1126                         _ntdb_transaction_cancel(ntdb);
1127
1128                         return ntdb->last_error = ecode;
1129                 }
1130                 SAFE_FREE(ntdb->transaction->blocks[i]);
1131         }
1132
1133         SAFE_FREE(ntdb->transaction->blocks);
1134         ntdb->transaction->num_blocks = 0;
1135
1136         /* ensure the new data is on disk */
1137         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1138         if (ecode != NTDB_SUCCESS) {
1139                 return ntdb->last_error = ecode;
1140         }
1141
1142         /*
1143           TODO: maybe write to some dummy hdr field, or write to magic
1144           offset without mmap, before the last sync, instead of the
1145           utime() call
1146         */
1147
1148         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1149            don't change the mtime of the file, this means the file may
1150            not be backed up (as ntdb rounding to block sizes means that
1151            file size changes are quite rare too). The following forces
1152            mtime changes when a transaction completes */
1153 #if HAVE_UTIME
1154         utime(ntdb->name, NULL);
1155 #endif
1156
1157         /* use a transaction cancel to free memory and remove the
1158            transaction locks: it "restores" map_size, too. */
1159         ntdb->transaction->old_map_size = ntdb->file->map_size;
1160         _ntdb_transaction_cancel(ntdb);
1161
1162         return ntdb->last_error = NTDB_SUCCESS;
1163 }
1164
1165
1166 /*
1167   recover from an aborted transaction. Must be called with exclusive
1168   database write access already established (including the open
1169   lock to prevent new processes attaching)
1170 */
1171 enum NTDB_ERROR ntdb_transaction_recover(struct ntdb_context *ntdb)
1172 {
1173         ntdb_off_t recovery_head, recovery_eof;
1174         unsigned char *data, *p;
1175         struct ntdb_recovery_record rec;
1176         enum NTDB_ERROR ecode;
1177
1178         /* find the recovery area */
1179         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1180         if (NTDB_OFF_IS_ERR(recovery_head)) {
1181                 ecode = NTDB_OFF_TO_ERR(recovery_head);
1182                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1183                                   "ntdb_transaction_recover:"
1184                                   " failed to read recovery head");
1185         }
1186
1187         if (recovery_head == 0) {
1188                 /* we have never allocated a recovery record */
1189                 return NTDB_SUCCESS;
1190         }
1191
1192         /* read the recovery record */
1193         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1194         if (ecode != NTDB_SUCCESS) {
1195                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1196                                   "ntdb_transaction_recover:"
1197                                   " failed to read recovery record");
1198         }
1199
1200         if (rec.magic != NTDB_RECOVERY_MAGIC) {
1201                 /* there is no valid recovery data */
1202                 return NTDB_SUCCESS;
1203         }
1204
1205         if (ntdb->flags & NTDB_RDONLY) {
1206                 return ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
1207                                   "ntdb_transaction_recover:"
1208                                   " attempt to recover read only database");
1209         }
1210
1211         recovery_eof = rec.eof;
1212
1213         data = (unsigned char *)malloc(rec.len);
1214         if (data == NULL) {
1215                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
1216                                   "ntdb_transaction_recover:"
1217                                   " failed to allocate recovery data");
1218         }
1219
1220         /* read the full recovery data */
1221         ecode = ntdb->io->tread(ntdb, recovery_head + sizeof(rec), data,
1222                                     rec.len);
1223         if (ecode != NTDB_SUCCESS) {
1224                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1225                                   "ntdb_transaction_recover:"
1226                                   " failed to read recovery data");
1227         }
1228
1229         /* recover the file data */
1230         p = data;
1231         while (p+sizeof(ntdb_off_t)+sizeof(ntdb_len_t) < data + rec.len) {
1232                 ntdb_off_t ofs;
1233                 ntdb_len_t len;
1234                 ntdb_convert(ntdb, p, sizeof(ofs) + sizeof(len));
1235                 memcpy(&ofs, p, sizeof(ofs));
1236                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1237                 p += sizeof(ofs) + sizeof(len);
1238
1239                 ecode = ntdb->io->twrite(ntdb, ofs, p, len);
1240                 if (ecode != NTDB_SUCCESS) {
1241                         free(data);
1242                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1243                                           "ntdb_transaction_recover:"
1244                                           " failed to recover %zu bytes"
1245                                           " at offset %zu",
1246                                           (size_t)len, (size_t)ofs);
1247                 }
1248                 p += len;
1249         }
1250
1251         free(data);
1252
1253         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1254         if (ecode != NTDB_SUCCESS) {
1255                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1256                                   "ntdb_transaction_recover:"
1257                                   " failed to sync recovery");
1258         }
1259
1260         /* if the recovery area is after the recovered eof then remove it */
1261         if (recovery_eof <= recovery_head) {
1262                 ecode = ntdb_write_off(ntdb, offsetof(struct ntdb_header,
1263                                                     recovery),
1264                                       0);
1265                 if (ecode != NTDB_SUCCESS) {
1266                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1267                                           "ntdb_transaction_recover:"
1268                                           " failed to remove recovery head");
1269                 }
1270         }
1271
1272         /* remove the recovery magic */
1273         ecode = ntdb_write_off(ntdb,
1274                               recovery_head
1275                               + offsetof(struct ntdb_recovery_record, magic),
1276                               NTDB_RECOVERY_INVALID_MAGIC);
1277         if (ecode != NTDB_SUCCESS) {
1278                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1279                                   "ntdb_transaction_recover:"
1280                                   " failed to remove recovery magic");
1281         }
1282
1283         ecode = transaction_sync(ntdb, 0, recovery_eof);
1284         if (ecode != NTDB_SUCCESS) {
1285                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1286                                   "ntdb_transaction_recover:"
1287                                   " failed to sync2 recovery");
1288         }
1289
1290         ntdb_logerr(ntdb, NTDB_SUCCESS, NTDB_LOG_WARNING,
1291                    "ntdb_transaction_recover: recovered %zu byte database",
1292                    (size_t)recovery_eof);
1293
1294         /* all done */
1295         return NTDB_SUCCESS;
1296 }
1297
1298 ntdb_bool_err ntdb_needs_recovery(struct ntdb_context *ntdb)
1299 {
1300         ntdb_off_t recovery_head;
1301         struct ntdb_recovery_record rec;
1302         enum NTDB_ERROR ecode;
1303
1304         /* find the recovery area */
1305         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1306         if (NTDB_OFF_IS_ERR(recovery_head)) {
1307                 return recovery_head;
1308         }
1309
1310         if (recovery_head == 0) {
1311                 /* we have never allocated a recovery record */
1312                 return false;
1313         }
1314
1315         /* read the recovery record */
1316         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1317         if (ecode != NTDB_SUCCESS) {
1318                 return NTDB_ERR_TO_OFF(ecode);
1319         }
1320
1321         return (rec.magic == NTDB_RECOVERY_MAGIC);
1322 }