a265252174b155f7123628f209b53d94f56a7494
[kai/samba-autobuild/.git] / lib / ntdb / transaction.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7    Copyright (C) Rusty Russell                2010
8
9      ** NOTE! The following LGPL license applies to the ntdb
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "private.h"
28 #include <assert.h>
29 #define SAFE_FREE(ntdb, x) do { if ((x) != NULL) {ntdb->free_fn((void *)x, ntdb->alloc_data); (x)=NULL;} } while(0)
30
31 /*
32   transaction design:
33
34   - only allow a single transaction at a time per database. This makes
35     using the transaction API simpler, as otherwise the caller would
36     have to cope with temporary failures in transactions that conflict
37     with other current transactions
38
39   - keep the transaction recovery information in the same file as the
40     database, using a special 'transaction recovery' record pointed at
41     by the header. This removes the need for extra journal files as
42     used by some other databases
43
44   - dynamically allocated the transaction recover record, re-using it
45     for subsequent transactions. If a larger record is needed then
46     ntdb_free() the old record to place it on the normal ntdb freelist
47     before allocating the new record
48
49   - during transactions, keep a linked list of writes all that have
50     been performed by intercepting all ntdb_write() calls. The hooked
51     transaction versions of ntdb_read() and ntdb_write() check this
52     linked list and try to use the elements of the list in preference
53     to the real database.
54
55   - don't allow any locks to be held when a transaction starts,
56     otherwise we can end up with deadlock (plus lack of lock nesting
57     in POSIX locks would mean the lock is lost)
58
59   - if the caller gains a lock during the transaction but doesn't
60     release it then fail the commit
61
62   - allow for nested calls to ntdb_transaction_start(), re-using the
63     existing transaction record. If the inner transaction is canceled
64     then a subsequent commit will fail
65
66   - keep a mirrored copy of the ntdb hash chain heads to allow for the
67     fast hash heads scan on traverse, updating the mirrored copy in
68     the transaction version of ntdb_write
69
70   - allow callers to mix transaction and non-transaction use of ntdb,
71     although once a transaction is started then an exclusive lock is
72     gained until the transaction is committed or canceled
73
74   - the commit stategy involves first saving away all modified data
75     into a linearised buffer in the transaction recovery area, then
76     marking the transaction recovery area with a magic value to
77     indicate a valid recovery record. In total 4 fsync/msync calls are
78     needed per commit to prevent race conditions. It might be possible
79     to reduce this to 3 or even 2 with some more work.
80
81   - check for a valid recovery record on open of the ntdb, while the
82     open lock is held. Automatically recover from the transaction
83     recovery area if needed, then continue with the open as
84     usual. This allows for smooth crash recovery with no administrator
85     intervention.
86
87   - if NTDB_NOSYNC is passed to flags in ntdb_open then transactions are
88     still available, but fsync/msync calls are made.  This means we
89     still are safe against unexpected death during transaction commit,
90     but not against machine reboots.
91 */
92
93 /*
94   hold the context of any current transaction
95 */
96 struct ntdb_transaction {
97         /* the original io methods - used to do IOs to the real db */
98         const struct ntdb_methods *io_methods;
99
100         /* the list of transaction blocks. When a block is first
101            written to, it gets created in this list */
102         uint8_t **blocks;
103         size_t num_blocks;
104
105         /* non-zero when an internal transaction error has
106            occurred. All write operations will then fail until the
107            transaction is ended */
108         int transaction_error;
109
110         /* when inside a transaction we need to keep track of any
111            nested ntdb_transaction_start() calls, as these are allowed,
112            but don't create a new transaction */
113         unsigned int nesting;
114
115         /* set when a prepare has already occurred */
116         bool prepared;
117         ntdb_off_t magic_offset;
118
119         /* old file size before transaction */
120         ntdb_len_t old_map_size;
121 };
122
123 /*
124   read while in a transaction. We need to check first if the data is in our list
125   of transaction elements, then if not do a real read
126 */
127 static enum NTDB_ERROR transaction_read(struct ntdb_context *ntdb, ntdb_off_t off,
128                                        void *buf, ntdb_len_t len)
129 {
130         size_t blk;
131         enum NTDB_ERROR ecode;
132
133         /* break it down into block sized ops */
134         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
135                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
136                 ecode = transaction_read(ntdb, off, buf, len2);
137                 if (ecode != NTDB_SUCCESS) {
138                         return ecode;
139                 }
140                 len -= len2;
141                 off += len2;
142                 buf = (void *)(len2 + (char *)buf);
143         }
144
145         if (len == 0) {
146                 return NTDB_SUCCESS;
147         }
148
149         blk = off / NTDB_PGSIZE;
150
151         /* see if we have it in the block list */
152         if (ntdb->transaction->num_blocks <= blk ||
153             ntdb->transaction->blocks[blk] == NULL) {
154                 /* nope, do a real read */
155                 ecode = ntdb->transaction->io_methods->tread(ntdb, off, buf, len);
156                 if (ecode != NTDB_SUCCESS) {
157                         goto fail;
158                 }
159                 return 0;
160         }
161
162         /* now copy it out of this block */
163         memcpy(buf, ntdb->transaction->blocks[blk] + (off % NTDB_PGSIZE), len);
164         return NTDB_SUCCESS;
165
166 fail:
167         ntdb->transaction->transaction_error = 1;
168         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
169                           "transaction_read: failed at off=%zu len=%zu",
170                           (size_t)off, (size_t)len);
171 }
172
173
174 /*
175   write while in a transaction
176 */
177 static enum NTDB_ERROR transaction_write(struct ntdb_context *ntdb, ntdb_off_t off,
178                                         const void *buf, ntdb_len_t len)
179 {
180         size_t blk;
181         enum NTDB_ERROR ecode;
182
183         /* Only a commit is allowed on a prepared transaction */
184         if (ntdb->transaction->prepared) {
185                 ecode = ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
186                                    "transaction_write: transaction already"
187                                    " prepared, write not allowed");
188                 goto fail;
189         }
190
191         /* break it up into block sized chunks */
192         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
193                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
194                 ecode = transaction_write(ntdb, off, buf, len2);
195                 if (ecode != NTDB_SUCCESS) {
196                         return ecode;
197                 }
198                 len -= len2;
199                 off += len2;
200                 if (buf != NULL) {
201                         buf = (const void *)(len2 + (const char *)buf);
202                 }
203         }
204
205         if (len == 0) {
206                 return NTDB_SUCCESS;
207         }
208
209         blk = off / NTDB_PGSIZE;
210         off = off % NTDB_PGSIZE;
211
212         if (ntdb->transaction->num_blocks <= blk) {
213                 uint8_t **new_blocks;
214                 /* expand the blocks array */
215                 if (ntdb->transaction->blocks == NULL) {
216                         new_blocks = (uint8_t **)ntdb->alloc_fn(ntdb,
217                                     (blk+1)*sizeof(uint8_t *), ntdb->alloc_data);
218                 } else {
219                         new_blocks = (uint8_t **)ntdb->expand_fn(
220                                 ntdb->transaction->blocks,
221                                 (blk+1)*sizeof(uint8_t *), ntdb->alloc_data);
222                 }
223                 if (new_blocks == NULL) {
224                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
225                                            "transaction_write:"
226                                            " failed to allocate");
227                         goto fail;
228                 }
229                 memset(&new_blocks[ntdb->transaction->num_blocks], 0,
230                        (1+(blk - ntdb->transaction->num_blocks))*sizeof(uint8_t *));
231                 ntdb->transaction->blocks = new_blocks;
232                 ntdb->transaction->num_blocks = blk+1;
233         }
234
235         /* allocate and fill a block? */
236         if (ntdb->transaction->blocks[blk] == NULL) {
237                 ntdb->transaction->blocks[blk] = (uint8_t *)
238                         ntdb->alloc_fn(ntdb->transaction->blocks, NTDB_PGSIZE,
239                                    ntdb->alloc_data);
240                 if (ntdb->transaction->blocks[blk] == NULL) {
241                         ecode = ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
242                                            "transaction_write:"
243                                            " failed to allocate");
244                         goto fail;
245                 }
246                 memset(ntdb->transaction->blocks[blk], 0, NTDB_PGSIZE);
247                 if (ntdb->transaction->old_map_size > blk * NTDB_PGSIZE) {
248                         ntdb_len_t len2 = NTDB_PGSIZE;
249                         if (len2 + (blk * NTDB_PGSIZE) > ntdb->transaction->old_map_size) {
250                                 len2 = ntdb->transaction->old_map_size - (blk * NTDB_PGSIZE);
251                         }
252                         ecode = ntdb->transaction->io_methods->tread(ntdb,
253                                         blk * NTDB_PGSIZE,
254                                         ntdb->transaction->blocks[blk],
255                                         len2);
256                         if (ecode != NTDB_SUCCESS) {
257                                 ecode = ntdb_logerr(ntdb, ecode,
258                                                    NTDB_LOG_ERROR,
259                                                    "transaction_write:"
260                                                    " failed to"
261                                                    " read old block: %s",
262                                                    strerror(errno));
263                                 SAFE_FREE(ntdb, ntdb->transaction->blocks[blk]);
264                                 goto fail;
265                         }
266                 }
267         }
268
269         /* overwrite part of an existing block */
270         if (buf == NULL) {
271                 memset(ntdb->transaction->blocks[blk] + off, 0, len);
272         } else {
273                 memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
274         }
275         return NTDB_SUCCESS;
276
277 fail:
278         ntdb->transaction->transaction_error = 1;
279         return ecode;
280 }
281
282
283 /*
284   write while in a transaction - this variant never expands the transaction blocks, it only
285   updates existing blocks. This means it cannot change the recovery size
286 */
287 static void transaction_write_existing(struct ntdb_context *ntdb, ntdb_off_t off,
288                                        const void *buf, ntdb_len_t len)
289 {
290         size_t blk;
291
292         /* break it up into block sized chunks */
293         while (len + (off % NTDB_PGSIZE) > NTDB_PGSIZE) {
294                 ntdb_len_t len2 = NTDB_PGSIZE - (off % NTDB_PGSIZE);
295                 transaction_write_existing(ntdb, off, buf, len2);
296                 len -= len2;
297                 off += len2;
298                 if (buf != NULL) {
299                         buf = (const void *)(len2 + (const char *)buf);
300                 }
301         }
302
303         if (len == 0) {
304                 return;
305         }
306
307         blk = off / NTDB_PGSIZE;
308         off = off % NTDB_PGSIZE;
309
310         if (ntdb->transaction->num_blocks <= blk ||
311             ntdb->transaction->blocks[blk] == NULL) {
312                 return;
313         }
314
315         /* overwrite part of an existing block */
316         memcpy(ntdb->transaction->blocks[blk] + off, buf, len);
317 }
318
319
320 /*
321   out of bounds check during a transaction
322 */
323 static enum NTDB_ERROR transaction_oob(struct ntdb_context *ntdb,
324                                       ntdb_off_t off, ntdb_len_t len, bool probe)
325 {
326         if ((off + len >= off && off + len <= ntdb->file->map_size) || probe) {
327                 return NTDB_SUCCESS;
328         }
329
330         ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
331                    "ntdb_oob len %lld beyond transaction size %lld",
332                    (long long)(off + len),
333                    (long long)ntdb->file->map_size);
334         return NTDB_ERR_IO;
335 }
336
337 /*
338   transaction version of ntdb_expand().
339 */
340 static enum NTDB_ERROR transaction_expand_file(struct ntdb_context *ntdb,
341                                               ntdb_off_t addition)
342 {
343         enum NTDB_ERROR ecode;
344
345         assert((ntdb->file->map_size + addition) % NTDB_PGSIZE == 0);
346
347         /* add a write to the transaction elements, so subsequent
348            reads see the zero data */
349         ecode = transaction_write(ntdb, ntdb->file->map_size, NULL, addition);
350         if (ecode == NTDB_SUCCESS) {
351                 ntdb->file->map_size += addition;
352         }
353         return ecode;
354 }
355
356 static void *transaction_direct(struct ntdb_context *ntdb, ntdb_off_t off,
357                                 size_t len, bool write_mode)
358 {
359         size_t blk = off / NTDB_PGSIZE, end_blk;
360
361         /* This is wrong for zero-length blocks, but will fail gracefully */
362         end_blk = (off + len - 1) / NTDB_PGSIZE;
363
364         /* Can only do direct if in single block and we've already copied. */
365         if (write_mode) {
366                 ntdb->stats.transaction_write_direct++;
367                 if (blk != end_blk
368                     || blk >= ntdb->transaction->num_blocks
369                     || ntdb->transaction->blocks[blk] == NULL) {
370                         ntdb->stats.transaction_write_direct_fail++;
371                         return NULL;
372                 }
373                 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
374         }
375
376         ntdb->stats.transaction_read_direct++;
377         /* Single which we have copied? */
378         if (blk == end_blk
379             && blk < ntdb->transaction->num_blocks
380             && ntdb->transaction->blocks[blk])
381                 return ntdb->transaction->blocks[blk] + off % NTDB_PGSIZE;
382
383         /* Otherwise must be all not copied. */
384         while (blk <= end_blk) {
385                 if (blk >= ntdb->transaction->num_blocks)
386                         break;
387                 if (ntdb->transaction->blocks[blk]) {
388                         ntdb->stats.transaction_read_direct_fail++;
389                         return NULL;
390                 }
391                 blk++;
392         }
393         return ntdb->transaction->io_methods->direct(ntdb, off, len, false);
394 }
395
396 static const struct ntdb_methods transaction_methods = {
397         transaction_read,
398         transaction_write,
399         transaction_oob,
400         transaction_expand_file,
401         transaction_direct,
402 };
403
404 /*
405   sync to disk
406 */
407 static enum NTDB_ERROR transaction_sync(struct ntdb_context *ntdb,
408                                        ntdb_off_t offset, ntdb_len_t length)
409 {
410         if (ntdb->flags & NTDB_NOSYNC) {
411                 return NTDB_SUCCESS;
412         }
413
414         if (fsync(ntdb->file->fd) != 0) {
415                 return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
416                                   "ntdb_transaction: fsync failed: %s",
417                                   strerror(errno));
418         }
419 #ifdef MS_SYNC
420         if (ntdb->file->map_ptr) {
421                 ntdb_off_t moffset = offset & ~(getpagesize()-1);
422                 if (msync(moffset + (char *)ntdb->file->map_ptr,
423                           length + (offset - moffset), MS_SYNC) != 0) {
424                         return ntdb_logerr(ntdb, NTDB_ERR_IO, NTDB_LOG_ERROR,
425                                           "ntdb_transaction: msync failed: %s",
426                                           strerror(errno));
427                 }
428         }
429 #endif
430         return NTDB_SUCCESS;
431 }
432
433
434 static void _ntdb_transaction_cancel(struct ntdb_context *ntdb)
435 {
436         int i;
437         enum NTDB_ERROR ecode;
438
439         if (ntdb->transaction == NULL) {
440                 ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
441                            "ntdb_transaction_cancel: no transaction");
442                 return;
443         }
444
445         if (ntdb->transaction->nesting != 0) {
446                 ntdb->transaction->transaction_error = 1;
447                 ntdb->transaction->nesting--;
448                 return;
449         }
450
451         ntdb->file->map_size = ntdb->transaction->old_map_size;
452
453         /* free all the transaction blocks */
454         for (i=0;i<ntdb->transaction->num_blocks;i++) {
455                 if (ntdb->transaction->blocks[i] != NULL) {
456                         ntdb->free_fn(ntdb->transaction->blocks[i],
457                                       ntdb->alloc_data);
458                 }
459         }
460         SAFE_FREE(ntdb, ntdb->transaction->blocks);
461
462         if (ntdb->transaction->magic_offset) {
463                 const struct ntdb_methods *methods = ntdb->transaction->io_methods;
464                 uint64_t invalid = NTDB_RECOVERY_INVALID_MAGIC;
465
466                 /* remove the recovery marker */
467                 ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
468                                         &invalid, sizeof(invalid));
469                 if (ecode == NTDB_SUCCESS)
470                         ecode = transaction_sync(ntdb,
471                                                  ntdb->transaction->magic_offset,
472                                                  sizeof(invalid));
473                 if (ecode != NTDB_SUCCESS) {
474                         ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
475                                    "ntdb_transaction_cancel: failed to remove"
476                                    " recovery magic");
477                 }
478         }
479
480         if (ntdb->file->allrecord_lock.count)
481                 ntdb_allrecord_unlock(ntdb, ntdb->file->allrecord_lock.ltype);
482
483         /* restore the normal io methods */
484         ntdb->io = ntdb->transaction->io_methods;
485
486         ntdb_transaction_unlock(ntdb, F_WRLCK);
487
488         if (ntdb_has_open_lock(ntdb))
489                 ntdb_unlock_open(ntdb, F_WRLCK);
490
491         SAFE_FREE(ntdb, ntdb->transaction);
492 }
493
494 /*
495   start a ntdb transaction. No token is returned, as only a single
496   transaction is allowed to be pending per ntdb_context
497 */
498 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_start(struct ntdb_context *ntdb)
499 {
500         enum NTDB_ERROR ecode;
501
502         ntdb->stats.transactions++;
503         /* some sanity checks */
504         if (ntdb->flags & NTDB_INTERNAL) {
505                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
506                                    "ntdb_transaction_start:"
507                                    " cannot start a transaction on an"
508                                    " internal ntdb");
509         }
510
511         if (ntdb->flags & NTDB_RDONLY) {
512                 return ntdb_logerr(ntdb, NTDB_ERR_RDONLY, NTDB_LOG_USE_ERROR,
513                                    "ntdb_transaction_start:"
514                                    " cannot start a transaction on a"
515                                    " read-only ntdb");
516         }
517
518         /* cope with nested ntdb_transaction_start() calls */
519         if (ntdb->transaction != NULL) {
520                 if (!(ntdb->flags & NTDB_ALLOW_NESTING)) {
521                         return ntdb_logerr(ntdb, NTDB_ERR_IO,
522                                            NTDB_LOG_USE_ERROR,
523                                            "ntdb_transaction_start:"
524                                            " already inside transaction");
525                 }
526                 ntdb->transaction->nesting++;
527                 ntdb->stats.transaction_nest++;
528                 return 0;
529         }
530
531         if (ntdb_has_hash_locks(ntdb)) {
532                 /* the caller must not have any locks when starting a
533                    transaction as otherwise we'll be screwed by lack
534                    of nested locks in POSIX */
535                 return ntdb_logerr(ntdb, NTDB_ERR_LOCK,
536                                    NTDB_LOG_USE_ERROR,
537                                    "ntdb_transaction_start:"
538                                    " cannot start a transaction with locks"
539                                    " held");
540         }
541
542         ntdb->transaction = (struct ntdb_transaction *)
543                 ntdb->alloc_fn(ntdb, sizeof(struct ntdb_transaction),
544                                ntdb->alloc_data);
545         if (ntdb->transaction == NULL) {
546                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
547                                    "ntdb_transaction_start:"
548                                    " cannot allocate");
549         }
550         memset(ntdb->transaction, 0, sizeof(*ntdb->transaction));
551
552         /* get the transaction write lock. This is a blocking lock. As
553            discussed with Volker, there are a number of ways we could
554            make this async, which we will probably do in the future */
555         ecode = ntdb_transaction_lock(ntdb, F_WRLCK);
556         if (ecode != NTDB_SUCCESS) {
557                 SAFE_FREE(ntdb, ntdb->transaction->blocks);
558                 SAFE_FREE(ntdb, ntdb->transaction);
559                 return ecode;
560         }
561
562         /* get a read lock over entire file. This is upgraded to a write
563            lock during the commit */
564         ecode = ntdb_allrecord_lock(ntdb, F_RDLCK, NTDB_LOCK_WAIT, true);
565         if (ecode != NTDB_SUCCESS) {
566                 goto fail_allrecord_lock;
567         }
568
569         /* make sure we know about any file expansions already done by
570            anyone else */
571         ntdb_oob(ntdb, ntdb->file->map_size, 1, true);
572         ntdb->transaction->old_map_size = ntdb->file->map_size;
573
574         /* finally hook the io methods, replacing them with
575            transaction specific methods */
576         ntdb->transaction->io_methods = ntdb->io;
577         ntdb->io = &transaction_methods;
578         return NTDB_SUCCESS;
579
580 fail_allrecord_lock:
581         ntdb_transaction_unlock(ntdb, F_WRLCK);
582         SAFE_FREE(ntdb, ntdb->transaction->blocks);
583         SAFE_FREE(ntdb, ntdb->transaction);
584         return ecode;
585 }
586
587
588 /*
589   cancel the current transaction
590 */
591 _PUBLIC_ void ntdb_transaction_cancel(struct ntdb_context *ntdb)
592 {
593         ntdb->stats.transaction_cancel++;
594         _ntdb_transaction_cancel(ntdb);
595 }
596
597 /*
598   work out how much space the linearised recovery data will consume (worst case)
599 */
600 static ntdb_len_t ntdb_recovery_size(struct ntdb_context *ntdb)
601 {
602         ntdb_len_t recovery_size = 0;
603         int i;
604
605         recovery_size = 0;
606         for (i=0;i<ntdb->transaction->num_blocks;i++) {
607                 if (i * NTDB_PGSIZE >= ntdb->transaction->old_map_size) {
608                         break;
609                 }
610                 if (ntdb->transaction->blocks[i] == NULL) {
611                         continue;
612                 }
613                 recovery_size += 2*sizeof(ntdb_off_t) + NTDB_PGSIZE;
614         }
615
616         return recovery_size;
617 }
618
619 static enum NTDB_ERROR ntdb_recovery_area(struct ntdb_context *ntdb,
620                                         const struct ntdb_methods *methods,
621                                         ntdb_off_t *recovery_offset,
622                                         struct ntdb_recovery_record *rec)
623 {
624         enum NTDB_ERROR ecode;
625
626         *recovery_offset = ntdb_read_off(ntdb,
627                                         offsetof(struct ntdb_header, recovery));
628         if (NTDB_OFF_IS_ERR(*recovery_offset)) {
629                 return NTDB_OFF_TO_ERR(*recovery_offset);
630         }
631
632         if (*recovery_offset == 0) {
633                 rec->max_len = 0;
634                 return NTDB_SUCCESS;
635         }
636
637         ecode = methods->tread(ntdb, *recovery_offset, rec, sizeof(*rec));
638         if (ecode != NTDB_SUCCESS)
639                 return ecode;
640
641         ntdb_convert(ntdb, rec, sizeof(*rec));
642         /* ignore invalid recovery regions: can happen in crash */
643         if (rec->magic != NTDB_RECOVERY_MAGIC &&
644             rec->magic != NTDB_RECOVERY_INVALID_MAGIC) {
645                 *recovery_offset = 0;
646                 rec->max_len = 0;
647         }
648         return NTDB_SUCCESS;
649 }
650
651 static unsigned int same(const unsigned char *new,
652                          const unsigned char *old,
653                          unsigned int length)
654 {
655         unsigned int i;
656
657         for (i = 0; i < length; i++) {
658                 if (new[i] != old[i])
659                         break;
660         }
661         return i;
662 }
663
664 static unsigned int different(const unsigned char *new,
665                               const unsigned char *old,
666                               unsigned int length,
667                               unsigned int min_same,
668                               unsigned int *samelen)
669 {
670         unsigned int i;
671
672         *samelen = 0;
673         for (i = 0; i < length; i++) {
674                 if (new[i] == old[i]) {
675                         (*samelen)++;
676                 } else {
677                         if (*samelen >= min_same) {
678                                 return i - *samelen;
679                         }
680                         *samelen = 0;
681                 }
682         }
683
684         if (*samelen < min_same)
685                 *samelen = 0;
686         return length - *samelen;
687 }
688
689 /* Allocates recovery blob, without ntdb_recovery_record at head set up. */
690 static struct ntdb_recovery_record *alloc_recovery(struct ntdb_context *ntdb,
691                                                   ntdb_len_t *len)
692 {
693         struct ntdb_recovery_record *rec;
694         size_t i;
695         enum NTDB_ERROR ecode;
696         unsigned char *p;
697         const struct ntdb_methods *old_methods = ntdb->io;
698
699         rec = ntdb->alloc_fn(ntdb, sizeof(*rec) + ntdb_recovery_size(ntdb),
700                          ntdb->alloc_data);
701         if (!rec) {
702                 ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
703                            "transaction_setup_recovery:"
704                            " cannot allocate");
705                 return NTDB_ERR_PTR(NTDB_ERR_OOM);
706         }
707
708         /* We temporarily revert to the old I/O methods, so we can use
709          * ntdb_access_read */
710         ntdb->io = ntdb->transaction->io_methods;
711
712         /* build the recovery data into a single blob to allow us to do a single
713            large write, which should be more efficient */
714         p = (unsigned char *)(rec + 1);
715         for (i=0;i<ntdb->transaction->num_blocks;i++) {
716                 ntdb_off_t offset;
717                 ntdb_len_t length;
718                 unsigned int off;
719                 const unsigned char *buffer;
720
721                 if (ntdb->transaction->blocks[i] == NULL) {
722                         continue;
723                 }
724
725                 offset = i * NTDB_PGSIZE;
726                 length = NTDB_PGSIZE;
727                 if (offset >= ntdb->transaction->old_map_size) {
728                         continue;
729                 }
730
731                 if (offset + length > ntdb->file->map_size) {
732                         ecode = ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
733                                            "ntdb_transaction_setup_recovery:"
734                                            " transaction data over new region"
735                                            " boundary");
736                         goto fail;
737                 }
738                 buffer = ntdb_access_read(ntdb, offset, length, false);
739                 if (NTDB_PTR_IS_ERR(buffer)) {
740                         ecode = NTDB_PTR_ERR(buffer);
741                         goto fail;
742                 }
743
744                 /* Skip over anything the same at the start. */
745                 off = same(ntdb->transaction->blocks[i], buffer, length);
746                 offset += off;
747
748                 while (off < length) {
749                         ntdb_len_t len1;
750                         unsigned int samelen;
751
752                         len1 = different(ntdb->transaction->blocks[i] + off,
753                                         buffer + off, length - off,
754                                         sizeof(offset) + sizeof(len1) + 1,
755                                         &samelen);
756
757                         memcpy(p, &offset, sizeof(offset));
758                         memcpy(p + sizeof(offset), &len1, sizeof(len1));
759                         ntdb_convert(ntdb, p, sizeof(offset) + sizeof(len1));
760                         p += sizeof(offset) + sizeof(len1);
761                         memcpy(p, buffer + off, len1);
762                         p += len1;
763                         off += len1 + samelen;
764                         offset += len1 + samelen;
765                 }
766                 ntdb_access_release(ntdb, buffer);
767         }
768
769         *len = p - (unsigned char *)(rec + 1);
770         ntdb->io = old_methods;
771         return rec;
772
773 fail:
774         ntdb->free_fn(rec, ntdb->alloc_data);
775         ntdb->io = old_methods;
776         return NTDB_ERR_PTR(ecode);
777 }
778
779 static ntdb_off_t create_recovery_area(struct ntdb_context *ntdb,
780                                       ntdb_len_t rec_length,
781                                       struct ntdb_recovery_record *rec)
782 {
783         ntdb_off_t off, recovery_off;
784         ntdb_len_t addition;
785         enum NTDB_ERROR ecode;
786         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
787
788         /* round up to a multiple of page size. Overallocate, since each
789          * such allocation forces us to expand the file. */
790         rec->max_len = ntdb_expand_adjust(ntdb->file->map_size, rec_length);
791
792         /* Round up to a page. */
793         rec->max_len = ((sizeof(*rec) + rec->max_len + NTDB_PGSIZE-1)
794                         & ~(NTDB_PGSIZE-1))
795                 - sizeof(*rec);
796
797         off = ntdb->file->map_size;
798
799         /* Restore ->map_size before calling underlying expand_file.
800            Also so that we don't try to expand the file again in the
801            transaction commit, which would destroy the recovery
802            area */
803         addition = (ntdb->file->map_size - ntdb->transaction->old_map_size) +
804                 sizeof(*rec) + rec->max_len;
805         ntdb->file->map_size = ntdb->transaction->old_map_size;
806         ntdb->stats.transaction_expand_file++;
807         ecode = methods->expand_file(ntdb, addition);
808         if (ecode != NTDB_SUCCESS) {
809                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
810                            "ntdb_recovery_allocate:"
811                            " failed to create recovery area");
812                 return NTDB_ERR_TO_OFF(ecode);
813         }
814
815         /* we have to reset the old map size so that we don't try to
816            expand the file again in the transaction commit, which
817            would destroy the recovery area */
818         ntdb->transaction->old_map_size = ntdb->file->map_size;
819
820         /* write the recovery header offset and sync - we can sync without a race here
821            as the magic ptr in the recovery record has not been set */
822         recovery_off = off;
823         ntdb_convert(ntdb, &recovery_off, sizeof(recovery_off));
824         ecode = methods->twrite(ntdb, offsetof(struct ntdb_header, recovery),
825                                 &recovery_off, sizeof(ntdb_off_t));
826         if (ecode != NTDB_SUCCESS) {
827                 ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
828                            "ntdb_recovery_allocate:"
829                            " failed to write recovery head");
830                 return NTDB_ERR_TO_OFF(ecode);
831         }
832         transaction_write_existing(ntdb, offsetof(struct ntdb_header, recovery),
833                                    &recovery_off,
834                                    sizeof(ntdb_off_t));
835         return off;
836 }
837
838 /*
839   setup the recovery data that will be used on a crash during commit
840 */
841 static enum NTDB_ERROR transaction_setup_recovery(struct ntdb_context *ntdb)
842 {
843         ntdb_len_t recovery_size = 0;
844         ntdb_off_t recovery_off = 0;
845         ntdb_off_t old_map_size = ntdb->transaction->old_map_size;
846         struct ntdb_recovery_record *recovery;
847         const struct ntdb_methods *methods = ntdb->transaction->io_methods;
848         uint64_t magic;
849         enum NTDB_ERROR ecode;
850
851         recovery = alloc_recovery(ntdb, &recovery_size);
852         if (NTDB_PTR_IS_ERR(recovery))
853                 return NTDB_PTR_ERR(recovery);
854
855         ecode = ntdb_recovery_area(ntdb, methods, &recovery_off, recovery);
856         if (ecode) {
857                 ntdb->free_fn(recovery, ntdb->alloc_data);
858                 return ecode;
859         }
860
861         if (recovery->max_len < recovery_size) {
862                 /* Not large enough. Free up old recovery area. */
863                 if (recovery_off) {
864                         ntdb->stats.frees++;
865                         ecode = add_free_record(ntdb, recovery_off,
866                                                 sizeof(*recovery)
867                                                 + recovery->max_len,
868                                                 NTDB_LOCK_WAIT, true);
869                         ntdb->free_fn(recovery, ntdb->alloc_data);
870                         if (ecode != NTDB_SUCCESS) {
871                                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
872                                                   "ntdb_recovery_allocate:"
873                                                   " failed to free previous"
874                                                   " recovery area");
875                         }
876
877                         /* Refresh recovery after add_free_record above. */
878                         recovery = alloc_recovery(ntdb, &recovery_size);
879                         if (NTDB_PTR_IS_ERR(recovery))
880                                 return NTDB_PTR_ERR(recovery);
881                 }
882
883                 recovery_off = create_recovery_area(ntdb, recovery_size,
884                                                     recovery);
885                 if (NTDB_OFF_IS_ERR(recovery_off)) {
886                         ntdb->free_fn(recovery, ntdb->alloc_data);
887                         return NTDB_OFF_TO_ERR(recovery_off);
888                 }
889         }
890
891         /* Now we know size, convert rec header. */
892         recovery->magic = NTDB_RECOVERY_INVALID_MAGIC;
893         recovery->len = recovery_size;
894         recovery->eof = old_map_size;
895         ntdb_convert(ntdb, recovery, sizeof(*recovery));
896
897         /* write the recovery data to the recovery area */
898         ecode = methods->twrite(ntdb, recovery_off, recovery,
899                                 sizeof(*recovery) + recovery_size);
900         if (ecode != NTDB_SUCCESS) {
901                 ntdb->free_fn(recovery, ntdb->alloc_data);
902                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
903                                   "ntdb_transaction_setup_recovery:"
904                                   " failed to write recovery data");
905         }
906         transaction_write_existing(ntdb, recovery_off, recovery, recovery_size);
907
908         ntdb->free_fn(recovery, ntdb->alloc_data);
909
910         /* as we don't have ordered writes, we have to sync the recovery
911            data before we update the magic to indicate that the recovery
912            data is present */
913         ecode = transaction_sync(ntdb, recovery_off, recovery_size);
914         if (ecode != NTDB_SUCCESS)
915                 return ecode;
916
917         magic = NTDB_RECOVERY_MAGIC;
918         ntdb_convert(ntdb, &magic, sizeof(magic));
919
920         ntdb->transaction->magic_offset
921                 = recovery_off + offsetof(struct ntdb_recovery_record, magic);
922
923         ecode = methods->twrite(ntdb, ntdb->transaction->magic_offset,
924                                 &magic, sizeof(magic));
925         if (ecode != NTDB_SUCCESS) {
926                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
927                                   "ntdb_transaction_setup_recovery:"
928                                   " failed to write recovery magic");
929         }
930         transaction_write_existing(ntdb, ntdb->transaction->magic_offset,
931                                    &magic, sizeof(magic));
932
933         /* ensure the recovery magic marker is on disk */
934         return transaction_sync(ntdb, ntdb->transaction->magic_offset,
935                                 sizeof(magic));
936 }
937
938 static enum NTDB_ERROR _ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
939 {
940         const struct ntdb_methods *methods;
941         enum NTDB_ERROR ecode;
942
943         if (ntdb->transaction == NULL) {
944                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
945                                   "ntdb_transaction_prepare_commit:"
946                                   " no transaction");
947         }
948
949         if (ntdb->transaction->prepared) {
950                 _ntdb_transaction_cancel(ntdb);
951                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
952                                   "ntdb_transaction_prepare_commit:"
953                                   " transaction already prepared");
954         }
955
956         if (ntdb->transaction->transaction_error) {
957                 _ntdb_transaction_cancel(ntdb);
958                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_ERROR,
959                                   "ntdb_transaction_prepare_commit:"
960                                   " transaction error pending");
961         }
962
963
964         if (ntdb->transaction->nesting != 0) {
965                 return NTDB_SUCCESS;
966         }
967
968         /* check for a null transaction */
969         if (ntdb->transaction->blocks == NULL) {
970                 return NTDB_SUCCESS;
971         }
972
973         methods = ntdb->transaction->io_methods;
974
975         /* upgrade the main transaction lock region to a write lock */
976         ecode = ntdb_allrecord_upgrade(ntdb, NTDB_HASH_LOCK_START);
977         if (ecode != NTDB_SUCCESS) {
978                 return ecode;
979         }
980
981         /* get the open lock - this prevents new users attaching to the database
982            during the commit */
983         ecode = ntdb_lock_open(ntdb, F_WRLCK, NTDB_LOCK_WAIT|NTDB_LOCK_NOCHECK);
984         if (ecode != NTDB_SUCCESS) {
985                 return ecode;
986         }
987
988         /* Sets up ntdb->transaction->recovery and
989          * ntdb->transaction->magic_offset. */
990         ecode = transaction_setup_recovery(ntdb);
991         if (ecode != NTDB_SUCCESS) {
992                 return ecode;
993         }
994
995         ntdb->transaction->prepared = true;
996
997         /* expand the file to the new size if needed */
998         if (ntdb->file->map_size != ntdb->transaction->old_map_size) {
999                 ntdb_len_t add;
1000
1001                 add = ntdb->file->map_size - ntdb->transaction->old_map_size;
1002                 /* Restore original map size for ntdb_expand_file */
1003                 ntdb->file->map_size = ntdb->transaction->old_map_size;
1004                 ecode = methods->expand_file(ntdb, add);
1005                 if (ecode != NTDB_SUCCESS) {
1006                         return ecode;
1007                 }
1008         }
1009
1010         /* Keep the open lock until the actual commit */
1011         return NTDB_SUCCESS;
1012 }
1013
1014 /*
1015    prepare to commit the current transaction
1016 */
1017 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_prepare_commit(struct ntdb_context *ntdb)
1018 {
1019         return _ntdb_transaction_prepare_commit(ntdb);
1020 }
1021
1022 /*
1023   commit the current transaction
1024 */
1025 _PUBLIC_ enum NTDB_ERROR ntdb_transaction_commit(struct ntdb_context *ntdb)
1026 {
1027         const struct ntdb_methods *methods;
1028         int i;
1029         enum NTDB_ERROR ecode;
1030
1031         if (ntdb->transaction == NULL) {
1032                 return ntdb_logerr(ntdb, NTDB_ERR_EINVAL, NTDB_LOG_USE_ERROR,
1033                                    "ntdb_transaction_commit:"
1034                                    " no transaction");
1035         }
1036
1037         ntdb_trace(ntdb, "ntdb_transaction_commit");
1038
1039         if (ntdb->transaction->nesting != 0) {
1040                 ntdb->transaction->nesting--;
1041                 return NTDB_SUCCESS;
1042         }
1043
1044         /* check for a null transaction */
1045         if (ntdb->transaction->blocks == NULL) {
1046                 _ntdb_transaction_cancel(ntdb);
1047                 return NTDB_SUCCESS;
1048         }
1049
1050         if (!ntdb->transaction->prepared) {
1051                 ecode = _ntdb_transaction_prepare_commit(ntdb);
1052                 if (ecode != NTDB_SUCCESS) {
1053                         _ntdb_transaction_cancel(ntdb);
1054                         return ecode;
1055                 }
1056         }
1057
1058         methods = ntdb->transaction->io_methods;
1059
1060         /* perform all the writes */
1061         for (i=0;i<ntdb->transaction->num_blocks;i++) {
1062                 ntdb_off_t offset;
1063                 ntdb_len_t length;
1064
1065                 if (ntdb->transaction->blocks[i] == NULL) {
1066                         continue;
1067                 }
1068
1069                 offset = i * NTDB_PGSIZE;
1070                 length = NTDB_PGSIZE;
1071
1072                 ecode = methods->twrite(ntdb, offset,
1073                                         ntdb->transaction->blocks[i], length);
1074                 if (ecode != NTDB_SUCCESS) {
1075                         /* we've overwritten part of the data and
1076                            possibly expanded the file, so we need to
1077                            run the crash recovery code */
1078                         ntdb->io = methods;
1079                         ntdb_transaction_recover(ntdb);
1080
1081                         _ntdb_transaction_cancel(ntdb);
1082
1083                         return ecode;
1084                 }
1085                 SAFE_FREE(ntdb, ntdb->transaction->blocks[i]);
1086         }
1087
1088         SAFE_FREE(ntdb, ntdb->transaction->blocks);
1089         ntdb->transaction->num_blocks = 0;
1090
1091         /* ensure the new data is on disk */
1092         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1093         if (ecode != NTDB_SUCCESS) {
1094                 return ecode;
1095         }
1096
1097         /*
1098           TODO: maybe write to some dummy hdr field, or write to magic
1099           offset without mmap, before the last sync, instead of the
1100           utime() call
1101         */
1102
1103         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1104            don't change the mtime of the file, this means the file may
1105            not be backed up (as ntdb rounding to block sizes means that
1106            file size changes are quite rare too). The following forces
1107            mtime changes when a transaction completes */
1108 #if HAVE_UTIME
1109         utime(ntdb->name, NULL);
1110 #endif
1111
1112         /* use a transaction cancel to free memory and remove the
1113            transaction locks: it "restores" map_size, too. */
1114         ntdb->transaction->old_map_size = ntdb->file->map_size;
1115         _ntdb_transaction_cancel(ntdb);
1116
1117         return NTDB_SUCCESS;
1118 }
1119
1120
1121 /*
1122   recover from an aborted transaction. Must be called with exclusive
1123   database write access already established (including the open
1124   lock to prevent new processes attaching)
1125 */
1126 enum NTDB_ERROR ntdb_transaction_recover(struct ntdb_context *ntdb)
1127 {
1128         ntdb_off_t recovery_head, recovery_eof;
1129         unsigned char *data, *p;
1130         struct ntdb_recovery_record rec;
1131         enum NTDB_ERROR ecode;
1132
1133         /* find the recovery area */
1134         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1135         if (NTDB_OFF_IS_ERR(recovery_head)) {
1136                 ecode = NTDB_OFF_TO_ERR(recovery_head);
1137                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1138                                   "ntdb_transaction_recover:"
1139                                   " failed to read recovery head");
1140         }
1141
1142         if (recovery_head == 0) {
1143                 /* we have never allocated a recovery record */
1144                 return NTDB_SUCCESS;
1145         }
1146
1147         /* read the recovery record */
1148         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1149         if (ecode != NTDB_SUCCESS) {
1150                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1151                                   "ntdb_transaction_recover:"
1152                                   " failed to read recovery record");
1153         }
1154
1155         if (rec.magic != NTDB_RECOVERY_MAGIC) {
1156                 /* there is no valid recovery data */
1157                 return NTDB_SUCCESS;
1158         }
1159
1160         if (ntdb->flags & NTDB_RDONLY) {
1161                 return ntdb_logerr(ntdb, NTDB_ERR_CORRUPT, NTDB_LOG_ERROR,
1162                                   "ntdb_transaction_recover:"
1163                                   " attempt to recover read only database");
1164         }
1165
1166         recovery_eof = rec.eof;
1167
1168         data = (unsigned char *)ntdb->alloc_fn(ntdb, rec.len, ntdb->alloc_data);
1169         if (data == NULL) {
1170                 return ntdb_logerr(ntdb, NTDB_ERR_OOM, NTDB_LOG_ERROR,
1171                                   "ntdb_transaction_recover:"
1172                                   " failed to allocate recovery data");
1173         }
1174
1175         /* read the full recovery data */
1176         ecode = ntdb->io->tread(ntdb, recovery_head + sizeof(rec), data,
1177                                     rec.len);
1178         if (ecode != NTDB_SUCCESS) {
1179                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1180                                   "ntdb_transaction_recover:"
1181                                   " failed to read recovery data");
1182         }
1183
1184         /* recover the file data */
1185         p = data;
1186         while (p+sizeof(ntdb_off_t)+sizeof(ntdb_len_t) < data + rec.len) {
1187                 ntdb_off_t ofs;
1188                 ntdb_len_t len;
1189                 ntdb_convert(ntdb, p, sizeof(ofs) + sizeof(len));
1190                 memcpy(&ofs, p, sizeof(ofs));
1191                 memcpy(&len, p + sizeof(ofs), sizeof(len));
1192                 p += sizeof(ofs) + sizeof(len);
1193
1194                 ecode = ntdb->io->twrite(ntdb, ofs, p, len);
1195                 if (ecode != NTDB_SUCCESS) {
1196                         ntdb->free_fn(data, ntdb->alloc_data);
1197                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1198                                           "ntdb_transaction_recover:"
1199                                           " failed to recover %zu bytes"
1200                                           " at offset %zu",
1201                                           (size_t)len, (size_t)ofs);
1202                 }
1203                 p += len;
1204         }
1205
1206         ntdb->free_fn(data, ntdb->alloc_data);
1207
1208         ecode = transaction_sync(ntdb, 0, ntdb->file->map_size);
1209         if (ecode != NTDB_SUCCESS) {
1210                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1211                                   "ntdb_transaction_recover:"
1212                                   " failed to sync recovery");
1213         }
1214
1215         /* if the recovery area is after the recovered eof then remove it */
1216         if (recovery_eof <= recovery_head) {
1217                 ecode = ntdb_write_off(ntdb, offsetof(struct ntdb_header,
1218                                                     recovery),
1219                                       0);
1220                 if (ecode != NTDB_SUCCESS) {
1221                         return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1222                                           "ntdb_transaction_recover:"
1223                                           " failed to remove recovery head");
1224                 }
1225         }
1226
1227         /* remove the recovery magic */
1228         ecode = ntdb_write_off(ntdb,
1229                               recovery_head
1230                               + offsetof(struct ntdb_recovery_record, magic),
1231                               NTDB_RECOVERY_INVALID_MAGIC);
1232         if (ecode != NTDB_SUCCESS) {
1233                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1234                                   "ntdb_transaction_recover:"
1235                                   " failed to remove recovery magic");
1236         }
1237
1238         ecode = transaction_sync(ntdb, 0, recovery_eof);
1239         if (ecode != NTDB_SUCCESS) {
1240                 return ntdb_logerr(ntdb, ecode, NTDB_LOG_ERROR,
1241                                   "ntdb_transaction_recover:"
1242                                   " failed to sync2 recovery");
1243         }
1244
1245         ntdb_logerr(ntdb, NTDB_SUCCESS, NTDB_LOG_WARNING,
1246                    "ntdb_transaction_recover: recovered %zu byte database",
1247                    (size_t)recovery_eof);
1248
1249         /* all done */
1250         return NTDB_SUCCESS;
1251 }
1252
1253 ntdb_bool_err ntdb_needs_recovery(struct ntdb_context *ntdb)
1254 {
1255         ntdb_off_t recovery_head;
1256         struct ntdb_recovery_record rec;
1257         enum NTDB_ERROR ecode;
1258
1259         /* find the recovery area */
1260         recovery_head = ntdb_read_off(ntdb, offsetof(struct ntdb_header,recovery));
1261         if (NTDB_OFF_IS_ERR(recovery_head)) {
1262                 return recovery_head;
1263         }
1264
1265         if (recovery_head == 0) {
1266                 /* we have never allocated a recovery record */
1267                 return false;
1268         }
1269
1270         /* read the recovery record */
1271         ecode = ntdb_read_convert(ntdb, recovery_head, &rec, sizeof(rec));
1272         if (ecode != NTDB_SUCCESS) {
1273                 return NTDB_ERR_TO_OFF(ecode);
1274         }
1275
1276         return (rec.magic == NTDB_RECOVERY_MAGIC);
1277 }