auto-repack in transactions that expand the tdb
[ira/wip.git] / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90
91 /*
92   hold the context of any current transaction
93 */
94 struct tdb_transaction {
95         /* we keep a mirrored copy of the tdb hash heads here so
96            tdb_next_hash_chain() can operate efficiently */
97         uint32_t *hash_heads;
98
99         /* the original io methods - used to do IOs to the real db */
100         const struct tdb_methods *io_methods;
101
102         /* the list of transaction blocks. When a block is first
103            written to, it gets created in this list */
104         uint8_t **blocks;
105         uint32_t num_blocks;
106         uint32_t block_size;      /* bytes in each block */
107         uint32_t last_block_size; /* number of valid bytes in the last block */
108
109         /* non-zero when an internal transaction error has
110            occurred. All write operations will then fail until the
111            transaction is ended */
112         int transaction_error;
113
114         /* when inside a transaction we need to keep track of any
115            nested tdb_transaction_start() calls, as these are allowed,
116            but don't create a new transaction */
117         int nesting;
118
119         /* set when a prepare has already occurred */
120         bool prepared;
121         tdb_off_t magic_offset;
122
123         /* old file size before transaction */
124         tdb_len_t old_map_size;
125
126         /* we should re-pack on commit */
127         bool need_repack;
128 };
129
130
131 /*
132   read while in a transaction. We need to check first if the data is in our list
133   of transaction elements, then if not do a real read
134 */
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
136                             tdb_len_t len, int cv)
137 {
138         uint32_t blk;
139
140         /* Only a commit is allowed on a prepared transaction */
141         if (tdb->transaction->prepared) {
142                 tdb->ecode = TDB_ERR_EINVAL;
143                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: transaction already prepared, read not allowed\n"));
144                 tdb->transaction->transaction_error = 1;
145                 return -1;
146         }
147
148         /* break it down into block sized ops */
149         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
150                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
151                 if (transaction_read(tdb, off, buf, len2, cv) != 0) {
152                         return -1;
153                 }
154                 len -= len2;
155                 off += len2;
156                 buf = (void *)(len2 + (char *)buf);
157         }
158
159         if (len == 0) {
160                 return 0;
161         }
162
163         blk = off / tdb->transaction->block_size;
164
165         /* see if we have it in the block list */
166         if (tdb->transaction->num_blocks <= blk ||
167             tdb->transaction->blocks[blk] == NULL) {
168                 /* nope, do a real read */
169                 if (tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv) != 0) {
170                         goto fail;
171                 }
172                 return 0;
173         }
174
175         /* it is in the block list. Now check for the last block */
176         if (blk == tdb->transaction->num_blocks-1) {
177                 if (len > tdb->transaction->last_block_size) {
178                         goto fail;
179                 }
180         }
181         
182         /* now copy it out of this block */
183         memcpy(buf, tdb->transaction->blocks[blk] + (off % tdb->transaction->block_size), len);
184         if (cv) {
185                 tdb_convert(buf, len);
186         }
187         return 0;
188
189 fail:
190         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
191         tdb->ecode = TDB_ERR_IO;
192         tdb->transaction->transaction_error = 1;
193         return -1;
194 }
195
196
197 /*
198   write while in a transaction
199 */
200 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
201                              const void *buf, tdb_len_t len)
202 {
203         uint32_t blk;
204
205         /* Only a commit is allowed on a prepared transaction */
206         if (tdb->transaction->prepared) {
207                 tdb->ecode = TDB_ERR_EINVAL;
208                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: transaction already prepared, write not allowed\n"));
209                 tdb->transaction->transaction_error = 1;
210                 return -1;
211         }
212
213         /* if the write is to a hash head, then update the transaction
214            hash heads */
215         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
216             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
217                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
218                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
219         }
220
221         /* break it up into block sized chunks */
222         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
223                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
224                 if (transaction_write(tdb, off, buf, len2) != 0) {
225                         return -1;
226                 }
227                 len -= len2;
228                 off += len2;
229                 if (buf != NULL) {
230                         buf = (const void *)(len2 + (const char *)buf);
231                 }
232         }
233
234         if (len == 0) {
235                 return 0;
236         }
237
238         blk = off / tdb->transaction->block_size;
239         off = off % tdb->transaction->block_size;
240
241         if (tdb->transaction->num_blocks <= blk) {
242                 uint8_t **new_blocks;
243                 /* expand the blocks array */
244                 if (tdb->transaction->blocks == NULL) {
245                         new_blocks = (uint8_t **)malloc(
246                                 (blk+1)*sizeof(uint8_t *));
247                 } else {
248                         new_blocks = (uint8_t **)realloc(
249                                 tdb->transaction->blocks,
250                                 (blk+1)*sizeof(uint8_t *));
251                 }
252                 if (new_blocks == NULL) {
253                         tdb->ecode = TDB_ERR_OOM;
254                         goto fail;
255                 }
256                 memset(&new_blocks[tdb->transaction->num_blocks], 0, 
257                        (1+(blk - tdb->transaction->num_blocks))*sizeof(uint8_t *));
258                 tdb->transaction->blocks = new_blocks;
259                 tdb->transaction->num_blocks = blk+1;
260                 tdb->transaction->last_block_size = 0;
261         }
262
263         /* allocate and fill a block? */
264         if (tdb->transaction->blocks[blk] == NULL) {
265                 tdb->transaction->blocks[blk] = (uint8_t *)calloc(tdb->transaction->block_size, 1);
266                 if (tdb->transaction->blocks[blk] == NULL) {
267                         tdb->ecode = TDB_ERR_OOM;
268                         tdb->transaction->transaction_error = 1;
269                         return -1;                      
270                 }
271                 if (tdb->transaction->old_map_size > blk * tdb->transaction->block_size) {
272                         tdb_len_t len2 = tdb->transaction->block_size;
273                         if (len2 + (blk * tdb->transaction->block_size) > tdb->transaction->old_map_size) {
274                                 len2 = tdb->transaction->old_map_size - (blk * tdb->transaction->block_size);
275                         }
276                         if (tdb->transaction->io_methods->tdb_read(tdb, blk * tdb->transaction->block_size, 
277                                                                    tdb->transaction->blocks[blk], 
278                                                                    len2, 0) != 0) {
279                                 SAFE_FREE(tdb->transaction->blocks[blk]);                               
280                                 tdb->ecode = TDB_ERR_IO;
281                                 goto fail;
282                         }
283                         if (blk == tdb->transaction->num_blocks-1) {
284                                 tdb->transaction->last_block_size = len2;
285                         }                       
286                 }
287         }
288         
289         /* overwrite part of an existing block */
290         if (buf == NULL) {
291                 memset(tdb->transaction->blocks[blk] + off, 0, len);
292         } else {
293                 memcpy(tdb->transaction->blocks[blk] + off, buf, len);
294         }
295         if (blk == tdb->transaction->num_blocks-1) {
296                 if (len + off > tdb->transaction->last_block_size) {
297                         tdb->transaction->last_block_size = len + off;
298                 }
299         }
300
301         return 0;
302
303 fail:
304         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", 
305                  (blk*tdb->transaction->block_size) + off, len));
306         tdb->transaction->transaction_error = 1;
307         return -1;
308 }
309
310
311 /*
312   write while in a transaction - this varient never expands the transaction blocks, it only
313   updates existing blocks. This means it cannot change the recovery size
314 */
315 static int transaction_write_existing(struct tdb_context *tdb, tdb_off_t off, 
316                                       const void *buf, tdb_len_t len)
317 {
318         uint32_t blk;
319
320         /* break it up into block sized chunks */
321         while (len + (off % tdb->transaction->block_size) > tdb->transaction->block_size) {
322                 tdb_len_t len2 = tdb->transaction->block_size - (off % tdb->transaction->block_size);
323                 if (transaction_write_existing(tdb, off, buf, len2) != 0) {
324                         return -1;
325                 }
326                 len -= len2;
327                 off += len2;
328                 if (buf != NULL) {
329                         buf = (const void *)(len2 + (const char *)buf);
330                 }
331         }
332
333         if (len == 0) {
334                 return 0;
335         }
336
337         blk = off / tdb->transaction->block_size;
338         off = off % tdb->transaction->block_size;
339
340         if (tdb->transaction->num_blocks <= blk ||
341             tdb->transaction->blocks[blk] == NULL) {
342                 return 0;
343         }
344
345         if (blk == tdb->transaction->num_blocks-1 &&
346             off + len > tdb->transaction->last_block_size) {
347                 if (off >= tdb->transaction->last_block_size) {
348                         return 0;
349                 }
350                 len = tdb->transaction->last_block_size - off;
351         }
352
353         /* overwrite part of an existing block */
354         memcpy(tdb->transaction->blocks[blk] + off, buf, len);
355
356         return 0;
357 }
358
359
360 /*
361   accelerated hash chain head search, using the cached hash heads
362 */
363 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
364 {
365         uint32_t h = *chain;
366         for (;h < tdb->header.hash_size;h++) {
367                 /* the +1 takes account of the freelist */
368                 if (0 != tdb->transaction->hash_heads[h+1]) {
369                         break;
370                 }
371         }
372         (*chain) = h;
373 }
374
375 /*
376   out of bounds check during a transaction
377 */
378 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
379 {
380         if (len <= tdb->map_size) {
381                 return 0;
382         }
383         return TDB_ERRCODE(TDB_ERR_IO, -1);
384 }
385
386 /*
387   transaction version of tdb_expand().
388 */
389 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
390                                    tdb_off_t addition)
391 {
392         /* add a write to the transaction elements, so subsequent
393            reads see the zero data */
394         if (transaction_write(tdb, size, NULL, addition) != 0) {
395                 return -1;
396         }
397
398         tdb->transaction->need_repack = true;
399
400         return 0;
401 }
402
403 /*
404   brlock during a transaction - ignore them
405 */
406 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
407                               int rw_type, int lck_type, int probe, size_t len)
408 {
409         return 0;
410 }
411
412 static const struct tdb_methods transaction_methods = {
413         transaction_read,
414         transaction_write,
415         transaction_next_hash_chain,
416         transaction_oob,
417         transaction_expand_file,
418         transaction_brlock
419 };
420
421
422 /*
423   start a tdb transaction. No token is returned, as only a single
424   transaction is allowed to be pending per tdb_context
425 */
426 int tdb_transaction_start(struct tdb_context *tdb)
427 {
428         /* some sanity checks */
429         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
430                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
431                 tdb->ecode = TDB_ERR_EINVAL;
432                 return -1;
433         }
434
435         /* cope with nested tdb_transaction_start() calls */
436         if (tdb->transaction != NULL) {
437                 tdb->transaction->nesting++;
438                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
439                          tdb->transaction->nesting));
440                 return 0;
441         }
442
443         if (tdb->num_locks != 0 || tdb->global_lock.count) {
444                 /* the caller must not have any locks when starting a
445                    transaction as otherwise we'll be screwed by lack
446                    of nested locks in posix */
447                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
448                 tdb->ecode = TDB_ERR_LOCK;
449                 return -1;
450         }
451
452         if (tdb->travlocks.next != NULL) {
453                 /* you cannot use transactions inside a traverse (although you can use
454                    traverse inside a transaction) as otherwise you can end up with
455                    deadlock */
456                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
457                 tdb->ecode = TDB_ERR_LOCK;
458                 return -1;
459         }
460
461         tdb->transaction = (struct tdb_transaction *)
462                 calloc(sizeof(struct tdb_transaction), 1);
463         if (tdb->transaction == NULL) {
464                 tdb->ecode = TDB_ERR_OOM;
465                 return -1;
466         }
467
468         /* a page at a time seems like a reasonable compromise between compactness and efficiency */
469         tdb->transaction->block_size = tdb->page_size;
470
471         /* get the transaction write lock. This is a blocking lock. As
472            discussed with Volker, there are a number of ways we could
473            make this async, which we will probably do in the future */
474         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
475                 SAFE_FREE(tdb->transaction->blocks);
476                 SAFE_FREE(tdb->transaction);
477                 return -1;
478         }
479         
480         /* get a read lock from the freelist to the end of file. This
481            is upgraded to a write lock during the commit */
482         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
483                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
484                 tdb->ecode = TDB_ERR_LOCK;
485                 goto fail;
486         }
487
488         /* setup a copy of the hash table heads so the hash scan in
489            traverse can be fast */
490         tdb->transaction->hash_heads = (uint32_t *)
491                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
492         if (tdb->transaction->hash_heads == NULL) {
493                 tdb->ecode = TDB_ERR_OOM;
494                 goto fail;
495         }
496         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
497                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
498                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
499                 tdb->ecode = TDB_ERR_IO;
500                 goto fail;
501         }
502
503         /* make sure we know about any file expansions already done by
504            anyone else */
505         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
506         tdb->transaction->old_map_size = tdb->map_size;
507
508         /* finally hook the io methods, replacing them with
509            transaction specific methods */
510         tdb->transaction->io_methods = tdb->methods;
511         tdb->methods = &transaction_methods;
512
513         return 0;
514         
515 fail:
516         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
517         tdb_transaction_unlock(tdb);
518         SAFE_FREE(tdb->transaction->blocks);
519         SAFE_FREE(tdb->transaction->hash_heads);
520         SAFE_FREE(tdb->transaction);
521         return -1;
522 }
523
524
525 /*
526   sync to disk
527 */
528 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
529 {       
530         if (tdb->flags & TDB_NOSYNC) {
531                 return 0;
532         }
533
534         if (fsync(tdb->fd) != 0) {
535                 tdb->ecode = TDB_ERR_IO;
536                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
537                 return -1;
538         }
539 #ifdef HAVE_MMAP
540         if (tdb->map_ptr) {
541                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
542                 if (msync(moffset + (char *)tdb->map_ptr, 
543                           length + (offset - moffset), MS_SYNC) != 0) {
544                         tdb->ecode = TDB_ERR_IO;
545                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
546                                  strerror(errno)));
547                         return -1;
548                 }
549         }
550 #endif
551         return 0;
552 }
553
554
555 /*
556   cancel the current transaction
557 */
558 int tdb_transaction_cancel(struct tdb_context *tdb)
559 {       
560         int i, ret = 0;
561
562         if (tdb->transaction == NULL) {
563                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
564                 return -1;
565         }
566
567         if (tdb->transaction->nesting != 0) {
568                 tdb->transaction->transaction_error = 1;
569                 tdb->transaction->nesting--;
570                 return 0;
571         }               
572
573         tdb->map_size = tdb->transaction->old_map_size;
574
575         /* free all the transaction blocks */
576         for (i=0;i<tdb->transaction->num_blocks;i++) {
577                 if (tdb->transaction->blocks[i] != NULL) {
578                         free(tdb->transaction->blocks[i]);
579                 }
580         }
581         SAFE_FREE(tdb->transaction->blocks);
582
583         if (tdb->transaction->magic_offset) {
584                 const struct tdb_methods *methods = tdb->transaction->io_methods;
585                 uint32_t zero = 0;
586
587                 /* remove the recovery marker */
588                 if (methods->tdb_write(tdb, tdb->transaction->magic_offset, &zero, 4) == -1 ||
589                 transaction_sync(tdb, tdb->transaction->magic_offset, 4) == -1) {
590                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_cancel: failed to remove recovery magic\n"));
591                         ret = -1;
592                 }
593         }
594
595         /* remove any global lock created during the transaction */
596         if (tdb->global_lock.count != 0) {
597                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
598                 tdb->global_lock.count = 0;
599         }
600
601         /* remove any locks created during the transaction */
602         if (tdb->num_locks != 0) {
603                 for (i=0;i<tdb->num_lockrecs;i++) {
604                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
605                                    F_UNLCK,F_SETLKW, 0, 1);
606                 }
607                 tdb->num_locks = 0;
608                 tdb->num_lockrecs = 0;
609                 SAFE_FREE(tdb->lockrecs);
610         }
611
612         /* restore the normal io methods */
613         tdb->methods = tdb->transaction->io_methods;
614
615         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
616         tdb_transaction_unlock(tdb);
617         SAFE_FREE(tdb->transaction->hash_heads);
618         SAFE_FREE(tdb->transaction);
619         
620         return ret;
621 }
622
623
624 /*
625   work out how much space the linearised recovery data will consume
626 */
627 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
628 {
629         tdb_len_t recovery_size = 0;
630         int i;
631
632         recovery_size = sizeof(uint32_t);
633         for (i=0;i<tdb->transaction->num_blocks;i++) {
634                 if (i * tdb->transaction->block_size >= tdb->transaction->old_map_size) {
635                         break;
636                 }
637                 if (tdb->transaction->blocks[i] == NULL) {
638                         continue;
639                 }
640                 recovery_size += 2*sizeof(tdb_off_t);
641                 if (i == tdb->transaction->num_blocks-1) {
642                         recovery_size += tdb->transaction->last_block_size;
643                 } else {
644                         recovery_size += tdb->transaction->block_size;
645                 }
646         }       
647
648         return recovery_size;
649 }
650
651 /*
652   allocate the recovery area, or use an existing recovery area if it is
653   large enough
654 */
655 static int tdb_recovery_allocate(struct tdb_context *tdb, 
656                                  tdb_len_t *recovery_size,
657                                  tdb_off_t *recovery_offset,
658                                  tdb_len_t *recovery_max_size)
659 {
660         struct list_struct rec;
661         const struct tdb_methods *methods = tdb->transaction->io_methods;
662         tdb_off_t recovery_head;
663
664         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
665                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
666                 return -1;
667         }
668
669         rec.rec_len = 0;
670
671         if (recovery_head != 0 && 
672             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
673                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
674                 return -1;
675         }
676
677         *recovery_size = tdb_recovery_size(tdb);
678
679         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
680                 /* it fits in the existing area */
681                 *recovery_max_size = rec.rec_len;
682                 *recovery_offset = recovery_head;
683                 return 0;
684         }
685
686         /* we need to free up the old recovery area, then allocate a
687            new one at the end of the file. Note that we cannot use
688            tdb_allocate() to allocate the new one as that might return
689            us an area that is being currently used (as of the start of
690            the transaction) */
691         if (recovery_head != 0) {
692                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
693                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
694                         return -1;
695                 }
696         }
697
698         /* the tdb_free() call might have increased the recovery size */
699         *recovery_size = tdb_recovery_size(tdb);
700
701         /* round up to a multiple of page size */
702         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
703         *recovery_offset = tdb->map_size;
704         recovery_head = *recovery_offset;
705
706         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
707                                      (tdb->map_size - tdb->transaction->old_map_size) +
708                                      sizeof(rec) + *recovery_max_size) == -1) {
709                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
710                 return -1;
711         }
712
713         /* remap the file (if using mmap) */
714         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
715
716         /* we have to reset the old map size so that we don't try to expand the file
717            again in the transaction commit, which would destroy the recovery area */
718         tdb->transaction->old_map_size = tdb->map_size;
719
720         /* write the recovery header offset and sync - we can sync without a race here
721            as the magic ptr in the recovery record has not been set */
722         CONVERT(recovery_head);
723         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
724                                &recovery_head, sizeof(tdb_off_t)) == -1) {
725                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
726                 return -1;
727         }
728         if (transaction_write_existing(tdb, TDB_RECOVERY_HEAD, &recovery_head, sizeof(tdb_off_t)) == -1) {
729                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
730                 return -1;
731         }
732
733         return 0;
734 }
735
736
737 /*
738   setup the recovery data that will be used on a crash during commit
739 */
740 static int transaction_setup_recovery(struct tdb_context *tdb, 
741                                       tdb_off_t *magic_offset)
742 {
743         tdb_len_t recovery_size;
744         unsigned char *data, *p;
745         const struct tdb_methods *methods = tdb->transaction->io_methods;
746         struct list_struct *rec;
747         tdb_off_t recovery_offset, recovery_max_size;
748         tdb_off_t old_map_size = tdb->transaction->old_map_size;
749         uint32_t magic, tailer;
750         int i;
751
752         /*
753           check that the recovery area has enough space
754         */
755         if (tdb_recovery_allocate(tdb, &recovery_size, 
756                                   &recovery_offset, &recovery_max_size) == -1) {
757                 return -1;
758         }
759
760         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
761         if (data == NULL) {
762                 tdb->ecode = TDB_ERR_OOM;
763                 return -1;
764         }
765
766         rec = (struct list_struct *)data;
767         memset(rec, 0, sizeof(*rec));
768
769         rec->magic    = 0;
770         rec->data_len = recovery_size;
771         rec->rec_len  = recovery_max_size;
772         rec->key_len  = old_map_size;
773         CONVERT(rec);
774
775         /* build the recovery data into a single blob to allow us to do a single
776            large write, which should be more efficient */
777         p = data + sizeof(*rec);
778         for (i=0;i<tdb->transaction->num_blocks;i++) {
779                 tdb_off_t offset;
780                 tdb_len_t length;
781
782                 if (tdb->transaction->blocks[i] == NULL) {
783                         continue;
784                 }
785
786                 offset = i * tdb->transaction->block_size;
787                 length = tdb->transaction->block_size;
788                 if (i == tdb->transaction->num_blocks-1) {
789                         length = tdb->transaction->last_block_size;
790                 }
791                 
792                 if (offset >= old_map_size) {
793                         continue;
794                 }
795                 if (offset + length > tdb->transaction->old_map_size) {
796                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
797                         free(data);
798                         tdb->ecode = TDB_ERR_CORRUPT;
799                         return -1;
800                 }
801                 memcpy(p, &offset, 4);
802                 memcpy(p+4, &length, 4);
803                 if (DOCONV()) {
804                         tdb_convert(p, 8);
805                 }
806                 /* the recovery area contains the old data, not the
807                    new data, so we have to call the original tdb_read
808                    method to get it */
809                 if (methods->tdb_read(tdb, offset, p + 8, length, 0) != 0) {
810                         free(data);
811                         tdb->ecode = TDB_ERR_IO;
812                         return -1;
813                 }
814                 p += 8 + length;
815         }
816
817         /* and the tailer */
818         tailer = sizeof(*rec) + recovery_max_size;
819         memcpy(p, &tailer, 4);
820         CONVERT(p);
821
822         /* write the recovery data to the recovery area */
823         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
824                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
825                 free(data);
826                 tdb->ecode = TDB_ERR_IO;
827                 return -1;
828         }
829         if (transaction_write_existing(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
830                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery data\n"));
831                 free(data);
832                 tdb->ecode = TDB_ERR_IO;
833                 return -1;
834         }
835
836         /* as we don't have ordered writes, we have to sync the recovery
837            data before we update the magic to indicate that the recovery
838            data is present */
839         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
840                 free(data);
841                 return -1;
842         }
843
844         free(data);
845
846         magic = TDB_RECOVERY_MAGIC;
847         CONVERT(magic);
848
849         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
850
851         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
852                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
853                 tdb->ecode = TDB_ERR_IO;
854                 return -1;
855         }
856         if (transaction_write_existing(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
857                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write secondary recovery magic\n"));
858                 tdb->ecode = TDB_ERR_IO;
859                 return -1;
860         }
861
862         /* ensure the recovery magic marker is on disk */
863         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
864                 return -1;
865         }
866
867         return 0;
868 }
869
870 /*
871   prepare to commit the current transaction
872 */
873 int tdb_transaction_prepare_commit(struct tdb_context *tdb)
874 {       
875         const struct tdb_methods *methods;
876
877         if (tdb->transaction == NULL) {
878                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: no transaction\n"));
879                 return -1;
880         }
881
882         if (tdb->transaction->prepared) {
883                 tdb->ecode = TDB_ERR_EINVAL;
884                 tdb_transaction_cancel(tdb);
885                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction already prepared\n"));
886                 return -1;
887         }
888
889         if (tdb->transaction->transaction_error) {
890                 tdb->ecode = TDB_ERR_IO;
891                 tdb_transaction_cancel(tdb);
892                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: transaction error pending\n"));
893                 return -1;
894         }
895
896
897         if (tdb->transaction->nesting != 0) {
898                 return 0;
899         }               
900
901         /* check for a null transaction */
902         if (tdb->transaction->blocks == NULL) {
903                 return 0;
904         }
905
906         methods = tdb->transaction->io_methods;
907         
908         /* if there are any locks pending then the caller has not
909            nested their locks properly, so fail the transaction */
910         if (tdb->num_locks || tdb->global_lock.count) {
911                 tdb->ecode = TDB_ERR_LOCK;
912                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: locks pending on commit\n"));
913                 tdb_transaction_cancel(tdb);
914                 return -1;
915         }
916
917         /* upgrade the main transaction lock region to a write lock */
918         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
919                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to upgrade hash locks\n"));
920                 tdb->ecode = TDB_ERR_LOCK;
921                 tdb_transaction_cancel(tdb);
922                 return -1;
923         }
924
925         /* get the global lock - this prevents new users attaching to the database
926            during the commit */
927         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
928                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_prepare_commit: failed to get global lock\n"));
929                 tdb->ecode = TDB_ERR_LOCK;
930                 tdb_transaction_cancel(tdb);
931                 return -1;
932         }
933
934         if (!(tdb->flags & TDB_NOSYNC)) {
935                 /* write the recovery data to the end of the file */
936                 if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
937                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
938                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
939                         tdb_transaction_cancel(tdb);
940                         return -1;
941                 }
942         }
943
944         tdb->transaction->prepared = true;
945
946         /* expand the file to the new size if needed */
947         if (tdb->map_size != tdb->transaction->old_map_size) {
948                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
949                                              tdb->map_size - 
950                                              tdb->transaction->old_map_size) == -1) {
951                         tdb->ecode = TDB_ERR_IO;
952                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
953                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
954                         tdb_transaction_cancel(tdb);
955                         return -1;
956                 }
957                 tdb->map_size = tdb->transaction->old_map_size;
958                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
959         }
960
961         /* Keep the global lock until the actual commit */
962
963         return 0;
964 }
965
966 /*
967   commit the current transaction
968 */
969 int tdb_transaction_commit(struct tdb_context *tdb)
970 {       
971         const struct tdb_methods *methods;
972         int i;
973         bool need_repack;
974
975         if (tdb->transaction == NULL) {
976                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
977                 return -1;
978         }
979
980         if (tdb->transaction->transaction_error) {
981                 tdb->ecode = TDB_ERR_IO;
982                 tdb_transaction_cancel(tdb);
983                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
984                 return -1;
985         }
986
987
988         if (tdb->transaction->nesting != 0) {
989                 tdb->transaction->nesting--;
990                 return 0;
991         }
992
993         /* check for a null transaction */
994         if (tdb->transaction->blocks == NULL) {
995                 tdb_transaction_cancel(tdb);
996                 return 0;
997         }
998
999         if (!tdb->transaction->prepared) {
1000                 int ret = tdb_transaction_prepare_commit(tdb);
1001                 if (ret)
1002                         return ret;
1003         }
1004
1005         methods = tdb->transaction->io_methods;
1006
1007         /* perform all the writes */
1008         for (i=0;i<tdb->transaction->num_blocks;i++) {
1009                 tdb_off_t offset;
1010                 tdb_len_t length;
1011
1012                 if (tdb->transaction->blocks[i] == NULL) {
1013                         continue;
1014                 }
1015
1016                 offset = i * tdb->transaction->block_size;
1017                 length = tdb->transaction->block_size;
1018                 if (i == tdb->transaction->num_blocks-1) {
1019                         length = tdb->transaction->last_block_size;
1020                 }
1021
1022                 if (methods->tdb_write(tdb, offset, tdb->transaction->blocks[i], length) == -1) {
1023                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
1024                         
1025                         /* we've overwritten part of the data and
1026                            possibly expanded the file, so we need to
1027                            run the crash recovery code */
1028                         tdb->methods = methods;
1029                         tdb_transaction_recover(tdb); 
1030
1031                         tdb_transaction_cancel(tdb);
1032                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1033
1034                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
1035                         return -1;
1036                 }
1037                 SAFE_FREE(tdb->transaction->blocks[i]);
1038         } 
1039
1040         SAFE_FREE(tdb->transaction->blocks);
1041         tdb->transaction->num_blocks = 0;
1042
1043         /* ensure the new data is on disk */
1044         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1045                 return -1;
1046         }
1047
1048         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
1049
1050         /*
1051           TODO: maybe write to some dummy hdr field, or write to magic
1052           offset without mmap, before the last sync, instead of the
1053           utime() call
1054         */
1055
1056         /* on some systems (like Linux 2.6.x) changes via mmap/msync
1057            don't change the mtime of the file, this means the file may
1058            not be backed up (as tdb rounding to block sizes means that
1059            file size changes are quite rare too). The following forces
1060            mtime changes when a transaction completes */
1061 #ifdef HAVE_UTIME
1062         utime(tdb->name, NULL);
1063 #endif
1064
1065         need_repack = tdb->transaction->need_repack;
1066
1067         /* use a transaction cancel to free memory and remove the
1068            transaction locks */
1069         tdb_transaction_cancel(tdb);
1070
1071         if (need_repack) {
1072                 return tdb_repack(tdb);
1073         }
1074
1075         return 0;
1076 }
1077
1078
1079 /*
1080   recover from an aborted transaction. Must be called with exclusive
1081   database write access already established (including the global
1082   lock to prevent new processes attaching)
1083 */
1084 int tdb_transaction_recover(struct tdb_context *tdb)
1085 {
1086         tdb_off_t recovery_head, recovery_eof;
1087         unsigned char *data, *p;
1088         uint32_t zero = 0;
1089         struct list_struct rec;
1090
1091         /* find the recovery area */
1092         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
1093                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
1094                 tdb->ecode = TDB_ERR_IO;
1095                 return -1;
1096         }
1097
1098         if (recovery_head == 0) {
1099                 /* we have never allocated a recovery record */
1100                 return 0;
1101         }
1102
1103         /* read the recovery record */
1104         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
1105                                    sizeof(rec), DOCONV()) == -1) {
1106                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
1107                 tdb->ecode = TDB_ERR_IO;
1108                 return -1;
1109         }
1110
1111         if (rec.magic != TDB_RECOVERY_MAGIC) {
1112                 /* there is no valid recovery data */
1113                 return 0;
1114         }
1115
1116         if (tdb->read_only) {
1117                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
1118                 tdb->ecode = TDB_ERR_CORRUPT;
1119                 return -1;
1120         }
1121
1122         recovery_eof = rec.key_len;
1123
1124         data = (unsigned char *)malloc(rec.data_len);
1125         if (data == NULL) {
1126                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
1127                 tdb->ecode = TDB_ERR_OOM;
1128                 return -1;
1129         }
1130
1131         /* read the full recovery data */
1132         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
1133                                    rec.data_len, 0) == -1) {
1134                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
1135                 tdb->ecode = TDB_ERR_IO;
1136                 return -1;
1137         }
1138
1139         /* recover the file data */
1140         p = data;
1141         while (p+8 < data + rec.data_len) {
1142                 uint32_t ofs, len;
1143                 if (DOCONV()) {
1144                         tdb_convert(p, 8);
1145                 }
1146                 memcpy(&ofs, p, 4);
1147                 memcpy(&len, p+4, 4);
1148
1149                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
1150                         free(data);
1151                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1152                         tdb->ecode = TDB_ERR_IO;
1153                         return -1;
1154                 }
1155                 p += 8 + len;
1156         }
1157
1158         free(data);
1159
1160         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1161                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1162                 tdb->ecode = TDB_ERR_IO;
1163                 return -1;
1164         }
1165
1166         /* if the recovery area is after the recovered eof then remove it */
1167         if (recovery_eof <= recovery_head) {
1168                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1169                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1170                         tdb->ecode = TDB_ERR_IO;
1171                         return -1;                      
1172                 }
1173         }
1174
1175         /* remove the recovery magic */
1176         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1177                           &zero) == -1) {
1178                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1179                 tdb->ecode = TDB_ERR_IO;
1180                 return -1;                      
1181         }
1182         
1183         /* reduce the file size to the old size */
1184         tdb_munmap(tdb);
1185         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1186                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1187                 tdb->ecode = TDB_ERR_IO;
1188                 return -1;                      
1189         }
1190         tdb->map_size = recovery_eof;
1191         tdb_mmap(tdb);
1192
1193         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1194                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1195                 tdb->ecode = TDB_ERR_IO;
1196                 return -1;
1197         }
1198
1199         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1200                  recovery_eof));
1201
1202         /* all done */
1203         return 0;
1204 }