Move common libraries from root to lib/.
[kai/samba-autobuild/.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
129                             tdb_len_t len, int cv)
130 {
131         uint32_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / tdb->transaction->block_size;
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166         
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
169         if (cv) {
170                 tdb_convert(buf, len);
171         }
172         return 0;
173
174 fail:
175         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176         tdb->ecode = TDB_ERR_IO;
177         tdb->transaction->transaction_error = 1;
178         return -1;
179 }
180
181
182 /*
183   write while in a transaction
184 */
185 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
186                              const void *buf, tdb_len_t len)
187 {
188         uint32_t blk;
189
190         /* if the write is to a hash head, then update the transaction
191            hash heads */
192         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
196         }
197
198         /* break it up into block sized chunks */
199         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201                 if (transaction_write(tdb, off, buf, len2) != 0) {
202                         return -1;
203                 }
204                 len -= len2;
205                 off += len2;
206                 if (buf != NULL) {
207                         buf = (const void *)(len2 + (const char *)buf);
208                 }
209         }
210
211         if (len == 0) {
212                 return 0;
213         }
214
215         blk = off / tdb->transaction->block_size;
216         off = off % tdb->transaction->block_size;
217
218         if (tdb->transaction->num_blocks <= blk) {
219                 uint8_t **new_blocks;
220                 /* expand the blocks array */
221                 if (tdb->transaction->blocks == NULL) {
222                         new_blocks = (uint8_t **)malloc(
223                                 (blk+1)*sizeof(uint8_t *));
224                 } else {
225                         new_blocks = (uint8_t **)realloc(
226                                 tdb->transaction->blocks,
227                                 (blk+1)*sizeof(uint8_t *));
228                 }
229                 if (new_blocks == NULL) {
230                         tdb->ecode = TDB_ERR_OOM;
231                         goto fail;
232                 }
233                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
234                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
235                 tdb->transaction->blocks = new_blocks;
236                 tdb->transaction->num_blocks = blk+1;
237                 tdb->transaction->last_block_size = 0;
238         }
239
240         /* allocate and fill a block? */
241         if (tdb->transaction->blocks[blk] == NULL) {
242                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
243                 if (tdb->transaction->blocks[blk] == NULL) {
244                         tdb->ecode = TDB_ERR_OOM;
245                         tdb->transaction->transaction_error = 1;
246                         return -1;                      
247                 }
248                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
249                         tdb_len_t len2 = tdb->transaction->block_size;
250                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
251                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
252                         }
253                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
254                                                                    tdb->transaction->blocks[blk], 
255                                                                    len2, 0) != 0) {
256                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
257                                 tdb->ecode = TDB_ERR_IO;
258                                 goto fail;
259                         }
260                         if (blk == tdb->transaction->num_blocks-1) {
261                                 tdb->transaction->last_block_size = len2;
262                         }                       
263                 }
264         }
265         
266         /* overwrite part of an existing block */
267         if (buf == NULL) {
268                 memset(tdb->transaction->blocks[blk] + off, 0, len);
269         } else {
270                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
271         }
272         if (blk == tdb->transaction->num_blocks-1) {
273                 if (len + off > tdb->transaction->last_block_size) {
274                         tdb->transaction->last_block_size = len + off;
275                 }
276         }
277
278         return 0;
279
280 fail:
281         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
282                  (blk*tdb->transaction->block_size) + off, len));
283         tdb->transaction->transaction_error = 1;
284         return -1;
285 }
286
287
288 /*
289   write while in a transaction - this varient never expands the transaction blocks, it only
290   updates existing blocks. This means it cannot change the recovery size
291 */
292 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
293                                       const void *buf, tdb_len_t len)
294 {
295         uint32_t blk;
296
297         /* break it up into block sized chunks */
298         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
299                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
300                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
301                         return -1;
302                 }
303                 len -= len2;
304                 off += len2;
305                 if (buf != NULL) {
306                         buf = (const void *)(len2 + (const char *)buf);
307                 }
308         }
309
310         if (len == 0) {
311                 return 0;
312         }
313
314         blk = off / tdb->transaction->block_size;
315         off = off % tdb->transaction->block_size;
316
317         if (tdb->transaction->num_blocks <= blk ||
318             tdb->transaction->blocks[blk] == NULL) {
319                 return 0;
320         }
321
322         if (blk == tdb->transaction->num_blocks-1 &&
323             off + len > tdb->transaction->last_block_size) {
324                 if (off >= tdb->transaction->last_block_size) {
325                         return 0;
326                 }
327                 len = tdb->transaction->last_block_size - off;
328         }
329
330         /* overwrite part of an existing block */
331         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
332
333         return 0;
334 }
335
336
337 /*
338   accelerated hash chain head search, using the cached hash heads
339 */
340 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
341 {
342         uint32_t h = *chain;
343         for (;h < tdb->header.hash_size;h++) {
344                 /* the +1 takes account of the freelist */
345                 if (0 != tdb->transaction->hash_heads[h+1]) {
346                         break;
347                 }
348         }
349         (*chain) = h;
350 }
351
352 /*
353   out of bounds check during a transaction
354 */
355 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
356 {
357         if (len <= tdb->map_size) {
358                 return 0;
359         }
360         return TDB_ERRCODE(TDB_ERR_IO, -1);
361 }
362
363 /*
364   transaction version of tdb_expand().
365 */
366 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
367                                    tdb_off_t addition)
368 {
369         /* add a write to the transaction elements, so subsequent
370            reads see the zero data */
371         if (transaction_write(tdb, size, NULL, addition) != 0) {
372                 return -1;
373         }
374
375         return 0;
376 }
377
378 /*
379   brlock during a transaction - ignore them
380 */
381 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
382                               int rw_type, int lck_type, int probe, size_t len)
383 {
384         return 0;
385 }
386
387 static const struct tdb_methods transaction_methods = {
388         transaction_read,
389         transaction_write,
390         transaction_next_hash_chain,
391         transaction_oob,
392         transaction_expand_file,
393         transaction_brlock
394 };
395
396
397 /*
398   start a tdb transaction. No token is returned, as only a single
399   transaction is allowed to be pending per tdb_context
400 */
401 int tdb_transaction_start(struct tdb_context *tdb)
402 {
403         /* some sanity checks */
404         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
405                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
406                 tdb->ecode = TDB_ERR_EINVAL;
407                 return -1;
408         }
409
410         /* cope with nested tdb_transaction_start() calls */
411         if (tdb->transaction != NULL) {
412                 tdb->transaction->nesting++;
413                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
414                          tdb->transaction->nesting));
415                 return 0;
416         }
417
418         if (tdb->num_locks != 0 || tdb->global_lock.count) {
419                 /* the caller must not have any locks when starting a
420                    transaction as otherwise we'll be screwed by lack
421                    of nested locks in posix */
422                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
423                 tdb->ecode = TDB_ERR_LOCK;
424                 return -1;
425         }
426
427         if (tdb->travlocks.next != NULL) {
428                 /* you cannot use transactions inside a traverse (although you can use
429                    traverse inside a transaction) as otherwise you can end up with
430                    deadlock */
431                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
432                 tdb->ecode = TDB_ERR_LOCK;
433                 return -1;
434         }
435
436         tdb->transaction = (struct tdb_transaction *)
437                 calloc(sizeof(struct tdb_transaction), 1);
438         if (tdb->transaction == NULL) {
439                 tdb->ecode = TDB_ERR_OOM;
440                 return -1;
441         }
442
443         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
444         tdb->transaction->block_size = tdb->page_size;
445
446         /* get the transaction write lock. This is a blocking lock. As
447            discussed with Volker, there are a number of ways we could
448            make this async, which we will probably do in the future */
449         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
450                 SAFE_FREE(tdb->transaction->blocks);
451                 SAFE_FREE(tdb->transaction);
452                 return -1;
453         }
454         
455         /* get a read lock from the freelist to the end of file. This
456            is upgraded to a write lock during the commit */
457         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
458                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
459                 tdb->ecode = TDB_ERR_LOCK;
460                 goto fail;
461         }
462
463         /* setup a copy of the hash table heads so the hash scan in
464            traverse can be fast */
465         tdb->transaction->hash_heads = (uint32_t *)
466                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
467         if (tdb->transaction->hash_heads == NULL) {
468                 tdb->ecode = TDB_ERR_OOM;
469                 goto fail;
470         }
471         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
472                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
473                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
474                 tdb->ecode = TDB_ERR_IO;
475                 goto fail;
476         }
477
478         /* make sure we know about any file expansions already done by
479            anyone else */
480         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
481         tdb->transaction->old_map_size = tdb->map_size;
482
483         /* finally hook the io methods, replacing them with
484            transaction specific methods */
485         tdb->transaction->io_methods = tdb->methods;
486         tdb->methods = &transaction_methods;
487
488         return 0;
489         
490 fail:
491         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
492         tdb_transaction_unlock(tdb);
493         SAFE_FREE(tdb->transaction->blocks);
494         SAFE_FREE(tdb->transaction->hash_heads);
495         SAFE_FREE(tdb->transaction);
496         return -1;
497 }
498
499
500 /*
501   cancel the current transaction
502 */
503 int tdb_transaction_cancel(struct tdb_context *tdb)
504 {       
505         int i;
506
507         if (tdb->transaction == NULL) {
508                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
509                 return -1;
510         }
511
512         if (tdb->transaction->nesting != 0) {
513                 tdb->transaction->transaction_error = 1;
514                 tdb->transaction->nesting--;
515                 return 0;
516         }               
517
518         tdb->map_size = tdb->transaction->old_map_size;
519
520         /* free all the transaction blocks */
521         for (i=0;i<tdb->transaction->num_blocks;i++) {
522                 if (tdb->transaction->blocks[i] != NULL) {
523                         free(tdb->transaction->blocks[i]);
524                 }
525         }
526         SAFE_FREE(tdb->transaction->blocks);
527
528         /* remove any global lock created during the transaction */
529         if (tdb->global_lock.count != 0) {
530                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
531                 tdb->global_lock.count = 0;
532         }
533
534         /* remove any locks created during the transaction */
535         if (tdb->num_locks != 0) {
536                 for (i=0;i<tdb->num_lockrecs;i++) {
537                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
538                                    F_UNLCK,F_SETLKW, 0, 1);
539                 }
540                 tdb->num_locks = 0;
541                 tdb->num_lockrecs = 0;
542                 SAFE_FREE(tdb->lockrecs);
543         }
544
545         /* restore the normal io methods */
546         tdb->methods = tdb->transaction->io_methods;
547
548         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
549         tdb_transaction_unlock(tdb);
550         SAFE_FREE(tdb->transaction->hash_heads);
551         SAFE_FREE(tdb->transaction);
552         
553         return 0;
554 }
555
556 /*
557   sync to disk
558 */
559 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
560 {       
561         if (fsync(tdb->fd) != 0) {
562                 tdb->ecode = TDB_ERR_IO;
563                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
564                 return -1;
565         }
566 #ifdef HAVE_MMAP
567         if (tdb->map_ptr) {
568                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
569                 if (msync(moffset + (char *)tdb->map_ptr, 
570                           length + (offset - moffset), MS_SYNC) != 0) {
571                         tdb->ecode = TDB_ERR_IO;
572                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
573                                  strerror(errno)));
574                         return -1;
575                 }
576         }
577 #endif
578         return 0;
579 }
580
581
582 /*
583   work out how much space the linearised recovery data will consume
584 */
585 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
586 {
587         tdb_len_t recovery_size = 0;
588         int i;
589
590         recovery_size = sizeof(uint32_t);
591         for (i=0;i<tdb->transaction->num_blocks;i++) {
592                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
593                         break;
594                 }
595                 if (tdb->transaction->blocks[i] == NULL) {
596                         continue;
597                 }
598                 recovery_size += 2*sizeof(tdb_off_t);
599                 if (i == tdb->transaction->num_blocks-1) {
600                         recovery_size += tdb->transaction->last_block_size;
601                 } else {
602                         recovery_size += tdb->transaction->block_size;
603                 }
604         }       
605
606         return recovery_size;
607 }
608
609 /*
610   allocate the recovery area, or use an existing recovery area if it is
611   large enough
612 */
613 static int tdb_recovery_allocate(struct tdb_context *tdb, 
614                                  tdb_len_t *recovery_size,
615                                  tdb_off_t *recovery_offset,
616                                  tdb_len_t *recovery_max_size)
617 {
618         struct list_struct rec;
619         const struct tdb_methods *methods = tdb->transaction->io_methods;
620         tdb_off_t recovery_head;
621
622         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
623                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
624                 return -1;
625         }
626
627         rec.rec_len = 0;
628
629         if (recovery_head != 0 && 
630             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
631                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
632                 return -1;
633         }
634
635         *recovery_size = tdb_recovery_size(tdb);
636
637         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
638                 /* it fits in the existing area */
639                 *recovery_max_size = rec.rec_len;
640                 *recovery_offset = recovery_head;
641                 return 0;
642         }
643
644         /* we need to free up the old recovery area, then allocate a
645            new one at the end of the file. Note that we cannot use
646            tdb_allocate() to allocate the new one as that might return
647            us an area that is being currently used (as of the start of
648            the transaction) */
649         if (recovery_head != 0) {
650                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
651                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
652                         return -1;
653                 }
654         }
655
656         /* the tdb_free() call might have increased the recovery size */
657         *recovery_size = tdb_recovery_size(tdb);
658
659         /* round up to a multiple of page size */
660         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
661         *recovery_offset = tdb->map_size;
662         recovery_head = *recovery_offset;
663
664         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
665                                      (tdb->map_size - tdb->transaction->old_map_size) +
666                                      sizeof(rec) + *recovery_max_size) == -1) {
667                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
668                 return -1;
669         }
670
671         /* remap the file (if using mmap) */
672         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
673
674         /* we have to reset the old map size so that we don't try to expand the file
675            again in the transaction commit, which would destroy the recovery area */
676         tdb->transaction->old_map_size = tdb->map_size;
677
678         /* write the recovery header offset and sync - we can sync without a race here
679            as the magic ptr in the recovery record has not been set */
680         CONVERT(recovery_head);
681         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
682                                &recovery_head, sizeof(tdb_off_t)) == -1) {
683                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
684                 return -1;
685         }
686         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
687                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
688                 return -1;
689         }
690
691         return 0;
692 }
693
694
695 /*
696   setup the recovery data that will be used on a crash during commit
697 */
698 static int transaction_setup_recovery(struct tdb_context *tdb, 
699                                       tdb_off_t *magic_offset)
700 {
701         tdb_len_t recovery_size;
702         unsigned char *data, *p;
703         const struct tdb_methods *methods = tdb->transaction->io_methods;
704         struct list_struct *rec;
705         tdb_off_t recovery_offset, recovery_max_size;
706         tdb_off_t old_map_size = tdb->transaction->old_map_size;
707         uint32_t magic, tailer;
708         int i;
709
710         /*
711           check that the recovery area has enough space
712         */
713         if (tdb_recovery_allocate(tdb, &recovery_size, 
714                                   &recovery_offset, &recovery_max_size) == -1) {
715                 return -1;
716         }
717
718         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
719         if (data == NULL) {
720                 tdb->ecode = TDB_ERR_OOM;
721                 return -1;
722         }
723
724         rec = (struct list_struct *)data;
725         memset(rec, 0, sizeof(*rec));
726
727         rec->magic    = 0;
728         rec->data_len = recovery_size;
729         rec->rec_len  = recovery_max_size;
730         rec->key_len  = old_map_size;
731         CONVERT(rec);
732
733         /* build the recovery data into a single blob to allow us to do a single
734            large write, which should be more efficient */
735         p = data + sizeof(*rec);
736         for (i=0;i<tdb->transaction->num_blocks;i++) {
737                 tdb_off_t offset;
738                 tdb_len_t length;
739
740                 if (tdb->transaction->blocks[i] == NULL) {
741                         continue;
742                 }
743
744                 offset = i * tdb->transaction->block_size;
745                 length = tdb->transaction->block_size;
746                 if (i == tdb->transaction->num_blocks-1) {
747                         length = tdb->transaction->last_block_size;
748                 }
749                 
750                 if (offset >= old_map_size) {
751                         continue;
752                 }
753                 if (offset + length > tdb->transaction->old_map_size) {
754                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
755                         free(data);
756                         tdb->ecode = TDB_ERR_CORRUPT;
757                         return -1;
758                 }
759                 memcpy(p, &offset, 4);
760                 memcpy(p+4, &length, 4);
761                 if (DOCONV()) {
762                         tdb_convert(p, 8);
763                 }
764                 /* the recovery area contains the old data, not the
765                    new data, so we have to call the original tdb_read
766                    method to get it */
767                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
768                         free(data);
769                         tdb->ecode = TDB_ERR_IO;
770                         return -1;
771                 }
772                 p += 8 + length;
773         }
774
775         /* and the tailer */
776         tailer = sizeof(*rec) + recovery_max_size;
777         memcpy(p, &tailer, 4);
778         CONVERT(p);
779
780         /* write the recovery data to the recovery area */
781         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
782                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
783                 free(data);
784                 tdb->ecode = TDB_ERR_IO;
785                 return -1;
786         }
787         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
788                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
789                 free(data);
790                 tdb->ecode = TDB_ERR_IO;
791                 return -1;
792         }
793
794         /* as we don't have ordered writes, we have to sync the recovery
795            data before we update the magic to indicate that the recovery
796            data is present */
797         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
798                 free(data);
799                 return -1;
800         }
801
802         free(data);
803
804         magic = TDB_RECOVERY_MAGIC;
805         CONVERT(magic);
806
807         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
808
809         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
810                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
811                 tdb->ecode = TDB_ERR_IO;
812                 return -1;
813         }
814         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
815                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
816                 tdb->ecode = TDB_ERR_IO;
817                 return -1;
818         }
819
820         /* ensure the recovery magic marker is on disk */
821         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
822                 return -1;
823         }
824
825         return 0;
826 }
827
828 /*
829   commit the current transaction
830 */
831 int tdb_transaction_commit(struct tdb_context *tdb)
832 {       
833         const struct tdb_methods *methods;
834         tdb_off_t magic_offset = 0;
835         uint32_t zero = 0;
836         int i;
837
838         if (tdb->transaction == NULL) {
839                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
840                 return -1;
841         }
842
843         if (tdb->transaction->transaction_error) {
844                 tdb->ecode = TDB_ERR_IO;
845                 tdb_transaction_cancel(tdb);
846                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
847                 return -1;
848         }
849
850
851         if (tdb->transaction->nesting != 0) {
852                 tdb->transaction->nesting--;
853                 return 0;
854         }               
855
856         /* check for a null transaction */
857         if (tdb->transaction->blocks == NULL) {
858                 tdb_transaction_cancel(tdb);
859                 return 0;
860         }
861
862         methods = tdb->transaction->io_methods;
863         
864         /* if there are any locks pending then the caller has not
865            nested their locks properly, so fail the transaction */
866         if (tdb->num_locks || tdb->global_lock.count) {
867                 tdb->ecode = TDB_ERR_LOCK;
868                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
869                 tdb_transaction_cancel(tdb);
870                 return -1;
871         }
872
873         /* upgrade the main transaction lock region to a write lock */
874         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
875                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
876                 tdb->ecode = TDB_ERR_LOCK;
877                 tdb_transaction_cancel(tdb);
878                 return -1;
879         }
880
881         /* get the global lock - this prevents new users attaching to the database
882            during the commit */
883         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
884                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
885                 tdb->ecode = TDB_ERR_LOCK;
886                 tdb_transaction_cancel(tdb);
887                 return -1;
888         }
889
890         if (!(tdb->flags & TDB_NOSYNC)) {
891                 /* write the recovery data to the end of the file */
892                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
893                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
894                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
895                         tdb_transaction_cancel(tdb);
896                         return -1;
897                 }
898         }
899
900         /* expand the file to the new size if needed */
901         if (tdb->map_size != tdb->transaction->old_map_size) {
902                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
903                                              tdb->map_size - 
904                                              tdb->transaction->old_map_size) == -1) {
905                         tdb->ecode = TDB_ERR_IO;
906                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
907                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
908                         tdb_transaction_cancel(tdb);
909                         return -1;
910                 }
911                 tdb->map_size = tdb->transaction->old_map_size;
912                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
913         }
914
915         /* perform all the writes */
916         for (i=0;i<tdb->transaction->num_blocks;i++) {
917                 tdb_off_t offset;
918                 tdb_len_t length;
919
920                 if (tdb->transaction->blocks[i] == NULL) {
921                         continue;
922                 }
923
924                 offset = i * tdb->transaction->block_size;
925                 length = tdb->transaction->block_size;
926                 if (i == tdb->transaction->num_blocks-1) {
927                         length = tdb->transaction->last_block_size;
928                 }
929
930                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
931                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
932                         
933                         /* we've overwritten part of the data and
934                            possibly expanded the file, so we need to
935                            run the crash recovery code */
936                         tdb->methods = methods;
937                         tdb_transaction_recover(tdb); 
938
939                         tdb_transaction_cancel(tdb);
940                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
941
942                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
943                         return -1;
944                 }
945                 SAFE_FREE(tdb->transaction->blocks[i]);
946         } 
947
948         SAFE_FREE(tdb->transaction->blocks);
949         tdb->transaction->num_blocks = 0;
950
951         if (!(tdb->flags & TDB_NOSYNC)) {
952                 /* ensure the new data is on disk */
953                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
954                         return -1;
955                 }
956
957                 /* remove the recovery marker */
958                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
959                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
960                         return -1;
961                 }
962
963                 /* ensure the recovery marker has been removed on disk */
964                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
965                         return -1;
966                 }
967         }
968
969         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
970
971         /*
972           TODO: maybe write to some dummy hdr field, or write to magic
973           offset without mmap, before the last sync, instead of the
974           utime() call
975         */
976
977         /* on some systems (like Linux 2.6.x) changes via mmap/msync
978            don't change the mtime of the file, this means the file may
979            not be backed up (as tdb rounding to block sizes means that
980            file size changes are quite rare too). The following forces
981            mtime changes when a transaction completes */
982 #ifdef HAVE_UTIME
983         utime(tdb->name, NULL);
984 #endif
985
986         /* use a transaction cancel to free memory and remove the
987            transaction locks */
988         tdb_transaction_cancel(tdb);
989
990         return 0;
991 }
992
993
994 /*
995   recover from an aborted transaction. Must be called with exclusive
996   database write access already established (including the global
997   lock to prevent new processes attaching)
998 */
999 int tdb_transaction_recover(struct tdb_context *tdb)
1000 {
1001         tdb_off_t recovery_head, recovery_eof;
1002         unsigned char *data, *p;
1003         uint32_t zero = 0;
1004         struct list_struct rec;
1005
1006         /* find the recovery area */
1007         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1008                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1009                 tdb->ecode = TDB_ERR_IO;
1010                 return -1;
1011         }
1012
1013         if (recovery_head == 0) {
1014                 /* we have never allocated a recovery record */
1015                 return 0;
1016         }
1017
1018         /* read the recovery record */
1019         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1020                                    sizeof(rec), DOCONV()) == -1) {
1021                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1022                 tdb->ecode = TDB_ERR_IO;
1023                 return -1;
1024         }
1025
1026         if (rec.magic != TDB_RECOVERY_MAGIC) {
1027                 /* there is no valid recovery data */
1028                 return 0;
1029         }
1030
1031         if (tdb->read_only) {
1032                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1033                 tdb->ecode = TDB_ERR_CORRUPT;
1034                 return -1;
1035         }
1036
1037         recovery_eof = rec.key_len;
1038
1039         data = (unsigned char *)malloc(rec.data_len);
1040         if (data == NULL) {
1041                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1042                 tdb->ecode = TDB_ERR_OOM;
1043                 return -1;
1044         }
1045
1046         /* read the full recovery data */
1047         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1048                                    rec.data_len, 0) == -1) {
1049                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1050                 tdb->ecode = TDB_ERR_IO;
1051                 return -1;
1052         }
1053
1054         /* recover the file data */
1055         p = data;
1056         while (p+8 < data + rec.data_len) {
1057                 uint32_t ofs, len;
1058                 if (DOCONV()) {
1059                         tdb_convert(p, 8);
1060                 }
1061                 memcpy(&ofs, p, 4);
1062                 memcpy(&len, p+4, 4);
1063
1064                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1065                         free(data);
1066                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1067                         tdb->ecode = TDB_ERR_IO;
1068                         return -1;
1069                 }
1070                 p += 8 + len;
1071         }
1072
1073         free(data);
1074
1075         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1076                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1077                 tdb->ecode = TDB_ERR_IO;
1078                 return -1;
1079         }
1080
1081         /* if the recovery area is after the recovered eof then remove it */
1082         if (recovery_eof <= recovery_head) {
1083                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1084                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1085                         tdb->ecode = TDB_ERR_IO;
1086                         return -1;                      
1087                 }
1088         }
1089
1090         /* remove the recovery magic */
1091         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1092                           &zero) == -1) {
1093                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1094                 tdb->ecode = TDB_ERR_IO;
1095                 return -1;                      
1096         }
1097         
1098         /* reduce the file size to the old size */
1099         tdb_munmap(tdb);
1100         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1101                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1102                 tdb->ecode = TDB_ERR_IO;
1103                 return -1;                      
1104         }
1105         tdb->map_size = recovery_eof;
1106         tdb_mmap(tdb);
1107
1108         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1109                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1110                 tdb->ecode = TDB_ERR_IO;
1111                 return -1;
1112         }
1113
1114         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1115                  recovery_eof));
1116
1117         /* all done */
1118         return 0;
1119 }