7eaacf7a164fbad3052c8b211ca8f547af3ba3be
[ira/wip.git] / source / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90 struct tdb_transaction_el {
91         struct tdb_transaction_el *next, *prev;
92         tdb_off_t offset;
93         tdb_len_t length;
94         unsigned char *data;
95 };
96
97 /*
98   hold the context of any current transaction
99 */
100 struct tdb_transaction {
101         /* we keep a mirrored copy of the tdb hash heads here so
102            tdb_next_hash_chain() can operate efficiently */
103         uint32_t *hash_heads;
104
105         /* the original io methods - used to do IOs to the real db */
106         const struct tdb_methods *io_methods;
107
108         /* the list of transaction elements. We use a doubly linked
109            list with a last pointer to allow us to keep the list
110            ordered, with first element at the front of the list. It
111            needs to be doubly linked as the read/write traversals need
112            to be backwards, while the commit needs to be forwards */
113         struct tdb_transaction_el *elements, *elements_last;
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* old file size before transaction */
126         tdb_len_t old_map_size;
127 };
128
129
130 /*
131   read while in a transaction. We need to check first if the data is in our list
132   of transaction elements, then if not do a real read
133 */
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
135                             tdb_len_t len, int cv)
136 {
137         struct tdb_transaction_el *el;
138
139         /* we need to walk the list backwards to get the most recent data */
140         for (el=tdb->transaction->elements_last;el;el=el->prev) {
141                 tdb_len_t partial;
142
143                 if (off+len <= el->offset) {
144                         continue;
145                 }
146                 if (off >= el->offset + el->length) {
147                         continue;
148                 }
149
150                 /* an overlapping read - needs to be split into up to
151                    2 reads and a memcpy */
152                 if (off < el->offset) {
153                         partial = el->offset - off;
154                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155                                 goto fail;
156                         }
157                         len -= partial;
158                         off += partial;
159                         buf = (void *)(partial + (char *)buf);
160                 }
161                 if (off + len <= el->offset + el->length) {
162                         partial = len;
163                 } else {
164                         partial = el->offset + el->length - off;
165                 }
166                 memcpy(buf, el->data + (off - el->offset), partial);
167                 if (cv) {
168                         tdb_convert(buf, len);
169                 }
170                 len -= partial;
171                 off += partial;
172                 buf = (void *)(partial + (char *)buf);
173                 
174                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177
178                 return 0;
179         }
180
181         /* its not in the transaction elements - do a real read */
182         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183
184 fail:
185         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
186         tdb->ecode = TDB_ERR_IO;
187         tdb->transaction->transaction_error = 1;
188         return -1;
189 }
190
191
192 /*
193   write while in a transaction
194 */
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
196                              const void *buf, tdb_len_t len)
197 {
198         struct tdb_transaction_el *el, *best_el=NULL;
199
200         if (len == 0) {
201                 return 0;
202         }
203         
204         /* if the write is to a hash head, then update the transaction
205            hash heads */
206         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208                 uint32_t chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
210         }
211
212         /* first see if we can replace an existing entry */
213         for (el=tdb->transaction->elements_last;el;el=el->prev) {
214                 tdb_len_t partial;
215
216                 if (best_el == NULL && off == el->offset+el->length) {
217                         best_el = el;
218                 }
219
220                 if (off+len <= el->offset) {
221                         continue;
222                 }
223                 if (off >= el->offset + el->length) {
224                         continue;
225                 }
226
227                 /* an overlapping write - needs to be split into up to
228                    2 writes and a memcpy */
229                 if (off < el->offset) {
230                         partial = el->offset - off;
231                         if (transaction_write(tdb, off, buf, partial) != 0) {
232                                 goto fail;
233                         }
234                         len -= partial;
235                         off += partial;
236                         buf = (const void *)(partial + (const char *)buf);
237                 }
238                 if (off + len <= el->offset + el->length) {
239                         partial = len;
240                 } else {
241                         partial = el->offset + el->length - off;
242                 }
243                 memcpy(el->data + (off - el->offset), buf, partial);
244                 len -= partial;
245                 off += partial;
246                 buf = (const void *)(partial + (const char *)buf);
247                 
248                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
249                         goto fail;
250                 }
251
252                 return 0;
253         }
254
255         /* see if we can append the new entry to an existing entry */
256         if (best_el && best_el->offset + best_el->length == off && 
257             (off+len < tdb->transaction->old_map_size ||
258              off > tdb->transaction->old_map_size)) {
259                 unsigned char *data = best_el->data;
260                 el = best_el;
261                 el->data = (unsigned char *)realloc(el->data,
262                                                     el->length + len);
263                 if (el->data == NULL) {
264                         tdb->ecode = TDB_ERR_OOM;
265                         tdb->transaction->transaction_error = 1;
266                         el->data = data;
267                         return -1;
268                 }
269                 if (buf) {
270                         memcpy(el->data + el->length, buf, len);
271                 } else {
272                         memset(el->data + el->length, TDB_PAD_BYTE, len);
273                 }
274                 el->length += len;
275                 return 0;
276         }
277
278         /* add a new entry at the end of the list */
279         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
280         if (el == NULL) {
281                 tdb->ecode = TDB_ERR_OOM;
282                 tdb->transaction->transaction_error = 1;                
283                 return -1;
284         }
285         el->next = NULL;
286         el->prev = tdb->transaction->elements_last;
287         el->offset = off;
288         el->length = len;
289         el->data = (unsigned char *)malloc(len);
290         if (el->data == NULL) {
291                 free(el);
292                 tdb->ecode = TDB_ERR_OOM;
293                 tdb->transaction->transaction_error = 1;                
294                 return -1;
295         }
296         if (buf) {
297                 memcpy(el->data, buf, len);
298         } else {
299                 memset(el->data, TDB_PAD_BYTE, len);
300         }
301         if (el->prev) {
302                 el->prev->next = el;
303         } else {
304                 tdb->transaction->elements = el;
305         }
306         tdb->transaction->elements_last = el;
307         return 0;
308
309 fail:
310         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
311         tdb->ecode = TDB_ERR_IO;
312         tdb->transaction->transaction_error = 1;
313         return -1;
314 }
315
316 /*
317   accelerated hash chain head search, using the cached hash heads
318 */
319 static void transaction_next_hash_chain(struct tdb_context *tdb, uint32_t *chain)
320 {
321         uint32_t h = *chain;
322         for (;h < tdb->header.hash_size;h++) {
323                 /* the +1 takes account of the freelist */
324                 if (0 != tdb->transaction->hash_heads[h+1]) {
325                         break;
326                 }
327         }
328         (*chain) = h;
329 }
330
331 /*
332   out of bounds check during a transaction
333 */
334 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
335 {
336         if (len <= tdb->map_size) {
337                 return 0;
338         }
339         return TDB_ERRCODE(TDB_ERR_IO, -1);
340 }
341
342 /*
343   transaction version of tdb_expand().
344 */
345 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
346                                    tdb_off_t addition)
347 {
348         /* add a write to the transaction elements, so subsequent
349            reads see the zero data */
350         if (transaction_write(tdb, size, NULL, addition) != 0) {
351                 return -1;
352         }
353
354         return 0;
355 }
356
357 /*
358   brlock during a transaction - ignore them
359 */
360 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
361                               int rw_type, int lck_type, int probe, size_t len)
362 {
363         return 0;
364 }
365
366 static const struct tdb_methods transaction_methods = {
367         transaction_read,
368         transaction_write,
369         transaction_next_hash_chain,
370         transaction_oob,
371         transaction_expand_file,
372         transaction_brlock
373 };
374
375
376 /*
377   start a tdb transaction. No token is returned, as only a single
378   transaction is allowed to be pending per tdb_context
379 */
380 int tdb_transaction_start(struct tdb_context *tdb)
381 {
382         /* some sanity checks */
383         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
384                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
385                 tdb->ecode = TDB_ERR_EINVAL;
386                 return -1;
387         }
388
389         /* cope with nested tdb_transaction_start() calls */
390         if (tdb->transaction != NULL) {
391                 tdb->transaction->nesting++;
392                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
393                          tdb->transaction->nesting));
394                 return 0;
395         }
396
397         if (tdb->num_locks != 0 || tdb->global_lock.count) {
398                 /* the caller must not have any locks when starting a
399                    transaction as otherwise we'll be screwed by lack
400                    of nested locks in posix */
401                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
402                 tdb->ecode = TDB_ERR_LOCK;
403                 return -1;
404         }
405
406         if (tdb->travlocks.next != NULL) {
407                 /* you cannot use transactions inside a traverse (although you can use
408                    traverse inside a transaction) as otherwise you can end up with
409                    deadlock */
410                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
411                 tdb->ecode = TDB_ERR_LOCK;
412                 return -1;
413         }
414
415         tdb->transaction = (struct tdb_transaction *)
416                 calloc(sizeof(struct tdb_transaction), 1);
417         if (tdb->transaction == NULL) {
418                 tdb->ecode = TDB_ERR_OOM;
419                 return -1;
420         }
421
422         /* get the transaction write lock. This is a blocking lock. As
423            discussed with Volker, there are a number of ways we could
424            make this async, which we will probably do in the future */
425         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
426                 SAFE_FREE(tdb->transaction);
427                 return -1;
428         }
429         
430         /* get a read lock from the freelist to the end of file. This
431            is upgraded to a write lock during the commit */
432         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
433                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
434                 tdb->ecode = TDB_ERR_LOCK;
435                 goto fail;
436         }
437
438         /* setup a copy of the hash table heads so the hash scan in
439            traverse can be fast */
440         tdb->transaction->hash_heads = (uint32_t *)
441                 calloc(tdb->header.hash_size+1, sizeof(uint32_t));
442         if (tdb->transaction->hash_heads == NULL) {
443                 tdb->ecode = TDB_ERR_OOM;
444                 goto fail;
445         }
446         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
447                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
448                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
449                 tdb->ecode = TDB_ERR_IO;
450                 goto fail;
451         }
452
453         /* make sure we know about any file expansions already done by
454            anyone else */
455         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
456         tdb->transaction->old_map_size = tdb->map_size;
457
458         /* finally hook the io methods, replacing them with
459            transaction specific methods */
460         tdb->transaction->io_methods = tdb->methods;
461         tdb->methods = &transaction_methods;
462
463         /* by calling this transaction write here, we ensure that we don't grow the
464            transaction linked list due to hash table updates */
465         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
466                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
467                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
468                 tdb->ecode = TDB_ERR_IO;
469                 tdb->methods = tdb->transaction->io_methods;
470                 goto fail;
471         }
472
473         return 0;
474         
475 fail:
476         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
477         tdb_transaction_unlock(tdb);
478         SAFE_FREE(tdb->transaction->hash_heads);
479         SAFE_FREE(tdb->transaction);
480         return -1;
481 }
482
483
484 /*
485   cancel the current transaction
486 */
487 int tdb_transaction_cancel(struct tdb_context *tdb)
488 {       
489         if (tdb->transaction == NULL) {
490                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
491                 return -1;
492         }
493
494         if (tdb->transaction->nesting != 0) {
495                 tdb->transaction->transaction_error = 1;
496                 tdb->transaction->nesting--;
497                 return 0;
498         }               
499
500         tdb->map_size = tdb->transaction->old_map_size;
501
502         /* free all the transaction elements */
503         while (tdb->transaction->elements) {
504                 struct tdb_transaction_el *el = tdb->transaction->elements;
505                 tdb->transaction->elements = el->next;
506                 free(el->data);
507                 free(el);
508         }
509
510         /* remove any global lock created during the transaction */
511         if (tdb->global_lock.count != 0) {
512                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
513                 tdb->global_lock.count = 0;
514         }
515
516         /* remove any locks created during the transaction */
517         if (tdb->num_locks != 0) {
518                 int i;
519                 for (i=0;i<tdb->num_lockrecs;i++) {
520                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
521                                    F_UNLCK,F_SETLKW, 0, 1);
522                 }
523                 tdb->num_locks = 0;
524                 tdb->num_lockrecs = 0;
525                 SAFE_FREE(tdb->lockrecs);
526         }
527
528         /* restore the normal io methods */
529         tdb->methods = tdb->transaction->io_methods;
530
531         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
532         tdb_transaction_unlock(tdb);
533         SAFE_FREE(tdb->transaction->hash_heads);
534         SAFE_FREE(tdb->transaction);
535         
536         return 0;
537 }
538
539 /*
540   sync to disk
541 */
542 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
543 {       
544         if (fsync(tdb->fd) != 0) {
545                 tdb->ecode = TDB_ERR_IO;
546                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
547                 return -1;
548         }
549 #ifdef MS_SYNC
550         if (tdb->map_ptr) {
551                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
552                 if (msync(moffset + (char *)tdb->map_ptr, 
553                           length + (offset - moffset), MS_SYNC) != 0) {
554                         tdb->ecode = TDB_ERR_IO;
555                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
556                                  strerror(errno)));
557                         return -1;
558                 }
559         }
560 #endif
561         return 0;
562 }
563
564
565 /*
566   work out how much space the linearised recovery data will consume
567 */
568 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
569 {
570         struct tdb_transaction_el *el;
571         tdb_len_t recovery_size = 0;
572
573         recovery_size = sizeof(uint32_t);
574         for (el=tdb->transaction->elements;el;el=el->next) {
575                 if (el->offset >= tdb->transaction->old_map_size) {
576                         continue;
577                 }
578                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
579         }
580
581         return recovery_size;
582 }
583
584 /*
585   allocate the recovery area, or use an existing recovery area if it is
586   large enough
587 */
588 static int tdb_recovery_allocate(struct tdb_context *tdb, 
589                                  tdb_len_t *recovery_size,
590                                  tdb_off_t *recovery_offset,
591                                  tdb_len_t *recovery_max_size)
592 {
593         struct list_struct rec;
594         const struct tdb_methods *methods = tdb->transaction->io_methods;
595         tdb_off_t recovery_head;
596
597         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
598                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
599                 return -1;
600         }
601
602         rec.rec_len = 0;
603
604         if (recovery_head != 0 && 
605             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
606                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
607                 return -1;
608         }
609
610         *recovery_size = tdb_recovery_size(tdb);
611
612         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
613                 /* it fits in the existing area */
614                 *recovery_max_size = rec.rec_len;
615                 *recovery_offset = recovery_head;
616                 return 0;
617         }
618
619         /* we need to free up the old recovery area, then allocate a
620            new one at the end of the file. Note that we cannot use
621            tdb_allocate() to allocate the new one as that might return
622            us an area that is being currently used (as of the start of
623            the transaction) */
624         if (recovery_head != 0) {
625                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
626                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
627                         return -1;
628                 }
629         }
630
631         /* the tdb_free() call might have increased the recovery size */
632         *recovery_size = tdb_recovery_size(tdb);
633
634         /* round up to a multiple of page size */
635         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
636         *recovery_offset = tdb->map_size;
637         recovery_head = *recovery_offset;
638
639         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
640                                      (tdb->map_size - tdb->transaction->old_map_size) +
641                                      sizeof(rec) + *recovery_max_size) == -1) {
642                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
643                 return -1;
644         }
645
646         /* remap the file (if using mmap) */
647         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
648
649         /* we have to reset the old map size so that we don't try to expand the file
650            again in the transaction commit, which would destroy the recovery area */
651         tdb->transaction->old_map_size = tdb->map_size;
652
653         /* write the recovery header offset and sync - we can sync without a race here
654            as the magic ptr in the recovery record has not been set */
655         CONVERT(recovery_head);
656         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
657                                &recovery_head, sizeof(tdb_off_t)) == -1) {
658                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
659                 return -1;
660         }
661
662         return 0;
663 }
664
665
666 /*
667   setup the recovery data that will be used on a crash during commit
668 */
669 static int transaction_setup_recovery(struct tdb_context *tdb, 
670                                       tdb_off_t *magic_offset)
671 {
672         struct tdb_transaction_el *el;
673         tdb_len_t recovery_size;
674         unsigned char *data, *p;
675         const struct tdb_methods *methods = tdb->transaction->io_methods;
676         struct list_struct *rec;
677         tdb_off_t recovery_offset, recovery_max_size;
678         tdb_off_t old_map_size = tdb->transaction->old_map_size;
679         uint32_t magic, tailer;
680
681         /*
682           check that the recovery area has enough space
683         */
684         if (tdb_recovery_allocate(tdb, &recovery_size, 
685                                   &recovery_offset, &recovery_max_size) == -1) {
686                 return -1;
687         }
688
689         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
690         if (data == NULL) {
691                 tdb->ecode = TDB_ERR_OOM;
692                 return -1;
693         }
694
695         rec = (struct list_struct *)data;
696         memset(rec, 0, sizeof(*rec));
697
698         rec->magic    = 0;
699         rec->data_len = recovery_size;
700         rec->rec_len  = recovery_max_size;
701         rec->key_len  = old_map_size;
702         CONVERT(rec);
703
704         /* build the recovery data into a single blob to allow us to do a single
705            large write, which should be more efficient */
706         p = data + sizeof(*rec);
707         for (el=tdb->transaction->elements;el;el=el->next) {
708                 if (el->offset >= old_map_size) {
709                         continue;
710                 }
711                 if (el->offset + el->length > tdb->transaction->old_map_size) {
712                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
713                         free(data);
714                         tdb->ecode = TDB_ERR_CORRUPT;
715                         return -1;
716                 }
717                 memcpy(p, &el->offset, 4);
718                 memcpy(p+4, &el->length, 4);
719                 if (DOCONV()) {
720                         tdb_convert(p, 8);
721                 }
722                 /* the recovery area contains the old data, not the
723                    new data, so we have to call the original tdb_read
724                    method to get it */
725                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
726                         free(data);
727                         tdb->ecode = TDB_ERR_IO;
728                         return -1;
729                 }
730                 p += 8 + el->length;
731         }
732
733         /* and the tailer */
734         tailer = sizeof(*rec) + recovery_max_size;
735         memcpy(p, &tailer, 4);
736         CONVERT(p);
737
738         /* write the recovery data to the recovery area */
739         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
740                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
741                 free(data);
742                 tdb->ecode = TDB_ERR_IO;
743                 return -1;
744         }
745
746         /* as we don't have ordered writes, we have to sync the recovery
747            data before we update the magic to indicate that the recovery
748            data is present */
749         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
750                 free(data);
751                 return -1;
752         }
753
754         free(data);
755
756         magic = TDB_RECOVERY_MAGIC;
757         CONVERT(magic);
758
759         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
760
761         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
762                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
763                 tdb->ecode = TDB_ERR_IO;
764                 return -1;
765         }
766
767         /* ensure the recovery magic marker is on disk */
768         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
769                 return -1;
770         }
771
772         return 0;
773 }
774
775 /*
776   commit the current transaction
777 */
778 int tdb_transaction_commit(struct tdb_context *tdb)
779 {       
780         const struct tdb_methods *methods;
781         tdb_off_t magic_offset = 0;
782         uint32_t zero = 0;
783
784         if (tdb->transaction == NULL) {
785                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
786                 return -1;
787         }
788
789         if (tdb->transaction->transaction_error) {
790                 tdb->ecode = TDB_ERR_IO;
791                 tdb_transaction_cancel(tdb);
792                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
793                 return -1;
794         }
795
796         if (tdb->transaction->nesting != 0) {
797                 tdb->transaction->nesting--;
798                 return 0;
799         }               
800
801         /* check for a null transaction */
802         if (tdb->transaction->elements == NULL) {
803                 tdb_transaction_cancel(tdb);
804                 return 0;
805         }
806
807         methods = tdb->transaction->io_methods;
808         
809         /* if there are any locks pending then the caller has not
810            nested their locks properly, so fail the transaction */
811         if (tdb->num_locks || tdb->global_lock.count) {
812                 tdb->ecode = TDB_ERR_LOCK;
813                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
814                 tdb_transaction_cancel(tdb);
815                 return -1;
816         }
817
818         /* upgrade the main transaction lock region to a write lock */
819         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
820                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
821                 tdb->ecode = TDB_ERR_LOCK;
822                 tdb_transaction_cancel(tdb);
823                 return -1;
824         }
825
826         /* get the global lock - this prevents new users attaching to the database
827            during the commit */
828         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
829                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
830                 tdb->ecode = TDB_ERR_LOCK;
831                 tdb_transaction_cancel(tdb);
832                 return -1;
833         }
834
835         if (!(tdb->flags & TDB_NOSYNC)) {
836                 /* write the recovery data to the end of the file */
837                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
838                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
839                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
840                         tdb_transaction_cancel(tdb);
841                         return -1;
842                 }
843         }
844
845         /* expand the file to the new size if needed */
846         if (tdb->map_size != tdb->transaction->old_map_size) {
847                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
848                                              tdb->map_size - 
849                                              tdb->transaction->old_map_size) == -1) {
850                         tdb->ecode = TDB_ERR_IO;
851                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
852                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
853                         tdb_transaction_cancel(tdb);
854                         return -1;
855                 }
856                 tdb->map_size = tdb->transaction->old_map_size;
857                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
858         }
859
860         /* perform all the writes */
861         while (tdb->transaction->elements) {
862                 struct tdb_transaction_el *el = tdb->transaction->elements;
863
864                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
865                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
866                         
867                         /* we've overwritten part of the data and
868                            possibly expanded the file, so we need to
869                            run the crash recovery code */
870                         tdb->methods = methods;
871                         tdb_transaction_recover(tdb); 
872
873                         tdb_transaction_cancel(tdb);
874                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
875
876                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
877                         return -1;
878                 }
879                 tdb->transaction->elements = el->next;
880                 free(el->data); 
881                 free(el);
882         } 
883
884         if (!(tdb->flags & TDB_NOSYNC)) {
885                 /* ensure the new data is on disk */
886                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
887                         return -1;
888                 }
889
890                 /* remove the recovery marker */
891                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
892                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
893                         return -1;
894                 }
895
896                 /* ensure the recovery marker has been removed on disk */
897                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
898                         return -1;
899                 }
900         }
901
902         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
903
904         /*
905           TODO: maybe write to some dummy hdr field, or write to magic
906           offset without mmap, before the last sync, instead of the
907           utime() call
908         */
909
910         /* on some systems (like Linux 2.6.x) changes via mmap/msync
911            don't change the mtime of the file, this means the file may
912            not be backed up (as tdb rounding to block sizes means that
913            file size changes are quite rare too). The following forces
914            mtime changes when a transaction completes */
915 #ifdef HAVE_UTIME
916         utime(tdb->name, NULL);
917 #endif
918
919         /* use a transaction cancel to free memory and remove the
920            transaction locks */
921         tdb_transaction_cancel(tdb);
922         return 0;
923 }
924
925
926 /*
927   recover from an aborted transaction. Must be called with exclusive
928   database write access already established (including the global
929   lock to prevent new processes attaching)
930 */
931 int tdb_transaction_recover(struct tdb_context *tdb)
932 {
933         tdb_off_t recovery_head, recovery_eof;
934         unsigned char *data, *p;
935         uint32_t zero = 0;
936         struct list_struct rec;
937
938         /* find the recovery area */
939         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
940                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
941                 tdb->ecode = TDB_ERR_IO;
942                 return -1;
943         }
944
945         if (recovery_head == 0) {
946                 /* we have never allocated a recovery record */
947                 return 0;
948         }
949
950         /* read the recovery record */
951         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
952                                    sizeof(rec), DOCONV()) == -1) {
953                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
954                 tdb->ecode = TDB_ERR_IO;
955                 return -1;
956         }
957
958         if (rec.magic != TDB_RECOVERY_MAGIC) {
959                 /* there is no valid recovery data */
960                 return 0;
961         }
962
963         if (tdb->read_only) {
964                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
965                 tdb->ecode = TDB_ERR_CORRUPT;
966                 return -1;
967         }
968
969         recovery_eof = rec.key_len;
970
971         data = (unsigned char *)malloc(rec.data_len);
972         if (data == NULL) {
973                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
974                 tdb->ecode = TDB_ERR_OOM;
975                 return -1;
976         }
977
978         /* read the full recovery data */
979         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
980                                    rec.data_len, 0) == -1) {
981                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
982                 tdb->ecode = TDB_ERR_IO;
983                 return -1;
984         }
985
986         /* recover the file data */
987         p = data;
988         while (p+8 < data + rec.data_len) {
989                 uint32_t ofs, len;
990                 if (DOCONV()) {
991                         tdb_convert(p, 8);
992                 }
993                 memcpy(&ofs, p, 4);
994                 memcpy(&len, p+4, 4);
995
996                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
997                         free(data);
998                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
999                         tdb->ecode = TDB_ERR_IO;
1000                         return -1;
1001                 }
1002                 p += 8 + len;
1003         }
1004
1005         free(data);
1006
1007         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1008                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1009                 tdb->ecode = TDB_ERR_IO;
1010                 return -1;
1011         }
1012
1013         /* if the recovery area is after the recovered eof then remove it */
1014         if (recovery_eof <= recovery_head) {
1015                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1016                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1017                         tdb->ecode = TDB_ERR_IO;
1018                         return -1;                      
1019                 }
1020         }
1021
1022         /* remove the recovery magic */
1023         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1024                           &zero) == -1) {
1025                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1026                 tdb->ecode = TDB_ERR_IO;
1027                 return -1;                      
1028         }
1029         
1030         /* reduce the file size to the old size */
1031         tdb_munmap(tdb);
1032         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1033                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1034                 tdb->ecode = TDB_ERR_IO;
1035                 return -1;                      
1036         }
1037         tdb->map_size = recovery_eof;
1038         tdb_mmap(tdb);
1039
1040         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1041                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1042                 tdb->ecode = TDB_ERR_IO;
1043                 return -1;
1044         }
1045
1046         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1047                  recovery_eof));
1048
1049         /* all done */
1050         return 0;
1051 }