tdb: allow reads after prepare commit
[ira/wip.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* set when a prepare has already occurred */
120         bool prepared;
121         tdb_off_t magic_offset;
122
123         /* old file size before transaction */
124         tdb_len_t old_map_size;
125
126         /* we should re-pack on commit */
127         bool need_repack;
128 };
129
130
131 /*
132   read while in a transaction. We need to check first if the data is in our list
133   of transaction elements, then if not do a real read
134 */
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
136                             tdb_len_t len, int cv)
137 {
138         uint32_t blk;
139
140         /* break it down into block sized ops */
141         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
142                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
143                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
144                         return -1;
145                 }
146                 len -= len2;
147                 off += len2;
148                 buf = (void *)(len2 + (char *)buf);
149         }
150
151         if (len == 0) {
152                 return 0;
153         }
154
155         blk = off / tdb->transaction->block_size;
156
157         /* see if we have it in the block list */
158         if (tdb->transaction->num_blocks <= blk ||
159             tdb->transaction->blocks[blk] == NULL) {
160                 /* nope, do a real read */
161                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
162                         goto fail;
163                 }
164                 return 0;
165         }
166
167         /* it is in the block list. Now check for the last block */
168         if (blk == tdb->transaction->num_blocks-1) {
169                 if (len > tdb->transaction->last_block_size) {
170                         goto fail;
171                 }
172         }
173         
174         /* now copy it out of this block */
175         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
176         if (cv) {
177                 tdb_convert(buf, len);
178         }
179         return 0;
180
181 fail:
182         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
183         tdb->ecode = TDB_ERR_IO;
184         tdb->transaction->transaction_error = 1;
185         return -1;
186 }
187
188
189 /*
190   write while in a transaction
191 */
192 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
193                              const void *buf, tdb_len_t len)
194 {
195         uint32_t blk;
196
197         /* Only a commit is allowed on a prepared transaction */
198         if (tdb->transaction->prepared) {
199                 tdb->ecode = TDB_ERR_EINVAL;
200                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
201                 tdb->transaction->transaction_error = 1;
202                 return -1;
203         }
204
205         /* if the write is to a hash head, then update the transaction
206            hash heads */
207         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
208             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
209                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
210                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
211         }
212
213         /* break it up into block sized chunks */
214         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
215                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
216                 if (transaction_write(tdb, off, buf, len2) != 0) {
217                         return -1;
218                 }
219                 len -= len2;
220                 off += len2;
221                 if (buf != NULL) {
222                         buf = (const void *)(len2 + (const char *)buf);
223                 }
224         }
225
226         if (len == 0) {
227                 return 0;
228         }
229
230         blk = off / tdb->transaction->block_size;
231         off = off % tdb->transaction->block_size;
232
233         if (tdb->transaction->num_blocks <= blk) {
234                 uint8_t **new_blocks;
235                 /* expand the blocks array */
236                 if (tdb->transaction->blocks == NULL) {
237                         new_blocks = (uint8_t **)malloc(
238                                 (blk+1)*sizeof(uint8_t *));
239                 } else {
240                         new_blocks = (uint8_t **)realloc(
241                                 tdb->transaction->blocks,
242                                 (blk+1)*sizeof(uint8_t *));
243                 }
244                 if (new_blocks == NULL) {
245                         tdb->ecode = TDB_ERR_OOM;
246                         goto fail;
247                 }
248                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
249                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
250                 tdb->transaction->blocks = new_blocks;
251                 tdb->transaction->num_blocks = blk+1;
252                 tdb->transaction->last_block_size = 0;
253         }
254
255         /* allocate and fill a block? */
256         if (tdb->transaction->blocks[blk] == NULL) {
257                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
258                 if (tdb->transaction->blocks[blk] == NULL) {
259                         tdb->ecode = TDB_ERR_OOM;
260                         tdb->transaction->transaction_error = 1;
261                         return -1;                      
262                 }
263                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
264                         tdb_len_t len2 = tdb->transaction->block_size;
265                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
266                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
267                         }
268                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
269                                                                    tdb->transaction->blocks[blk], 
270                                                                    len2, 0) != 0) {
271                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
272                                 tdb->ecode = TDB_ERR_IO;
273                                 goto fail;
274                         }
275                         if (blk == tdb->transaction->num_blocks-1) {
276                                 tdb->transaction->last_block_size = len2;
277                         }                       
278                 }
279         }
280         
281         /* overwrite part of an existing block */
282         if (buf == NULL) {
283                 memset(tdb->transaction->blocks[blk] + off, 0, len);
284         } else {
285                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
286         }
287         if (blk == tdb->transaction->num_blocks-1) {
288                 if (len + off > tdb->transaction->last_block_size) {
289                         tdb->transaction->last_block_size = len + off;
290                 }
291         }
292
293         return 0;
294
295 fail:
296         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
297                  (blk*tdb->transaction->block_size) + off, len));
298         tdb->transaction->transaction_error = 1;
299         return -1;
300 }
301
302
303 /*
304   write while in a transaction - this varient never expands the transaction blocks, it only
305   updates existing blocks. This means it cannot change the recovery size
306 */
307 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
308                                       const void *buf, tdb_len_t len)
309 {
310         uint32_t blk;
311
312         /* break it up into block sized chunks */
313         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
314                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
315                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
316                         return -1;
317                 }
318                 len -= len2;
319                 off += len2;
320                 if (buf != NULL) {
321                         buf = (const void *)(len2 + (const char *)buf);
322                 }
323         }
324
325         if (len == 0) {
326                 return 0;
327         }
328
329         blk = off / tdb->transaction->block_size;
330         off = off % tdb->transaction->block_size;
331
332         if (tdb->transaction->num_blocks <= blk ||
333             tdb->transaction->blocks[blk] == NULL) {
334                 return 0;
335         }
336
337         if (blk == tdb->transaction->num_blocks-1 &&
338             off + len > tdb->transaction->last_block_size) {
339                 if (off >= tdb->transaction->last_block_size) {
340                         return 0;
341                 }
342                 len = tdb->transaction->last_block_size - off;
343         }
344
345         /* overwrite part of an existing block */
346         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
347
348         return 0;
349 }
350
351
352 /*
353   accelerated hash chain head search, using the cached hash heads
354 */
355 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
356 {
357         uint32_t h = *chain;
358         for (;h < tdb->header.hash_size;h++) {
359                 /* the +1 takes account of the freelist */
360                 if (0 != tdb->transaction->hash_heads[h+1]) {
361                         break;
362                 }
363         }
364         (*chain) = h;
365 }
366
367 /*
368   out of bounds check during a transaction
369 */
370 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
371 {
372         if (len <= tdb->map_size) {
373                 return 0;
374         }
375         return TDB_ERRCODE(TDB_ERR_IO, -1);
376 }
377
378 /*
379   transaction version of tdb_expand().
380 */
381 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
382                                    tdb_off_t addition)
383 {
384         /* add a write to the transaction elements, so subsequent
385            reads see the zero data */
386         if (transaction_write(tdb, size, NULL, addition) != 0) {
387                 return -1;
388         }
389
390         tdb->transaction->need_repack = true;
391
392         return 0;
393 }
394
395 /*
396   brlock during a transaction - ignore them
397 */
398 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
399                               int rw_type, int lck_type, int probe, size_t len)
400 {
401         return 0;
402 }
403
404 static const struct tdb_methods transaction_methods = {
405         transaction_read,
406         transaction_write,
407         transaction_next_hash_chain,
408         transaction_oob,
409         transaction_expand_file,
410         transaction_brlock
411 };
412
413
414 /*
415   start a tdb transaction. No token is returned, as only a single
416   transaction is allowed to be pending per tdb_context
417 */
418 int tdb_transaction_start(struct tdb_context *tdb)
419 {
420         /* some sanity checks */
421         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
422                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
423                 tdb->ecode = TDB_ERR_EINVAL;
424                 return -1;
425         }
426
427         /* cope with nested tdb_transaction_start() calls */
428         if (tdb->transaction != NULL) {
429                 tdb->transaction->nesting++;
430                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
431                          tdb->transaction->nesting));
432                 return 0;
433         }
434
435         if (tdb->num_locks != 0 || tdb->global_lock.count) {
436                 /* the caller must not have any locks when starting a
437                    transaction as otherwise we'll be screwed by lack
438                    of nested locks in posix */
439                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
440                 tdb->ecode = TDB_ERR_LOCK;
441                 return -1;
442         }
443
444         if (tdb->travlocks.next != NULL) {
445                 /* you cannot use transactions inside a traverse (although you can use
446                    traverse inside a transaction) as otherwise you can end up with
447                    deadlock */
448                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
449                 tdb->ecode = TDB_ERR_LOCK;
450                 return -1;
451         }
452
453         tdb->transaction = (struct tdb_transaction *)
454                 calloc(sizeof(struct tdb_transaction), 1);
455         if (tdb->transaction == NULL) {
456                 tdb->ecode = TDB_ERR_OOM;
457                 return -1;
458         }
459
460         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
461         tdb->transaction->block_size = tdb->page_size;
462
463         /* get the transaction write lock. This is a blocking lock. As
464            discussed with Volker, there are a number of ways we could
465            make this async, which we will probably do in the future */
466         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
467                 SAFE_FREE(tdb->transaction->blocks);
468                 SAFE_FREE(tdb->transaction);
469                 return -1;
470         }
471         
472         /* get a read lock from the freelist to the end of file. This
473            is upgraded to a write lock during the commit */
474         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
475                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
476                 tdb->ecode = TDB_ERR_LOCK;
477                 goto fail;
478         }
479
480         /* setup a copy of the hash table heads so the hash scan in
481            traverse can be fast */
482         tdb->transaction->hash_heads = (uint32_t *)
483                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
484         if (tdb->transaction->hash_heads == NULL) {
485                 tdb->ecode = TDB_ERR_OOM;
486                 goto fail;
487         }
488         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
489                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
490                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
491                 tdb->ecode = TDB_ERR_IO;
492                 goto fail;
493         }
494
495         /* make sure we know about any file expansions already done by
496            anyone else */
497         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
498         tdb->transaction->old_map_size = tdb->map_size;
499
500         /* finally hook the io methods, replacing them with
501            transaction specific methods */
502         tdb->transaction->io_methods = tdb->methods;
503         tdb->methods = &transaction_methods;
504
505         return 0;
506         
507 fail:
508         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
509         tdb_transaction_unlock(tdb);
510         SAFE_FREE(tdb->transaction->blocks);
511         SAFE_FREE(tdb->transaction->hash_heads);
512         SAFE_FREE(tdb->transaction);
513         return -1;
514 }
515
516
517 /*
518   sync to disk
519 */
520 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
521 {       
522         if (tdb->flags & TDB_NOSYNC) {
523                 return 0;
524         }
525
526         if (fsync(tdb->fd) != 0) {
527                 tdb->ecode = TDB_ERR_IO;
528                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
529                 return -1;
530         }
531 #ifdef HAVE_MMAP
532         if (tdb->map_ptr) {
533                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
534                 if (msync(moffset + (char *)tdb->map_ptr, 
535                           length + (offset - moffset), MS_SYNC) != 0) {
536                         tdb->ecode = TDB_ERR_IO;
537                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
538                                  strerror(errno)));
539                         return -1;
540                 }
541         }
542 #endif
543         return 0;
544 }
545
546
547 /*
548   cancel the current transaction
549 */
550 int tdb_transaction_cancel(struct tdb_context *tdb)
551 {       
552         int i, ret = 0;
553
554         if (tdb->transaction == NULL) {
555                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
556                 return -1;
557         }
558
559         if (tdb->transaction->nesting != 0) {
560                 tdb->transaction->transaction_error = 1;
561                 tdb->transaction->nesting--;
562                 return 0;
563         }               
564
565         tdb->map_size = tdb->transaction->old_map_size;
566
567         /* free all the transaction blocks */
568         for (i=0;i<tdb->transaction->num_blocks;i++) {
569                 if (tdb->transaction->blocks[i] != NULL) {
570                         free(tdb->transaction->blocks[i]);
571                 }
572         }
573         SAFE_FREE(tdb->transaction->blocks);
574
575         if (tdb->transaction->magic_offset) {
576                 const struct tdb_methods *methods = tdb->transaction->io_methods;
577                 uint32_t zero = 0;
578
579                 /* remove the recovery marker */
580                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
581                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
582                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
583                         ret = -1;
584                 }
585         }
586
587         /* remove any global lock created during the transaction */
588         if (tdb->global_lock.count != 0) {
589                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
590                 tdb->global_lock.count = 0;
591         }
592
593         /* remove any locks created during the transaction */
594         if (tdb->num_locks != 0) {
595                 for (i=0;i<tdb->num_lockrecs;i++) {
596                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
597                                    F_UNLCK,F_SETLKW, 0, 1);
598                 }
599                 tdb->num_locks = 0;
600                 tdb->num_lockrecs = 0;
601                 SAFE_FREE(tdb->lockrecs);
602         }
603
604         /* restore the normal io methods */
605         tdb->methods = tdb->transaction->io_methods;
606
607         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
608         tdb_transaction_unlock(tdb);
609         SAFE_FREE(tdb->transaction->hash_heads);
610         SAFE_FREE(tdb->transaction);
611         
612         return ret;
613 }
614
615
616 /*
617   work out how much space the linearised recovery data will consume
618 */
619 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
620 {
621         tdb_len_t recovery_size = 0;
622         int i;
623
624         recovery_size = sizeof(uint32_t);
625         for (i=0;i<tdb->transaction->num_blocks;i++) {
626                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
627                         break;
628                 }
629                 if (tdb->transaction->blocks[i] == NULL) {
630                         continue;
631                 }
632                 recovery_size += 2*sizeof(tdb_off_t);
633                 if (i == tdb->transaction->num_blocks-1) {
634                         recovery_size += tdb->transaction->last_block_size;
635                 } else {
636                         recovery_size += tdb->transaction->block_size;
637                 }
638         }       
639
640         return recovery_size;
641 }
642
643 /*
644   allocate the recovery area, or use an existing recovery area if it is
645   large enough
646 */
647 static int tdb_recovery_allocate(struct tdb_context *tdb, 
648                                  tdb_len_t *recovery_size,
649                                  tdb_off_t *recovery_offset,
650                                  tdb_len_t *recovery_max_size)
651 {
652         struct list_struct rec;
653         const struct tdb_methods *methods = tdb->transaction->io_methods;
654         tdb_off_t recovery_head;
655
656         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
657                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
658                 return -1;
659         }
660
661         rec.rec_len = 0;
662
663         if (recovery_head != 0 && 
664             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
665                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
666                 return -1;
667         }
668
669         *recovery_size = tdb_recovery_size(tdb);
670
671         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
672                 /* it fits in the existing area */
673                 *recovery_max_size = rec.rec_len;
674                 *recovery_offset = recovery_head;
675                 return 0;
676         }
677
678         /* we need to free up the old recovery area, then allocate a
679            new one at the end of the file. Note that we cannot use
680            tdb_allocate() to allocate the new one as that might return
681            us an area that is being currently used (as of the start of
682            the transaction) */
683         if (recovery_head != 0) {
684                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
685                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
686                         return -1;
687                 }
688         }
689
690         /* the tdb_free() call might have increased the recovery size */
691         *recovery_size = tdb_recovery_size(tdb);
692
693         /* round up to a multiple of page size */
694         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
695         *recovery_offset = tdb->map_size;
696         recovery_head = *recovery_offset;
697
698         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
699                                      (tdb->map_size - tdb->transaction->old_map_size) +
700                                      sizeof(rec) + *recovery_max_size) == -1) {
701                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
702                 return -1;
703         }
704
705         /* remap the file (if using mmap) */
706         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
707
708         /* we have to reset the old map size so that we don't try to expand the file
709            again in the transaction commit, which would destroy the recovery area */
710         tdb->transaction->old_map_size = tdb->map_size;
711
712         /* write the recovery header offset and sync - we can sync without a race here
713            as the magic ptr in the recovery record has not been set */
714         CONVERT(recovery_head);
715         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
716                                &recovery_head, sizeof(tdb_off_t)) == -1) {
717                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
718                 return -1;
719         }
720         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
721                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
722                 return -1;
723         }
724
725         return 0;
726 }
727
728
729 /*
730   setup the recovery data that will be used on a crash during commit
731 */
732 static int transaction_setup_recovery(struct tdb_context *tdb, 
733                                       tdb_off_t *magic_offset)
734 {
735         tdb_len_t recovery_size;
736         unsigned char *data, *p;
737         const struct tdb_methods *methods = tdb->transaction->io_methods;
738         struct list_struct *rec;
739         tdb_off_t recovery_offset, recovery_max_size;
740         tdb_off_t old_map_size = tdb->transaction->old_map_size;
741         uint32_t magic, tailer;
742         int i;
743
744         /*
745           check that the recovery area has enough space
746         */
747         if (tdb_recovery_allocate(tdb, &recovery_size, 
748                                   &recovery_offset, &recovery_max_size) == -1) {
749                 return -1;
750         }
751
752         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
753         if (data == NULL) {
754                 tdb->ecode = TDB_ERR_OOM;
755                 return -1;
756         }
757
758         rec = (struct list_struct *)data;
759         memset(rec, 0, sizeof(*rec));
760
761         rec->magic    = 0;
762         rec->data_len = recovery_size;
763         rec->rec_len  = recovery_max_size;
764         rec->key_len  = old_map_size;
765         CONVERT(rec);
766
767         /* build the recovery data into a single blob to allow us to do a single
768            large write, which should be more efficient */
769         p = data + sizeof(*rec);
770         for (i=0;i<tdb->transaction->num_blocks;i++) {
771                 tdb_off_t offset;
772                 tdb_len_t length;
773
774                 if (tdb->transaction->blocks[i] == NULL) {
775                         continue;
776                 }
777
778                 offset = i * tdb->transaction->block_size;
779                 length = tdb->transaction->block_size;
780                 if (i == tdb->transaction->num_blocks-1) {
781                         length = tdb->transaction->last_block_size;
782                 }
783                 
784                 if (offset >= old_map_size) {
785                         continue;
786                 }
787                 if (offset + length > tdb->transaction->old_map_size) {
788                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
789                         free(data);
790                         tdb->ecode = TDB_ERR_CORRUPT;
791                         return -1;
792                 }
793                 memcpy(p, &offset, 4);
794                 memcpy(p+4, &length, 4);
795                 if (DOCONV()) {
796                         tdb_convert(p, 8);
797                 }
798                 /* the recovery area contains the old data, not the
799                    new data, so we have to call the original tdb_read
800                    method to get it */
801                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
802                         free(data);
803                         tdb->ecode = TDB_ERR_IO;
804                         return -1;
805                 }
806                 p += 8 + length;
807         }
808
809         /* and the tailer */
810         tailer = sizeof(*rec) + recovery_max_size;
811         memcpy(p, &tailer, 4);
812         CONVERT(p);
813
814         /* write the recovery data to the recovery area */
815         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
816                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
817                 free(data);
818                 tdb->ecode = TDB_ERR_IO;
819                 return -1;
820         }
821         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
822                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
823                 free(data);
824                 tdb->ecode = TDB_ERR_IO;
825                 return -1;
826         }
827
828         /* as we don't have ordered writes, we have to sync the recovery
829            data before we update the magic to indicate that the recovery
830            data is present */
831         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
832                 free(data);
833                 return -1;
834         }
835
836         free(data);
837
838         magic = TDB_RECOVERY_MAGIC;
839         CONVERT(magic);
840
841         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
842
843         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
844                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
845                 tdb->ecode = TDB_ERR_IO;
846                 return -1;
847         }
848         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
849                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
850                 tdb->ecode = TDB_ERR_IO;
851                 return -1;
852         }
853
854         /* ensure the recovery magic marker is on disk */
855         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
856                 return -1;
857         }
858
859         return 0;
860 }
861
862 /*
863   prepare to commit the current transaction
864 */
865 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
866 {       
867         const struct tdb_methods *methods;
868
869         if (tdb->transaction == NULL) {
870                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
871                 return -1;
872         }
873
874         if (tdb->transaction->prepared) {
875                 tdb->ecode = TDB_ERR_EINVAL;
876                 tdb_transaction_cancel(tdb);
877                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
878                 return -1;
879         }
880
881         if (tdb->transaction->transaction_error) {
882                 tdb->ecode = TDB_ERR_IO;
883                 tdb_transaction_cancel(tdb);
884                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
885                 return -1;
886         }
887
888
889         if (tdb->transaction->nesting != 0) {
890                 return 0;
891         }               
892
893         /* check for a null transaction */
894         if (tdb->transaction->blocks == NULL) {
895                 return 0;
896         }
897
898         methods = tdb->transaction->io_methods;
899         
900         /* if there are any locks pending then the caller has not
901            nested their locks properly, so fail the transaction */
902         if (tdb->num_locks || tdb->global_lock.count) {
903                 tdb->ecode = TDB_ERR_LOCK;
904                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
905                 tdb_transaction_cancel(tdb);
906                 return -1;
907         }
908
909         /* upgrade the main transaction lock region to a write lock */
910         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
911                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
912                 tdb->ecode = TDB_ERR_LOCK;
913                 tdb_transaction_cancel(tdb);
914                 return -1;
915         }
916
917         /* get the global lock - this prevents new users attaching to the database
918            during the commit */
919         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
920                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
921                 tdb->ecode = TDB_ERR_LOCK;
922                 tdb_transaction_cancel(tdb);
923                 return -1;
924         }
925
926         if (!(tdb->flags & TDB_NOSYNC)) {
927                 /* write the recovery data to the end of the file */
928                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
929                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
930                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
931                         tdb_transaction_cancel(tdb);
932                         return -1;
933                 }
934         }
935
936         tdb->transaction->prepared = true;
937
938         /* expand the file to the new size if needed */
939         if (tdb->map_size != tdb->transaction->old_map_size) {
940                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
941                                              tdb->map_size - 
942                                              tdb->transaction->old_map_size) == -1) {
943                         tdb->ecode = TDB_ERR_IO;
944                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
945                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
946                         tdb_transaction_cancel(tdb);
947                         return -1;
948                 }
949                 tdb->map_size = tdb->transaction->old_map_size;
950                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
951         }
952
953         /* Keep the global lock until the actual commit */
954
955         return 0;
956 }
957
958 /*
959   commit the current transaction
960 */
961 int tdb_transaction_commit(struct tdb_context *tdb)
962 {       
963         const struct tdb_methods *methods;
964         int i;
965         bool need_repack;
966
967         if (tdb->transaction == NULL) {
968                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
969                 return -1;
970         }
971
972         if (tdb->transaction->transaction_error) {
973                 tdb->ecode = TDB_ERR_IO;
974                 tdb_transaction_cancel(tdb);
975                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
976                 return -1;
977         }
978
979
980         if (tdb->transaction->nesting != 0) {
981                 tdb->transaction->nesting--;
982                 return 0;
983         }
984
985         /* check for a null transaction */
986         if (tdb->transaction->blocks == NULL) {
987                 tdb_transaction_cancel(tdb);
988                 return 0;
989         }
990
991         if (!tdb->transaction->prepared) {
992                 int ret = tdb_transaction_prepare_commit(tdb);
993                 if (ret)
994                         return ret;
995         }
996
997         methods = tdb->transaction->io_methods;
998
999         /* perform all the writes */
1000         for (i=0;i<tdb->transaction->num_blocks;i++) {
1001                 tdb_off_t offset;
1002                 tdb_len_t length;
1003
1004                 if (tdb->transaction->blocks[i] == NULL) {
1005                         continue;
1006                 }
1007
1008                 offset = i * tdb->transaction->block_size;
1009                 length = tdb->transaction->block_size;
1010                 if (i == tdb->transaction->num_blocks-1) {
1011                         length = tdb->transaction->last_block_size;
1012                 }
1013
1014                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1015                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1016                         
1017                         /* we've overwritten part of the data and
1018                            possibly expanded the file, so we need to
1019                            run the crash recovery code */
1020                         tdb->methods = methods;
1021                         tdb_transaction_recover(tdb); 
1022
1023                         tdb_transaction_cancel(tdb);
1024                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1025
1026                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1027                         return -1;
1028                 }
1029                 SAFE_FREE(tdb->transaction->blocks[i]);
1030         } 
1031
1032         SAFE_FREE(tdb->transaction->blocks);
1033         tdb->transaction->num_blocks = 0;
1034
1035         /* ensure the new data is on disk */
1036         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1037                 return -1;
1038         }
1039
1040         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1041
1042         /*
1043           TODO: maybe write to some dummy hdr field, or write to magic
1044           offset without mmap, before the last sync, instead of the
1045           utime() call
1046         */
1047
1048         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1049            don't change the mtime of the file, this means the file may
1050            not be backed up (as tdb rounding to block sizes means that
1051            file size changes are quite rare too). The following forces
1052            mtime changes when a transaction completes */
1053 #ifdef HAVE_UTIME
1054         utime(tdb->name, NULL);
1055 #endif
1056
1057         need_repack = tdb->transaction->need_repack;
1058
1059         /* use a transaction cancel to free memory and remove the
1060            transaction locks */
1061         tdb_transaction_cancel(tdb);
1062
1063         if (need_repack) {
1064                 return tdb_repack(tdb);
1065         }
1066
1067         return 0;
1068 }
1069
1070
1071 /*
1072   recover from an aborted transaction. Must be called with exclusive
1073   database write access already established (including the global
1074   lock to prevent new processes attaching)
1075 */
1076 int tdb_transaction_recover(struct tdb_context *tdb)
1077 {
1078         tdb_off_t recovery_head, recovery_eof;
1079         unsigned char *data, *p;
1080         uint32_t zero = 0;
1081         struct list_struct rec;
1082
1083         /* find the recovery area */
1084         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1085                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1086                 tdb->ecode = TDB_ERR_IO;
1087                 return -1;
1088         }
1089
1090         if (recovery_head == 0) {
1091                 /* we have never allocated a recovery record */
1092                 return 0;
1093         }
1094
1095         /* read the recovery record */
1096         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1097                                    sizeof(rec), DOCONV()) == -1) {
1098                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1099                 tdb->ecode = TDB_ERR_IO;
1100                 return -1;
1101         }
1102
1103         if (rec.magic != TDB_RECOVERY_MAGIC) {
1104                 /* there is no valid recovery data */
1105                 return 0;
1106         }
1107
1108         if (tdb->read_only) {
1109                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1110                 tdb->ecode = TDB_ERR_CORRUPT;
1111                 return -1;
1112         }
1113
1114         recovery_eof = rec.key_len;
1115
1116         data = (unsigned char *)malloc(rec.data_len);
1117         if (data == NULL) {
1118                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1119                 tdb->ecode = TDB_ERR_OOM;
1120                 return -1;
1121         }
1122
1123         /* read the full recovery data */
1124         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1125                                    rec.data_len, 0) == -1) {
1126                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1127                 tdb->ecode = TDB_ERR_IO;
1128                 return -1;
1129         }
1130
1131         /* recover the file data */
1132         p = data;
1133         while (p+8 < data + rec.data_len) {
1134                 uint32_t ofs, len;
1135                 if (DOCONV()) {
1136                         tdb_convert(p, 8);
1137                 }
1138                 memcpy(&ofs, p, 4);
1139                 memcpy(&len, p+4, 4);
1140
1141                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1142                         free(data);
1143                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1144                         tdb->ecode = TDB_ERR_IO;
1145                         return -1;
1146                 }
1147                 p += 8 + len;
1148         }
1149
1150         free(data);
1151
1152         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1153                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1154                 tdb->ecode = TDB_ERR_IO;
1155                 return -1;
1156         }
1157
1158         /* if the recovery area is after the recovered eof then remove it */
1159         if (recovery_eof <= recovery_head) {
1160                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1161                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1162                         tdb->ecode = TDB_ERR_IO;
1163                         return -1;                      
1164                 }
1165         }
1166
1167         /* remove the recovery magic */
1168         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1169                           &zero) == -1) {
1170                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1171                 tdb->ecode = TDB_ERR_IO;
1172                 return -1;                      
1173         }
1174         
1175         /* reduce the file size to the old size */
1176         tdb_munmap(tdb);
1177         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1178                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1179                 tdb->ecode = TDB_ERR_IO;
1180                 return -1;                      
1181         }
1182         tdb->map_size = recovery_eof;
1183         tdb_mmap(tdb);
1184
1185         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1186                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1187                 tdb->ecode = TDB_ERR_IO;
1188                 return -1;
1189         }
1190
1191         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1192                  recovery_eof));
1193
1194         /* all done */
1195         return 0;
1196 }