035b4e1d54cec932d728683967337f84d1f1adc2
[samba.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* set when a prepare has already occurred */
120         bool prepared;
121         tdb_off_t magic_offset;
122
123         /* old file size before transaction */
124         tdb_len_t old_map_size;
125
126         /* we should re-pack on commit */
127         bool need_repack;
128 };
129
130
131 /*
132   read while in a transaction. We need to check first if the data is in our list
133   of transaction elements, then if not do a real read
134 */
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
136                             tdb_len_t len, int cv)
137 {
138         uint32_t blk;
139
140         /* break it down into block sized ops */
141         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
142                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
143                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
144                         return -1;
145                 }
146                 len -= len2;
147                 off += len2;
148                 buf = (void *)(len2 + (char *)buf);
149         }
150
151         if (len == 0) {
152                 return 0;
153         }
154
155         blk = off / tdb->transaction->block_size;
156
157         /* see if we have it in the block list */
158         if (tdb->transaction->num_blocks <= blk ||
159             tdb->transaction->blocks[blk] == NULL) {
160                 /* nope, do a real read */
161                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
162                         goto fail;
163                 }
164                 return 0;
165         }
166
167         /* it is in the block list. Now check for the last block */
168         if (blk == tdb->transaction->num_blocks-1) {
169                 if (len > tdb->transaction->last_block_size) {
170                         goto fail;
171                 }
172         }
173         
174         /* now copy it out of this block */
175         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
176         if (cv) {
177                 tdb_convert(buf, len);
178         }
179         return 0;
180
181 fail:
182         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
183         tdb->ecode = TDB_ERR_IO;
184         tdb->transaction->transaction_error = 1;
185         return -1;
186 }
187
188
189 /*
190   write while in a transaction
191 */
192 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
193                              const void *buf, tdb_len_t len)
194 {
195         uint32_t blk;
196
197         /* Only a commit is allowed on a prepared transaction */
198         if (tdb->transaction->prepared) {
199                 tdb->ecode = TDB_ERR_EINVAL;
200                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
201                 tdb->transaction->transaction_error = 1;
202                 return -1;
203         }
204
205         /* if the write is to a hash head, then update the transaction
206            hash heads */
207         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
208             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
209                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
210                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
211         }
212
213         /* break it up into block sized chunks */
214         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
215                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
216                 if (transaction_write(tdb, off, buf, len2) != 0) {
217                         return -1;
218                 }
219                 len -= len2;
220                 off += len2;
221                 if (buf != NULL) {
222                         buf = (const void *)(len2 + (const char *)buf);
223                 }
224         }
225
226         if (len == 0) {
227                 return 0;
228         }
229
230         blk = off / tdb->transaction->block_size;
231         off = off % tdb->transaction->block_size;
232
233         if (tdb->transaction->num_blocks <= blk) {
234                 uint8_t **new_blocks;
235                 /* expand the blocks array */
236                 if (tdb->transaction->blocks == NULL) {
237                         new_blocks = (uint8_t **)malloc(
238                                 (blk+1)*sizeof(uint8_t *));
239                 } else {
240                         new_blocks = (uint8_t **)realloc(
241                                 tdb->transaction->blocks,
242                                 (blk+1)*sizeof(uint8_t *));
243                 }
244                 if (new_blocks == NULL) {
245                         tdb->ecode = TDB_ERR_OOM;
246                         goto fail;
247                 }
248                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
249                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
250                 tdb->transaction->blocks = new_blocks;
251                 tdb->transaction->num_blocks = blk+1;
252                 tdb->transaction->last_block_size = 0;
253         }
254
255         /* allocate and fill a block? */
256         if (tdb->transaction->blocks[blk] == NULL) {
257                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
258                 if (tdb->transaction->blocks[blk] == NULL) {
259                         tdb->ecode = TDB_ERR_OOM;
260                         tdb->transaction->transaction_error = 1;
261                         return -1;                      
262                 }
263                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
264                         tdb_len_t len2 = tdb->transaction->block_size;
265                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
266                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
267                         }
268                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
269                                                                    tdb->transaction->blocks[blk], 
270                                                                    len2, 0) != 0) {
271                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
272                                 tdb->ecode = TDB_ERR_IO;
273                                 goto fail;
274                         }
275                         if (blk == tdb->transaction->num_blocks-1) {
276                                 tdb->transaction->last_block_size = len2;
277                         }                       
278                 }
279         }
280         
281         /* overwrite part of an existing block */
282         if (buf == NULL) {
283                 memset(tdb->transaction->blocks[blk] + off, 0, len);
284         } else {
285                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
286         }
287         if (blk == tdb->transaction->num_blocks-1) {
288                 if (len + off > tdb->transaction->last_block_size) {
289                         tdb->transaction->last_block_size = len + off;
290                 }
291         }
292
293         return 0;
294
295 fail:
296         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
297                  (blk*tdb->transaction->block_size) + off, len));
298         tdb->transaction->transaction_error = 1;
299         return -1;
300 }
301
302
303 /*
304   write while in a transaction - this varient never expands the transaction blocks, it only
305   updates existing blocks. This means it cannot change the recovery size
306 */
307 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
308                                       const void *buf, tdb_len_t len)
309 {
310         uint32_t blk;
311
312         /* break it up into block sized chunks */
313         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
314                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
315                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
316                         return -1;
317                 }
318                 len -= len2;
319                 off += len2;
320                 if (buf != NULL) {
321                         buf = (const void *)(len2 + (const char *)buf);
322                 }
323         }
324
325         if (len == 0) {
326                 return 0;
327         }
328
329         blk = off / tdb->transaction->block_size;
330         off = off % tdb->transaction->block_size;
331
332         if (tdb->transaction->num_blocks <= blk ||
333             tdb->transaction->blocks[blk] == NULL) {
334                 return 0;
335         }
336
337         if (blk == tdb->transaction->num_blocks-1 &&
338             off + len > tdb->transaction->last_block_size) {
339                 if (off >= tdb->transaction->last_block_size) {
340                         return 0;
341                 }
342                 len = tdb->transaction->last_block_size - off;
343         }
344
345         /* overwrite part of an existing block */
346         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
347
348         return 0;
349 }
350
351
352 /*
353   accelerated hash chain head search, using the cached hash heads
354 */
355 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
356 {
357         uint32_t h = *chain;
358         for (;h < tdb->header.hash_size;h++) {
359                 /* the +1 takes account of the freelist */
360                 if (0 != tdb->transaction->hash_heads[h+1]) {
361                         break;
362                 }
363         }
364         (*chain) = h;
365 }
366
367 /*
368   out of bounds check during a transaction
369 */
370 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
371 {
372         if (len <= tdb->map_size) {
373                 return 0;
374         }
375         tdb->ecode = TDB_ERR_IO;
376         return -1;
377 }
378
379 /*
380   transaction version of tdb_expand().
381 */
382 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
383                                    tdb_off_t addition)
384 {
385         /* add a write to the transaction elements, so subsequent
386            reads see the zero data */
387         if (transaction_write(tdb, size, NULL, addition) != 0) {
388                 return -1;
389         }
390
391         tdb->transaction->need_repack = true;
392
393         return 0;
394 }
395
396 /*
397   brlock during a transaction - ignore them
398 */
399 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
400                               int rw_type, int lck_type, int probe, size_t len)
401 {
402         return 0;
403 }
404
405 static const struct tdb_methods transaction_methods = {
406         transaction_read,
407         transaction_write,
408         transaction_next_hash_chain,
409         transaction_oob,
410         transaction_expand_file,
411         transaction_brlock
412 };
413
414
415 /*
416   start a tdb transaction. No token is returned, as only a single
417   transaction is allowed to be pending per tdb_context
418 */
419 int tdb_transaction_start(struct tdb_context *tdb)
420 {
421         /* some sanity checks */
422         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
423                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
424                 tdb->ecode = TDB_ERR_EINVAL;
425                 return -1;
426         }
427
428         /* cope with nested tdb_transaction_start() calls */
429         if (tdb->transaction != NULL) {
430                 tdb->transaction->nesting++;
431                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
432                          tdb->transaction->nesting));
433                 return 0;
434         }
435
436         if (tdb->num_locks != 0 || tdb->global_lock.count) {
437                 /* the caller must not have any locks when starting a
438                    transaction as otherwise we'll be screwed by lack
439                    of nested locks in posix */
440                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
441                 tdb->ecode = TDB_ERR_LOCK;
442                 return -1;
443         }
444
445         if (tdb->travlocks.next != NULL) {
446                 /* you cannot use transactions inside a traverse (although you can use
447                    traverse inside a transaction) as otherwise you can end up with
448                    deadlock */
449                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
450                 tdb->ecode = TDB_ERR_LOCK;
451                 return -1;
452         }
453
454         tdb->transaction = (struct tdb_transaction *)
455                 calloc(sizeof(struct tdb_transaction), 1);
456         if (tdb->transaction == NULL) {
457                 tdb->ecode = TDB_ERR_OOM;
458                 return -1;
459         }
460
461         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
462         tdb->transaction->block_size = tdb->page_size;
463
464         /* get the transaction write lock. This is a blocking lock. As
465            discussed with Volker, there are a number of ways we could
466            make this async, which we will probably do in the future */
467         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
468                 SAFE_FREE(tdb->transaction->blocks);
469                 SAFE_FREE(tdb->transaction);
470                 return -1;
471         }
472         
473         /* get a read lock from the freelist to the end of file. This
474            is upgraded to a write lock during the commit */
475         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
476                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
477                 tdb->ecode = TDB_ERR_LOCK;
478                 goto fail;
479         }
480
481         /* setup a copy of the hash table heads so the hash scan in
482            traverse can be fast */
483         tdb->transaction->hash_heads = (uint32_t *)
484                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
485         if (tdb->transaction->hash_heads == NULL) {
486                 tdb->ecode = TDB_ERR_OOM;
487                 goto fail;
488         }
489         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
490                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
491                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
492                 tdb->ecode = TDB_ERR_IO;
493                 goto fail;
494         }
495
496         /* make sure we know about any file expansions already done by
497            anyone else */
498         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
499         tdb->transaction->old_map_size = tdb->map_size;
500
501         /* finally hook the io methods, replacing them with
502            transaction specific methods */
503         tdb->transaction->io_methods = tdb->methods;
504         tdb->methods = &transaction_methods;
505
506         /* Trace at the end, so we get sequence number correct. */
507         tdb_trace(tdb, "tdb_transaction_start");
508         return 0;
509         
510 fail:
511         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
512         tdb_transaction_unlock(tdb);
513         SAFE_FREE(tdb->transaction->blocks);
514         SAFE_FREE(tdb->transaction->hash_heads);
515         SAFE_FREE(tdb->transaction);
516         return -1;
517 }
518
519
520 /*
521   sync to disk
522 */
523 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
524 {       
525         if (tdb->flags & TDB_NOSYNC) {
526                 return 0;
527         }
528
529         if (fsync(tdb->fd) != 0) {
530                 tdb->ecode = TDB_ERR_IO;
531                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
532                 return -1;
533         }
534 #ifdef HAVE_MMAP
535         if (tdb->map_ptr) {
536                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
537                 if (msync(moffset + (char *)tdb->map_ptr, 
538                           length + (offset - moffset), MS_SYNC) != 0) {
539                         tdb->ecode = TDB_ERR_IO;
540                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
541                                  strerror(errno)));
542                         return -1;
543                 }
544         }
545 #endif
546         return 0;
547 }
548
549
550 int _tdb_transaction_cancel(struct tdb_context *tdb)
551 {       
552         int i, ret = 0;
553
554         if (tdb->transaction == NULL) {
555                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
556                 return -1;
557         }
558
559         if (tdb->transaction->nesting != 0) {
560                 tdb->transaction->transaction_error = 1;
561                 tdb->transaction->nesting--;
562                 return 0;
563         }               
564
565         tdb->map_size = tdb->transaction->old_map_size;
566
567         /* free all the transaction blocks */
568         for (i=0;i<tdb->transaction->num_blocks;i++) {
569                 if (tdb->transaction->blocks[i] != NULL) {
570                         free(tdb->transaction->blocks[i]);
571                 }
572         }
573         SAFE_FREE(tdb->transaction->blocks);
574
575         if (tdb->transaction->magic_offset) {
576                 const struct tdb_methods *methods = tdb->transaction->io_methods;
577                 uint32_t zero = 0;
578
579                 /* remove the recovery marker */
580                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
581                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
582                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
583                         ret = -1;
584                 }
585         }
586
587         /* remove any global lock created during the transaction */
588         if (tdb->global_lock.count != 0) {
589                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
590                 tdb->global_lock.count = 0;
591         }
592
593         /* remove any locks created during the transaction */
594         if (tdb->num_locks != 0) {
595                 for (i=0;i<tdb->num_lockrecs;i++) {
596                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
597                                    F_UNLCK,F_SETLKW, 0, 1);
598                 }
599                 tdb->num_locks = 0;
600                 tdb->num_lockrecs = 0;
601                 SAFE_FREE(tdb->lockrecs);
602         }
603
604         /* restore the normal io methods */
605         tdb->methods = tdb->transaction->io_methods;
606
607         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
608         tdb_transaction_unlock(tdb);
609         SAFE_FREE(tdb->transaction->hash_heads);
610         SAFE_FREE(tdb->transaction);
611         
612         return ret;
613 }
614
615 /*
616   cancel the current transaction
617 */
618 int tdb_transaction_cancel(struct tdb_context *tdb)
619 {
620         tdb_trace(tdb, "tdb_transaction_cancel");
621         return _tdb_transaction_cancel(tdb);
622 }
623
624 /*
625   work out how much space the linearised recovery data will consume
626 */
627 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
628 {
629         tdb_len_t recovery_size = 0;
630         int i;
631
632         recovery_size = sizeof(uint32_t);
633         for (i=0;i<tdb->transaction->num_blocks;i++) {
634                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
635                         break;
636                 }
637                 if (tdb->transaction->blocks[i] == NULL) {
638                         continue;
639                 }
640                 recovery_size += 2*sizeof(tdb_off_t);
641                 if (i == tdb->transaction->num_blocks-1) {
642                         recovery_size += tdb->transaction->last_block_size;
643                 } else {
644                         recovery_size += tdb->transaction->block_size;
645                 }
646         }       
647
648         return recovery_size;
649 }
650
651 /*
652   allocate the recovery area, or use an existing recovery area if it is
653   large enough
654 */
655 static int tdb_recovery_allocate(struct tdb_context *tdb, 
656                                  tdb_len_t *recovery_size,
657                                  tdb_off_t *recovery_offset,
658                                  tdb_len_t *recovery_max_size)
659 {
660         struct tdb_record rec;
661         const struct tdb_methods *methods = tdb->transaction->io_methods;
662         tdb_off_t recovery_head;
663
664         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
665                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
666                 return -1;
667         }
668
669         rec.rec_len = 0;
670
671         if (recovery_head != 0 && 
672             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
673                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
674                 return -1;
675         }
676
677         *recovery_size = tdb_recovery_size(tdb);
678
679         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
680                 /* it fits in the existing area */
681                 *recovery_max_size = rec.rec_len;
682                 *recovery_offset = recovery_head;
683                 return 0;
684         }
685
686         /* we need to free up the old recovery area, then allocate a
687            new one at the end of the file. Note that we cannot use
688            tdb_allocate() to allocate the new one as that might return
689            us an area that is being currently used (as of the start of
690            the transaction) */
691         if (recovery_head != 0) {
692                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
693                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
694                         return -1;
695                 }
696         }
697
698         /* the tdb_free() call might have increased the recovery size */
699         *recovery_size = tdb_recovery_size(tdb);
700
701         /* round up to a multiple of page size */
702         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
703         *recovery_offset = tdb->map_size;
704         recovery_head = *recovery_offset;
705
706         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
707                                      (tdb->map_size - tdb->transaction->old_map_size) +
708                                      sizeof(rec) + *recovery_max_size) == -1) {
709                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
710                 return -1;
711         }
712
713         /* remap the file (if using mmap) */
714         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
715
716         /* we have to reset the old map size so that we don't try to expand the file
717            again in the transaction commit, which would destroy the recovery area */
718         tdb->transaction->old_map_size = tdb->map_size;
719
720         /* write the recovery header offset and sync - we can sync without a race here
721            as the magic ptr in the recovery record has not been set */
722         CONVERT(recovery_head);
723         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
724                                &recovery_head, sizeof(tdb_off_t)) == -1) {
725                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
726                 return -1;
727         }
728         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
729                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
730                 return -1;
731         }
732
733         return 0;
734 }
735
736
737 /*
738   setup the recovery data that will be used on a crash during commit
739 */
740 static int transaction_setup_recovery(struct tdb_context *tdb, 
741                                       tdb_off_t *magic_offset)
742 {
743         tdb_len_t recovery_size;
744         unsigned char *data, *p;
745         const struct tdb_methods *methods = tdb->transaction->io_methods;
746         struct tdb_record *rec;
747         tdb_off_t recovery_offset, recovery_max_size;
748         tdb_off_t old_map_size = tdb->transaction->old_map_size;
749         uint32_t magic, tailer;
750         int i;
751
752         /*
753           check that the recovery area has enough space
754         */
755         if (tdb_recovery_allocate(tdb, &recovery_size, 
756                                   &recovery_offset, &recovery_max_size) == -1) {
757                 return -1;
758         }
759
760         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
761         if (data == NULL) {
762                 tdb->ecode = TDB_ERR_OOM;
763                 return -1;
764         }
765
766         rec = (struct tdb_record *)data;
767         memset(rec, 0, sizeof(*rec));
768
769         rec->magic    = 0;
770         rec->data_len = recovery_size;
771         rec->rec_len  = recovery_max_size;
772         rec->key_len  = old_map_size;
773         CONVERT(rec);
774
775         /* build the recovery data into a single blob to allow us to do a single
776            large write, which should be more efficient */
777         p = data + sizeof(*rec);
778         for (i=0;i<tdb->transaction->num_blocks;i++) {
779                 tdb_off_t offset;
780                 tdb_len_t length;
781
782                 if (tdb->transaction->blocks[i] == NULL) {
783                         continue;
784                 }
785
786                 offset = i * tdb->transaction->block_size;
787                 length = tdb->transaction->block_size;
788                 if (i == tdb->transaction->num_blocks-1) {
789                         length = tdb->transaction->last_block_size;
790                 }
791                 
792                 if (offset >= old_map_size) {
793                         continue;
794                 }
795                 if (offset + length > tdb->transaction->old_map_size) {
796                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
797                         free(data);
798                         tdb->ecode = TDB_ERR_CORRUPT;
799                         return -1;
800                 }
801                 memcpy(p, &offset, 4);
802                 memcpy(p+4, &length, 4);
803                 if (DOCONV()) {
804                         tdb_convert(p, 8);
805                 }
806                 /* the recovery area contains the old data, not the
807                    new data, so we have to call the original tdb_read
808                    method to get it */
809                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
810                         free(data);
811                         tdb->ecode = TDB_ERR_IO;
812                         return -1;
813                 }
814                 p += 8 + length;
815         }
816
817         /* and the tailer */
818         tailer = sizeof(*rec) + recovery_max_size;
819         memcpy(p, &tailer, 4);
820         CONVERT(p);
821
822         /* write the recovery data to the recovery area */
823         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
824                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
825                 free(data);
826                 tdb->ecode = TDB_ERR_IO;
827                 return -1;
828         }
829         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
830                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
831                 free(data);
832                 tdb->ecode = TDB_ERR_IO;
833                 return -1;
834         }
835
836         /* as we don't have ordered writes, we have to sync the recovery
837            data before we update the magic to indicate that the recovery
838            data is present */
839         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
840                 free(data);
841                 return -1;
842         }
843
844         free(data);
845
846         magic = TDB_RECOVERY_MAGIC;
847         CONVERT(magic);
848
849         *magic_offset = recovery_offset + offsetof(struct tdb_record, magic);
850
851         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
852                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
853                 tdb->ecode = TDB_ERR_IO;
854                 return -1;
855         }
856         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
857                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
858                 tdb->ecode = TDB_ERR_IO;
859                 return -1;
860         }
861
862         /* ensure the recovery magic marker is on disk */
863         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
864                 return -1;
865         }
866
867         return 0;
868 }
869
870 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
871 {       
872         const struct tdb_methods *methods;
873
874         if (tdb->transaction == NULL) {
875                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
876                 return -1;
877         }
878
879         if (tdb->transaction->prepared) {
880                 tdb->ecode = TDB_ERR_EINVAL;
881                 _tdb_transaction_cancel(tdb);
882                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
883                 return -1;
884         }
885
886         if (tdb->transaction->transaction_error) {
887                 tdb->ecode = TDB_ERR_IO;
888                 _tdb_transaction_cancel(tdb);
889                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
890                 return -1;
891         }
892
893
894         if (tdb->transaction->nesting != 0) {
895                 return 0;
896         }               
897
898         /* check for a null transaction */
899         if (tdb->transaction->blocks == NULL) {
900                 return 0;
901         }
902
903         methods = tdb->transaction->io_methods;
904         
905         /* if there are any locks pending then the caller has not
906            nested their locks properly, so fail the transaction */
907         if (tdb->num_locks || tdb->global_lock.count) {
908                 tdb->ecode = TDB_ERR_LOCK;
909                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
910                 _tdb_transaction_cancel(tdb);
911                 return -1;
912         }
913
914         /* upgrade the main transaction lock region to a write lock */
915         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
916                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
917                 tdb->ecode = TDB_ERR_LOCK;
918                 _tdb_transaction_cancel(tdb);
919                 return -1;
920         }
921
922         /* get the global lock - this prevents new users attaching to the database
923            during the commit */
924         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
925                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
926                 tdb->ecode = TDB_ERR_LOCK;
927                 _tdb_transaction_cancel(tdb);
928                 return -1;
929         }
930
931         if (!(tdb->flags & TDB_NOSYNC)) {
932                 /* write the recovery data to the end of the file */
933                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
934                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
935                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
936                         _tdb_transaction_cancel(tdb);
937                         return -1;
938                 }
939         }
940
941         tdb->transaction->prepared = true;
942
943         /* expand the file to the new size if needed */
944         if (tdb->map_size != tdb->transaction->old_map_size) {
945                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
946                                              tdb->map_size - 
947                                              tdb->transaction->old_map_size) == -1) {
948                         tdb->ecode = TDB_ERR_IO;
949                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
950                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
951                         _tdb_transaction_cancel(tdb);
952                         return -1;
953                 }
954                 tdb->map_size = tdb->transaction->old_map_size;
955                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
956         }
957
958         /* Keep the global lock until the actual commit */
959
960         return 0;
961 }
962
963 /*
964    prepare to commit the current transaction
965 */
966 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
967 {       
968         tdb_trace(tdb, "tdb_transaction_prepare_commit");
969         return _tdb_transaction_prepare_commit(tdb);
970 }
971
972 /*
973   commit the current transaction
974 */
975 int tdb_transaction_commit(struct tdb_context *tdb)
976 {       
977         const struct tdb_methods *methods;
978         int i;
979         bool need_repack;
980
981         if (tdb->transaction == NULL) {
982                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
983                 return -1;
984         }
985
986         tdb_trace(tdb, "tdb_transaction_commit");
987
988         if (tdb->transaction->transaction_error) {
989                 tdb->ecode = TDB_ERR_IO;
990                 _tdb_transaction_cancel(tdb);
991                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
992                 return -1;
993         }
994
995
996         if (tdb->transaction->nesting != 0) {
997                 tdb->transaction->nesting--;
998                 return 0;
999         }
1000
1001         /* check for a null transaction */
1002         if (tdb->transaction->blocks == NULL) {
1003                 _tdb_transaction_cancel(tdb);
1004                 return 0;
1005         }
1006
1007         if (!tdb->transaction->prepared) {
1008                 int ret = _tdb_transaction_prepare_commit(tdb);
1009                 if (ret)
1010                         return ret;
1011         }
1012
1013         methods = tdb->transaction->io_methods;
1014
1015         /* perform all the writes */
1016         for (i=0;i<tdb->transaction->num_blocks;i++) {
1017                 tdb_off_t offset;
1018                 tdb_len_t length;
1019
1020                 if (tdb->transaction->blocks[i] == NULL) {
1021                         continue;
1022                 }
1023
1024                 offset = i * tdb->transaction->block_size;
1025                 length = tdb->transaction->block_size;
1026                 if (i == tdb->transaction->num_blocks-1) {
1027                         length = tdb->transaction->last_block_size;
1028                 }
1029
1030                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1031                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1032                         
1033                         /* we've overwritten part of the data and
1034                            possibly expanded the file, so we need to
1035                            run the crash recovery code */
1036                         tdb->methods = methods;
1037                         tdb_transaction_recover(tdb); 
1038
1039                         _tdb_transaction_cancel(tdb);
1040                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1041
1042                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1043                         return -1;
1044                 }
1045                 SAFE_FREE(tdb->transaction->blocks[i]);
1046         } 
1047
1048         SAFE_FREE(tdb->transaction->blocks);
1049         tdb->transaction->num_blocks = 0;
1050
1051         /* ensure the new data is on disk */
1052         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1053                 return -1;
1054         }
1055
1056         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1057
1058         /*
1059           TODO: maybe write to some dummy hdr field, or write to magic
1060           offset without mmap, before the last sync, instead of the
1061           utime() call
1062         */
1063
1064         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1065            don't change the mtime of the file, this means the file may
1066            not be backed up (as tdb rounding to block sizes means that
1067            file size changes are quite rare too). The following forces
1068            mtime changes when a transaction completes */
1069 #ifdef HAVE_UTIME
1070         utime(tdb->name, NULL);
1071 #endif
1072
1073         need_repack = tdb->transaction->need_repack;
1074
1075         /* use a transaction cancel to free memory and remove the
1076            transaction locks */
1077         _tdb_transaction_cancel(tdb);
1078
1079         if (need_repack) {
1080                 return tdb_repack(tdb);
1081         }
1082
1083         return 0;
1084 }
1085
1086
1087 /*
1088   recover from an aborted transaction. Must be called with exclusive
1089   database write access already established (including the global
1090   lock to prevent new processes attaching)
1091 */
1092 int tdb_transaction_recover(struct tdb_context *tdb)
1093 {
1094         tdb_off_t recovery_head, recovery_eof;
1095         unsigned char *data, *p;
1096         uint32_t zero = 0;
1097         struct tdb_record rec;
1098
1099         /* find the recovery area */
1100         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1101                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1102                 tdb->ecode = TDB_ERR_IO;
1103                 return -1;
1104         }
1105
1106         if (recovery_head == 0) {
1107                 /* we have never allocated a recovery record */
1108                 return 0;
1109         }
1110
1111         /* read the recovery record */
1112         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1113                                    sizeof(rec), DOCONV()) == -1) {
1114                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1115                 tdb->ecode = TDB_ERR_IO;
1116                 return -1;
1117         }
1118
1119         if (rec.magic != TDB_RECOVERY_MAGIC) {
1120                 /* there is no valid recovery data */
1121                 return 0;
1122         }
1123
1124         if (tdb->read_only) {
1125                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1126                 tdb->ecode = TDB_ERR_CORRUPT;
1127                 return -1;
1128         }
1129
1130         recovery_eof = rec.key_len;
1131
1132         data = (unsigned char *)malloc(rec.data_len);
1133         if (data == NULL) {
1134                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1135                 tdb->ecode = TDB_ERR_OOM;
1136                 return -1;
1137         }
1138
1139         /* read the full recovery data */
1140         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1141                                    rec.data_len, 0) == -1) {
1142                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1143                 tdb->ecode = TDB_ERR_IO;
1144                 return -1;
1145         }
1146
1147         /* recover the file data */
1148         p = data;
1149         while (p+8 < data + rec.data_len) {
1150                 uint32_t ofs, len;
1151                 if (DOCONV()) {
1152                         tdb_convert(p, 8);
1153                 }
1154                 memcpy(&ofs, p, 4);
1155                 memcpy(&len, p+4, 4);
1156
1157                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1158                         free(data);
1159                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1160                         tdb->ecode = TDB_ERR_IO;
1161                         return -1;
1162                 }
1163                 p += 8 + len;
1164         }
1165
1166         free(data);
1167
1168         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1169                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1170                 tdb->ecode = TDB_ERR_IO;
1171                 return -1;
1172         }
1173
1174         /* if the recovery area is after the recovered eof then remove it */
1175         if (recovery_eof <= recovery_head) {
1176                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1177                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1178                         tdb->ecode = TDB_ERR_IO;
1179                         return -1;                      
1180                 }
1181         }
1182
1183         /* remove the recovery magic */
1184         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct tdb_record, magic),
1185                           &zero) == -1) {
1186                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1187                 tdb->ecode = TDB_ERR_IO;
1188                 return -1;                      
1189         }
1190         
1191         /* reduce the file size to the old size */
1192         tdb_munmap(tdb);
1193         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1194                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1195                 tdb->ecode = TDB_ERR_IO;
1196                 return -1;                      
1197         }
1198         tdb->map_size = recovery_eof;
1199         tdb_mmap(tdb);
1200
1201         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1202                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1203                 tdb->ecode = TDB_ERR_IO;
1204                 return -1;
1205         }
1206
1207         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1208                  recovery_eof));
1209
1210         /* all done */
1211         return 0;
1212 }