Merge commit 'samba/v3-2-test' into v3-2-stable
[bbaumbach/samba-autobuild/.git] / source / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* old file size before transaction */
120         tdb_len_t old_map_size;
121 };
122
123
124 /*
125   read while in a transaction. We need to check first if the data is in our list
126   of transaction elements, then if not do a real read
127 */
128 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
129                             tdb_len_t len, int cv)
130 {
131         uint32_t blk;
132
133         /* break it down into block sized ops */
134         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
135                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
136                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
137                         return -1;
138                 }
139                 len -= len2;
140                 off += len2;
141                 buf = (void *)(len2 + (char *)buf);
142         }
143
144         if (len == 0) {
145                 return 0;
146         }
147
148         blk = off / tdb->transaction->block_size;
149
150         /* see if we have it in the block list */
151         if (tdb->transaction->num_blocks <= blk ||
152             tdb->transaction->blocks[blk] == NULL) {
153                 /* nope, do a real read */
154                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
155                         goto fail;
156                 }
157                 return 0;
158         }
159
160         /* it is in the block list. Now check for the last block */
161         if (blk == tdb->transaction->num_blocks-1) {
162                 if (len > tdb->transaction->last_block_size) {
163                         goto fail;
164                 }
165         }
166         
167         /* now copy it out of this block */
168         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
169         if (cv) {
170                 tdb_convert(buf, len);
171         }
172         return 0;
173
174 fail:
175         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
176         tdb->ecode = TDB_ERR_IO;
177         tdb->transaction->transaction_error = 1;
178         return -1;
179 }
180
181
182 /*
183   write while in a transaction
184 */
185 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
186                              const void *buf, tdb_len_t len)
187 {
188         uint32_t blk;
189
190         /* if the write is to a hash head, then update the transaction
191            hash heads */
192         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
193             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
194                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
195                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
196         }
197
198         /* break it up into block sized chunks */
199         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
200                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
201                 if (transaction_write(tdb, off, buf, len2) != 0) {
202                         return -1;
203                 }
204                 len -= len2;
205                 off += len2;
206                 if (buf != NULL) {
207                         buf = (const void *)(len2 + (const char *)buf);
208                 }
209         }
210
211         if (len == 0) {
212                 return 0;
213         }
214
215         blk = off / tdb->transaction->block_size;
216         off = off % tdb->transaction->block_size;
217
218         if (tdb->transaction->num_blocks <= blk) {
219                 uint8_t **new_blocks;
220                 /* expand the blocks array */
221                 if (tdb->transaction->blocks == NULL) {
222                         new_blocks = malloc((blk+1)*sizeof(uint8_t *));
223                 } else {
224                         new_blocks = realloc(tdb->transaction->blocks, (blk+1)*sizeof(uint8_t *));
225                 }
226                 if (new_blocks == NULL) {
227                         tdb->ecode = TDB_ERR_OOM;
228                         goto fail;
229                 }
230                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
231                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
232                 tdb->transaction->blocks = new_blocks;
233                 tdb->transaction->num_blocks = blk+1;
234                 tdb->transaction->last_block_size = 0;
235         }
236
237         /* allocate and fill a block? */
238         if (tdb->transaction->blocks[blk] == NULL) {
239                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
240                 if (tdb->transaction->blocks[blk] == NULL) {
241                         tdb->ecode = TDB_ERR_OOM;
242                         tdb->transaction->transaction_error = 1;
243                         return -1;                      
244                 }
245                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
246                         tdb_len_t len2 = tdb->transaction->block_size;
247                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
248                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
249                         }
250                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
251                                                                    tdb->transaction->blocks[blk], 
252                                                                    len2, 0) != 0) {
253                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
254                                 tdb->ecode = TDB_ERR_IO;
255                                 goto fail;
256                         }
257                         if (blk == tdb->transaction->num_blocks-1) {
258                                 tdb->transaction->last_block_size = len2;
259                         }                       
260                 }
261         }
262         
263         /* overwrite part of an existing block */
264         if (buf == NULL) {
265                 memset(tdb->transaction->blocks[blk] + off, 0, len);
266         } else {
267                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
268         }
269         if (blk == tdb->transaction->num_blocks-1) {
270                 if (len + off > tdb->transaction->last_block_size) {
271                         tdb->transaction->last_block_size = len + off;
272                 }
273         }
274
275         return 0;
276
277 fail:
278         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
279                  (blk*tdb->transaction->block_size) + off, len));
280         tdb->transaction->transaction_error = 1;
281         return -1;
282 }
283
284
285 /*
286   write while in a transaction - this varient never expands the transaction blocks, it only
287   updates existing blocks. This means it cannot change the recovery size
288 */
289 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
290                                       const void *buf, tdb_len_t len)
291 {
292         uint32_t blk;
293
294         /* break it up into block sized chunks */
295         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
296                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
297                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
298                         return -1;
299                 }
300                 len -= len2;
301                 off += len2;
302                 if (buf != NULL) {
303                         buf = (const void *)(len2 + (const char *)buf);
304                 }
305         }
306
307         if (len == 0) {
308                 return 0;
309         }
310
311         blk = off / tdb->transaction->block_size;
312         off = off % tdb->transaction->block_size;
313
314         if (tdb->transaction->num_blocks <= blk ||
315             tdb->transaction->blocks[blk] == NULL) {
316                 return 0;
317         }
318
319         /* overwrite part of an existing block */
320         if (buf == NULL) {
321                 memset(tdb->transaction->blocks[blk] + off, 0, len);
322         } else {
323                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
324         }
325         if (blk == tdb->transaction->num_blocks-1) {
326                 if (len + off > tdb->transaction->last_block_size) {
327                         tdb->transaction->last_block_size = len + off;
328                 }
329         }
330
331         return 0;
332
333 fail:
334         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
335                  (blk*tdb->transaction->block_size) + off, len));
336         tdb->transaction->transaction_error = 1;
337         return -1;
338 }
339
340
341 /*
342   accelerated hash chain head search, using the cached hash heads
343 */
344 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
345 {
346         uint32_t h = *chain;
347         for (;h < tdb->header.hash_size;h++) {
348                 /* the +1 takes account of the freelist */
349                 if (0 != tdb->transaction->hash_heads[h+1]) {
350                         break;
351                 }
352         }
353         (*chain) = h;
354 }
355
356 /*
357   out of bounds check during a transaction
358 */
359 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
360 {
361         if (len <= tdb->map_size) {
362                 return 0;
363         }
364         return TDB_ERRCODE(TDB_ERR_IO, -1);
365 }
366
367 /*
368   transaction version of tdb_expand().
369 */
370 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
371                                    tdb_off_t addition)
372 {
373         /* add a write to the transaction elements, so subsequent
374            reads see the zero data */
375         if (transaction_write(tdb, size, NULL, addition) != 0) {
376                 return -1;
377         }
378
379         return 0;
380 }
381
382 /*
383   brlock during a transaction - ignore them
384 */
385 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
386                               int rw_type, int lck_type, int probe, size_t len)
387 {
388         return 0;
389 }
390
391 static const struct tdb_methods transaction_methods = {
392         transaction_read,
393         transaction_write,
394         transaction_next_hash_chain,
395         transaction_oob,
396         transaction_expand_file,
397         transaction_brlock
398 };
399
400
401 /*
402   start a tdb transaction. No token is returned, as only a single
403   transaction is allowed to be pending per tdb_context
404 */
405 int tdb_transaction_start(struct tdb_context *tdb)
406 {
407         /* some sanity checks */
408         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
409                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
410                 tdb->ecode = TDB_ERR_EINVAL;
411                 return -1;
412         }
413
414         /* cope with nested tdb_transaction_start() calls */
415         if (tdb->transaction != NULL) {
416                 tdb->transaction->nesting++;
417                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
418                          tdb->transaction->nesting));
419                 return 0;
420         }
421
422         if (tdb->num_locks != 0 || tdb->global_lock.count) {
423                 /* the caller must not have any locks when starting a
424                    transaction as otherwise we'll be screwed by lack
425                    of nested locks in posix */
426                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
427                 tdb->ecode = TDB_ERR_LOCK;
428                 return -1;
429         }
430
431         if (tdb->travlocks.next != NULL) {
432                 /* you cannot use transactions inside a traverse (although you can use
433                    traverse inside a transaction) as otherwise you can end up with
434                    deadlock */
435                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
436                 tdb->ecode = TDB_ERR_LOCK;
437                 return -1;
438         }
439
440         tdb->transaction = (struct tdb_transaction *)
441                 calloc(sizeof(struct tdb_transaction), 1);
442         if (tdb->transaction == NULL) {
443                 tdb->ecode = TDB_ERR_OOM;
444                 return -1;
445         }
446
447         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
448         tdb->transaction->block_size = tdb->page_size;
449
450         /* get the transaction write lock. This is a blocking lock. As
451            discussed with Volker, there are a number of ways we could
452            make this async, which we will probably do in the future */
453         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
454                 SAFE_FREE(tdb->transaction->blocks);
455                 SAFE_FREE(tdb->transaction);
456                 return -1;
457         }
458         
459         /* get a read lock from the freelist to the end of file. This
460            is upgraded to a write lock during the commit */
461         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
462                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
463                 tdb->ecode = TDB_ERR_LOCK;
464                 goto fail;
465         }
466
467         /* setup a copy of the hash table heads so the hash scan in
468            traverse can be fast */
469         tdb->transaction->hash_heads = (uint32_t *)
470                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
471         if (tdb->transaction->hash_heads == NULL) {
472                 tdb->ecode = TDB_ERR_OOM;
473                 goto fail;
474         }
475         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
476                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
477                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
478                 tdb->ecode = TDB_ERR_IO;
479                 goto fail;
480         }
481
482         /* make sure we know about any file expansions already done by
483            anyone else */
484         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
485         tdb->transaction->old_map_size = tdb->map_size;
486
487         /* finally hook the io methods, replacing them with
488            transaction specific methods */
489         tdb->transaction->io_methods = tdb->methods;
490         tdb->methods = &transaction_methods;
491
492         return 0;
493         
494 fail:
495         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
496         tdb_transaction_unlock(tdb);
497         SAFE_FREE(tdb->transaction->blocks);
498         SAFE_FREE(tdb->transaction->hash_heads);
499         SAFE_FREE(tdb->transaction);
500         return -1;
501 }
502
503
504 /*
505   cancel the current transaction
506 */
507 int tdb_transaction_cancel(struct tdb_context *tdb)
508 {       
509         int i;
510
511         if (tdb->transaction == NULL) {
512                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
513                 return -1;
514         }
515
516         if (tdb->transaction->nesting != 0) {
517                 tdb->transaction->transaction_error = 1;
518                 tdb->transaction->nesting--;
519                 return 0;
520         }               
521
522         tdb->map_size = tdb->transaction->old_map_size;
523
524         /* free all the transaction blocks */
525         for (i=0;i<tdb->transaction->num_blocks;i++) {
526                 if (tdb->transaction->blocks[i] != NULL) {
527                         free(tdb->transaction->blocks[i]);
528                 }
529         }
530         SAFE_FREE(tdb->transaction->blocks);
531
532         /* remove any global lock created during the transaction */
533         if (tdb->global_lock.count != 0) {
534                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
535                 tdb->global_lock.count = 0;
536         }
537
538         /* remove any locks created during the transaction */
539         if (tdb->num_locks != 0) {
540                 for (i=0;i<tdb->num_lockrecs;i++) {
541                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
542                                    F_UNLCK,F_SETLKW, 0, 1);
543                 }
544                 tdb->num_locks = 0;
545                 tdb->num_lockrecs = 0;
546                 SAFE_FREE(tdb->lockrecs);
547         }
548
549         /* restore the normal io methods */
550         tdb->methods = tdb->transaction->io_methods;
551
552         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
553         tdb_transaction_unlock(tdb);
554         SAFE_FREE(tdb->transaction->hash_heads);
555         SAFE_FREE(tdb->transaction);
556         
557         return 0;
558 }
559
560 /*
561   sync to disk
562 */
563 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
564 {       
565         if (fsync(tdb->fd) != 0) {
566                 tdb->ecode = TDB_ERR_IO;
567                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
568                 return -1;
569         }
570 #ifdef MS_SYNC
571         if (tdb->map_ptr) {
572                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
573                 if (msync(moffset + (char *)tdb->map_ptr, 
574                           length + (offset - moffset), MS_SYNC) != 0) {
575                         tdb->ecode = TDB_ERR_IO;
576                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
577                                  strerror(errno)));
578                         return -1;
579                 }
580         }
581 #endif
582         return 0;
583 }
584
585
586 /*
587   work out how much space the linearised recovery data will consume
588 */
589 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
590 {
591         tdb_len_t recovery_size = 0;
592         int i;
593
594         recovery_size = sizeof(uint32_t);
595         for (i=0;i<tdb->transaction->num_blocks;i++) {
596                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
597                         break;
598                 }
599                 if (tdb->transaction->blocks[i] == NULL) {
600                         continue;
601                 }
602                 recovery_size += 2*sizeof(tdb_off_t);
603                 if (i == tdb->transaction->num_blocks-1) {
604                         recovery_size += tdb->transaction->last_block_size;
605                 } else {
606                         recovery_size += tdb->transaction->block_size;
607                 }
608         }       
609
610         return recovery_size;
611 }
612
613 /*
614   allocate the recovery area, or use an existing recovery area if it is
615   large enough
616 */
617 static int tdb_recovery_allocate(struct tdb_context *tdb, 
618                                  tdb_len_t *recovery_size,
619                                  tdb_off_t *recovery_offset,
620                                  tdb_len_t *recovery_max_size)
621 {
622         struct list_struct rec;
623         const struct tdb_methods *methods = tdb->transaction->io_methods;
624         tdb_off_t recovery_head;
625
626         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
627                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
628                 return -1;
629         }
630
631         rec.rec_len = 0;
632
633         if (recovery_head != 0 && 
634             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
635                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
636                 return -1;
637         }
638
639         *recovery_size = tdb_recovery_size(tdb);
640
641         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
642                 /* it fits in the existing area */
643                 *recovery_max_size = rec.rec_len;
644                 *recovery_offset = recovery_head;
645                 return 0;
646         }
647
648         /* we need to free up the old recovery area, then allocate a
649            new one at the end of the file. Note that we cannot use
650            tdb_allocate() to allocate the new one as that might return
651            us an area that is being currently used (as of the start of
652            the transaction) */
653         if (recovery_head != 0) {
654                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
655                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
656                         return -1;
657                 }
658         }
659
660         /* the tdb_free() call might have increased the recovery size */
661         *recovery_size = tdb_recovery_size(tdb);
662
663         /* round up to a multiple of page size */
664         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
665         *recovery_offset = tdb->map_size;
666         recovery_head = *recovery_offset;
667
668         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
669                                      (tdb->map_size - tdb->transaction->old_map_size) +
670                                      sizeof(rec) + *recovery_max_size) == -1) {
671                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
672                 return -1;
673         }
674
675         /* remap the file (if using mmap) */
676         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
677
678         /* we have to reset the old map size so that we don't try to expand the file
679            again in the transaction commit, which would destroy the recovery area */
680         tdb->transaction->old_map_size = tdb->map_size;
681
682         /* write the recovery header offset and sync - we can sync without a race here
683            as the magic ptr in the recovery record has not been set */
684         CONVERT(recovery_head);
685         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
686                                &recovery_head, sizeof(tdb_off_t)) == -1) {
687                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
688                 return -1;
689         }
690         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
691                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
692                 return -1;
693         }
694
695         return 0;
696 }
697
698
699 /*
700   setup the recovery data that will be used on a crash during commit
701 */
702 static int transaction_setup_recovery(struct tdb_context *tdb, 
703                                       tdb_off_t *magic_offset)
704 {
705         tdb_len_t recovery_size;
706         unsigned char *data, *p;
707         const struct tdb_methods *methods = tdb->transaction->io_methods;
708         struct list_struct *rec;
709         tdb_off_t recovery_offset, recovery_max_size;
710         tdb_off_t old_map_size = tdb->transaction->old_map_size;
711         uint32_t magic, tailer;
712         int i;
713
714         /*
715           check that the recovery area has enough space
716         */
717         if (tdb_recovery_allocate(tdb, &recovery_size, 
718                                   &recovery_offset, &recovery_max_size) == -1) {
719                 return -1;
720         }
721
722         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
723         if (data == NULL) {
724                 tdb->ecode = TDB_ERR_OOM;
725                 return -1;
726         }
727
728         rec = (struct list_struct *)data;
729         memset(rec, 0, sizeof(*rec));
730
731         rec->magic    = 0;
732         rec->data_len = recovery_size;
733         rec->rec_len  = recovery_max_size;
734         rec->key_len  = old_map_size;
735         CONVERT(rec);
736
737         /* build the recovery data into a single blob to allow us to do a single
738            large write, which should be more efficient */
739         p = data + sizeof(*rec);
740         for (i=0;i<tdb->transaction->num_blocks;i++) {
741                 tdb_off_t offset;
742                 tdb_len_t length;
743
744                 if (tdb->transaction->blocks[i] == NULL) {
745                         continue;
746                 }
747
748                 offset = i * tdb->transaction->block_size;
749                 length = tdb->transaction->block_size;
750                 if (i == tdb->transaction->num_blocks-1) {
751                         length = tdb->transaction->last_block_size;
752                 }
753                 
754                 if (offset >= old_map_size) {
755                         continue;
756                 }
757                 if (offset + length > tdb->transaction->old_map_size) {
758                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
759                         free(data);
760                         tdb->ecode = TDB_ERR_CORRUPT;
761                         return -1;
762                 }
763                 memcpy(p, &offset, 4);
764                 memcpy(p+4, &length, 4);
765                 if (DOCONV()) {
766                         tdb_convert(p, 8);
767                 }
768                 /* the recovery area contains the old data, not the
769                    new data, so we have to call the original tdb_read
770                    method to get it */
771                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
772                         free(data);
773                         tdb->ecode = TDB_ERR_IO;
774                         return -1;
775                 }
776                 p += 8 + length;
777         }
778
779         /* and the tailer */
780         tailer = sizeof(*rec) + recovery_max_size;
781         memcpy(p, &tailer, 4);
782         CONVERT(p);
783
784         /* write the recovery data to the recovery area */
785         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
786                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
787                 free(data);
788                 tdb->ecode = TDB_ERR_IO;
789                 return -1;
790         }
791         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
792                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
793                 free(data);
794                 tdb->ecode = TDB_ERR_IO;
795                 return -1;
796         }
797
798         /* as we don't have ordered writes, we have to sync the recovery
799            data before we update the magic to indicate that the recovery
800            data is present */
801         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
802                 free(data);
803                 return -1;
804         }
805
806         free(data);
807
808         magic = TDB_RECOVERY_MAGIC;
809         CONVERT(magic);
810
811         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
812
813         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
814                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
815                 tdb->ecode = TDB_ERR_IO;
816                 return -1;
817         }
818         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
819                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
820                 tdb->ecode = TDB_ERR_IO;
821                 return -1;
822         }
823
824         /* ensure the recovery magic marker is on disk */
825         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
826                 return -1;
827         }
828
829         return 0;
830 }
831
832 /*
833   commit the current transaction
834 */
835 int tdb_transaction_commit(struct tdb_context *tdb)
836 {       
837         const struct tdb_methods *methods;
838         tdb_off_t magic_offset = 0;
839         uint32_t zero = 0;
840         int i;
841
842         if (tdb->transaction == NULL) {
843                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
844                 return -1;
845         }
846
847         if (tdb->transaction->transaction_error) {
848                 tdb->ecode = TDB_ERR_IO;
849                 tdb_transaction_cancel(tdb);
850                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
851                 return -1;
852         }
853
854
855         if (tdb->transaction->nesting != 0) {
856                 tdb->transaction->nesting--;
857                 return 0;
858         }               
859
860         /* check for a null transaction */
861         if (tdb->transaction->blocks == NULL) {
862                 tdb_transaction_cancel(tdb);
863                 return 0;
864         }
865
866         methods = tdb->transaction->io_methods;
867         
868         /* if there are any locks pending then the caller has not
869            nested their locks properly, so fail the transaction */
870         if (tdb->num_locks || tdb->global_lock.count) {
871                 tdb->ecode = TDB_ERR_LOCK;
872                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
873                 tdb_transaction_cancel(tdb);
874                 return -1;
875         }
876
877         /* upgrade the main transaction lock region to a write lock */
878         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
879                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
880                 tdb->ecode = TDB_ERR_LOCK;
881                 tdb_transaction_cancel(tdb);
882                 return -1;
883         }
884
885         /* get the global lock - this prevents new users attaching to the database
886            during the commit */
887         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
888                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
889                 tdb->ecode = TDB_ERR_LOCK;
890                 tdb_transaction_cancel(tdb);
891                 return -1;
892         }
893
894         if (!(tdb->flags & TDB_NOSYNC)) {
895                 /* write the recovery data to the end of the file */
896                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
897                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
898                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
899                         tdb_transaction_cancel(tdb);
900                         return -1;
901                 }
902         }
903
904         /* expand the file to the new size if needed */
905         if (tdb->map_size != tdb->transaction->old_map_size) {
906                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
907                                              tdb->map_size - 
908                                              tdb->transaction->old_map_size) == -1) {
909                         tdb->ecode = TDB_ERR_IO;
910                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
911                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
912                         tdb_transaction_cancel(tdb);
913                         return -1;
914                 }
915                 tdb->map_size = tdb->transaction->old_map_size;
916                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
917         }
918
919         /* perform all the writes */
920         for (i=0;i<tdb->transaction->num_blocks;i++) {
921                 tdb_off_t offset;
922                 tdb_len_t length;
923
924                 if (tdb->transaction->blocks[i] == NULL) {
925                         continue;
926                 }
927
928                 offset = i * tdb->transaction->block_size;
929                 length = tdb->transaction->block_size;
930                 if (i == tdb->transaction->num_blocks-1) {
931                         length = tdb->transaction->last_block_size;
932                 }
933
934                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
935                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
936                         
937                         /* we've overwritten part of the data and
938                            possibly expanded the file, so we need to
939                            run the crash recovery code */
940                         tdb->methods = methods;
941                         tdb_transaction_recover(tdb); 
942
943                         tdb_transaction_cancel(tdb);
944                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
945
946                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
947                         return -1;
948                 }
949                 SAFE_FREE(tdb->transaction->blocks[i]);
950         } 
951
952         SAFE_FREE(tdb->transaction->blocks);
953         tdb->transaction->num_blocks = 0;
954
955         if (!(tdb->flags & TDB_NOSYNC)) {
956                 /* ensure the new data is on disk */
957                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
958                         return -1;
959                 }
960
961                 /* remove the recovery marker */
962                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
963                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
964                         return -1;
965                 }
966
967                 /* ensure the recovery marker has been removed on disk */
968                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
969                         return -1;
970                 }
971         }
972
973         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
974
975         /*
976           TODO: maybe write to some dummy hdr field, or write to magic
977           offset without mmap, before the last sync, instead of the
978           utime() call
979         */
980
981         /* on some systems (like Linux 2.6.x) changes via mmap/msync
982            don't change the mtime of the file, this means the file may
983            not be backed up (as tdb rounding to block sizes means that
984            file size changes are quite rare too). The following forces
985            mtime changes when a transaction completes */
986 #ifdef HAVE_UTIME
987         utime(tdb->name, NULL);
988 #endif
989
990         /* use a transaction cancel to free memory and remove the
991            transaction locks */
992         tdb_transaction_cancel(tdb);
993
994         return 0;
995 }
996
997
998 /*
999   recover from an aborted transaction. Must be called with exclusive
1000   database write access already established (including the global
1001   lock to prevent new processes attaching)
1002 */
1003 int tdb_transaction_recover(struct tdb_context *tdb)
1004 {
1005         tdb_off_t recovery_head, recovery_eof;
1006         unsigned char *data, *p;
1007         uint32_t zero = 0;
1008         struct list_struct rec;
1009
1010         /* find the recovery area */
1011         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1012                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1013                 tdb->ecode = TDB_ERR_IO;
1014                 return -1;
1015         }
1016
1017         if (recovery_head == 0) {
1018                 /* we have never allocated a recovery record */
1019                 return 0;
1020         }
1021
1022         /* read the recovery record */
1023         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1024                                    sizeof(rec), DOCONV()) == -1) {
1025                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1026                 tdb->ecode = TDB_ERR_IO;
1027                 return -1;
1028         }
1029
1030         if (rec.magic != TDB_RECOVERY_MAGIC) {
1031                 /* there is no valid recovery data */
1032                 return 0;
1033         }
1034
1035         if (tdb->read_only) {
1036                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1037                 tdb->ecode = TDB_ERR_CORRUPT;
1038                 return -1;
1039         }
1040
1041         recovery_eof = rec.key_len;
1042
1043         data = (unsigned char *)malloc(rec.data_len);
1044         if (data == NULL) {
1045                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1046                 tdb->ecode = TDB_ERR_OOM;
1047                 return -1;
1048         }
1049
1050         /* read the full recovery data */
1051         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1052                                    rec.data_len, 0) == -1) {
1053                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1054                 tdb->ecode = TDB_ERR_IO;
1055                 return -1;
1056         }
1057
1058         /* recover the file data */
1059         p = data;
1060         while (p+8 < data + rec.data_len) {
1061                 uint32_t ofs, len;
1062                 if (DOCONV()) {
1063                         tdb_convert(p, 8);
1064                 }
1065                 memcpy(&ofs, p, 4);
1066                 memcpy(&len, p+4, 4);
1067
1068                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1069                         free(data);
1070                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1071                         tdb->ecode = TDB_ERR_IO;
1072                         return -1;
1073                 }
1074                 p += 8 + len;
1075         }
1076
1077         free(data);
1078
1079         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1080                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1081                 tdb->ecode = TDB_ERR_IO;
1082                 return -1;
1083         }
1084
1085         /* if the recovery area is after the recovered eof then remove it */
1086         if (recovery_eof <= recovery_head) {
1087                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1088                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1089                         tdb->ecode = TDB_ERR_IO;
1090                         return -1;                      
1091                 }
1092         }
1093
1094         /* remove the recovery magic */
1095         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1096                           &zero) == -1) {
1097                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1098                 tdb->ecode = TDB_ERR_IO;
1099                 return -1;                      
1100         }
1101         
1102         /* reduce the file size to the old size */
1103         tdb_munmap(tdb);
1104         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1105                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1106                 tdb->ecode = TDB_ERR_IO;
1107                 return -1;                      
1108         }
1109         tdb->map_size = recovery_eof;
1110         tdb_mmap(tdb);
1111
1112         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1113                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1114                 tdb->ecode = TDB_ERR_IO;
1115                 return -1;
1116         }
1117
1118         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1119                  recovery_eof));
1120
1121         /* all done */
1122         return 0;
1123 }