tdb: Remove unused variable
[kai/samba-autobuild/.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* set when a prepare has already occurred */
120         bool prepared;
121         tdb_off_t magic_offset;
122
123         /* old file size before transaction */
124         tdb_len_t old_map_size;
125 };
126
127
128 /*
129   read while in a transaction. We need to check first if the data is in our list
130   of transaction elements, then if not do a real read
131 */
132 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
133                             tdb_len_t len, int cv)
134 {
135         uint32_t blk;
136
137         /* Only a commit is allowed on a prepared transaction */
138         if (tdb->transaction->prepared) {
139                 tdb->ecode = TDB_ERR_EINVAL;
140                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
141                 tdb->transaction->transaction_error = 1;
142                 return -1;
143         }
144
145         /* break it down into block sized ops */
146         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
147                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
148                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
149                         return -1;
150                 }
151                 len -= len2;
152                 off += len2;
153                 buf = (void *)(len2 + (char *)buf);
154         }
155
156         if (len == 0) {
157                 return 0;
158         }
159
160         blk = off / tdb->transaction->block_size;
161
162         /* see if we have it in the block list */
163         if (tdb->transaction->num_blocks <= blk ||
164             tdb->transaction->blocks[blk] == NULL) {
165                 /* nope, do a real read */
166                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
167                         goto fail;
168                 }
169                 return 0;
170         }
171
172         /* it is in the block list. Now check for the last block */
173         if (blk == tdb->transaction->num_blocks-1) {
174                 if (len > tdb->transaction->last_block_size) {
175                         goto fail;
176                 }
177         }
178         
179         /* now copy it out of this block */
180         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
181         if (cv) {
182                 tdb_convert(buf, len);
183         }
184         return 0;
185
186 fail:
187         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
188         tdb->ecode = TDB_ERR_IO;
189         tdb->transaction->transaction_error = 1;
190         return -1;
191 }
192
193
194 /*
195   write while in a transaction
196 */
197 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
198                              const void *buf, tdb_len_t len)
199 {
200         uint32_t blk;
201
202         /* Only a commit is allowed on a prepared transaction */
203         if (tdb->transaction->prepared) {
204                 tdb->ecode = TDB_ERR_EINVAL;
205                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
206                 tdb->transaction->transaction_error = 1;
207                 return -1;
208         }
209
210         /* if the write is to a hash head, then update the transaction
211            hash heads */
212         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
213             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
214                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
215                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
216         }
217
218         /* break it up into block sized chunks */
219         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
220                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
221                 if (transaction_write(tdb, off, buf, len2) != 0) {
222                         return -1;
223                 }
224                 len -= len2;
225                 off += len2;
226                 if (buf != NULL) {
227                         buf = (const void *)(len2 + (const char *)buf);
228                 }
229         }
230
231         if (len == 0) {
232                 return 0;
233         }
234
235         blk = off / tdb->transaction->block_size;
236         off = off % tdb->transaction->block_size;
237
238         if (tdb->transaction->num_blocks <= blk) {
239                 uint8_t **new_blocks;
240                 /* expand the blocks array */
241                 if (tdb->transaction->blocks == NULL) {
242                         new_blocks = (uint8_t **)malloc(
243                                 (blk+1)*sizeof(uint8_t *));
244                 } else {
245                         new_blocks = (uint8_t **)realloc(
246                                 tdb->transaction->blocks,
247                                 (blk+1)*sizeof(uint8_t *));
248                 }
249                 if (new_blocks == NULL) {
250                         tdb->ecode = TDB_ERR_OOM;
251                         goto fail;
252                 }
253                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
254                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
255                 tdb->transaction->blocks = new_blocks;
256                 tdb->transaction->num_blocks = blk+1;
257                 tdb->transaction->last_block_size = 0;
258         }
259
260         /* allocate and fill a block? */
261         if (tdb->transaction->blocks[blk] == NULL) {
262                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
263                 if (tdb->transaction->blocks[blk] == NULL) {
264                         tdb->ecode = TDB_ERR_OOM;
265                         tdb->transaction->transaction_error = 1;
266                         return -1;                      
267                 }
268                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
269                         tdb_len_t len2 = tdb->transaction->block_size;
270                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
271                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
272                         }
273                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
274                                                                    tdb->transaction->blocks[blk], 
275                                                                    len2, 0) != 0) {
276                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
277                                 tdb->ecode = TDB_ERR_IO;
278                                 goto fail;
279                         }
280                         if (blk == tdb->transaction->num_blocks-1) {
281                                 tdb->transaction->last_block_size = len2;
282                         }                       
283                 }
284         }
285         
286         /* overwrite part of an existing block */
287         if (buf == NULL) {
288                 memset(tdb->transaction->blocks[blk] + off, 0, len);
289         } else {
290                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
291         }
292         if (blk == tdb->transaction->num_blocks-1) {
293                 if (len + off > tdb->transaction->last_block_size) {
294                         tdb->transaction->last_block_size = len + off;
295                 }
296         }
297
298         return 0;
299
300 fail:
301         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
302                  (blk*tdb->transaction->block_size) + off, len));
303         tdb->transaction->transaction_error = 1;
304         return -1;
305 }
306
307
308 /*
309   write while in a transaction - this varient never expands the transaction blocks, it only
310   updates existing blocks. This means it cannot change the recovery size
311 */
312 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
313                                       const void *buf, tdb_len_t len)
314 {
315         uint32_t blk;
316
317         /* break it up into block sized chunks */
318         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
319                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
320                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
321                         return -1;
322                 }
323                 len -= len2;
324                 off += len2;
325                 if (buf != NULL) {
326                         buf = (const void *)(len2 + (const char *)buf);
327                 }
328         }
329
330         if (len == 0) {
331                 return 0;
332         }
333
334         blk = off / tdb->transaction->block_size;
335         off = off % tdb->transaction->block_size;
336
337         if (tdb->transaction->num_blocks <= blk ||
338             tdb->transaction->blocks[blk] == NULL) {
339                 return 0;
340         }
341
342         if (blk == tdb->transaction->num_blocks-1 &&
343             off + len > tdb->transaction->last_block_size) {
344                 if (off >= tdb->transaction->last_block_size) {
345                         return 0;
346                 }
347                 len = tdb->transaction->last_block_size - off;
348         }
349
350         /* overwrite part of an existing block */
351         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
352
353         return 0;
354 }
355
356
357 /*
358   accelerated hash chain head search, using the cached hash heads
359 */
360 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
361 {
362         uint32_t h = *chain;
363         for (;h < tdb->header.hash_size;h++) {
364                 /* the +1 takes account of the freelist */
365                 if (0 != tdb->transaction->hash_heads[h+1]) {
366                         break;
367                 }
368         }
369         (*chain) = h;
370 }
371
372 /*
373   out of bounds check during a transaction
374 */
375 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
376 {
377         if (len <= tdb->map_size) {
378                 return 0;
379         }
380         return TDB_ERRCODE(TDB_ERR_IO, -1);
381 }
382
383 /*
384   transaction version of tdb_expand().
385 */
386 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
387                                    tdb_off_t addition)
388 {
389         /* add a write to the transaction elements, so subsequent
390            reads see the zero data */
391         if (transaction_write(tdb, size, NULL, addition) != 0) {
392                 return -1;
393         }
394
395         return 0;
396 }
397
398 /*
399   brlock during a transaction - ignore them
400 */
401 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
402                               int rw_type, int lck_type, int probe, size_t len)
403 {
404         return 0;
405 }
406
407 static const struct tdb_methods transaction_methods = {
408         transaction_read,
409         transaction_write,
410         transaction_next_hash_chain,
411         transaction_oob,
412         transaction_expand_file,
413         transaction_brlock
414 };
415
416
417 /*
418   start a tdb transaction. No token is returned, as only a single
419   transaction is allowed to be pending per tdb_context
420 */
421 int tdb_transaction_start(struct tdb_context *tdb)
422 {
423         /* some sanity checks */
424         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
425                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
426                 tdb->ecode = TDB_ERR_EINVAL;
427                 return -1;
428         }
429
430         /* cope with nested tdb_transaction_start() calls */
431         if (tdb->transaction != NULL) {
432                 tdb->transaction->nesting++;
433                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
434                          tdb->transaction->nesting));
435                 return 0;
436         }
437
438         if (tdb->num_locks != 0 || tdb->global_lock.count) {
439                 /* the caller must not have any locks when starting a
440                    transaction as otherwise we'll be screwed by lack
441                    of nested locks in posix */
442                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
443                 tdb->ecode = TDB_ERR_LOCK;
444                 return -1;
445         }
446
447         if (tdb->travlocks.next != NULL) {
448                 /* you cannot use transactions inside a traverse (although you can use
449                    traverse inside a transaction) as otherwise you can end up with
450                    deadlock */
451                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
452                 tdb->ecode = TDB_ERR_LOCK;
453                 return -1;
454         }
455
456         tdb->transaction = (struct tdb_transaction *)
457                 calloc(sizeof(struct tdb_transaction), 1);
458         if (tdb->transaction == NULL) {
459                 tdb->ecode = TDB_ERR_OOM;
460                 return -1;
461         }
462
463         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
464         tdb->transaction->block_size = tdb->page_size;
465
466         /* get the transaction write lock. This is a blocking lock. As
467            discussed with Volker, there are a number of ways we could
468            make this async, which we will probably do in the future */
469         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
470                 SAFE_FREE(tdb->transaction->blocks);
471                 SAFE_FREE(tdb->transaction);
472                 return -1;
473         }
474         
475         /* get a read lock from the freelist to the end of file. This
476            is upgraded to a write lock during the commit */
477         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
478                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
479                 tdb->ecode = TDB_ERR_LOCK;
480                 goto fail;
481         }
482
483         /* setup a copy of the hash table heads so the hash scan in
484            traverse can be fast */
485         tdb->transaction->hash_heads = (uint32_t *)
486                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
487         if (tdb->transaction->hash_heads == NULL) {
488                 tdb->ecode = TDB_ERR_OOM;
489                 goto fail;
490         }
491         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
492                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
493                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
494                 tdb->ecode = TDB_ERR_IO;
495                 goto fail;
496         }
497
498         /* make sure we know about any file expansions already done by
499            anyone else */
500         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
501         tdb->transaction->old_map_size = tdb->map_size;
502
503         /* finally hook the io methods, replacing them with
504            transaction specific methods */
505         tdb->transaction->io_methods = tdb->methods;
506         tdb->methods = &transaction_methods;
507
508         return 0;
509         
510 fail:
511         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
512         tdb_transaction_unlock(tdb);
513         SAFE_FREE(tdb->transaction->blocks);
514         SAFE_FREE(tdb->transaction->hash_heads);
515         SAFE_FREE(tdb->transaction);
516         return -1;
517 }
518
519
520 /*
521   sync to disk
522 */
523 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
524 {       
525         if (fsync(tdb->fd) != 0) {
526                 tdb->ecode = TDB_ERR_IO;
527                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
528                 return -1;
529         }
530 #ifdef HAVE_MMAP
531         if (tdb->map_ptr) {
532                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
533                 if (msync(moffset + (char *)tdb->map_ptr, 
534                           length + (offset - moffset), MS_SYNC) != 0) {
535                         tdb->ecode = TDB_ERR_IO;
536                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
537                                  strerror(errno)));
538                         return -1;
539                 }
540         }
541 #endif
542         return 0;
543 }
544
545
546 /*
547   cancel the current transaction
548 */
549 int tdb_transaction_cancel(struct tdb_context *tdb)
550 {       
551         int i, ret = 0;
552
553         if (tdb->transaction == NULL) {
554                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
555                 return -1;
556         }
557
558         if (tdb->transaction->nesting != 0) {
559                 tdb->transaction->transaction_error = 1;
560                 tdb->transaction->nesting--;
561                 return 0;
562         }               
563
564         tdb->map_size = tdb->transaction->old_map_size;
565
566         /* free all the transaction blocks */
567         for (i=0;i<tdb->transaction->num_blocks;i++) {
568                 if (tdb->transaction->blocks[i] != NULL) {
569                         free(tdb->transaction->blocks[i]);
570                 }
571         }
572         SAFE_FREE(tdb->transaction->blocks);
573
574         if (tdb->transaction->magic_offset) {
575                 const struct tdb_methods *methods = tdb->transaction->io_methods;
576                 uint32_t zero = 0;
577
578                 /* remove the recovery marker */
579                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
580                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
581                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
582                         ret = -1;
583                 }
584         }
585
586         /* remove any global lock created during the transaction */
587         if (tdb->global_lock.count != 0) {
588                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
589                 tdb->global_lock.count = 0;
590         }
591
592         /* remove any locks created during the transaction */
593         if (tdb->num_locks != 0) {
594                 for (i=0;i<tdb->num_lockrecs;i++) {
595                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
596                                    F_UNLCK,F_SETLKW, 0, 1);
597                 }
598                 tdb->num_locks = 0;
599                 tdb->num_lockrecs = 0;
600                 SAFE_FREE(tdb->lockrecs);
601         }
602
603         /* restore the normal io methods */
604         tdb->methods = tdb->transaction->io_methods;
605
606         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
607         tdb_transaction_unlock(tdb);
608         SAFE_FREE(tdb->transaction->hash_heads);
609         SAFE_FREE(tdb->transaction);
610         
611         return ret;
612 }
613
614
615 /*
616   work out how much space the linearised recovery data will consume
617 */
618 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
619 {
620         tdb_len_t recovery_size = 0;
621         int i;
622
623         recovery_size = sizeof(uint32_t);
624         for (i=0;i<tdb->transaction->num_blocks;i++) {
625                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
626                         break;
627                 }
628                 if (tdb->transaction->blocks[i] == NULL) {
629                         continue;
630                 }
631                 recovery_size += 2*sizeof(tdb_off_t);
632                 if (i == tdb->transaction->num_blocks-1) {
633                         recovery_size += tdb->transaction->last_block_size;
634                 } else {
635                         recovery_size += tdb->transaction->block_size;
636                 }
637         }       
638
639         return recovery_size;
640 }
641
642 /*
643   allocate the recovery area, or use an existing recovery area if it is
644   large enough
645 */
646 static int tdb_recovery_allocate(struct tdb_context *tdb, 
647                                  tdb_len_t *recovery_size,
648                                  tdb_off_t *recovery_offset,
649                                  tdb_len_t *recovery_max_size)
650 {
651         struct list_struct rec;
652         const struct tdb_methods *methods = tdb->transaction->io_methods;
653         tdb_off_t recovery_head;
654
655         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
656                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
657                 return -1;
658         }
659
660         rec.rec_len = 0;
661
662         if (recovery_head != 0 && 
663             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
664                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
665                 return -1;
666         }
667
668         *recovery_size = tdb_recovery_size(tdb);
669
670         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
671                 /* it fits in the existing area */
672                 *recovery_max_size = rec.rec_len;
673                 *recovery_offset = recovery_head;
674                 return 0;
675         }
676
677         /* we need to free up the old recovery area, then allocate a
678            new one at the end of the file. Note that we cannot use
679            tdb_allocate() to allocate the new one as that might return
680            us an area that is being currently used (as of the start of
681            the transaction) */
682         if (recovery_head != 0) {
683                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
684                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
685                         return -1;
686                 }
687         }
688
689         /* the tdb_free() call might have increased the recovery size */
690         *recovery_size = tdb_recovery_size(tdb);
691
692         /* round up to a multiple of page size */
693         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
694         *recovery_offset = tdb->map_size;
695         recovery_head = *recovery_offset;
696
697         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
698                                      (tdb->map_size - tdb->transaction->old_map_size) +
699                                      sizeof(rec) + *recovery_max_size) == -1) {
700                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
701                 return -1;
702         }
703
704         /* remap the file (if using mmap) */
705         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
706
707         /* we have to reset the old map size so that we don't try to expand the file
708            again in the transaction commit, which would destroy the recovery area */
709         tdb->transaction->old_map_size = tdb->map_size;
710
711         /* write the recovery header offset and sync - we can sync without a race here
712            as the magic ptr in the recovery record has not been set */
713         CONVERT(recovery_head);
714         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
715                                &recovery_head, sizeof(tdb_off_t)) == -1) {
716                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
717                 return -1;
718         }
719         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
720                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
721                 return -1;
722         }
723
724         return 0;
725 }
726
727
728 /*
729   setup the recovery data that will be used on a crash during commit
730 */
731 static int transaction_setup_recovery(struct tdb_context *tdb, 
732                                       tdb_off_t *magic_offset)
733 {
734         tdb_len_t recovery_size;
735         unsigned char *data, *p;
736         const struct tdb_methods *methods = tdb->transaction->io_methods;
737         struct list_struct *rec;
738         tdb_off_t recovery_offset, recovery_max_size;
739         tdb_off_t old_map_size = tdb->transaction->old_map_size;
740         uint32_t magic, tailer;
741         int i;
742
743         /*
744           check that the recovery area has enough space
745         */
746         if (tdb_recovery_allocate(tdb, &recovery_size, 
747                                   &recovery_offset, &recovery_max_size) == -1) {
748                 return -1;
749         }
750
751         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
752         if (data == NULL) {
753                 tdb->ecode = TDB_ERR_OOM;
754                 return -1;
755         }
756
757         rec = (struct list_struct *)data;
758         memset(rec, 0, sizeof(*rec));
759
760         rec->magic    = 0;
761         rec->data_len = recovery_size;
762         rec->rec_len  = recovery_max_size;
763         rec->key_len  = old_map_size;
764         CONVERT(rec);
765
766         /* build the recovery data into a single blob to allow us to do a single
767            large write, which should be more efficient */
768         p = data + sizeof(*rec);
769         for (i=0;i<tdb->transaction->num_blocks;i++) {
770                 tdb_off_t offset;
771                 tdb_len_t length;
772
773                 if (tdb->transaction->blocks[i] == NULL) {
774                         continue;
775                 }
776
777                 offset = i * tdb->transaction->block_size;
778                 length = tdb->transaction->block_size;
779                 if (i == tdb->transaction->num_blocks-1) {
780                         length = tdb->transaction->last_block_size;
781                 }
782                 
783                 if (offset >= old_map_size) {
784                         continue;
785                 }
786                 if (offset + length > tdb->transaction->old_map_size) {
787                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
788                         free(data);
789                         tdb->ecode = TDB_ERR_CORRUPT;
790                         return -1;
791                 }
792                 memcpy(p, &offset, 4);
793                 memcpy(p+4, &length, 4);
794                 if (DOCONV()) {
795                         tdb_convert(p, 8);
796                 }
797                 /* the recovery area contains the old data, not the
798                    new data, so we have to call the original tdb_read
799                    method to get it */
800                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
801                         free(data);
802                         tdb->ecode = TDB_ERR_IO;
803                         return -1;
804                 }
805                 p += 8 + length;
806         }
807
808         /* and the tailer */
809         tailer = sizeof(*rec) + recovery_max_size;
810         memcpy(p, &tailer, 4);
811         CONVERT(p);
812
813         /* write the recovery data to the recovery area */
814         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
815                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
816                 free(data);
817                 tdb->ecode = TDB_ERR_IO;
818                 return -1;
819         }
820         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
821                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
822                 free(data);
823                 tdb->ecode = TDB_ERR_IO;
824                 return -1;
825         }
826
827         /* as we don't have ordered writes, we have to sync the recovery
828            data before we update the magic to indicate that the recovery
829            data is present */
830         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
831                 free(data);
832                 return -1;
833         }
834
835         free(data);
836
837         magic = TDB_RECOVERY_MAGIC;
838         CONVERT(magic);
839
840         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
841
842         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
843                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
844                 tdb->ecode = TDB_ERR_IO;
845                 return -1;
846         }
847         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
848                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
849                 tdb->ecode = TDB_ERR_IO;
850                 return -1;
851         }
852
853         /* ensure the recovery magic marker is on disk */
854         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
855                 return -1;
856         }
857
858         return 0;
859 }
860
861 /*
862   prepare to commit the current transaction
863 */
864 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
865 {       
866         const struct tdb_methods *methods;
867
868         if (tdb->transaction == NULL) {
869                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
870                 return -1;
871         }
872
873         if (tdb->transaction->prepared) {
874                 tdb->ecode = TDB_ERR_EINVAL;
875                 tdb_transaction_cancel(tdb);
876                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
877                 return -1;
878         }
879
880         if (tdb->transaction->transaction_error) {
881                 tdb->ecode = TDB_ERR_IO;
882                 tdb_transaction_cancel(tdb);
883                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
884                 return -1;
885         }
886
887
888         if (tdb->transaction->nesting != 0) {
889                 return 0;
890         }               
891
892         /* check for a null transaction */
893         if (tdb->transaction->blocks == NULL) {
894                 return 0;
895         }
896
897         methods = tdb->transaction->io_methods;
898         
899         /* if there are any locks pending then the caller has not
900            nested their locks properly, so fail the transaction */
901         if (tdb->num_locks || tdb->global_lock.count) {
902                 tdb->ecode = TDB_ERR_LOCK;
903                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
904                 tdb_transaction_cancel(tdb);
905                 return -1;
906         }
907
908         /* upgrade the main transaction lock region to a write lock */
909         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
910                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
911                 tdb->ecode = TDB_ERR_LOCK;
912                 tdb_transaction_cancel(tdb);
913                 return -1;
914         }
915
916         /* get the global lock - this prevents new users attaching to the database
917            during the commit */
918         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
919                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
920                 tdb->ecode = TDB_ERR_LOCK;
921                 tdb_transaction_cancel(tdb);
922                 return -1;
923         }
924
925         if (!(tdb->flags & TDB_NOSYNC)) {
926                 /* write the recovery data to the end of the file */
927                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
928                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
929                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
930                         tdb_transaction_cancel(tdb);
931                         return -1;
932                 }
933         }
934
935         tdb->transaction->prepared = true;
936
937         /* expand the file to the new size if needed */
938         if (tdb->map_size != tdb->transaction->old_map_size) {
939                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
940                                              tdb->map_size - 
941                                              tdb->transaction->old_map_size) == -1) {
942                         tdb->ecode = TDB_ERR_IO;
943                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
944                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
945                         tdb_transaction_cancel(tdb);
946                         return -1;
947                 }
948                 tdb->map_size = tdb->transaction->old_map_size;
949                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
950         }
951
952         /* Keep the global lock until the actual commit */
953
954         return 0;
955 }
956
957 /*
958   commit the current transaction
959 */
960 int tdb_transaction_commit(struct tdb_context *tdb)
961 {       
962         const struct tdb_methods *methods;
963         int i;
964
965         if (tdb->transaction == NULL) {
966                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
967                 return -1;
968         }
969
970         if (tdb->transaction->transaction_error) {
971                 tdb->ecode = TDB_ERR_IO;
972                 tdb_transaction_cancel(tdb);
973                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
974                 return -1;
975         }
976
977
978         if (tdb->transaction->nesting != 0) {
979                 tdb->transaction->nesting--;
980                 return 0;
981         }
982
983         /* check for a null transaction */
984         if (tdb->transaction->blocks == NULL) {
985                 tdb_transaction_cancel(tdb);
986                 return 0;
987         }
988
989         if (!tdb->transaction->prepared) {
990                 int ret = tdb_transaction_prepare_commit(tdb);
991                 if (ret)
992                         return ret;
993         }
994
995         methods = tdb->transaction->io_methods;
996
997         /* perform all the writes */
998         for (i=0;i<tdb->transaction->num_blocks;i++) {
999                 tdb_off_t offset;
1000                 tdb_len_t length;
1001
1002                 if (tdb->transaction->blocks[i] == NULL) {
1003                         continue;
1004                 }
1005
1006                 offset = i * tdb->transaction->block_size;
1007                 length = tdb->transaction->block_size;
1008                 if (i == tdb->transaction->num_blocks-1) {
1009                         length = tdb->transaction->last_block_size;
1010                 }
1011
1012                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1013                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1014                         
1015                         /* we've overwritten part of the data and
1016                            possibly expanded the file, so we need to
1017                            run the crash recovery code */
1018                         tdb->methods = methods;
1019                         tdb_transaction_recover(tdb); 
1020
1021                         tdb_transaction_cancel(tdb);
1022                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1023
1024                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1025                         return -1;
1026                 }
1027                 SAFE_FREE(tdb->transaction->blocks[i]);
1028         } 
1029
1030         SAFE_FREE(tdb->transaction->blocks);
1031         tdb->transaction->num_blocks = 0;
1032
1033         if (!(tdb->flags & TDB_NOSYNC)) {
1034                 /* ensure the new data is on disk */
1035                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1036                         return -1;
1037                 }
1038         }
1039
1040         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1041
1042         /*
1043           TODO: maybe write to some dummy hdr field, or write to magic
1044           offset without mmap, before the last sync, instead of the
1045           utime() call
1046         */
1047
1048         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1049            don't change the mtime of the file, this means the file may
1050            not be backed up (as tdb rounding to block sizes means that
1051            file size changes are quite rare too). The following forces
1052            mtime changes when a transaction completes */
1053 #ifdef HAVE_UTIME
1054         utime(tdb->name, NULL);
1055 #endif
1056
1057         /* use a transaction cancel to free memory and remove the
1058            transaction locks */
1059         tdb_transaction_cancel(tdb);
1060
1061         return 0;
1062 }
1063
1064
1065 /*
1066   recover from an aborted transaction. Must be called with exclusive
1067   database write access already established (including the global
1068   lock to prevent new processes attaching)
1069 */
1070 int tdb_transaction_recover(struct tdb_context *tdb)
1071 {
1072         tdb_off_t recovery_head, recovery_eof;
1073         unsigned char *data, *p;
1074         uint32_t zero = 0;
1075         struct list_struct rec;
1076
1077         /* find the recovery area */
1078         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1079                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1080                 tdb->ecode = TDB_ERR_IO;
1081                 return -1;
1082         }
1083
1084         if (recovery_head == 0) {
1085                 /* we have never allocated a recovery record */
1086                 return 0;
1087         }
1088
1089         /* read the recovery record */
1090         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1091                                    sizeof(rec), DOCONV()) == -1) {
1092                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1093                 tdb->ecode = TDB_ERR_IO;
1094                 return -1;
1095         }
1096
1097         if (rec.magic != TDB_RECOVERY_MAGIC) {
1098                 /* there is no valid recovery data */
1099                 return 0;
1100         }
1101
1102         if (tdb->read_only) {
1103                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1104                 tdb->ecode = TDB_ERR_CORRUPT;
1105                 return -1;
1106         }
1107
1108         recovery_eof = rec.key_len;
1109
1110         data = (unsigned char *)malloc(rec.data_len);
1111         if (data == NULL) {
1112                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1113                 tdb->ecode = TDB_ERR_OOM;
1114                 return -1;
1115         }
1116
1117         /* read the full recovery data */
1118         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1119                                    rec.data_len, 0) == -1) {
1120                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1121                 tdb->ecode = TDB_ERR_IO;
1122                 return -1;
1123         }
1124
1125         /* recover the file data */
1126         p = data;
1127         while (p+8 < data + rec.data_len) {
1128                 uint32_t ofs, len;
1129                 if (DOCONV()) {
1130                         tdb_convert(p, 8);
1131                 }
1132                 memcpy(&ofs, p, 4);
1133                 memcpy(&len, p+4, 4);
1134
1135                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1136                         free(data);
1137                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1138                         tdb->ecode = TDB_ERR_IO;
1139                         return -1;
1140                 }
1141                 p += 8 + len;
1142         }
1143
1144         free(data);
1145
1146         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1147                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1148                 tdb->ecode = TDB_ERR_IO;
1149                 return -1;
1150         }
1151
1152         /* if the recovery area is after the recovered eof then remove it */
1153         if (recovery_eof <= recovery_head) {
1154                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1155                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1156                         tdb->ecode = TDB_ERR_IO;
1157                         return -1;                      
1158                 }
1159         }
1160
1161         /* remove the recovery magic */
1162         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1163                           &zero) == -1) {
1164                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1165                 tdb->ecode = TDB_ERR_IO;
1166                 return -1;                      
1167         }
1168         
1169         /* reduce the file size to the old size */
1170         tdb_munmap(tdb);
1171         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1172                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1173                 tdb->ecode = TDB_ERR_IO;
1174                 return -1;                      
1175         }
1176         tdb->map_size = recovery_eof;
1177         tdb_mmap(tdb);
1178
1179         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1180                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1181                 tdb->ecode = TDB_ERR_IO;
1182                 return -1;
1183         }
1184
1185         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1186                  recovery_eof));
1187
1188         /* all done */
1189         return 0;
1190 }