Port from ctdb:
[ira/wip.git] / source / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
129                             tdb_len_t len, int cv)
130 {
131         uint32_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / tdb->transaction->block_size;
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166         
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
169         if (cv) {
170                 tdb_convert(buf, len);
171         }
172         return 0;
173
174 fail:
175         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176         tdb->ecode = TDB_ERR_IO;
177         tdb->transaction->transaction_error = 1;
178         return -1;
179 }
180
181
182 /*
183   write while in a transaction
184 */
185 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
186                              const void *buf, tdb_len_t len)
187 {
188         uint32_t blk;
189
190         /* if the write is to a hash head, then update the transaction
191            hash heads */
192         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
196         }
197
198         /* break it up into block sized chunks */
199         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201                 if (transaction_write(tdb, off, buf, len2) != 0) {
202                         return -1;
203                 }
204                 len -= len2;
205                 off += len2;
206                 if (buf != NULL) {
207                         buf = (const void *)(len2 + (const char *)buf);
208                 }
209         }
210
211         if (len == 0) {
212                 return 0;
213         }
214
215         blk = off / tdb->transaction->block_size;
216         off = off % tdb->transaction->block_size;
217
218         if (tdb->transaction->num_blocks <= blk) {
219                 uint8_t **new_blocks;
220                 /* expand the blocks array */
221                 if (tdb->transaction->blocks == NULL) {
222                         new_blocks = malloc((blk+1)*sizeof(uint8_t *));
223                 } else {
224                         new_blocks = realloc(tdb->transaction->blocks, (blk+1)*sizeof(uint8_t *));
225                 }
226                 if (new_blocks == NULL) {
227                         tdb->ecode = TDB_ERR_OOM;
228                         goto fail;
229                 }
230                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
231                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
232                 tdb->transaction->blocks = new_blocks;
233                 tdb->transaction->num_blocks = blk+1;
234                 tdb->transaction->last_block_size = 0;
235         }
236
237         /* allocate and fill a block? */
238         if (tdb->transaction->blocks[blk] == NULL) {
239                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
240                 if (tdb->transaction->blocks[blk] == NULL) {
241                         tdb->ecode = TDB_ERR_OOM;
242                         tdb->transaction->transaction_error = 1;
243                         return -1;                      
244                 }
245                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
246                         tdb_len_t len2 = tdb->transaction->block_size;
247                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
248                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
249                         }
250                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
251                                                                    tdb->transaction->blocks[blk], 
252                                                                    len2, 0) != 0) {
253                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
254                                 tdb->ecode = TDB_ERR_IO;
255                                 goto fail;
256                         }
257                         if (blk == tdb->transaction->num_blocks-1) {
258                                 tdb->transaction->last_block_size = len2;
259                         }                       
260                 }
261         }
262         
263         /* overwrite part of an existing block */
264         if (buf == NULL) {
265                 memset(tdb->transaction->blocks[blk] + off, 0, len);
266         } else {
267                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
268         }
269         if (blk == tdb->transaction->num_blocks-1) {
270                 if (len + off > tdb->transaction->last_block_size) {
271                         tdb->transaction->last_block_size = len + off;
272                 }
273         }
274
275         return 0;
276
277 fail:
278         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
279                  (blk*tdb->transaction->block_size) + off, len));
280         tdb->transaction->transaction_error = 1;
281         return -1;
282 }
283
284
285 /*
286   write while in a transaction - this varient never expands the transaction blocks, it only
287   updates existing blocks. This means it cannot change the recovery size
288 */
289 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
290                                       const void *buf, tdb_len_t len)
291 {
292         uint32_t blk;
293
294         /* break it up into block sized chunks */
295         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
296                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
297                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
298                         return -1;
299                 }
300                 len -= len2;
301                 off += len2;
302                 if (buf != NULL) {
303                         buf = (const void *)(len2 + (const char *)buf);
304                 }
305         }
306
307         if (len == 0) {
308                 return 0;
309         }
310
311         blk = off / tdb->transaction->block_size;
312         off = off % tdb->transaction->block_size;
313
314         if (tdb->transaction->num_blocks <= blk ||
315             tdb->transaction->blocks[blk] == NULL) {
316                 return 0;
317         }
318
319         if (blk == tdb->transaction->num_blocks-1 &&
320             off + len > tdb->transaction->last_block_size) {
321                 len = tdb->transaction->last_block_size - off;
322         }
323
324         /* overwrite part of an existing block */
325         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
326
327         return 0;
328 }
329
330
331 /*
332   accelerated hash chain head search, using the cached hash heads
333 */
334 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
335 {
336         uint32_t h = *chain;
337         for (;h < tdb->header.hash_size;h++) {
338                 /* the +1 takes account of the freelist */
339                 if (0 != tdb->transaction->hash_heads[h+1]) {
340                         break;
341                 }
342         }
343         (*chain) = h;
344 }
345
346 /*
347   out of bounds check during a transaction
348 */
349 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
350 {
351         if (len <= tdb->map_size) {
352                 return 0;
353         }
354         return TDB_ERRCODE(TDB_ERR_IO, -1);
355 }
356
357 /*
358   transaction version of tdb_expand().
359 */
360 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
361                                    tdb_off_t addition)
362 {
363         /* add a write to the transaction elements, so subsequent
364            reads see the zero data */
365         if (transaction_write(tdb, size, NULL, addition) != 0) {
366                 return -1;
367         }
368
369         return 0;
370 }
371
372 /*
373   brlock during a transaction - ignore them
374 */
375 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
376                               int rw_type, int lck_type, int probe, size_t len)
377 {
378         return 0;
379 }
380
381 static const struct tdb_methods transaction_methods = {
382         transaction_read,
383         transaction_write,
384         transaction_next_hash_chain,
385         transaction_oob,
386         transaction_expand_file,
387         transaction_brlock
388 };
389
390
391 /*
392   start a tdb transaction. No token is returned, as only a single
393   transaction is allowed to be pending per tdb_context
394 */
395 int tdb_transaction_start(struct tdb_context *tdb)
396 {
397         /* some sanity checks */
398         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
399                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
400                 tdb->ecode = TDB_ERR_EINVAL;
401                 return -1;
402         }
403
404         /* cope with nested tdb_transaction_start() calls */
405         if (tdb->transaction != NULL) {
406                 tdb->transaction->nesting++;
407                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
408                          tdb->transaction->nesting));
409                 return 0;
410         }
411
412         if (tdb->num_locks != 0 || tdb->global_lock.count) {
413                 /* the caller must not have any locks when starting a
414                    transaction as otherwise we'll be screwed by lack
415                    of nested locks in posix */
416                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
417                 tdb->ecode = TDB_ERR_LOCK;
418                 return -1;
419         }
420
421         if (tdb->travlocks.next != NULL) {
422                 /* you cannot use transactions inside a traverse (although you can use
423                    traverse inside a transaction) as otherwise you can end up with
424                    deadlock */
425                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
426                 tdb->ecode = TDB_ERR_LOCK;
427                 return -1;
428         }
429
430         tdb->transaction = (struct tdb_transaction *)
431                 calloc(sizeof(struct tdb_transaction), 1);
432         if (tdb->transaction == NULL) {
433                 tdb->ecode = TDB_ERR_OOM;
434                 return -1;
435         }
436
437         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
438         tdb->transaction->block_size = tdb->page_size;
439
440         /* get the transaction write lock. This is a blocking lock. As
441            discussed with Volker, there are a number of ways we could
442            make this async, which we will probably do in the future */
443         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
444                 SAFE_FREE(tdb->transaction->blocks);
445                 SAFE_FREE(tdb->transaction);
446                 return -1;
447         }
448         
449         /* get a read lock from the freelist to the end of file. This
450            is upgraded to a write lock during the commit */
451         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
452                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
453                 tdb->ecode = TDB_ERR_LOCK;
454                 goto fail;
455         }
456
457         /* setup a copy of the hash table heads so the hash scan in
458            traverse can be fast */
459         tdb->transaction->hash_heads = (uint32_t *)
460                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
461         if (tdb->transaction->hash_heads == NULL) {
462                 tdb->ecode = TDB_ERR_OOM;
463                 goto fail;
464         }
465         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
466                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
467                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
468                 tdb->ecode = TDB_ERR_IO;
469                 goto fail;
470         }
471
472         /* make sure we know about any file expansions already done by
473            anyone else */
474         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
475         tdb->transaction->old_map_size = tdb->map_size;
476
477         /* finally hook the io methods, replacing them with
478            transaction specific methods */
479         tdb->transaction->io_methods = tdb->methods;
480         tdb->methods = &transaction_methods;
481
482         return 0;
483         
484 fail:
485         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
486         tdb_transaction_unlock(tdb);
487         SAFE_FREE(tdb->transaction->blocks);
488         SAFE_FREE(tdb->transaction->hash_heads);
489         SAFE_FREE(tdb->transaction);
490         return -1;
491 }
492
493
494 /*
495   cancel the current transaction
496 */
497 int tdb_transaction_cancel(struct tdb_context *tdb)
498 {       
499         int i;
500
501         if (tdb->transaction == NULL) {
502                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
503                 return -1;
504         }
505
506         if (tdb->transaction->nesting != 0) {
507                 tdb->transaction->transaction_error = 1;
508                 tdb->transaction->nesting--;
509                 return 0;
510         }               
511
512         tdb->map_size = tdb->transaction->old_map_size;
513
514         /* free all the transaction blocks */
515         for (i=0;i<tdb->transaction->num_blocks;i++) {
516                 if (tdb->transaction->blocks[i] != NULL) {
517                         free(tdb->transaction->blocks[i]);
518                 }
519         }
520         SAFE_FREE(tdb->transaction->blocks);
521
522         /* remove any global lock created during the transaction */
523         if (tdb->global_lock.count != 0) {
524                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
525                 tdb->global_lock.count = 0;
526         }
527
528         /* remove any locks created during the transaction */
529         if (tdb->num_locks != 0) {
530                 for (i=0;i<tdb->num_lockrecs;i++) {
531                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
532                                    F_UNLCK,F_SETLKW, 0, 1);
533                 }
534                 tdb->num_locks = 0;
535                 tdb->num_lockrecs = 0;
536                 SAFE_FREE(tdb->lockrecs);
537         }
538
539         /* restore the normal io methods */
540         tdb->methods = tdb->transaction->io_methods;
541
542         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
543         tdb_transaction_unlock(tdb);
544         SAFE_FREE(tdb->transaction->hash_heads);
545         SAFE_FREE(tdb->transaction);
546         
547         return 0;
548 }
549
550 /*
551   sync to disk
552 */
553 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
554 {       
555         if (fsync(tdb->fd) != 0) {
556                 tdb->ecode = TDB_ERR_IO;
557                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
558                 return -1;
559         }
560 #ifdef MS_SYNC
561         if (tdb->map_ptr) {
562                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
563                 if (msync(moffset + (char *)tdb->map_ptr, 
564                           length + (offset - moffset), MS_SYNC) != 0) {
565                         tdb->ecode = TDB_ERR_IO;
566                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
567                                  strerror(errno)));
568                         return -1;
569                 }
570         }
571 #endif
572         return 0;
573 }
574
575
576 /*
577   work out how much space the linearised recovery data will consume
578 */
579 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
580 {
581         tdb_len_t recovery_size = 0;
582         int i;
583
584         recovery_size = sizeof(uint32_t);
585         for (i=0;i<tdb->transaction->num_blocks;i++) {
586                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
587                         break;
588                 }
589                 if (tdb->transaction->blocks[i] == NULL) {
590                         continue;
591                 }
592                 recovery_size += 2*sizeof(tdb_off_t);
593                 if (i == tdb->transaction->num_blocks-1) {
594                         recovery_size += tdb->transaction->last_block_size;
595                 } else {
596                         recovery_size += tdb->transaction->block_size;
597                 }
598         }       
599
600         return recovery_size;
601 }
602
603 /*
604   allocate the recovery area, or use an existing recovery area if it is
605   large enough
606 */
607 static int tdb_recovery_allocate(struct tdb_context *tdb, 
608                                  tdb_len_t *recovery_size,
609                                  tdb_off_t *recovery_offset,
610                                  tdb_len_t *recovery_max_size)
611 {
612         struct list_struct rec;
613         const struct tdb_methods *methods = tdb->transaction->io_methods;
614         tdb_off_t recovery_head;
615
616         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
617                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
618                 return -1;
619         }
620
621         rec.rec_len = 0;
622
623         if (recovery_head != 0 && 
624             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
625                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
626                 return -1;
627         }
628
629         *recovery_size = tdb_recovery_size(tdb);
630
631         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
632                 /* it fits in the existing area */
633                 *recovery_max_size = rec.rec_len;
634                 *recovery_offset = recovery_head;
635                 return 0;
636         }
637
638         /* we need to free up the old recovery area, then allocate a
639            new one at the end of the file. Note that we cannot use
640            tdb_allocate() to allocate the new one as that might return
641            us an area that is being currently used (as of the start of
642            the transaction) */
643         if (recovery_head != 0) {
644                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
645                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
646                         return -1;
647                 }
648         }
649
650         /* the tdb_free() call might have increased the recovery size */
651         *recovery_size = tdb_recovery_size(tdb);
652
653         /* round up to a multiple of page size */
654         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
655         *recovery_offset = tdb->map_size;
656         recovery_head = *recovery_offset;
657
658         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
659                                      (tdb->map_size - tdb->transaction->old_map_size) +
660                                      sizeof(rec) + *recovery_max_size) == -1) {
661                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
662                 return -1;
663         }
664
665         /* remap the file (if using mmap) */
666         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
667
668         /* we have to reset the old map size so that we don't try to expand the file
669            again in the transaction commit, which would destroy the recovery area */
670         tdb->transaction->old_map_size = tdb->map_size;
671
672         /* write the recovery header offset and sync - we can sync without a race here
673            as the magic ptr in the recovery record has not been set */
674         CONVERT(recovery_head);
675         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
676                                &recovery_head, sizeof(tdb_off_t)) == -1) {
677                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
678                 return -1;
679         }
680         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
681                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
682                 return -1;
683         }
684
685         return 0;
686 }
687
688
689 /*
690   setup the recovery data that will be used on a crash during commit
691 */
692 static int transaction_setup_recovery(struct tdb_context *tdb, 
693                                       tdb_off_t *magic_offset)
694 {
695         tdb_len_t recovery_size;
696         unsigned char *data, *p;
697         const struct tdb_methods *methods = tdb->transaction->io_methods;
698         struct list_struct *rec;
699         tdb_off_t recovery_offset, recovery_max_size;
700         tdb_off_t old_map_size = tdb->transaction->old_map_size;
701         uint32_t magic, tailer;
702         int i;
703
704         /*
705           check that the recovery area has enough space
706         */
707         if (tdb_recovery_allocate(tdb, &recovery_size, 
708                                   &recovery_offset, &recovery_max_size) == -1) {
709                 return -1;
710         }
711
712         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
713         if (data == NULL) {
714                 tdb->ecode = TDB_ERR_OOM;
715                 return -1;
716         }
717
718         rec = (struct list_struct *)data;
719         memset(rec, 0, sizeof(*rec));
720
721         rec->magic    = 0;
722         rec->data_len = recovery_size;
723         rec->rec_len  = recovery_max_size;
724         rec->key_len  = old_map_size;
725         CONVERT(rec);
726
727         /* build the recovery data into a single blob to allow us to do a single
728            large write, which should be more efficient */
729         p = data + sizeof(*rec);
730         for (i=0;i<tdb->transaction->num_blocks;i++) {
731                 tdb_off_t offset;
732                 tdb_len_t length;
733
734                 if (tdb->transaction->blocks[i] == NULL) {
735                         continue;
736                 }
737
738                 offset = i * tdb->transaction->block_size;
739                 length = tdb->transaction->block_size;
740                 if (i == tdb->transaction->num_blocks-1) {
741                         length = tdb->transaction->last_block_size;
742                 }
743                 
744                 if (offset >= old_map_size) {
745                         continue;
746                 }
747                 if (offset + length > tdb->transaction->old_map_size) {
748                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
749                         free(data);
750                         tdb->ecode = TDB_ERR_CORRUPT;
751                         return -1;
752                 }
753                 memcpy(p, &offset, 4);
754                 memcpy(p+4, &length, 4);
755                 if (DOCONV()) {
756                         tdb_convert(p, 8);
757                 }
758                 /* the recovery area contains the old data, not the
759                    new data, so we have to call the original tdb_read
760                    method to get it */
761                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
762                         free(data);
763                         tdb->ecode = TDB_ERR_IO;
764                         return -1;
765                 }
766                 p += 8 + length;
767         }
768
769         /* and the tailer */
770         tailer = sizeof(*rec) + recovery_max_size;
771         memcpy(p, &tailer, 4);
772         CONVERT(p);
773
774         /* write the recovery data to the recovery area */
775         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
776                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
777                 free(data);
778                 tdb->ecode = TDB_ERR_IO;
779                 return -1;
780         }
781         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
782                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
783                 free(data);
784                 tdb->ecode = TDB_ERR_IO;
785                 return -1;
786         }
787
788         /* as we don't have ordered writes, we have to sync the recovery
789            data before we update the magic to indicate that the recovery
790            data is present */
791         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
792                 free(data);
793                 return -1;
794         }
795
796         free(data);
797
798         magic = TDB_RECOVERY_MAGIC;
799         CONVERT(magic);
800
801         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
802
803         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
804                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
805                 tdb->ecode = TDB_ERR_IO;
806                 return -1;
807         }
808         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
809                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
810                 tdb->ecode = TDB_ERR_IO;
811                 return -1;
812         }
813
814         /* ensure the recovery magic marker is on disk */
815         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
816                 return -1;
817         }
818
819         return 0;
820 }
821
822 /*
823   commit the current transaction
824 */
825 int tdb_transaction_commit(struct tdb_context *tdb)
826 {       
827         const struct tdb_methods *methods;
828         tdb_off_t magic_offset = 0;
829         uint32_t zero = 0;
830         int i;
831
832         if (tdb->transaction == NULL) {
833                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
834                 return -1;
835         }
836
837         if (tdb->transaction->transaction_error) {
838                 tdb->ecode = TDB_ERR_IO;
839                 tdb_transaction_cancel(tdb);
840                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
841                 return -1;
842         }
843
844
845         if (tdb->transaction->nesting != 0) {
846                 tdb->transaction->nesting--;
847                 return 0;
848         }               
849
850         /* check for a null transaction */
851         if (tdb->transaction->blocks == NULL) {
852                 tdb_transaction_cancel(tdb);
853                 return 0;
854         }
855
856         methods = tdb->transaction->io_methods;
857         
858         /* if there are any locks pending then the caller has not
859            nested their locks properly, so fail the transaction */
860         if (tdb->num_locks || tdb->global_lock.count) {
861                 tdb->ecode = TDB_ERR_LOCK;
862                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
863                 tdb_transaction_cancel(tdb);
864                 return -1;
865         }
866
867         /* upgrade the main transaction lock region to a write lock */
868         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
869                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
870                 tdb->ecode = TDB_ERR_LOCK;
871                 tdb_transaction_cancel(tdb);
872                 return -1;
873         }
874
875         /* get the global lock - this prevents new users attaching to the database
876            during the commit */
877         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
878                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
879                 tdb->ecode = TDB_ERR_LOCK;
880                 tdb_transaction_cancel(tdb);
881                 return -1;
882         }
883
884         if (!(tdb->flags & TDB_NOSYNC)) {
885                 /* write the recovery data to the end of the file */
886                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
887                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
888                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
889                         tdb_transaction_cancel(tdb);
890                         return -1;
891                 }
892         }
893
894         /* expand the file to the new size if needed */
895         if (tdb->map_size != tdb->transaction->old_map_size) {
896                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
897                                              tdb->map_size - 
898                                              tdb->transaction->old_map_size) == -1) {
899                         tdb->ecode = TDB_ERR_IO;
900                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
901                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
902                         tdb_transaction_cancel(tdb);
903                         return -1;
904                 }
905                 tdb->map_size = tdb->transaction->old_map_size;
906                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
907         }
908
909         /* perform all the writes */
910         for (i=0;i<tdb->transaction->num_blocks;i++) {
911                 tdb_off_t offset;
912                 tdb_len_t length;
913
914                 if (tdb->transaction->blocks[i] == NULL) {
915                         continue;
916                 }
917
918                 offset = i * tdb->transaction->block_size;
919                 length = tdb->transaction->block_size;
920                 if (i == tdb->transaction->num_blocks-1) {
921                         length = tdb->transaction->last_block_size;
922                 }
923
924                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
925                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
926                         
927                         /* we've overwritten part of the data and
928                            possibly expanded the file, so we need to
929                            run the crash recovery code */
930                         tdb->methods = methods;
931                         tdb_transaction_recover(tdb); 
932
933                         tdb_transaction_cancel(tdb);
934                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
935
936                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
937                         return -1;
938                 }
939                 SAFE_FREE(tdb->transaction->blocks[i]);
940         } 
941
942         SAFE_FREE(tdb->transaction->blocks);
943         tdb->transaction->num_blocks = 0;
944
945         if (!(tdb->flags & TDB_NOSYNC)) {
946                 /* ensure the new data is on disk */
947                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
948                         return -1;
949                 }
950
951                 /* remove the recovery marker */
952                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
953                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
954                         return -1;
955                 }
956
957                 /* ensure the recovery marker has been removed on disk */
958                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
959                         return -1;
960                 }
961         }
962
963         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
964
965         /*
966           TODO: maybe write to some dummy hdr field, or write to magic
967           offset without mmap, before the last sync, instead of the
968           utime() call
969         */
970
971         /* on some systems (like Linux 2.6.x) changes via mmap/msync
972            don't change the mtime of the file, this means the file may
973            not be backed up (as tdb rounding to block sizes means that
974            file size changes are quite rare too). The following forces
975            mtime changes when a transaction completes */
976 #ifdef HAVE_UTIME
977         utime(tdb->name, NULL);
978 #endif
979
980         /* use a transaction cancel to free memory and remove the
981            transaction locks */
982         tdb_transaction_cancel(tdb);
983
984         return 0;
985 }
986
987
988 /*
989   recover from an aborted transaction. Must be called with exclusive
990   database write access already established (including the global
991   lock to prevent new processes attaching)
992 */
993 int tdb_transaction_recover(struct tdb_context *tdb)
994 {
995         tdb_off_t recovery_head, recovery_eof;
996         unsigned char *data, *p;
997         uint32_t zero = 0;
998         struct list_struct rec;
999
1000         /* find the recovery area */
1001         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1002                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1003                 tdb->ecode = TDB_ERR_IO;
1004                 return -1;
1005         }
1006
1007         if (recovery_head == 0) {
1008                 /* we have never allocated a recovery record */
1009                 return 0;
1010         }
1011
1012         /* read the recovery record */
1013         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1014                                    sizeof(rec), DOCONV()) == -1) {
1015                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1016                 tdb->ecode = TDB_ERR_IO;
1017                 return -1;
1018         }
1019
1020         if (rec.magic != TDB_RECOVERY_MAGIC) {
1021                 /* there is no valid recovery data */
1022                 return 0;
1023         }
1024
1025         if (tdb->read_only) {
1026                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1027                 tdb->ecode = TDB_ERR_CORRUPT;
1028                 return -1;
1029         }
1030
1031         recovery_eof = rec.key_len;
1032
1033         data = (unsigned char *)malloc(rec.data_len);
1034         if (data == NULL) {
1035                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1036                 tdb->ecode = TDB_ERR_OOM;
1037                 return -1;
1038         }
1039
1040         /* read the full recovery data */
1041         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1042                                    rec.data_len, 0) == -1) {
1043                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1044                 tdb->ecode = TDB_ERR_IO;
1045                 return -1;
1046         }
1047
1048         /* recover the file data */
1049         p = data;
1050         while (p+8 < data + rec.data_len) {
1051                 uint32_t ofs, len;
1052                 if (DOCONV()) {
1053                         tdb_convert(p, 8);
1054                 }
1055                 memcpy(&ofs, p, 4);
1056                 memcpy(&len, p+4, 4);
1057
1058                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1059                         free(data);
1060                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1061                         tdb->ecode = TDB_ERR_IO;
1062                         return -1;
1063                 }
1064                 p += 8 + len;
1065         }
1066
1067         free(data);
1068
1069         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1070                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1071                 tdb->ecode = TDB_ERR_IO;
1072                 return -1;
1073         }
1074
1075         /* if the recovery area is after the recovered eof then remove it */
1076         if (recovery_eof <= recovery_head) {
1077                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1078                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1079                         tdb->ecode = TDB_ERR_IO;
1080                         return -1;                      
1081                 }
1082         }
1083
1084         /* remove the recovery magic */
1085         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1086                           &zero) == -1) {
1087                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1088                 tdb->ecode = TDB_ERR_IO;
1089                 return -1;                      
1090         }
1091         
1092         /* reduce the file size to the old size */
1093         tdb_munmap(tdb);
1094         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1095                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1096                 tdb->ecode = TDB_ERR_IO;
1097                 return -1;                      
1098         }
1099         tdb->map_size = recovery_eof;
1100         tdb_mmap(tdb);
1101
1102         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1103                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1104                 tdb->ecode = TDB_ERR_IO;
1105                 return -1;
1106         }
1107
1108         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1109                  recovery_eof));
1110
1111         /* all done */
1112         return 0;
1113 }