lib/tdb: TDB_TRACE support (for developers)
[samba.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* set when a prepare has already occurred */
120         bool prepared;
121         tdb_off_t magic_offset;
122
123         /* old file size before transaction */
124         tdb_len_t old_map_size;
125
126         /* we should re-pack on commit */
127         bool need_repack;
128 };
129
130
131 /*
132   read while in a transaction. We need to check first if the data is in our list
133   of transaction elements, then if not do a real read
134 */
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
136                             tdb_len_t len, int cv)
137 {
138         uint32_t blk;
139
140         /* break it down into block sized ops */
141         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
142                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
143                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
144                         return -1;
145                 }
146                 len -= len2;
147                 off += len2;
148                 buf = (void *)(len2 + (char *)buf);
149         }
150
151         if (len == 0) {
152                 return 0;
153         }
154
155         blk = off / tdb->transaction->block_size;
156
157         /* see if we have it in the block list */
158         if (tdb->transaction->num_blocks <= blk ||
159             tdb->transaction->blocks[blk] == NULL) {
160                 /* nope, do a real read */
161                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
162                         goto fail;
163                 }
164                 return 0;
165         }
166
167         /* it is in the block list. Now check for the last block */
168         if (blk == tdb->transaction->num_blocks-1) {
169                 if (len > tdb->transaction->last_block_size) {
170                         goto fail;
171                 }
172         }
173         
174         /* now copy it out of this block */
175         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
176         if (cv) {
177                 tdb_convert(buf, len);
178         }
179         return 0;
180
181 fail:
182         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
183         tdb->ecode = TDB_ERR_IO;
184         tdb->transaction->transaction_error = 1;
185         return -1;
186 }
187
188
189 /*
190   write while in a transaction
191 */
192 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
193                              const void *buf, tdb_len_t len)
194 {
195         uint32_t blk;
196
197         /* Only a commit is allowed on a prepared transaction */
198         if (tdb->transaction->prepared) {
199                 tdb->ecode = TDB_ERR_EINVAL;
200                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
201                 tdb->transaction->transaction_error = 1;
202                 return -1;
203         }
204
205         /* if the write is to a hash head, then update the transaction
206            hash heads */
207         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
208             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
209                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
210                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
211         }
212
213         /* break it up into block sized chunks */
214         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
215                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
216                 if (transaction_write(tdb, off, buf, len2) != 0) {
217                         return -1;
218                 }
219                 len -= len2;
220                 off += len2;
221                 if (buf != NULL) {
222                         buf = (const void *)(len2 + (const char *)buf);
223                 }
224         }
225
226         if (len == 0) {
227                 return 0;
228         }
229
230         blk = off / tdb->transaction->block_size;
231         off = off % tdb->transaction->block_size;
232
233         if (tdb->transaction->num_blocks <= blk) {
234                 uint8_t **new_blocks;
235                 /* expand the blocks array */
236                 if (tdb->transaction->blocks == NULL) {
237                         new_blocks = (uint8_t **)malloc(
238                                 (blk+1)*sizeof(uint8_t *));
239                 } else {
240                         new_blocks = (uint8_t **)realloc(
241                                 tdb->transaction->blocks,
242                                 (blk+1)*sizeof(uint8_t *));
243                 }
244                 if (new_blocks == NULL) {
245                         tdb->ecode = TDB_ERR_OOM;
246                         goto fail;
247                 }
248                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
249                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
250                 tdb->transaction->blocks = new_blocks;
251                 tdb->transaction->num_blocks = blk+1;
252                 tdb->transaction->last_block_size = 0;
253         }
254
255         /* allocate and fill a block? */
256         if (tdb->transaction->blocks[blk] == NULL) {
257                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
258                 if (tdb->transaction->blocks[blk] == NULL) {
259                         tdb->ecode = TDB_ERR_OOM;
260                         tdb->transaction->transaction_error = 1;
261                         return -1;                      
262                 }
263                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
264                         tdb_len_t len2 = tdb->transaction->block_size;
265                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
266                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
267                         }
268                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
269                                                                    tdb->transaction->blocks[blk], 
270                                                                    len2, 0) != 0) {
271                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
272                                 tdb->ecode = TDB_ERR_IO;
273                                 goto fail;
274                         }
275                         if (blk == tdb->transaction->num_blocks-1) {
276                                 tdb->transaction->last_block_size = len2;
277                         }                       
278                 }
279         }
280         
281         /* overwrite part of an existing block */
282         if (buf == NULL) {
283                 memset(tdb->transaction->blocks[blk] + off, 0, len);
284         } else {
285                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
286         }
287         if (blk == tdb->transaction->num_blocks-1) {
288                 if (len + off > tdb->transaction->last_block_size) {
289                         tdb->transaction->last_block_size = len + off;
290                 }
291         }
292
293         return 0;
294
295 fail:
296         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
297                  (blk*tdb->transaction->block_size) + off, len));
298         tdb->transaction->transaction_error = 1;
299         return -1;
300 }
301
302
303 /*
304   write while in a transaction - this varient never expands the transaction blocks, it only
305   updates existing blocks. This means it cannot change the recovery size
306 */
307 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
308                                       const void *buf, tdb_len_t len)
309 {
310         uint32_t blk;
311
312         /* break it up into block sized chunks */
313         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
314                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
315                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
316                         return -1;
317                 }
318                 len -= len2;
319                 off += len2;
320                 if (buf != NULL) {
321                         buf = (const void *)(len2 + (const char *)buf);
322                 }
323         }
324
325         if (len == 0) {
326                 return 0;
327         }
328
329         blk = off / tdb->transaction->block_size;
330         off = off % tdb->transaction->block_size;
331
332         if (tdb->transaction->num_blocks <= blk ||
333             tdb->transaction->blocks[blk] == NULL) {
334                 return 0;
335         }
336
337         if (blk == tdb->transaction->num_blocks-1 &&
338             off + len > tdb->transaction->last_block_size) {
339                 if (off >= tdb->transaction->last_block_size) {
340                         return 0;
341                 }
342                 len = tdb->transaction->last_block_size - off;
343         }
344
345         /* overwrite part of an existing block */
346         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
347
348         return 0;
349 }
350
351
352 /*
353   accelerated hash chain head search, using the cached hash heads
354 */
355 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
356 {
357         uint32_t h = *chain;
358         for (;h < tdb->header.hash_size;h++) {
359                 /* the +1 takes account of the freelist */
360                 if (0 != tdb->transaction->hash_heads[h+1]) {
361                         break;
362                 }
363         }
364         (*chain) = h;
365 }
366
367 /*
368   out of bounds check during a transaction
369 */
370 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
371 {
372         if (len <= tdb->map_size) {
373                 return 0;
374         }
375         return TDB_ERRCODE(TDB_ERR_IO, -1);
376 }
377
378 /*
379   transaction version of tdb_expand().
380 */
381 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
382                                    tdb_off_t addition)
383 {
384         /* add a write to the transaction elements, so subsequent
385            reads see the zero data */
386         if (transaction_write(tdb, size, NULL, addition) != 0) {
387                 return -1;
388         }
389
390         tdb->transaction->need_repack = true;
391
392         return 0;
393 }
394
395 /*
396   brlock during a transaction - ignore them
397 */
398 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
399                               int rw_type, int lck_type, int probe, size_t len)
400 {
401         return 0;
402 }
403
404 static const struct tdb_methods transaction_methods = {
405         transaction_read,
406         transaction_write,
407         transaction_next_hash_chain,
408         transaction_oob,
409         transaction_expand_file,
410         transaction_brlock
411 };
412
413
414 /*
415   start a tdb transaction. No token is returned, as only a single
416   transaction is allowed to be pending per tdb_context
417 */
418 int tdb_transaction_start(struct tdb_context *tdb)
419 {
420         /* some sanity checks */
421         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
422                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
423                 tdb->ecode = TDB_ERR_EINVAL;
424                 return -1;
425         }
426
427         /* cope with nested tdb_transaction_start() calls */
428         if (tdb->transaction != NULL) {
429                 tdb->transaction->nesting++;
430                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
431                          tdb->transaction->nesting));
432                 return 0;
433         }
434
435         if (tdb->num_locks != 0 || tdb->global_lock.count) {
436                 /* the caller must not have any locks when starting a
437                    transaction as otherwise we'll be screwed by lack
438                    of nested locks in posix */
439                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
440                 tdb->ecode = TDB_ERR_LOCK;
441                 return -1;
442         }
443
444         if (tdb->travlocks.next != NULL) {
445                 /* you cannot use transactions inside a traverse (although you can use
446                    traverse inside a transaction) as otherwise you can end up with
447                    deadlock */
448                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
449                 tdb->ecode = TDB_ERR_LOCK;
450                 return -1;
451         }
452
453         tdb->transaction = (struct tdb_transaction *)
454                 calloc(sizeof(struct tdb_transaction), 1);
455         if (tdb->transaction == NULL) {
456                 tdb->ecode = TDB_ERR_OOM;
457                 return -1;
458         }
459
460         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
461         tdb->transaction->block_size = tdb->page_size;
462
463         /* get the transaction write lock. This is a blocking lock. As
464            discussed with Volker, there are a number of ways we could
465            make this async, which we will probably do in the future */
466         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
467                 SAFE_FREE(tdb->transaction->blocks);
468                 SAFE_FREE(tdb->transaction);
469                 return -1;
470         }
471         
472         /* get a read lock from the freelist to the end of file. This
473            is upgraded to a write lock during the commit */
474         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
475                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
476                 tdb->ecode = TDB_ERR_LOCK;
477                 goto fail;
478         }
479
480         /* setup a copy of the hash table heads so the hash scan in
481            traverse can be fast */
482         tdb->transaction->hash_heads = (uint32_t *)
483                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
484         if (tdb->transaction->hash_heads == NULL) {
485                 tdb->ecode = TDB_ERR_OOM;
486                 goto fail;
487         }
488         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
489                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
490                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
491                 tdb->ecode = TDB_ERR_IO;
492                 goto fail;
493         }
494
495         /* make sure we know about any file expansions already done by
496            anyone else */
497         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
498         tdb->transaction->old_map_size = tdb->map_size;
499
500         /* finally hook the io methods, replacing them with
501            transaction specific methods */
502         tdb->transaction->io_methods = tdb->methods;
503         tdb->methods = &transaction_methods;
504
505         /* Trace at the end, so we get sequence number correct. */
506         tdb_trace(tdb, "tdb_transaction_start");
507         return 0;
508         
509 fail:
510         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
511         tdb_transaction_unlock(tdb);
512         SAFE_FREE(tdb->transaction->blocks);
513         SAFE_FREE(tdb->transaction->hash_heads);
514         SAFE_FREE(tdb->transaction);
515         return -1;
516 }
517
518
519 /*
520   sync to disk
521 */
522 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
523 {       
524         if (tdb->flags & TDB_NOSYNC) {
525                 return 0;
526         }
527
528         if (fsync(tdb->fd) != 0) {
529                 tdb->ecode = TDB_ERR_IO;
530                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
531                 return -1;
532         }
533 #ifdef HAVE_MMAP
534         if (tdb->map_ptr) {
535                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
536                 if (msync(moffset + (char *)tdb->map_ptr, 
537                           length + (offset - moffset), MS_SYNC) != 0) {
538                         tdb->ecode = TDB_ERR_IO;
539                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
540                                  strerror(errno)));
541                         return -1;
542                 }
543         }
544 #endif
545         return 0;
546 }
547
548
549 int _tdb_transaction_cancel(struct tdb_context *tdb)
550 {       
551         int i, ret = 0;
552
553         if (tdb->transaction == NULL) {
554                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
555                 return -1;
556         }
557
558         if (tdb->transaction->nesting != 0) {
559                 tdb->transaction->transaction_error = 1;
560                 tdb->transaction->nesting--;
561                 return 0;
562         }               
563
564         tdb->map_size = tdb->transaction->old_map_size;
565
566         /* free all the transaction blocks */
567         for (i=0;i<tdb->transaction->num_blocks;i++) {
568                 if (tdb->transaction->blocks[i] != NULL) {
569                         free(tdb->transaction->blocks[i]);
570                 }
571         }
572         SAFE_FREE(tdb->transaction->blocks);
573
574         if (tdb->transaction->magic_offset) {
575                 const struct tdb_methods *methods = tdb->transaction->io_methods;
576                 uint32_t zero = 0;
577
578                 /* remove the recovery marker */
579                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
580                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
581                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
582                         ret = -1;
583                 }
584         }
585
586         /* remove any global lock created during the transaction */
587         if (tdb->global_lock.count != 0) {
588                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
589                 tdb->global_lock.count = 0;
590         }
591
592         /* remove any locks created during the transaction */
593         if (tdb->num_locks != 0) {
594                 for (i=0;i<tdb->num_lockrecs;i++) {
595                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
596                                    F_UNLCK,F_SETLKW, 0, 1);
597                 }
598                 tdb->num_locks = 0;
599                 tdb->num_lockrecs = 0;
600                 SAFE_FREE(tdb->lockrecs);
601         }
602
603         /* restore the normal io methods */
604         tdb->methods = tdb->transaction->io_methods;
605
606         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
607         tdb_transaction_unlock(tdb);
608         SAFE_FREE(tdb->transaction->hash_heads);
609         SAFE_FREE(tdb->transaction);
610         
611         return ret;
612 }
613
614 /*
615   cancel the current transaction
616 */
617 int tdb_transaction_cancel(struct tdb_context *tdb)
618 {
619         tdb_trace(tdb, "tdb_transaction_cancel");
620         return _tdb_transaction_cancel(tdb);
621 }
622
623 /*
624   work out how much space the linearised recovery data will consume
625 */
626 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
627 {
628         tdb_len_t recovery_size = 0;
629         int i;
630
631         recovery_size = sizeof(uint32_t);
632         for (i=0;i<tdb->transaction->num_blocks;i++) {
633                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
634                         break;
635                 }
636                 if (tdb->transaction->blocks[i] == NULL) {
637                         continue;
638                 }
639                 recovery_size += 2*sizeof(tdb_off_t);
640                 if (i == tdb->transaction->num_blocks-1) {
641                         recovery_size += tdb->transaction->last_block_size;
642                 } else {
643                         recovery_size += tdb->transaction->block_size;
644                 }
645         }       
646
647         return recovery_size;
648 }
649
650 /*
651   allocate the recovery area, or use an existing recovery area if it is
652   large enough
653 */
654 static int tdb_recovery_allocate(struct tdb_context *tdb, 
655                                  tdb_len_t *recovery_size,
656                                  tdb_off_t *recovery_offset,
657                                  tdb_len_t *recovery_max_size)
658 {
659         struct list_struct rec;
660         const struct tdb_methods *methods = tdb->transaction->io_methods;
661         tdb_off_t recovery_head;
662
663         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
664                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
665                 return -1;
666         }
667
668         rec.rec_len = 0;
669
670         if (recovery_head != 0 && 
671             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
672                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
673                 return -1;
674         }
675
676         *recovery_size = tdb_recovery_size(tdb);
677
678         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
679                 /* it fits in the existing area */
680                 *recovery_max_size = rec.rec_len;
681                 *recovery_offset = recovery_head;
682                 return 0;
683         }
684
685         /* we need to free up the old recovery area, then allocate a
686            new one at the end of the file. Note that we cannot use
687            tdb_allocate() to allocate the new one as that might return
688            us an area that is being currently used (as of the start of
689            the transaction) */
690         if (recovery_head != 0) {
691                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
692                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
693                         return -1;
694                 }
695         }
696
697         /* the tdb_free() call might have increased the recovery size */
698         *recovery_size = tdb_recovery_size(tdb);
699
700         /* round up to a multiple of page size */
701         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
702         *recovery_offset = tdb->map_size;
703         recovery_head = *recovery_offset;
704
705         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
706                                      (tdb->map_size - tdb->transaction->old_map_size) +
707                                      sizeof(rec) + *recovery_max_size) == -1) {
708                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
709                 return -1;
710         }
711
712         /* remap the file (if using mmap) */
713         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
714
715         /* we have to reset the old map size so that we don't try to expand the file
716            again in the transaction commit, which would destroy the recovery area */
717         tdb->transaction->old_map_size = tdb->map_size;
718
719         /* write the recovery header offset and sync - we can sync without a race here
720            as the magic ptr in the recovery record has not been set */
721         CONVERT(recovery_head);
722         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
723                                &recovery_head, sizeof(tdb_off_t)) == -1) {
724                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
725                 return -1;
726         }
727         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
728                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
729                 return -1;
730         }
731
732         return 0;
733 }
734
735
736 /*
737   setup the recovery data that will be used on a crash during commit
738 */
739 static int transaction_setup_recovery(struct tdb_context *tdb, 
740                                       tdb_off_t *magic_offset)
741 {
742         tdb_len_t recovery_size;
743         unsigned char *data, *p;
744         const struct tdb_methods *methods = tdb->transaction->io_methods;
745         struct list_struct *rec;
746         tdb_off_t recovery_offset, recovery_max_size;
747         tdb_off_t old_map_size = tdb->transaction->old_map_size;
748         uint32_t magic, tailer;
749         int i;
750
751         /*
752           check that the recovery area has enough space
753         */
754         if (tdb_recovery_allocate(tdb, &recovery_size, 
755                                   &recovery_offset, &recovery_max_size) == -1) {
756                 return -1;
757         }
758
759         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
760         if (data == NULL) {
761                 tdb->ecode = TDB_ERR_OOM;
762                 return -1;
763         }
764
765         rec = (struct list_struct *)data;
766         memset(rec, 0, sizeof(*rec));
767
768         rec->magic    = 0;
769         rec->data_len = recovery_size;
770         rec->rec_len  = recovery_max_size;
771         rec->key_len  = old_map_size;
772         CONVERT(rec);
773
774         /* build the recovery data into a single blob to allow us to do a single
775            large write, which should be more efficient */
776         p = data + sizeof(*rec);
777         for (i=0;i<tdb->transaction->num_blocks;i++) {
778                 tdb_off_t offset;
779                 tdb_len_t length;
780
781                 if (tdb->transaction->blocks[i] == NULL) {
782                         continue;
783                 }
784
785                 offset = i * tdb->transaction->block_size;
786                 length = tdb->transaction->block_size;
787                 if (i == tdb->transaction->num_blocks-1) {
788                         length = tdb->transaction->last_block_size;
789                 }
790                 
791                 if (offset >= old_map_size) {
792                         continue;
793                 }
794                 if (offset + length > tdb->transaction->old_map_size) {
795                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
796                         free(data);
797                         tdb->ecode = TDB_ERR_CORRUPT;
798                         return -1;
799                 }
800                 memcpy(p, &offset, 4);
801                 memcpy(p+4, &length, 4);
802                 if (DOCONV()) {
803                         tdb_convert(p, 8);
804                 }
805                 /* the recovery area contains the old data, not the
806                    new data, so we have to call the original tdb_read
807                    method to get it */
808                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
809                         free(data);
810                         tdb->ecode = TDB_ERR_IO;
811                         return -1;
812                 }
813                 p += 8 + length;
814         }
815
816         /* and the tailer */
817         tailer = sizeof(*rec) + recovery_max_size;
818         memcpy(p, &tailer, 4);
819         CONVERT(p);
820
821         /* write the recovery data to the recovery area */
822         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
823                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
824                 free(data);
825                 tdb->ecode = TDB_ERR_IO;
826                 return -1;
827         }
828         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
829                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
830                 free(data);
831                 tdb->ecode = TDB_ERR_IO;
832                 return -1;
833         }
834
835         /* as we don't have ordered writes, we have to sync the recovery
836            data before we update the magic to indicate that the recovery
837            data is present */
838         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
839                 free(data);
840                 return -1;
841         }
842
843         free(data);
844
845         magic = TDB_RECOVERY_MAGIC;
846         CONVERT(magic);
847
848         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
849
850         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
851                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
852                 tdb->ecode = TDB_ERR_IO;
853                 return -1;
854         }
855         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
856                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
857                 tdb->ecode = TDB_ERR_IO;
858                 return -1;
859         }
860
861         /* ensure the recovery magic marker is on disk */
862         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
863                 return -1;
864         }
865
866         return 0;
867 }
868
869 static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
870 {       
871         const struct tdb_methods *methods;
872
873         if (tdb->transaction == NULL) {
874                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
875                 return -1;
876         }
877
878         if (tdb->transaction->prepared) {
879                 tdb->ecode = TDB_ERR_EINVAL;
880                 _tdb_transaction_cancel(tdb);
881                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
882                 return -1;
883         }
884
885         if (tdb->transaction->transaction_error) {
886                 tdb->ecode = TDB_ERR_IO;
887                 _tdb_transaction_cancel(tdb);
888                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
889                 return -1;
890         }
891
892
893         if (tdb->transaction->nesting != 0) {
894                 return 0;
895         }               
896
897         /* check for a null transaction */
898         if (tdb->transaction->blocks == NULL) {
899                 return 0;
900         }
901
902         methods = tdb->transaction->io_methods;
903         
904         /* if there are any locks pending then the caller has not
905            nested their locks properly, so fail the transaction */
906         if (tdb->num_locks || tdb->global_lock.count) {
907                 tdb->ecode = TDB_ERR_LOCK;
908                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
909                 _tdb_transaction_cancel(tdb);
910                 return -1;
911         }
912
913         /* upgrade the main transaction lock region to a write lock */
914         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
915                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
916                 tdb->ecode = TDB_ERR_LOCK;
917                 _tdb_transaction_cancel(tdb);
918                 return -1;
919         }
920
921         /* get the global lock - this prevents new users attaching to the database
922            during the commit */
923         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
924                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
925                 tdb->ecode = TDB_ERR_LOCK;
926                 _tdb_transaction_cancel(tdb);
927                 return -1;
928         }
929
930         if (!(tdb->flags & TDB_NOSYNC)) {
931                 /* write the recovery data to the end of the file */
932                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
933                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
934                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
935                         _tdb_transaction_cancel(tdb);
936                         return -1;
937                 }
938         }
939
940         tdb->transaction->prepared = true;
941
942         /* expand the file to the new size if needed */
943         if (tdb->map_size != tdb->transaction->old_map_size) {
944                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
945                                              tdb->map_size - 
946                                              tdb->transaction->old_map_size) == -1) {
947                         tdb->ecode = TDB_ERR_IO;
948                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
949                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
950                         _tdb_transaction_cancel(tdb);
951                         return -1;
952                 }
953                 tdb->map_size = tdb->transaction->old_map_size;
954                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
955         }
956
957         /* Keep the global lock until the actual commit */
958
959         return 0;
960 }
961
962 /*
963    prepare to commit the current transaction
964 */
965 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
966 {       
967         tdb_trace(tdb, "tdb_transaction_prepare_commit");
968         return _tdb_transaction_prepare_commit(tdb);
969 }
970
971 /*
972   commit the current transaction
973 */
974 int tdb_transaction_commit(struct tdb_context *tdb)
975 {       
976         const struct tdb_methods *methods;
977         int i;
978         bool need_repack;
979
980         if (tdb->transaction == NULL) {
981                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
982                 return -1;
983         }
984
985         tdb_trace(tdb, "tdb_transaction_commit");
986
987         if (tdb->transaction->transaction_error) {
988                 tdb->ecode = TDB_ERR_IO;
989                 _tdb_transaction_cancel(tdb);
990                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
991                 return -1;
992         }
993
994
995         if (tdb->transaction->nesting != 0) {
996                 tdb->transaction->nesting--;
997                 return 0;
998         }
999
1000         /* check for a null transaction */
1001         if (tdb->transaction->blocks == NULL) {
1002                 _tdb_transaction_cancel(tdb);
1003                 return 0;
1004         }
1005
1006         if (!tdb->transaction->prepared) {
1007                 int ret = _tdb_transaction_prepare_commit(tdb);
1008                 if (ret)
1009                         return ret;
1010         }
1011
1012         methods = tdb->transaction->io_methods;
1013
1014         /* perform all the writes */
1015         for (i=0;i<tdb->transaction->num_blocks;i++) {
1016                 tdb_off_t offset;
1017                 tdb_len_t length;
1018
1019                 if (tdb->transaction->blocks[i] == NULL) {
1020                         continue;
1021                 }
1022
1023                 offset = i * tdb->transaction->block_size;
1024                 length = tdb->transaction->block_size;
1025                 if (i == tdb->transaction->num_blocks-1) {
1026                         length = tdb->transaction->last_block_size;
1027                 }
1028
1029                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1030                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1031                         
1032                         /* we've overwritten part of the data and
1033                            possibly expanded the file, so we need to
1034                            run the crash recovery code */
1035                         tdb->methods = methods;
1036                         tdb_transaction_recover(tdb); 
1037
1038                         _tdb_transaction_cancel(tdb);
1039                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1040
1041                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1042                         return -1;
1043                 }
1044                 SAFE_FREE(tdb->transaction->blocks[i]);
1045         } 
1046
1047         SAFE_FREE(tdb->transaction->blocks);
1048         tdb->transaction->num_blocks = 0;
1049
1050         /* ensure the new data is on disk */
1051         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1052                 return -1;
1053         }
1054
1055         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1056
1057         /*
1058           TODO: maybe write to some dummy hdr field, or write to magic
1059           offset without mmap, before the last sync, instead of the
1060           utime() call
1061         */
1062
1063         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1064            don't change the mtime of the file, this means the file may
1065            not be backed up (as tdb rounding to block sizes means that
1066            file size changes are quite rare too). The following forces
1067            mtime changes when a transaction completes */
1068 #ifdef HAVE_UTIME
1069         utime(tdb->name, NULL);
1070 #endif
1071
1072         need_repack = tdb->transaction->need_repack;
1073
1074         /* use a transaction cancel to free memory and remove the
1075            transaction locks */
1076         _tdb_transaction_cancel(tdb);
1077
1078         if (need_repack) {
1079                 return tdb_repack(tdb);
1080         }
1081
1082         return 0;
1083 }
1084
1085
1086 /*
1087   recover from an aborted transaction. Must be called with exclusive
1088   database write access already established (including the global
1089   lock to prevent new processes attaching)
1090 */
1091 int tdb_transaction_recover(struct tdb_context *tdb)
1092 {
1093         tdb_off_t recovery_head, recovery_eof;
1094         unsigned char *data, *p;
1095         uint32_t zero = 0;
1096         struct list_struct rec;
1097
1098         /* find the recovery area */
1099         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1100                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1101                 tdb->ecode = TDB_ERR_IO;
1102                 return -1;
1103         }
1104
1105         if (recovery_head == 0) {
1106                 /* we have never allocated a recovery record */
1107                 return 0;
1108         }
1109
1110         /* read the recovery record */
1111         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1112                                    sizeof(rec), DOCONV()) == -1) {
1113                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1114                 tdb->ecode = TDB_ERR_IO;
1115                 return -1;
1116         }
1117
1118         if (rec.magic != TDB_RECOVERY_MAGIC) {
1119                 /* there is no valid recovery data */
1120                 return 0;
1121         }
1122
1123         if (tdb->read_only) {
1124                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1125                 tdb->ecode = TDB_ERR_CORRUPT;
1126                 return -1;
1127         }
1128
1129         recovery_eof = rec.key_len;
1130
1131         data = (unsigned char *)malloc(rec.data_len);
1132         if (data == NULL) {
1133                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1134                 tdb->ecode = TDB_ERR_OOM;
1135                 return -1;
1136         }
1137
1138         /* read the full recovery data */
1139         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1140                                    rec.data_len, 0) == -1) {
1141                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1142                 tdb->ecode = TDB_ERR_IO;
1143                 return -1;
1144         }
1145
1146         /* recover the file data */
1147         p = data;
1148         while (p+8 < data + rec.data_len) {
1149                 uint32_t ofs, len;
1150                 if (DOCONV()) {
1151                         tdb_convert(p, 8);
1152                 }
1153                 memcpy(&ofs, p, 4);
1154                 memcpy(&len, p+4, 4);
1155
1156                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1157                         free(data);
1158                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1159                         tdb->ecode = TDB_ERR_IO;
1160                         return -1;
1161                 }
1162                 p += 8 + len;
1163         }
1164
1165         free(data);
1166
1167         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1168                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1169                 tdb->ecode = TDB_ERR_IO;
1170                 return -1;
1171         }
1172
1173         /* if the recovery area is after the recovered eof then remove it */
1174         if (recovery_eof <= recovery_head) {
1175                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1176                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1177                         tdb->ecode = TDB_ERR_IO;
1178                         return -1;                      
1179                 }
1180         }
1181
1182         /* remove the recovery magic */
1183         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1184                           &zero) == -1) {
1185                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1186                 tdb->ecode = TDB_ERR_IO;
1187                 return -1;                      
1188         }
1189         
1190         /* reduce the file size to the old size */
1191         tdb_munmap(tdb);
1192         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1193                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1194                 tdb->ecode = TDB_ERR_IO;
1195                 return -1;                      
1196         }
1197         tdb->map_size = recovery_eof;
1198         tdb_mmap(tdb);
1199
1200         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1201                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1202                 tdb->ecode = TDB_ERR_IO;
1203                 return -1;
1204         }
1205
1206         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1207                  recovery_eof));
1208
1209         /* all done */
1210         return 0;
1211 }