c5be9f842b8701de777cf3e183901dcc31530a10
[tridge/ctdb.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88   - if TDB_ALLOW_NESTING is passed to flags in tdb open, or added using
89     tdb_add_flags() transaction is enabled.
90     The default is that transaction nesting is not allowed and an attempt
91     to create a nested transaction will fail with TDB_ERR_NESTING.
92
93     Beware. when transactions are nested a transaction successfully
94     completed with tdb_transaction_commit() can be silently unrolled later.
95 */
96
97
98 /*
99   hold the context of any current transaction
100 */
101 struct tdb_transaction {
102         /* we keep a mirrored copy of the tdb hash heads here so
103            tdb_next_hash_chain() can operate efficiently */
104         uint32_t *hash_heads;
105
106         /* the original io methods - used to do IOs to the real db */
107         const struct tdb_methods *io_methods;
108
109         /* the list of transaction blocks. When a block is first
110            written to, it gets created in this list */
111         uint8_t **blocks;
112         uint32_t num_blocks;
113         uint32_t block_size;      /* bytes in each block */
114         uint32_t last_block_size; /* number of valid bytes in the last block */
115
116         /* non-zero when an internal transaction error has
117            occurred. All write operations will then fail until the
118            transaction is ended */
119         int transaction_error;
120
121         /* when inside a transaction we need to keep track of any
122            nested tdb_transaction_start() calls, as these are allowed,
123            but don't create a new transaction */
124         int nesting;
125
126         /* set when a prepare has already occurred */
127         bool prepared;
128         tdb_off_t magic_offset;
129
130         /* old file size before transaction */
131         tdb_len_t old_map_size;
132 };
133
134
135 /*
136   read while in a transaction. We need to check first if the data is in our list
137   of transaction elements, then if not do a real read
138 */
139 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
140                             tdb_len_t len, int cv)
141 {
142         uint32_t blk;
143
144         /* Only a commit is allowed on a prepared transaction */
145         if (tdb->transaction->prepared) {
146                 tdb->ecode = TDB_ERR_EINVAL;
147                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
148                 tdb->transaction->transaction_error = 1;
149                 return -1;
150         }
151
152         /* break it down into block sized ops */
153         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
154                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
155                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
156                         return -1;
157                 }
158                 len -= len2;
159                 off += len2;
160                 buf = (void *)(len2 + (char *)buf);
161         }
162
163         if (len == 0) {
164                 return 0;
165         }
166
167         blk = off / tdb->transaction->block_size;
168
169         /* see if we have it in the block list */
170         if (tdb->transaction->num_blocks <= blk ||
171             tdb->transaction->blocks[blk] == NULL) {
172                 /* nope, do a real read */
173                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
174                         goto fail;
175                 }
176                 return 0;
177         }
178
179         /* it is in the block list. Now check for the last block */
180         if (blk == tdb->transaction->num_blocks-1) {
181                 if (len > tdb->transaction->last_block_size) {
182                         goto fail;
183                 }
184         }
185         
186         /* now copy it out of this block */
187         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
188         if (cv) {
189                 tdb_convert(buf, len);
190         }
191         return 0;
192
193 fail:
194         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
195         tdb->ecode = TDB_ERR_IO;
196         tdb->transaction->transaction_error = 1;
197         return -1;
198 }
199
200
201 /*
202   write while in a transaction
203 */
204 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
205                              const void *buf, tdb_len_t len)
206 {
207         uint32_t blk;
208
209         /* Only a commit is allowed on a prepared transaction */
210         if (tdb->transaction->prepared) {
211                 tdb->ecode = TDB_ERR_EINVAL;
212                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
213                 tdb->transaction->transaction_error = 1;
214                 return -1;
215         }
216
217         /* if the write is to a hash head, then update the transaction
218            hash heads */
219         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
220             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
221                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
222                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
223         }
224
225         /* break it up into block sized chunks */
226         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
227                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
228                 if (transaction_write(tdb, off, buf, len2) != 0) {
229                         return -1;
230                 }
231                 len -= len2;
232                 off += len2;
233                 if (buf != NULL) {
234                         buf = (const void *)(len2 + (const char *)buf);
235                 }
236         }
237
238         if (len == 0) {
239                 return 0;
240         }
241
242         blk = off / tdb->transaction->block_size;
243         off = off % tdb->transaction->block_size;
244
245         if (tdb->transaction->num_blocks <= blk) {
246                 uint8_t **new_blocks;
247                 /* expand the blocks array */
248                 if (tdb->transaction->blocks == NULL) {
249                         new_blocks = (uint8_t **)malloc(
250                                 (blk+1)*sizeof(uint8_t *));
251                 } else {
252                         new_blocks = (uint8_t **)realloc(
253                                 tdb->transaction->blocks,
254                                 (blk+1)*sizeof(uint8_t *));
255                 }
256                 if (new_blocks == NULL) {
257                         tdb->ecode = TDB_ERR_OOM;
258                         goto fail;
259                 }
260                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
261                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
262                 tdb->transaction->blocks = new_blocks;
263                 tdb->transaction->num_blocks = blk+1;
264                 tdb->transaction->last_block_size = 0;
265         }
266
267         /* allocate and fill a block? */
268         if (tdb->transaction->blocks[blk] == NULL) {
269                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
270                 if (tdb->transaction->blocks[blk] == NULL) {
271                         tdb->ecode = TDB_ERR_OOM;
272                         tdb->transaction->transaction_error = 1;
273                         return -1;                      
274                 }
275                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
276                         tdb_len_t len2 = tdb->transaction->block_size;
277                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
278                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
279                         }
280                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
281                                                                    tdb->transaction->blocks[blk], 
282                                                                    len2, 0) != 0) {
283                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
284                                 tdb->ecode = TDB_ERR_IO;
285                                 goto fail;
286                         }
287                         if (blk == tdb->transaction->num_blocks-1) {
288                                 tdb->transaction->last_block_size = len2;
289                         }                       
290                 }
291         }
292         
293         /* overwrite part of an existing block */
294         if (buf == NULL) {
295                 memset(tdb->transaction->blocks[blk] + off, 0, len);
296         } else {
297                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
298         }
299         if (blk == tdb->transaction->num_blocks-1) {
300                 if (len + off > tdb->transaction->last_block_size) {
301                         tdb->transaction->last_block_size = len + off;
302                 }
303         }
304
305         return 0;
306
307 fail:
308         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
309                  (blk*tdb->transaction->block_size) + off, len));
310         tdb->transaction->transaction_error = 1;
311         return -1;
312 }
313
314
315 /*
316   write while in a transaction - this varient never expands the transaction blocks, it only
317   updates existing blocks. This means it cannot change the recovery size
318 */
319 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
320                                       const void *buf, tdb_len_t len)
321 {
322         uint32_t blk;
323
324         /* break it up into block sized chunks */
325         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
326                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
327                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
328                         return -1;
329                 }
330                 len -= len2;
331                 off += len2;
332                 if (buf != NULL) {
333                         buf = (const void *)(len2 + (const char *)buf);
334                 }
335         }
336
337         if (len == 0) {
338                 return 0;
339         }
340
341         blk = off / tdb->transaction->block_size;
342         off = off % tdb->transaction->block_size;
343
344         if (tdb->transaction->num_blocks <= blk ||
345             tdb->transaction->blocks[blk] == NULL) {
346                 return 0;
347         }
348
349         if (blk == tdb->transaction->num_blocks-1 &&
350             off + len > tdb->transaction->last_block_size) {
351                 if (off >= tdb->transaction->last_block_size) {
352                         return 0;
353                 }
354                 len = tdb->transaction->last_block_size - off;
355         }
356
357         /* overwrite part of an existing block */
358         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
359
360         return 0;
361 }
362
363
364 /*
365   accelerated hash chain head search, using the cached hash heads
366 */
367 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
368 {
369         uint32_t h = *chain;
370         for (;h < tdb->header.hash_size;h++) {
371                 /* the +1 takes account of the freelist */
372                 if (0 != tdb->transaction->hash_heads[h+1]) {
373                         break;
374                 }
375         }
376         (*chain) = h;
377 }
378
379 /*
380   out of bounds check during a transaction
381 */
382 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
383 {
384         if (len <= tdb->map_size) {
385                 return 0;
386         }
387         return TDB_ERRCODE(TDB_ERR_IO, -1);
388 }
389
390 /*
391   transaction version of tdb_expand().
392 */
393 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
394                                    tdb_off_t addition)
395 {
396         /* add a write to the transaction elements, so subsequent
397            reads see the zero data */
398         if (transaction_write(tdb, size, NULL, addition) != 0) {
399                 return -1;
400         }
401
402         return 0;
403 }
404
405 /*
406   brlock during a transaction - ignore them
407 */
408 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
409                               int rw_type, int lck_type, int probe, size_t len)
410 {
411         return 0;
412 }
413
414 static const struct tdb_methods transaction_methods = {
415         transaction_read,
416         transaction_write,
417         transaction_next_hash_chain,
418         transaction_oob,
419         transaction_expand_file,
420         transaction_brlock
421 };
422
423
424 /*
425   start a tdb transaction. No token is returned, as only a single
426   transaction is allowed to be pending per tdb_context
427 */
428 int tdb_transaction_start(struct tdb_context *tdb)
429 {
430         /* some sanity checks */
431         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
432                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
433                 tdb->ecode = TDB_ERR_EINVAL;
434                 return -1;
435         }
436
437         /* cope with nested tdb_transaction_start() calls */
438         if (tdb->transaction != NULL) {
439                 if (!(tdb->flags & TDB_ALLOW_NESTING)) {
440                         tdb->ecode = TDB_ERR_NESTING;
441                         return -1;
442                 }
443                 tdb->transaction->nesting++;
444                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
445                          tdb->transaction->nesting));
446                 return 0;
447         }
448
449         if (tdb->num_locks != 0 || tdb->global_lock.count) {
450                 /* the caller must not have any locks when starting a
451                    transaction as otherwise we'll be screwed by lack
452                    of nested locks in posix */
453                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
454                 tdb->ecode = TDB_ERR_LOCK;
455                 return -1;
456         }
457
458         if (tdb->travlocks.next != NULL) {
459                 /* you cannot use transactions inside a traverse (although you can use
460                    traverse inside a transaction) as otherwise you can end up with
461                    deadlock */
462                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
463                 tdb->ecode = TDB_ERR_LOCK;
464                 return -1;
465         }
466
467         tdb->transaction = (struct tdb_transaction *)
468                 calloc(sizeof(struct tdb_transaction), 1);
469         if (tdb->transaction == NULL) {
470                 tdb->ecode = TDB_ERR_OOM;
471                 return -1;
472         }
473
474         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
475         tdb->transaction->block_size = tdb->page_size;
476
477         /* get the transaction write lock. This is a blocking lock. As
478            discussed with Volker, there are a number of ways we could
479            make this async, which we will probably do in the future */
480         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
481                 SAFE_FREE(tdb->transaction->blocks);
482                 SAFE_FREE(tdb->transaction);
483                 return -1;
484         }
485         
486         /* get a read lock from the freelist to the end of file. This
487            is upgraded to a write lock during the commit */
488         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
489                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
490                 tdb->ecode = TDB_ERR_LOCK;
491                 goto fail;
492         }
493
494         /* setup a copy of the hash table heads so the hash scan in
495            traverse can be fast */
496         tdb->transaction->hash_heads = (uint32_t *)
497                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
498         if (tdb->transaction->hash_heads == NULL) {
499                 tdb->ecode = TDB_ERR_OOM;
500                 goto fail;
501         }
502         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
503                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
504                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
505                 tdb->ecode = TDB_ERR_IO;
506                 goto fail;
507         }
508
509         /* make sure we know about any file expansions already done by
510            anyone else */
511         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
512         tdb->transaction->old_map_size = tdb->map_size;
513
514         /* finally hook the io methods, replacing them with
515            transaction specific methods */
516         tdb->transaction->io_methods = tdb->methods;
517         tdb->methods = &transaction_methods;
518
519         return 0;
520         
521 fail:
522         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
523         tdb_transaction_unlock(tdb);
524         SAFE_FREE(tdb->transaction->blocks);
525         SAFE_FREE(tdb->transaction->hash_heads);
526         SAFE_FREE(tdb->transaction);
527         return -1;
528 }
529
530
531 /*
532   sync to disk
533 */
534 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
535 {       
536         if (fsync(tdb->fd) != 0) {
537                 tdb->ecode = TDB_ERR_IO;
538                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
539                 return -1;
540         }
541 #ifdef MS_SYNC
542         if (tdb->map_ptr) {
543                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
544                 if (msync(moffset + (char *)tdb->map_ptr, 
545                           length + (offset - moffset), MS_SYNC) != 0) {
546                         tdb->ecode = TDB_ERR_IO;
547                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
548                                  strerror(errno)));
549                         return -1;
550                 }
551         }
552 #endif
553         return 0;
554 }
555
556
557 /*
558   cancel the current transaction
559 */
560 int tdb_transaction_cancel(struct tdb_context *tdb)
561 {       
562         int i, ret = 0;
563
564         if (tdb->transaction == NULL) {
565                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
566                 return -1;
567         }
568
569         if (tdb->transaction->nesting != 0) {
570                 tdb->transaction->transaction_error = 1;
571                 tdb->transaction->nesting--;
572                 return 0;
573         }               
574
575         tdb->map_size = tdb->transaction->old_map_size;
576
577         /* free all the transaction blocks */
578         for (i=0;i<tdb->transaction->num_blocks;i++) {
579                 if (tdb->transaction->blocks[i] != NULL) {
580                         free(tdb->transaction->blocks[i]);
581                 }
582         }
583         SAFE_FREE(tdb->transaction->blocks);
584
585         if (tdb->transaction->magic_offset) {
586                 const struct tdb_methods *methods = tdb->transaction->io_methods;
587                 uint32_t zero = 0;
588
589                 /* remove the recovery marker */
590                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
591                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
592                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
593                         ret = -1;
594                 }
595         }
596
597         /* remove any global lock created during the transaction */
598         if (tdb->global_lock.count != 0) {
599                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
600                 tdb->global_lock.count = 0;
601         }
602
603         /* remove any locks created during the transaction */
604         if (tdb->num_locks != 0) {
605                 for (i=0;i<tdb->num_lockrecs;i++) {
606                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
607                                    F_UNLCK,F_SETLKW, 0, 1);
608                 }
609                 tdb->num_locks = 0;
610                 tdb->num_lockrecs = 0;
611                 SAFE_FREE(tdb->lockrecs);
612         }
613
614         /* restore the normal io methods */
615         tdb->methods = tdb->transaction->io_methods;
616
617         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
618         tdb_transaction_unlock(tdb);
619         SAFE_FREE(tdb->transaction->hash_heads);
620         SAFE_FREE(tdb->transaction);
621         
622         return ret;
623 }
624
625
626 /*
627   work out how much space the linearised recovery data will consume
628 */
629 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
630 {
631         tdb_len_t recovery_size = 0;
632         int i;
633
634         recovery_size = sizeof(uint32_t);
635         for (i=0;i<tdb->transaction->num_blocks;i++) {
636                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
637                         break;
638                 }
639                 if (tdb->transaction->blocks[i] == NULL) {
640                         continue;
641                 }
642                 recovery_size += 2*sizeof(tdb_off_t);
643                 if (i == tdb->transaction->num_blocks-1) {
644                         recovery_size += tdb->transaction->last_block_size;
645                 } else {
646                         recovery_size += tdb->transaction->block_size;
647                 }
648         }       
649
650         return recovery_size;
651 }
652
653 /*
654   allocate the recovery area, or use an existing recovery area if it is
655   large enough
656 */
657 static int tdb_recovery_allocate(struct tdb_context *tdb, 
658                                  tdb_len_t *recovery_size,
659                                  tdb_off_t *recovery_offset,
660                                  tdb_len_t *recovery_max_size)
661 {
662         struct list_struct rec;
663         const struct tdb_methods *methods = tdb->transaction->io_methods;
664         tdb_off_t recovery_head;
665
666         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
667                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
668                 return -1;
669         }
670
671         rec.rec_len = 0;
672
673         if (recovery_head != 0 && 
674             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
675                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
676                 return -1;
677         }
678
679         *recovery_size = tdb_recovery_size(tdb);
680
681         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
682                 /* it fits in the existing area */
683                 *recovery_max_size = rec.rec_len;
684                 *recovery_offset = recovery_head;
685                 return 0;
686         }
687
688         /* we need to free up the old recovery area, then allocate a
689            new one at the end of the file. Note that we cannot use
690            tdb_allocate() to allocate the new one as that might return
691            us an area that is being currently used (as of the start of
692            the transaction) */
693         if (recovery_head != 0) {
694                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
695                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
696                         return -1;
697                 }
698         }
699
700         /* the tdb_free() call might have increased the recovery size */
701         *recovery_size = tdb_recovery_size(tdb);
702
703         /* round up to a multiple of page size */
704         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
705         *recovery_offset = tdb->map_size;
706         recovery_head = *recovery_offset;
707
708         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
709                                      (tdb->map_size - tdb->transaction->old_map_size) +
710                                      sizeof(rec) + *recovery_max_size) == -1) {
711                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
712                 return -1;
713         }
714
715         /* remap the file (if using mmap) */
716         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
717
718         /* we have to reset the old map size so that we don't try to expand the file
719            again in the transaction commit, which would destroy the recovery area */
720         tdb->transaction->old_map_size = tdb->map_size;
721
722         /* write the recovery header offset and sync - we can sync without a race here
723            as the magic ptr in the recovery record has not been set */
724         CONVERT(recovery_head);
725         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
726                                &recovery_head, sizeof(tdb_off_t)) == -1) {
727                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
728                 return -1;
729         }
730         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
731                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
732                 return -1;
733         }
734
735         return 0;
736 }
737
738
739 /*
740   setup the recovery data that will be used on a crash during commit
741 */
742 static int transaction_setup_recovery(struct tdb_context *tdb, 
743                                       tdb_off_t *magic_offset)
744 {
745         tdb_len_t recovery_size;
746         unsigned char *data, *p;
747         const struct tdb_methods *methods = tdb->transaction->io_methods;
748         struct list_struct *rec;
749         tdb_off_t recovery_offset, recovery_max_size;
750         tdb_off_t old_map_size = tdb->transaction->old_map_size;
751         uint32_t magic, tailer;
752         int i;
753
754         /*
755           check that the recovery area has enough space
756         */
757         if (tdb_recovery_allocate(tdb, &recovery_size, 
758                                   &recovery_offset, &recovery_max_size) == -1) {
759                 return -1;
760         }
761
762         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
763         if (data == NULL) {
764                 tdb->ecode = TDB_ERR_OOM;
765                 return -1;
766         }
767
768         rec = (struct list_struct *)data;
769         memset(rec, 0, sizeof(*rec));
770
771         rec->magic    = 0;
772         rec->data_len = recovery_size;
773         rec->rec_len  = recovery_max_size;
774         rec->key_len  = old_map_size;
775         CONVERT(rec);
776
777         /* build the recovery data into a single blob to allow us to do a single
778            large write, which should be more efficient */
779         p = data + sizeof(*rec);
780         for (i=0;i<tdb->transaction->num_blocks;i++) {
781                 tdb_off_t offset;
782                 tdb_len_t length;
783
784                 if (tdb->transaction->blocks[i] == NULL) {
785                         continue;
786                 }
787
788                 offset = i * tdb->transaction->block_size;
789                 length = tdb->transaction->block_size;
790                 if (i == tdb->transaction->num_blocks-1) {
791                         length = tdb->transaction->last_block_size;
792                 }
793                 
794                 if (offset >= old_map_size) {
795                         continue;
796                 }
797                 if (offset + length > tdb->transaction->old_map_size) {
798                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
799                         free(data);
800                         tdb->ecode = TDB_ERR_CORRUPT;
801                         return -1;
802                 }
803                 memcpy(p, &offset, 4);
804                 memcpy(p+4, &length, 4);
805                 if (DOCONV()) {
806                         tdb_convert(p, 8);
807                 }
808                 /* the recovery area contains the old data, not the
809                    new data, so we have to call the original tdb_read
810                    method to get it */
811                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
812                         free(data);
813                         tdb->ecode = TDB_ERR_IO;
814                         return -1;
815                 }
816                 p += 8 + length;
817         }
818
819         /* and the tailer */
820         tailer = sizeof(*rec) + recovery_max_size;
821         memcpy(p, &tailer, 4);
822         CONVERT(p);
823
824         /* write the recovery data to the recovery area */
825         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
826                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
827                 free(data);
828                 tdb->ecode = TDB_ERR_IO;
829                 return -1;
830         }
831         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
832                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
833                 free(data);
834                 tdb->ecode = TDB_ERR_IO;
835                 return -1;
836         }
837
838         /* as we don't have ordered writes, we have to sync the recovery
839            data before we update the magic to indicate that the recovery
840            data is present */
841         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
842                 free(data);
843                 return -1;
844         }
845
846         free(data);
847
848         magic = TDB_RECOVERY_MAGIC;
849         CONVERT(magic);
850
851         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
852
853         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
854                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
855                 tdb->ecode = TDB_ERR_IO;
856                 return -1;
857         }
858         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
859                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
860                 tdb->ecode = TDB_ERR_IO;
861                 return -1;
862         }
863
864         /* ensure the recovery magic marker is on disk */
865         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
866                 return -1;
867         }
868
869         return 0;
870 }
871
872 /*
873   prepare to commit the current transaction
874 */
875 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
876 {       
877         const struct tdb_methods *methods;
878         int i;
879
880         if (tdb->transaction == NULL) {
881                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
882                 return -1;
883         }
884
885         if (tdb->transaction->prepared) {
886                 tdb->ecode = TDB_ERR_EINVAL;
887                 tdb_transaction_cancel(tdb);
888                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
889                 return -1;
890         }
891
892         if (tdb->transaction->transaction_error) {
893                 tdb->ecode = TDB_ERR_IO;
894                 tdb_transaction_cancel(tdb);
895                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
896                 return -1;
897         }
898
899
900         if (tdb->transaction->nesting != 0) {
901                 return 0;
902         }               
903
904         /* check for a null transaction */
905         if (tdb->transaction->blocks == NULL) {
906                 return 0;
907         }
908
909         methods = tdb->transaction->io_methods;
910         
911         /* if there are any locks pending then the caller has not
912            nested their locks properly, so fail the transaction */
913         if (tdb->num_locks || tdb->global_lock.count) {
914                 tdb->ecode = TDB_ERR_LOCK;
915                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
916                 tdb_transaction_cancel(tdb);
917                 return -1;
918         }
919
920         /* upgrade the main transaction lock region to a write lock */
921         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
922                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
923                 tdb->ecode = TDB_ERR_LOCK;
924                 tdb_transaction_cancel(tdb);
925                 return -1;
926         }
927
928         /* get the global lock - this prevents new users attaching to the database
929            during the commit */
930         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
931                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
932                 tdb->ecode = TDB_ERR_LOCK;
933                 tdb_transaction_cancel(tdb);
934                 return -1;
935         }
936
937         if (!(tdb->flags & TDB_NOSYNC)) {
938                 /* write the recovery data to the end of the file */
939                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
940                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
941                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
942                         tdb_transaction_cancel(tdb);
943                         return -1;
944                 }
945         }
946
947         tdb->transaction->prepared = true;
948
949         /* expand the file to the new size if needed */
950         if (tdb->map_size != tdb->transaction->old_map_size) {
951                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
952                                              tdb->map_size - 
953                                              tdb->transaction->old_map_size) == -1) {
954                         tdb->ecode = TDB_ERR_IO;
955                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
956                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
957                         tdb_transaction_cancel(tdb);
958                         return -1;
959                 }
960                 tdb->map_size = tdb->transaction->old_map_size;
961                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
962         }
963
964         /* Keep the global lock until the actual commit */
965
966         return 0;
967 }
968
969 /*
970   commit the current transaction
971 */
972 int tdb_transaction_commit(struct tdb_context *tdb)
973 {       
974         const struct tdb_methods *methods;
975         int i;
976
977         if (tdb->transaction == NULL) {
978                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
979                 return -1;
980         }
981
982         if (tdb->transaction->transaction_error) {
983                 tdb->ecode = TDB_ERR_IO;
984                 tdb_transaction_cancel(tdb);
985                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
986                 return -1;
987         }
988
989
990         if (tdb->transaction->nesting != 0) {
991                 tdb->transaction->nesting--;
992                 return 0;
993         }
994
995         /* check for a null transaction */
996         if (tdb->transaction->blocks == NULL) {
997                 tdb_transaction_cancel(tdb);
998                 return 0;
999         }
1000
1001         if (!tdb->transaction->prepared) {
1002                 int ret = tdb_transaction_prepare_commit(tdb);
1003                 if (ret)
1004                         return ret;
1005         }
1006
1007         methods = tdb->transaction->io_methods;
1008
1009         /* perform all the writes */
1010         for (i=0;i<tdb->transaction->num_blocks;i++) {
1011                 tdb_off_t offset;
1012                 tdb_len_t length;
1013
1014                 if (tdb->transaction->blocks[i] == NULL) {
1015                         continue;
1016                 }
1017
1018                 offset = i * tdb->transaction->block_size;
1019                 length = tdb->transaction->block_size;
1020                 if (i == tdb->transaction->num_blocks-1) {
1021                         length = tdb->transaction->last_block_size;
1022                 }
1023
1024                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1025                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1026                         
1027                         /* we've overwritten part of the data and
1028                            possibly expanded the file, so we need to
1029                            run the crash recovery code */
1030                         tdb->methods = methods;
1031                         tdb_transaction_recover(tdb); 
1032
1033                         tdb_transaction_cancel(tdb);
1034                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1035
1036                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1037                         return -1;
1038                 }
1039                 SAFE_FREE(tdb->transaction->blocks[i]);
1040         } 
1041
1042         SAFE_FREE(tdb->transaction->blocks);
1043         tdb->transaction->num_blocks = 0;
1044
1045         if (!(tdb->flags & TDB_NOSYNC)) {
1046                 /* ensure the new data is on disk */
1047                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1048                         return -1;
1049                 }
1050         }
1051
1052         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1053
1054         /*
1055           TODO: maybe write to some dummy hdr field, or write to magic
1056           offset without mmap, before the last sync, instead of the
1057           utime() call
1058         */
1059
1060         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1061            don't change the mtime of the file, this means the file may
1062            not be backed up (as tdb rounding to block sizes means that
1063            file size changes are quite rare too). The following forces
1064            mtime changes when a transaction completes */
1065 #ifdef HAVE_UTIME
1066         utime(tdb->name, NULL);
1067 #endif
1068
1069         /* use a transaction cancel to free memory and remove the
1070            transaction locks */
1071         tdb_transaction_cancel(tdb);
1072
1073         return 0;
1074 }
1075
1076
1077 /*
1078   recover from an aborted transaction. Must be called with exclusive
1079   database write access already established (including the global
1080   lock to prevent new processes attaching)
1081 */
1082 int tdb_transaction_recover(struct tdb_context *tdb)
1083 {
1084         tdb_off_t recovery_head, recovery_eof;
1085         unsigned char *data, *p;
1086         uint32_t zero = 0;
1087         struct list_struct rec;
1088
1089         /* find the recovery area */
1090         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1091                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1092                 tdb->ecode = TDB_ERR_IO;
1093                 return -1;
1094         }
1095
1096         if (recovery_head == 0) {
1097                 /* we have never allocated a recovery record */
1098                 return 0;
1099         }
1100
1101         /* read the recovery record */
1102         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1103                                    sizeof(rec), DOCONV()) == -1) {
1104                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1105                 tdb->ecode = TDB_ERR_IO;
1106                 return -1;
1107         }
1108
1109         if (rec.magic != TDB_RECOVERY_MAGIC) {
1110                 /* there is no valid recovery data */
1111                 return 0;
1112         }
1113
1114         if (tdb->read_only) {
1115                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1116                 tdb->ecode = TDB_ERR_CORRUPT;
1117                 return -1;
1118         }
1119
1120         recovery_eof = rec.key_len;
1121
1122         data = (unsigned char *)malloc(rec.data_len);
1123         if (data == NULL) {
1124                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1125                 tdb->ecode = TDB_ERR_OOM;
1126                 return -1;
1127         }
1128
1129         /* read the full recovery data */
1130         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1131                                    rec.data_len, 0) == -1) {
1132                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1133                 tdb->ecode = TDB_ERR_IO;
1134                 return -1;
1135         }
1136
1137         /* recover the file data */
1138         p = data;
1139         while (p+8 < data + rec.data_len) {
1140                 uint32_t ofs, len;
1141                 if (DOCONV()) {
1142                         tdb_convert(p, 8);
1143                 }
1144                 memcpy(&ofs, p, 4);
1145                 memcpy(&len, p+4, 4);
1146
1147                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1148                         free(data);
1149                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1150                         tdb->ecode = TDB_ERR_IO;
1151                         return -1;
1152                 }
1153                 p += 8 + len;
1154         }
1155
1156         free(data);
1157
1158         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1159                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1160                 tdb->ecode = TDB_ERR_IO;
1161                 return -1;
1162         }
1163
1164         /* if the recovery area is after the recovered eof then remove it */
1165         if (recovery_eof <= recovery_head) {
1166                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1167                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1168                         tdb->ecode = TDB_ERR_IO;
1169                         return -1;                      
1170                 }
1171         }
1172
1173         /* remove the recovery magic */
1174         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1175                           &zero) == -1) {
1176                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1177                 tdb->ecode = TDB_ERR_IO;
1178                 return -1;                      
1179         }
1180         
1181         /* reduce the file size to the old size */
1182         tdb_munmap(tdb);
1183         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1184                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1185                 tdb->ecode = TDB_ERR_IO;
1186                 return -1;                      
1187         }
1188         tdb->map_size = recovery_eof;
1189         tdb_mmap(tdb);
1190
1191         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1192                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1193                 tdb->ecode = TDB_ERR_IO;
1194                 return -1;
1195         }
1196
1197         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1198                  recovery_eof));
1199
1200         /* all done */
1201         return 0;
1202 }