ntdb: make sure file is always a multiple of PAGESIZE (now NTDB_PGSIZE)
[kai/samba-autobuild/.git] / lib / ntdb / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the ntdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #include <assert.h>
29 #define SAFE_FREE(x) do { if ((x) != NULL) {free((void *)x); (x)=NULL;} } while(0)
30
31 /*
32   transaction design:
33
34   - only allow a single transaction at a time per database. This makes
35     using the transaction API simpler, as otherwise the caller would
36     have to cope with temporary failures in transactions that conflict
37     with other current transactions
38
39   - keep the transaction recovery information in the same file as the
40     database, using a special 'transaction recovery' record pointed at
41     by the header. This removes the need for extra journal files as
42     used by some other databases
43
44   - dynamically allocated the transaction recover record, re-using it
45     for subsequent transactions. If a larger record is needed then
46     ntdb_free() the old record to place it on the normal ntdb freelist
47     before allocating the new record
48
49   - during transactions, keep a linked list of writes all that have
50     been performed by intercepting all ntdb_write() calls. The hooked
51     transaction versions of ntdb_read() and ntdb_write() check this
52     linked list and try to use the elements of the list in preference
53     to the real database.
54
55   - don't allow any locks to be held when a transaction starts,
56     otherwise we can end up with deadlock (plus lack of lock nesting
57     in POSIX locks would mean the lock is lost)
58
59   - if the caller gains a lock during the transaction but doesn't
60     release it then fail the commit
61
62   - allow for nested calls to ntdb_transaction_start(), re-using the
63     existing transaction record. If the inner transaction is canceled
64     then a subsequent commit will fail
65
66   - keep a mirrored copy of the ntdb hash chain heads to allow for the
67     fast hash heads scan on traverse, updating the mirrored copy in
68     the transaction version of ntdb_write
69
70   - allow callers to mix transaction and non-transaction use of ntdb,
71     although once a transaction is started then an exclusive lock is
72     gained until the transaction is committed or canceled
73
74   - the commit stategy involves first saving away all modified data
75     into a linearised buffer in the transaction recovery area, then
76     marking the transaction recovery area with a magic value to
77     indicate a valid recovery record. In total 4 fsync/msync calls are
78     needed per commit to prevent race conditions. It might be possible
79     to reduce this to 3 or even 2 with some more work.
80
81   - check for a valid recovery record on open of the ntdb, while the
82     open lock is held. Automatically recover from the transaction
83     recovery area if needed, then continue with the open as
84     usual. This allows for smooth crash recovery with no administrator
85     intervention.
86
87   - if NTDB_NOSYNC is passed to flags in ntdb_open then transactions are
88     still available, but no transaction recovery area is used and no
89     fsync/msync calls are made.
90 */
91
92 /*
93   hold the context of any current transaction
94 */
95 struct ntdb_transaction {
96         /* the original io methods - used to do IOs to the real db */
97         const struct ntdb_methods *io_methods;
98
99         /* the list of transaction blocks. When a block is first
100            written to, it gets created in this list */
101         uint8_t **blocks;
102         size_t num_blocks;
103         size_t last_block_size; /* number of valid bytes in the last block */
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested ntdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         unsigned int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         ntdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         ntdb_len_t old_map_size;
121 };
122
123 /*
124   read while in a transaction. We need to check first if the data is in our list
125   of transaction elements, then if not do a real read
126 */
127 static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t off,
128                                        void *buf, ntdb_len_t len)
129 {
130         size_t blk;
131         enum NTDB_ERROR ecode;
132
133         /* break it down into block sized ops */
134         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
135                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
136                 ecode = transaction_read(ntdb, off, buf, len2);
137                 if (ecode != NTDB_SUCCESS) {
138                         return ecode;
139                 }
140                 len -= len2;
141                 off += len2;
142                 buf = (void *)(len2 + (char *)buf);
143         }
144
145         if (len == 0) {
146                 return NTDB_SUCCESS;
147         }
148
149         blk = off / NTDB_PGSIZE;
150
151         /* see if we have it in the block list */
152         if (ntdb->transaction->num_blocks <= blk ||
153             ntdb->transaction->blocks[blk] == NULL) {
154                 /* nope, do a real read */
155                 ecode = ntdb->transaction->io_methods->tread(ntdb, off, buf, len);
156                 if (ecode != NTDB_SUCCESS) {
157                         goto fail;
158                 }
159                 return 0;
160         }
161
162         /* it is in the block list. Now check for the last block */
163         if (blk == ntdb->transaction->num_blocks-1) {
164                 if (len > ntdb->transaction->last_block_size) {
165                         ecode = NTDB_ERR_IO;
166                         goto fail;
167                 }
168         }
169
170         /* now copy it out of this block */
171         memcpy(buf, ntdb->transaction->blocks[blk] + (off % NTDB_PGSIZE), len);
172         return NTDB_SUCCESS;
173
174 fail:
175         ntdb->transaction->transaction_error = 1;
176         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
177                           "transaction_read: failed at off=%zu len=%zu",
178                           (size_t)off, (size_t)len);
179 }
180
181
182 /*
183   write while in a transaction
184 */
185 static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t off,
186                                         const void *buf, ntdb_len_t len)
187 {
188         size_t blk;
189         enum NTDB_ERROR ecode;
190
191         /* Only a commit is allowed on a prepared transaction */
192         if (ntdb->transaction->prepared) {
193                 ecode = ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
194                                    "transaction_write: transaction already"
195                                    " prepared, write not allowed");
196                 goto fail;
197         }
198
199         /* break it up into block sized chunks */
200         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
201                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
202                 ecode = transaction_write(ntdb, off, buf, len2);
203                 if (ecode != NTDB_SUCCESS) {
204                         return ecode;
205                 }
206                 len -= len2;
207                 off += len2;
208                 if (buf != NULL) {
209                         buf = (const void *)(len2 + (const char *)buf);
210                 }
211         }
212
213         if (len == 0) {
214                 return NTDB_SUCCESS;
215         }
216
217         blk = off / NTDB_PGSIZE;
218         off = off % NTDB_PGSIZE;
219
220         if (ntdb->transaction->num_blocks <= blk) {
221                 uint8_t **new_blocks;
222                 /* expand the blocks array */
223                 if (ntdb->transaction->blocks == NULL) {
224                         new_blocks = (uint8_t **)malloc(
225                                 (blk+1)*sizeof(uint8_t *));
226                 } else {
227                         new_blocks = (uint8_t **)realloc(
228                                 ntdb->transaction->blocks,
229                                 (blk+1)*sizeof(uint8_t *));
230                 }
231                 if (new_blocks == NULL) {
232                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
233                                            "transaction_write:"
234                                            " failed to allocate");
235                         goto fail;
236                 }
237                 memset(&new_blocks[ntdb->transaction->num_blocks], 0,
238                        (1+(blk - ntdb->transaction->num_blocks))*sizeof(uint8_t *));
239                 ntdb->transaction->blocks = new_blocks;
240                 ntdb->transaction->num_blocks = blk+1;
241                 ntdb->transaction->last_block_size = 0;
242         }
243
244         /* allocate and fill a block? */
245         if (ntdb->transaction->blocks[blk] == NULL) {
246                 ntdb->transaction->blocks[blk] = (uint8_t *)calloc(NTDB_PGSIZE, 1);
247                 if (ntdb->transaction->blocks[blk] == NULL) {
248                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
249                                            "transaction_write:"
250                                            " failed to allocate");
251                         goto fail;
252                 }
253                 if (ntdb->transaction->old_map_size > blk * NTDB_PGSIZE) {
254                         ntdb_len_t len2 = NTDB_PGSIZE;
255                         if (len2 + (blk * NTDB_PGSIZE) > ntdb->transaction->old_map_size) {
256                                 len2 = ntdb->transaction->old_map_size - (blk * NTDB_PGSIZE);
257                         }
258                         ecode = ntdb->transaction->io_methods->tread(ntdb,
259                                         blk * NTDB_PGSIZE,
260                                         ntdb->transaction->blocks[blk],
261                                         len2);
262                         if (ecode != NTDB_SUCCESS) {
263                                 ecode = ntdb_logerr(ntdb, ecode,
264                                                    NTDB_LOG_ERROR,
265                                                    "transaction_write:"
266                                                    " failed to"
267                                                    " read old block: %s",
268                                                    strerror(errno));
269                                 SAFE_FREE(ntdb->transaction->blocks[blk]);
270                                 goto fail;
271                         }
272                         if (blk == ntdb->transaction->num_blocks-1) {
273                                 ntdb->transaction->last_block_size = len2;
274                         }
275                 }
276         }
277
278         /* overwrite part of an existing block */
279         if (buf == NULL) {
280                 memset(ntdb->transaction->blocks[blk] + off, 0, len);
281         } else {
282                 memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
283         }
284         if (blk == ntdb->transaction->num_blocks-1) {
285                 if (len + off > ntdb->transaction->last_block_size) {
286                         ntdb->transaction->last_block_size = len + off;
287                 }
288         }
289
290         return NTDB_SUCCESS;
291
292 fail:
293         ntdb->transaction->transaction_error = 1;
294         return ecode;
295 }
296
297
298 /*
299   write while in a transaction - this variant never expands the transaction blocks, it only
300   updates existing blocks. This means it cannot change the recovery size
301 */
302 static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off,
303                                        const void *buf, ntdb_len_t len)
304 {
305         size_t blk;
306
307         /* break it up into block sized chunks */
308         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
309                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
310                 transaction_write_existing(ntdb, off, buf, len2);
311                 len -= len2;
312                 off += len2;
313                 if (buf != NULL) {
314                         buf = (const void *)(len2 + (const char *)buf);
315                 }
316         }
317
318         if (len == 0) {
319                 return;
320         }
321
322         blk = off / NTDB_PGSIZE;
323         off = off % NTDB_PGSIZE;
324
325         if (ntdb->transaction->num_blocks <= blk ||
326             ntdb->transaction->blocks[blk] == NULL) {
327                 return;
328         }
329
330         if (blk == ntdb->transaction->num_blocks-1 &&
331             off + len > ntdb->transaction->last_block_size) {
332                 if (off >= ntdb->transaction->last_block_size) {
333                         return;
334                 }
335                 len = ntdb->transaction->last_block_size - off;
336         }
337
338         /* overwrite part of an existing block */
339         memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
340 }
341
342
343 /*
344   out of bounds check during a transaction
345 */
346 static enum NTDB_ERROR transaction_oob(struct ntdb_context *ntdb,
347                                       ntdb_off_t off, ntdb_len_t len, bool probe)
348 {
349         if ((off + len >= off && off + len <= ntdb->file->map_size) || probe) {
350                 return NTDB_SUCCESS;
351         }
352
353         ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
354                    "ntdb_oob len %lld beyond transaction size %lld",
355                    (long long)(off + len),
356                    (long long)ntdb->file->map_size);
357         return NTDB_ERR_IO;
358 }
359
360 /*
361   transaction version of ntdb_expand().
362 */
363 static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
364                                               ntdb_off_t addition)
365 {
366         enum NTDB_ERROR ecode;
367
368         assert((ntdb->file->map_size + addition) % NTDB_PGSIZE == 0);
369
370         /* add a write to the transaction elements, so subsequent
371            reads see the zero data */
372         ecode = transaction_write(ntdb, ntdb->file->map_size, NULL, addition);
373         if (ecode == NTDB_SUCCESS) {
374                 ntdb->file->map_size += addition;
375         }
376         return ecode;
377 }
378
379 static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
380                                 size_t len, bool write_mode)
381 {
382         size_t blk = off / NTDB_PGSIZE, end_blk;
383
384         /* This is wrong for zero-length blocks, but will fail gracefully */
385         end_blk = (off + len - 1) / NTDB_PGSIZE;
386
387         /* Can only do direct if in single block and we've already copied. */
388         if (write_mode) {
389                 ntdb->stats.transaction_write_direct++;
390                 if (blk != end_blk
391                     || blk >= ntdb->transaction->num_blocks
392                     || ntdb->transaction->blocks[blk] == NULL) {
393                         ntdb->stats.transaction_write_direct_fail++;
394                         return NULL;
395                 }
396                 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
397         }
398
399         ntdb->stats.transaction_read_direct++;
400         /* Single which we have copied? */
401         if (blk == end_blk
402             && blk < ntdb->transaction->num_blocks
403             && ntdb->transaction->blocks[blk])
404                 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
405
406         /* Otherwise must be all not copied. */
407         while (blk <= end_blk) {
408                 if (blk >= ntdb->transaction->num_blocks)
409                         break;
410                 if (ntdb->transaction->blocks[blk]) {
411                         ntdb->stats.transaction_read_direct_fail++;
412                         return NULL;
413                 }
414                 blk++;
415         }
416         return ntdb->transaction->io_methods->direct(ntdb, off, len, false);
417 }
418
419 static const struct ntdb_methods transaction_methods = {
420         transaction_read,
421         transaction_write,
422         transaction_oob,
423         transaction_expand_file,
424         transaction_direct,
425 };
426
427 /*
428   sync to disk
429 */
430 static enum NTDB_ERROR transaction_sync(struct ntdb_context *ntdb,
431                                        ntdb_off_t offset, ntdb_len_t length)
432 {
433         if (ntdb->flags & NTDB_NOSYNC) {
434                 return NTDB_SUCCESS;
435         }
436
437         if (fsync(ntdb->file->fd) != 0) {
438                 return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
439                                   "ntdb_transaction: fsync failed: %s",
440                                   strerror(errno));
441         }
442 #ifdef MS_SYNC
443         if (ntdb->file->map_ptr) {
444                 ntdb_off_t moffset = offset & ~(getpagesize()-1);
445                 if (msync(moffset + (char *)ntdb->file->map_ptr,
446                           length + (offset - moffset), MS_SYNC) != 0) {
447                         return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
448                                           "ntdb_transaction: msync failed: %s",
449                                           strerror(errno));
450                 }
451         }
452 #endif
453         return NTDB_SUCCESS;
454 }
455
456
457 static void _ntdb_transaction_cancel(struct ntdb_context *ntdb)
458 {
459         int i;
460         enum NTDB_ERROR ecode;
461
462         if (ntdb->transaction == NULL) {
463                 ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
464                            "ntdb_transaction_cancel: no transaction");
465                 return;
466         }
467
468         if (ntdb->transaction->nesting != 0) {
469                 ntdb->transaction->transaction_error = 1;
470                 ntdb->transaction->nesting--;
471                 return;
472         }
473
474         ntdb->file->map_size = ntdb->transaction->old_map_size;
475
476         /* free all the transaction blocks */
477         for (i=0;i<ntdb->transaction->num_blocks;i++) {
478                 if (ntdb->transaction->blocks[i] != NULL) {
479                         free(ntdb->transaction->blocks[i]);
480                 }
481         }
482         SAFE_FREE(ntdb->transaction->blocks);
483
484         if (ntdb->transaction->magic_offset) {
485                 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
486                 uint64_t invalid = NTDB_RECOVERY_INVALID_MAGIC;
487
488                 /* remove the recovery marker */
489                 ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
490                                         &invalid, sizeof(invalid));
491                 if (ecode == NTDB_SUCCESS)
492                         ecode = transaction_sync(ntdb,
493                                                  ntdb->transaction->magic_offset,
494                                                  sizeof(invalid));
495                 if (ecode != NTDB_SUCCESS) {
496                         ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
497                                    "ntdb_transaction_cancel: failed to remove"
498                                    " recovery magic");
499                 }
500         }
501
502         if (ntdb->file->allrecord_lock.count)
503                 ntdb_allrecord_unlock(ntdb, ntdb->file->allrecord_lock.ltype);
504
505         /* restore the normal io methods */
506         ntdb->io = ntdb->transaction->io_methods;
507
508         ntdb_transaction_unlock(ntdb, F_WRLCK);
509
510         if (ntdb_has_open_lock(ntdb))
511                 ntdb_unlock_open(ntdb, F_WRLCK);
512
513         SAFE_FREE(ntdb->transaction);
514 }
515
516 /*
517   start a ntdb transaction. No token is returned, as only a single
518   transaction is allowed to be pending per ntdb_context
519 */
520 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_start(struct ntdb_context *ntdb)
521 {
522         enum NTDB_ERROR ecode;
523
524         ntdb->stats.transactions++;
525         /* some sanity checks */
526         if (ntdb->flags & NTDB_INTERNAL) {
527                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
528                                    "ntdb_transaction_start:"
529                                    " cannot start a transaction on an"
530                                    " internal ntdb");
531         }
532
533         if (ntdb->flags & NTDB_RDONLY) {
534                 return ntdb_logerr(ntdb, NTDB_ERR_RDONLY, NTDB_LOG_USE_ERROR,
535                                    "ntdb_transaction_start:"
536                                    " cannot start a transaction on a"
537                                    " read-only ntdb");
538         }
539
540         /* cope with nested ntdb_transaction_start() calls */
541         if (ntdb->transaction != NULL) {
542                 if (!(ntdb->flags & NTDB_ALLOW_NESTING)) {
543                         return ntdb_logerr(ntdb, NTDB_ERR_IO,
544                                            NTDB_LOG_USE_ERROR,
545                                            "ntdb_transaction_start:"
546                                            " already inside transaction");
547                 }
548                 ntdb->transaction->nesting++;
549                 ntdb->stats.transaction_nest++;
550                 return 0;
551         }
552
553         if (ntdb_has_hash_locks(ntdb)) {
554                 /* the caller must not have any locks when starting a
555                    transaction as otherwise we'll be screwed by lack
556                    of nested locks in POSIX */
557                 return ntdb_logerr(ntdb, NTDB_ERR_LOCK,
558                                    NTDB_LOG_USE_ERROR,
559                                    "ntdb_transaction_start:"
560                                    " cannot start a transaction with locks"
561                                    " held");
562         }
563
564         ntdb->transaction = (struct ntdb_transaction *)
565                 calloc(sizeof(struct ntdb_transaction), 1);
566         if (ntdb->transaction == NULL) {
567                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
568                                    "ntdb_transaction_start:"
569                                    " cannot allocate");
570         }
571
572         /* get the transaction write lock. This is a blocking lock. As
573            discussed with Volker, there are a number of ways we could
574            make this async, which we will probably do in the future */
575         ecode = ntdb_transaction_lock(ntdb, F_WRLCK);
576         if (ecode != NTDB_SUCCESS) {
577                 SAFE_FREE(ntdb->transaction->blocks);
578                 SAFE_FREE(ntdb->transaction);
579                 return ecode;
580         }
581
582         /* get a read lock over entire file. This is upgraded to a write
583            lock during the commit */
584         ecode = ntdb_allrecord_lock(ntdb, F_RDLCK, NTDB_LOCK_WAIT, true);
585         if (ecode != NTDB_SUCCESS) {
586                 goto fail_allrecord_lock;
587         }
588
589         /* make sure we know about any file expansions already done by
590            anyone else */
591         ntdb->io->oob(ntdb, ntdb->file->map_size, 1, true);
592         ntdb->transaction->old_map_size = ntdb->file->map_size;
593
594         /* finally hook the io methods, replacing them with
595            transaction specific methods */
596         ntdb->transaction->io_methods = ntdb->io;
597         ntdb->io = &transaction_methods;
598         return NTDB_SUCCESS;
599
600 fail_allrecord_lock:
601         ntdb_transaction_unlock(ntdb, F_WRLCK);
602         SAFE_FREE(ntdb->transaction->blocks);
603         SAFE_FREE(ntdb->transaction);
604         return ecode;
605 }
606
607
608 /*
609   cancel the current transaction
610 */
611 _PUBLIC_ void ntdb_transaction_cancel(struct ntdb_context *ntdb)
612 {
613         ntdb->stats.transaction_cancel++;
614         _ntdb_transaction_cancel(ntdb);
615 }
616
617 /*
618   work out how much space the linearised recovery data will consume (worst case)
619 */
620 static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
621 {
622         ntdb_len_t recovery_size = 0;
623         int i;
624
625         recovery_size = 0;
626         for (i=0;i<ntdb->transaction->num_blocks;i++) {
627                 if (i * NTDB_PGSIZE >= ntdb->transaction->old_map_size) {
628                         break;
629                 }
630                 if (ntdb->transaction->blocks[i] == NULL) {
631                         continue;
632                 }
633                 recovery_size += 2*sizeof(ntdb_off_t);
634                 if (i == ntdb->transaction->num_blocks-1) {
635                         recovery_size += ntdb->transaction->last_block_size;
636                 } else {
637                         recovery_size += NTDB_PGSIZE;
638                 }
639         }
640
641         return recovery_size;
642 }
643
644 static enum NTDB_ERROR ntdb_recovery_area(struct ntdb_context *ntdb,
645                                         const struct ntdb_methods *methods,
646                                         ntdb_off_t *recovery_offset,
647                                         struct ntdb_recovery_record *rec)
648 {
649         enum NTDB_ERROR ecode;
650
651         *recovery_offset = ntdb_read_off(ntdb,
652                                         offsetof(struct ntdb_header, recovery));
653         if (NTDB_OFF_IS_ERR(*recovery_offset)) {
654                 return NTDB_OFF_TO_ERR(*recovery_offset);
655         }
656
657         if (*recovery_offset == 0) {
658                 rec->max_len = 0;
659                 return NTDB_SUCCESS;
660         }
661
662         ecode = methods->tread(ntdb, *recovery_offset, rec, sizeof(*rec));
663         if (ecode != NTDB_SUCCESS)
664                 return ecode;
665
666         ntdb_convert(ntdb, rec, sizeof(*rec));
667         /* ignore invalid recovery regions: can happen in crash */
668         if (rec->magic != NTDB_RECOVERY_MAGIC &&
669             rec->magic != NTDB_RECOVERY_INVALID_MAGIC) {
670                 *recovery_offset = 0;
671                 rec->max_len = 0;
672         }
673         return NTDB_SUCCESS;
674 }
675
676 static unsigned int same(const unsigned char *new,
677                          const unsigned char *old,
678                          unsigned int length)
679 {
680         unsigned int i;
681
682         for (i = 0; i < length; i++) {
683                 if (new[i] != old[i])
684                         break;
685         }
686         return i;
687 }
688
689 static unsigned int different(const unsigned char *new,
690                               const unsigned char *old,
691                               unsigned int length,
692                               unsigned int min_same,
693                               unsigned int *samelen)
694 {
695         unsigned int i;
696
697         *samelen = 0;
698         for (i = 0; i < length; i++) {
699                 if (new[i] == old[i]) {
700                         (*samelen)++;
701                 } else {
702                         if (*samelen >= min_same) {
703                                 return i - *samelen;
704                         }
705                         *samelen = 0;
706                 }
707         }
708
709         if (*samelen < min_same)
710                 *samelen = 0;
711         return length - *samelen;
712 }
713
714 /* Allocates recovery blob, without ntdb_recovery_record at head set up. */
715 static struct ntdb_recovery_record *alloc_recovery(struct ntdb_context *ntdb,
716                                                   ntdb_len_t *len)
717 {
718         struct ntdb_recovery_record *rec;
719         size_t i;
720         enum NTDB_ERROR ecode;
721         unsigned char *p;
722         const struct ntdb_methods *old_methods = ntdb->io;
723
724         rec = malloc(sizeof(*rec) + ntdb_recovery_size(ntdb));
725         if (!rec) {
726                 ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
727                            "transaction_setup_recovery:"
728                            " cannot allocate");
729                 return NTDB_ERR_PTR(NTDB_ERR_OOM);
730         }
731
732         /* We temporarily revert to the old I/O methods, so we can use
733          * ntdb_access_read */
734         ntdb->io = ntdb->transaction->io_methods;
735
736         /* build the recovery data into a single blob to allow us to do a single
737            large write, which should be more efficient */
738         p = (unsigned char *)(rec + 1);
739         for (i=0;i<ntdb->transaction->num_blocks;i++) {
740                 ntdb_off_t offset;
741                 ntdb_len_t length;
742                 unsigned int off;
743                 const unsigned char *buffer;
744
745                 if (ntdb->transaction->blocks[i] == NULL) {
746                         continue;
747                 }
748
749                 offset = i * NTDB_PGSIZE;
750                 length = NTDB_PGSIZE;
751                 if (i == ntdb->transaction->num_blocks-1) {
752                         length = ntdb->transaction->last_block_size;
753                 }
754
755                 if (offset >= ntdb->transaction->old_map_size) {
756                         continue;
757                 }
758
759                 if (offset + length > ntdb->file->map_size) {
760                         ecode = ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
761                                            "ntdb_transaction_setup_recovery:"
762                                            " transaction data over new region"
763                                            " boundary");
764                         goto fail;
765                 }
766                 if (offset + length > ntdb->transaction->old_map_size) {
767                         /* Short read at EOF. */
768                         length = ntdb->transaction->old_map_size - offset;
769                 }
770                 buffer = ntdb_access_read(ntdb, offset, length, false);
771                 if (NTDB_PTR_IS_ERR(buffer)) {
772                         ecode = NTDB_PTR_ERR(buffer);
773                         goto fail;
774                 }
775
776                 /* Skip over anything the same at the start. */
777                 off = same(ntdb->transaction->blocks[i], buffer, length);
778                 offset += off;
779
780                 while (off < length) {
781                         ntdb_len_t len1;
782                         unsigned int samelen;
783
784                         len1 = different(ntdb->transaction->blocks[i] + off,
785                                         buffer + off, length - off,
786                                         sizeof(offset) + sizeof(len1) + 1,
787                                         &samelen);
788
789                         memcpy(p, &offset, sizeof(offset));
790                         memcpy(p + sizeof(offset), &len1, sizeof(len1));
791                         ntdb_convert(ntdb, p, sizeof(offset) + sizeof(len1));
792                         p += sizeof(offset) + sizeof(len1);
793                         memcpy(p, buffer + off, len1);
794                         p += len1;
795                         off += len1 + samelen;
796                         offset += len1 + samelen;
797                 }
798                 ntdb_access_release(ntdb, buffer);
799         }
800
801         *len = p - (unsigned char *)(rec + 1);
802         ntdb->io = old_methods;
803         return rec;
804
805 fail:
806         free(rec);
807         ntdb->io = old_methods;
808         return NTDB_ERR_PTR(ecode);
809 }
810
811 static ntdb_off_t create_recovery_area(struct ntdb_context *ntdb,
812                                       ntdb_len_t rec_length,
813                                       struct ntdb_recovery_record *rec)
814 {
815         ntdb_off_t off, recovery_off;
816         ntdb_len_t addition;
817         enum NTDB_ERROR ecode;
818         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
819
820         /* round up to a multiple of page size. Overallocate, since each
821          * such allocation forces us to expand the file. */
822         rec->max_len = ntdb_expand_adjust(ntdb->file->map_size, rec_length);
823
824         /* Round up to a page. */
825         rec->max_len = ((sizeof(*rec) + rec->max_len + NTDB_PGSIZE-1)
826                         & ~(NTDB_PGSIZE-1))
827                 - sizeof(*rec);
828
829         off = ntdb->file->map_size;
830
831         /* Restore ->map_size before calling underlying expand_file.
832            Also so that we don't try to expand the file again in the
833            transaction commit, which would destroy the recovery
834            area */
835         addition = (ntdb->file->map_size - ntdb->transaction->old_map_size) +
836                 sizeof(*rec) + rec->max_len;
837         ntdb->file->map_size = ntdb->transaction->old_map_size;
838         ntdb->stats.transaction_expand_file++;
839         ecode = methods->expand_file(ntdb, addition);
840         if (ecode != NTDB_SUCCESS) {
841                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
842                            "ntdb_recovery_allocate:"
843                            " failed to create recovery area");
844                 return NTDB_ERR_TO_OFF(ecode);
845         }
846
847         /* we have to reset the old map size so that we don't try to
848            expand the file again in the transaction commit, which
849            would destroy the recovery area */
850         ntdb->transaction->old_map_size = ntdb->file->map_size;
851
852         /* write the recovery header offset and sync - we can sync without a race here
853            as the magic ptr in the recovery record has not been set */
854         recovery_off = off;
855         ntdb_convert(ntdb, &recovery_off, sizeof(recovery_off));
856         ecode = methods->twrite(ntdb, offsetof(struct ntdb_header, recovery),
857                                 &recovery_off, sizeof(ntdb_off_t));
858         if (ecode != NTDB_SUCCESS) {
859                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
860                            "ntdb_recovery_allocate:"
861                            " failed to write recovery head");
862                 return NTDB_ERR_TO_OFF(ecode);
863         }
864         transaction_write_existing(ntdb, offsetof(struct ntdb_header, recovery),
865                                    &recovery_off,
866                                    sizeof(ntdb_off_t));
867         return off;
868 }
869
870 /*
871   setup the recovery data that will be used on a crash during commit
872 */
873 static enum NTDB_ERROR transaction_setup_recovery(struct ntdb_context *ntdb)
874 {
875         ntdb_len_t recovery_size = 0;
876         ntdb_off_t recovery_off = 0;
877         ntdb_off_t old_map_size = ntdb->transaction->old_map_size;
878         struct ntdb_recovery_record *recovery;
879         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
880         uint64_t magic;
881         enum NTDB_ERROR ecode;
882
883         recovery = alloc_recovery(ntdb, &recovery_size);
884         if (NTDB_PTR_IS_ERR(recovery))
885                 return NTDB_PTR_ERR(recovery);
886
887         ecode = ntdb_recovery_area(ntdb, methods, &recovery_off, recovery);
888         if (ecode) {
889                 free(recovery);
890                 return ecode;
891         }
892
893         if (recovery->max_len < recovery_size) {
894                 /* Not large enough. Free up old recovery area. */
895                 if (recovery_off) {
896                         ntdb->stats.frees++;
897                         ecode = add_free_record(ntdb, recovery_off,
898                                                 sizeof(*recovery)
899                                                 + recovery->max_len,
900                                                 NTDB_LOCK_WAIT, true);
901                         free(recovery);
902                         if (ecode != NTDB_SUCCESS) {
903                                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
904                                                   "ntdb_recovery_allocate:"
905                                                   " failed to free previous"
906                                                   " recovery area");
907                         }
908
909                         /* Refresh recovery after add_free_record above. */
910                         recovery = alloc_recovery(ntdb, &recovery_size);
911                         if (NTDB_PTR_IS_ERR(recovery))
912                                 return NTDB_PTR_ERR(recovery);
913                 }
914
915                 recovery_off = create_recovery_area(ntdb, recovery_size,
916                                                     recovery);
917                 if (NTDB_OFF_IS_ERR(recovery_off)) {
918                         free(recovery);
919                         return NTDB_OFF_TO_ERR(recovery_off);
920                 }
921         }
922
923         /* Now we know size, convert rec header. */
924         recovery->magic = NTDB_RECOVERY_INVALID_MAGIC;
925         recovery->len = recovery_size;
926         recovery->eof = old_map_size;
927         ntdb_convert(ntdb, recovery, sizeof(*recovery));
928
929         /* write the recovery data to the recovery area */
930         ecode = methods->twrite(ntdb, recovery_off, recovery,
931                                 sizeof(*recovery) + recovery_size);
932         if (ecode != NTDB_SUCCESS) {
933                 free(recovery);
934                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
935                                   "ntdb_transaction_setup_recovery:"
936                                   " failed to write recovery data");
937         }
938         transaction_write_existing(ntdb, recovery_off, recovery, recovery_size);
939
940         free(recovery);
941
942         /* as we don't have ordered writes, we have to sync the recovery
943            data before we update the magic to indicate that the recovery
944            data is present */
945         ecode = transaction_sync(ntdb, recovery_off, recovery_size);
946         if (ecode != NTDB_SUCCESS)
947                 return ecode;
948
949         magic = NTDB_RECOVERY_MAGIC;
950         ntdb_convert(ntdb, &magic, sizeof(magic));
951
952         ntdb->transaction->magic_offset
953                 = recovery_off + offsetof(struct ntdb_recovery_record, magic);
954
955         ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
956                                 &magic, sizeof(magic));
957         if (ecode != NTDB_SUCCESS) {
958                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
959                                   "ntdb_transaction_setup_recovery:"
960                                   " failed to write recovery magic");
961         }
962         transaction_write_existing(ntdb, ntdb->transaction->magic_offset,
963                                    &magic, sizeof(magic));
964
965         /* ensure the recovery magic marker is on disk */
966         return transaction_sync(ntdb, ntdb->transaction->magic_offset,
967                                 sizeof(magic));
968 }
969
970 static enum NTDB_ERROR _ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
971 {
972         const struct ntdb_methods *methods;
973         enum NTDB_ERROR ecode;
974
975         if (ntdb->transaction == NULL) {
976                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
977                                   "ntdb_transaction_prepare_commit:"
978                                   " no transaction");
979         }
980
981         if (ntdb->transaction->prepared) {
982                 _ntdb_transaction_cancel(ntdb);
983                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
984                                   "ntdb_transaction_prepare_commit:"
985                                   " transaction already prepared");
986         }
987
988         if (ntdb->transaction->transaction_error) {
989                 _ntdb_transaction_cancel(ntdb);
990                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
991                                   "ntdb_transaction_prepare_commit:"
992                                   " transaction error pending");
993         }
994
995
996         if (ntdb->transaction->nesting != 0) {
997                 return NTDB_SUCCESS;
998         }
999
1000         /* check for a null transaction */
1001         if (ntdb->transaction->blocks == NULL) {
1002                 return NTDB_SUCCESS;
1003         }
1004
1005         methods = ntdb->transaction->io_methods;
1006
1007         /* upgrade the main transaction lock region to a write lock */
1008         ecode = ntdb_allrecord_upgrade(ntdb, NTDB_HASH_LOCK_START);
1009         if (ecode != NTDB_SUCCESS) {
1010                 return ecode;
1011         }
1012
1013         /* get the open lock - this prevents new users attaching to the database
1014            during the commit */
1015         ecode = ntdb_lock_open(ntdb, F_WRLCK, NTDB_LOCK_WAIT|NTDB_LOCK_NOCHECK);
1016         if (ecode != NTDB_SUCCESS) {
1017                 return ecode;
1018         }
1019
1020         /* Since we have whole db locked, we don't need the expansion lock. */
1021         if (!(ntdb->flags & NTDB_NOSYNC)) {
1022                 /* Sets up ntdb->transaction->recovery and
1023                  * ntdb->transaction->magic_offset. */
1024                 ecode = transaction_setup_recovery(ntdb);
1025                 if (ecode != NTDB_SUCCESS) {
1026                         return ecode;
1027                 }
1028         }
1029
1030         ntdb->transaction->prepared = true;
1031
1032         /* expand the file to the new size if needed */
1033         if (ntdb->file->map_size != ntdb->transaction->old_map_size) {
1034                 ntdb_len_t add;
1035
1036                 add = ntdb->file->map_size - ntdb->transaction->old_map_size;
1037                 /* Restore original map size for ntdb_expand_file */
1038                 ntdb->file->map_size = ntdb->transaction->old_map_size;
1039                 ecode = methods->expand_file(ntdb, add);
1040                 if (ecode != NTDB_SUCCESS) {
1041                         return ecode;
1042                 }
1043         }
1044
1045         /* Keep the open lock until the actual commit */
1046         return NTDB_SUCCESS;
1047 }
1048
1049 /*
1050    prepare to commit the current transaction
1051 */
1052 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
1053 {
1054         return _ntdb_transaction_prepare_commit(ntdb);
1055 }
1056
1057 /*
1058   commit the current transaction
1059 */
1060 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_commit(struct ntdb_context *ntdb)
1061 {
1062         const struct ntdb_methods *methods;
1063         int i;
1064         enum NTDB_ERROR ecode;
1065
1066         if (ntdb->transaction == NULL) {
1067                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
1068                                    "ntdb_transaction_commit:"
1069                                    " no transaction");
1070         }
1071
1072         ntdb_trace(ntdb, "ntdb_transaction_commit");
1073
1074         if (ntdb->transaction->nesting != 0) {
1075                 ntdb->transaction->nesting--;
1076                 return NTDB_SUCCESS;
1077         }
1078
1079         /* check for a null transaction */
1080         if (ntdb->transaction->blocks == NULL) {
1081                 _ntdb_transaction_cancel(ntdb);
1082                 return NTDB_SUCCESS;
1083         }
1084
1085         if (!ntdb->transaction->prepared) {
1086                 ecode = _ntdb_transaction_prepare_commit(ntdb);
1087                 if (ecode != NTDB_SUCCESS) {
1088                         _ntdb_transaction_cancel(ntdb);
1089                         return ecode;
1090                 }
1091         }
1092
1093         methods = ntdb->transaction->io_methods;
1094
1095         /* perform all the writes */
1096         for (i=0;i<ntdb->transaction->num_blocks;i++) {
1097                 ntdb_off_t offset;
1098                 ntdb_len_t length;
1099
1100                 if (ntdb->transaction->blocks[i] == NULL) {
1101                         continue;
1102                 }
1103
1104                 offset = i * NTDB_PGSIZE;
1105                 length = NTDB_PGSIZE;
1106                 if (i == ntdb->transaction->num_blocks-1) {
1107                         length = ntdb->transaction->last_block_size;
1108                 }
1109
1110                 ecode = methods->twrite(ntdb, offset,
1111                                         ntdb->transaction->blocks[i], length);
1112                 if (ecode != NTDB_SUCCESS) {
1113                         /* we've overwritten part of the data and
1114                            possibly expanded the file, so we need to
1115                            run the crash recovery code */
1116                         ntdb->io = methods;
1117                         ntdb_transaction_recover(ntdb);
1118
1119                         _ntdb_transaction_cancel(ntdb);
1120
1121                         return ecode;
1122                 }
1123                 SAFE_FREE(ntdb->transaction->blocks[i]);
1124         }
1125
1126         SAFE_FREE(ntdb->transaction->blocks);
1127         ntdb->transaction->num_blocks = 0;
1128
1129         /* ensure the new data is on disk */
1130         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1131         if (ecode != NTDB_SUCCESS) {
1132                 return ecode;
1133         }
1134
1135         /*
1136           TODO: maybe write to some dummy hdr field, or write to magic
1137           offset without mmap, before the last sync, instead of the
1138           utime() call
1139         */
1140
1141         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1142            don't change the mtime of the file, this means the file may
1143            not be backed up (as ntdb rounding to block sizes means that
1144            file size changes are quite rare too). The following forces
1145            mtime changes when a transaction completes */
1146 #if HAVE_UTIME
1147         utime(ntdb->name, NULL);
1148 #endif
1149
1150         /* use a transaction cancel to free memory and remove the
1151            transaction locks: it "restores" map_size, too. */
1152         ntdb->transaction->old_map_size = ntdb->file->map_size;
1153         _ntdb_transaction_cancel(ntdb);
1154
1155         return NTDB_SUCCESS;
1156 }
1157
1158
1159 /*
1160   recover from an aborted transaction. Must be called with exclusive
1161   database write access already established (including the open
1162   lock to prevent new processes attaching)
1163 */
1164 enum NTDB_ERROR ntdb_transaction_recover(struct ntdb_context *ntdb)
1165 {
1166         ntdb_off_t recovery_head, recovery_eof;
1167         unsigned char *data, *p;
1168         struct ntdb_recovery_record rec;
1169         enum NTDB_ERROR ecode;
1170
1171         /* find the recovery area */
1172         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1173         if (NTDB_OFF_IS_ERR(recovery_head)) {
1174                 ecode = NTDB_OFF_TO_ERR(recovery_head);
1175                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1176                                   "ntdb_transaction_recover:"
1177                                   " failed to read recovery head");
1178         }
1179
1180         if (recovery_head == 0) {
1181                 /* we have never allocated a recovery record */
1182                 return NTDB_SUCCESS;
1183         }
1184
1185         /* read the recovery record */
1186         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1187         if (ecode != NTDB_SUCCESS) {
1188                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1189                                   "ntdb_transaction_recover:"
1190                                   " failed to read recovery record");
1191         }
1192
1193         if (rec.magic != NTDB_RECOVERY_MAGIC) {
1194                 /* there is no valid recovery data */
1195                 return NTDB_SUCCESS;
1196         }
1197
1198         if (ntdb->flags & NTDB_RDONLY) {
1199                 return ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
1200                                   "ntdb_transaction_recover:"
1201                                   " attempt to recover read only database");
1202         }
1203
1204         recovery_eof = rec.eof;
1205
1206         data = (unsigned char *)malloc(rec.len);
1207         if (data == NULL) {
1208                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
1209                                   "ntdb_transaction_recover:"
1210                                   " failed to allocate recovery data");
1211         }
1212
1213         /* read the full recovery data */
1214         ecode = ntdb->io->tread(ntdb, recovery_head + sizeof(rec), data,
1215                                     rec.len);
1216         if (ecode != NTDB_SUCCESS) {
1217                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1218                                   "ntdb_transaction_recover:"
1219                                   " failed to read recovery data");
1220         }
1221
1222         /* recover the file data */
1223         p = data;
1224         while (p+sizeof(ntdb_off_t)+sizeof(ntdb_len_t) < data + rec.len) {
1225                 ntdb_off_t ofs;
1226                 ntdb_len_t len;
1227                 ntdb_convert(ntdb, p, sizeof(ofs) + sizeof(len));
1228                 memcpy(&ofs, p, sizeof(ofs));
1229                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1230                 p += sizeof(ofs) + sizeof(len);
1231
1232                 ecode = ntdb->io->twrite(ntdb, ofs, p, len);
1233                 if (ecode != NTDB_SUCCESS) {
1234                         free(data);
1235                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1236                                           "ntdb_transaction_recover:"
1237                                           " failed to recover %zu bytes"
1238                                           " at offset %zu",
1239                                           (size_t)len, (size_t)ofs);
1240                 }
1241                 p += len;
1242         }
1243
1244         free(data);
1245
1246         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1247         if (ecode != NTDB_SUCCESS) {
1248                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1249                                   "ntdb_transaction_recover:"
1250                                   " failed to sync recovery");
1251         }
1252
1253         /* if the recovery area is after the recovered eof then remove it */
1254         if (recovery_eof <= recovery_head) {
1255                 ecode = ntdb_write_off(ntdb, offsetof(struct ntdb_header,
1256                                                     recovery),
1257                                       0);
1258                 if (ecode != NTDB_SUCCESS) {
1259                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1260                                           "ntdb_transaction_recover:"
1261                                           " failed to remove recovery head");
1262                 }
1263         }
1264
1265         /* remove the recovery magic */
1266         ecode = ntdb_write_off(ntdb,
1267                               recovery_head
1268                               + offsetof(struct ntdb_recovery_record, magic),
1269                               NTDB_RECOVERY_INVALID_MAGIC);
1270         if (ecode != NTDB_SUCCESS) {
1271                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1272                                   "ntdb_transaction_recover:"
1273                                   " failed to remove recovery magic");
1274         }
1275
1276         ecode = transaction_sync(ntdb, 0, recovery_eof);
1277         if (ecode != NTDB_SUCCESS) {
1278                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1279                                   "ntdb_transaction_recover:"
1280                                   " failed to sync2 recovery");
1281         }
1282
1283         ntdb_logerr(ntdb, NTDB_SUCCESS, NTDB_LOG_WARNING,
1284                    "ntdb_transaction_recover: recovered %zu byte database",
1285                    (size_t)recovery_eof);
1286
1287         /* all done */
1288         return NTDB_SUCCESS;
1289 }
1290
1291 ntdb_bool_err ntdb_needs_recovery(struct ntdb_context *ntdb)
1292 {
1293         ntdb_off_t recovery_head;
1294         struct ntdb_recovery_record rec;
1295         enum NTDB_ERROR ecode;
1296
1297         /* find the recovery area */
1298         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1299         if (NTDB_OFF_IS_ERR(recovery_head)) {
1300                 return recovery_head;
1301         }
1302
1303         if (recovery_head == 0) {
1304                 /* we have never allocated a recovery record */
1305                 return false;
1306         }
1307
1308         /* read the recovery record */
1309         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1310         if (ecode != NTDB_SUCCESS) {
1311                 return NTDB_ERR_TO_OFF(ecode);
1312         }
1313
1314         return (rec.magic == NTDB_RECOVERY_MAGIC);
1315 }