r22247: merge from samba4:
[sfrench/samba-autobuild/.git] / source / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 #include "tdb_private.h"
28
29 /*
30   transaction design:
31
32   - only allow a single transaction at a time per database. This makes
33     using the transaction API simpler, as otherwise the caller would
34     have to cope with temporary failures in transactions that conflict
35     with other current transactions
36
37   - keep the transaction recovery information in the same file as the
38     database, using a special 'transaction recovery' record pointed at
39     by the header. This removes the need for extra journal files as
40     used by some other databases
41
42   - dynamically allocated the transaction recover record, re-using it
43     for subsequent transactions. If a larger record is needed then
44     tdb_free() the old record to place it on the normal tdb freelist
45     before allocating the new record
46
47   - during transactions, keep a linked list of writes all that have
48     been performed by intercepting all tdb_write() calls. The hooked
49     transaction versions of tdb_read() and tdb_write() check this
50     linked list and try to use the elements of the list in preference
51     to the real database.
52
53   - don't allow any locks to be held when a transaction starts,
54     otherwise we can end up with deadlock (plus lack of lock nesting
55     in posix locks would mean the lock is lost)
56
57   - if the caller gains a lock during the transaction but doesn't
58     release it then fail the commit
59
60   - allow for nested calls to tdb_transaction_start(), re-using the
61     existing transaction record. If the inner transaction is cancelled
62     then a subsequent commit will fail
63  
64   - keep a mirrored copy of the tdb hash chain heads to allow for the
65     fast hash heads scan on traverse, updating the mirrored copy in
66     the transaction version of tdb_write
67
68   - allow callers to mix transaction and non-transaction use of tdb,
69     although once a transaction is started then an exclusive lock is
70     gained until the transaction is committed or cancelled
71
72   - the commit stategy involves first saving away all modified data
73     into a linearised buffer in the transaction recovery area, then
74     marking the transaction recovery area with a magic value to
75     indicate a valid recovery record. In total 4 fsync/msync calls are
76     needed per commit to prevent race conditions. It might be possible
77     to reduce this to 3 or even 2 with some more work.
78
79   - check for a valid recovery record on open of the tdb, while the
80     global lock is held. Automatically recover from the transaction
81     recovery area if needed, then continue with the open as
82     usual. This allows for smooth crash recovery with no administrator
83     intervention.
84
85   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86     still available, but no transaction recovery area is used and no
87     fsync/msync calls are made.
88
89 */
90
91 struct tdb_transaction_el {
92         struct tdb_transaction_el *next, *prev;
93         tdb_off_t offset;
94         tdb_len_t length;
95         unsigned char *data;
96 };
97
98 /*
99   hold the context of any current transaction
100 */
101 struct tdb_transaction {
102         /* we keep a mirrored copy of the tdb hash heads here so
103            tdb_next_hash_chain() can operate efficiently */
104         u32 *hash_heads;
105
106         /* the original io methods - used to do IOs to the real db */
107         const struct tdb_methods *io_methods;
108
109         /* the list of transaction elements. We use a doubly linked
110            list with a last pointer to allow us to keep the list
111            ordered, with first element at the front of the list. It
112            needs to be doubly linked as the read/write traversals need
113            to be backwards, while the commit needs to be forwards */
114         struct tdb_transaction_el *elements, *elements_last;
115
116         /* non-zero when an internal transaction error has
117            occurred. All write operations will then fail until the
118            transaction is ended */
119         int transaction_error;
120
121         /* when inside a transaction we need to keep track of any
122            nested tdb_transaction_start() calls, as these are allowed,
123            but don't create a new transaction */
124         int nesting;
125
126         /* old file size before transaction */
127         tdb_len_t old_map_size;
128 };
129
130
131 /*
132   read while in a transaction. We need to check first if the data is in our list
133   of transaction elements, then if not do a real read
134 */
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
136                             tdb_len_t len, int cv)
137 {
138         struct tdb_transaction_el *el;
139
140         /* we need to walk the list backwards to get the most recent data */
141         for (el=tdb->transaction->elements_last;el;el=el->prev) {
142                 tdb_len_t partial;
143
144                 if (off+len <= el->offset) {
145                         continue;
146                 }
147                 if (off >= el->offset + el->length) {
148                         continue;
149                 }
150
151                 /* an overlapping read - needs to be split into up to
152                    2 reads and a memcpy */
153                 if (off < el->offset) {
154                         partial = el->offset - off;
155                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
156                                 goto fail;
157                         }
158                         len -= partial;
159                         off += partial;
160                         buf = (void *)(partial + (char *)buf);
161                 }
162                 if (off + len <= el->offset + el->length) {
163                         partial = len;
164                 } else {
165                         partial = el->offset + el->length - off;
166                 }
167                 memcpy(buf, el->data + (off - el->offset), partial);
168                 if (cv) {
169                         tdb_convert(buf, len);
170                 }
171                 len -= partial;
172                 off += partial;
173                 buf = (void *)(partial + (char *)buf);
174                 
175                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
176                         goto fail;
177                 }
178
179                 return 0;
180         }
181
182         /* its not in the transaction elements - do a real read */
183         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
184
185 fail:
186         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
187         tdb->ecode = TDB_ERR_IO;
188         tdb->transaction->transaction_error = 1;
189         return -1;
190 }
191
192
193 /*
194   write while in a transaction
195 */
196 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
197                              const void *buf, tdb_len_t len)
198 {
199         struct tdb_transaction_el *el, *best_el=NULL;
200
201         if (len == 0) {
202                 return 0;
203         }
204         
205         /* if the write is to a hash head, then update the transaction
206            hash heads */
207         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
208             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
209                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
210                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
211         }
212
213         /* first see if we can replace an existing entry */
214         for (el=tdb->transaction->elements_last;el;el=el->prev) {
215                 tdb_len_t partial;
216
217                 if (best_el == NULL && off == el->offset+el->length) {
218                         best_el = el;
219                 }
220
221                 if (off+len <= el->offset) {
222                         continue;
223                 }
224                 if (off >= el->offset + el->length) {
225                         continue;
226                 }
227
228                 /* an overlapping write - needs to be split into up to
229                    2 writes and a memcpy */
230                 if (off < el->offset) {
231                         partial = el->offset - off;
232                         if (transaction_write(tdb, off, buf, partial) != 0) {
233                                 goto fail;
234                         }
235                         len -= partial;
236                         off += partial;
237                         buf = (const void *)(partial + (const char *)buf);
238                 }
239                 if (off + len <= el->offset + el->length) {
240                         partial = len;
241                 } else {
242                         partial = el->offset + el->length - off;
243                 }
244                 memcpy(el->data + (off - el->offset), buf, partial);
245                 len -= partial;
246                 off += partial;
247                 buf = (const void *)(partial + (const char *)buf);
248                 
249                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
250                         goto fail;
251                 }
252
253                 return 0;
254         }
255
256         /* see if we can append the new entry to an existing entry */
257         if (best_el && best_el->offset + best_el->length == off && 
258             (off+len < tdb->transaction->old_map_size ||
259              off > tdb->transaction->old_map_size)) {
260                 unsigned char *data = best_el->data;
261                 el = best_el;
262                 el->data = (unsigned char *)realloc(el->data,
263                                                     el->length + len);
264                 if (el->data == NULL) {
265                         tdb->ecode = TDB_ERR_OOM;
266                         tdb->transaction->transaction_error = 1;
267                         el->data = data;
268                         return -1;
269                 }
270                 if (buf) {
271                         memcpy(el->data + el->length, buf, len);
272                 } else {
273                         memset(el->data + el->length, TDB_PAD_BYTE, len);
274                 }
275                 el->length += len;
276                 return 0;
277         }
278
279         /* add a new entry at the end of the list */
280         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
281         if (el == NULL) {
282                 tdb->ecode = TDB_ERR_OOM;
283                 tdb->transaction->transaction_error = 1;                
284                 return -1;
285         }
286         el->next = NULL;
287         el->prev = tdb->transaction->elements_last;
288         el->offset = off;
289         el->length = len;
290         el->data = (unsigned char *)malloc(len);
291         if (el->data == NULL) {
292                 free(el);
293                 tdb->ecode = TDB_ERR_OOM;
294                 tdb->transaction->transaction_error = 1;                
295                 return -1;
296         }
297         if (buf) {
298                 memcpy(el->data, buf, len);
299         } else {
300                 memset(el->data, TDB_PAD_BYTE, len);
301         }
302         if (el->prev) {
303                 el->prev->next = el;
304         } else {
305                 tdb->transaction->elements = el;
306         }
307         tdb->transaction->elements_last = el;
308         return 0;
309
310 fail:
311         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
312         tdb->ecode = TDB_ERR_IO;
313         tdb->transaction->transaction_error = 1;
314         return -1;
315 }
316
317 /*
318   accelerated hash chain head search, using the cached hash heads
319 */
320 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
321 {
322         u32 h = *chain;
323         for (;h < tdb->header.hash_size;h++) {
324                 /* the +1 takes account of the freelist */
325                 if (0 != tdb->transaction->hash_heads[h+1]) {
326                         break;
327                 }
328         }
329         (*chain) = h;
330 }
331
332 /*
333   out of bounds check during a transaction
334 */
335 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
336 {
337         if (len <= tdb->map_size) {
338                 return 0;
339         }
340         return TDB_ERRCODE(TDB_ERR_IO, -1);
341 }
342
343 /*
344   transaction version of tdb_expand().
345 */
346 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
347                                    tdb_off_t addition)
348 {
349         /* add a write to the transaction elements, so subsequent
350            reads see the zero data */
351         if (transaction_write(tdb, size, NULL, addition) != 0) {
352                 return -1;
353         }
354
355         return 0;
356 }
357
358 /*
359   brlock during a transaction - ignore them
360 */
361 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
362                        int rw_type, int lck_type, int probe, size_t len)
363 {
364         return 0;
365 }
366
367 static const struct tdb_methods transaction_methods = {
368         transaction_read,
369         transaction_write,
370         transaction_next_hash_chain,
371         transaction_oob,
372         transaction_expand_file,
373         transaction_brlock
374 };
375
376
377 /*
378   start a tdb transaction. No token is returned, as only a single
379   transaction is allowed to be pending per tdb_context
380 */
381 int tdb_transaction_start(struct tdb_context *tdb)
382 {
383         /* some sanity checks */
384         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
385                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
386                 tdb->ecode = TDB_ERR_EINVAL;
387                 return -1;
388         }
389
390         /* cope with nested tdb_transaction_start() calls */
391         if (tdb->transaction != NULL) {
392                 tdb->transaction->nesting++;
393                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
394                          tdb->transaction->nesting));
395                 return 0;
396         }
397
398         if (tdb->num_locks != 0 || tdb->global_lock.count) {
399                 /* the caller must not have any locks when starting a
400                    transaction as otherwise we'll be screwed by lack
401                    of nested locks in posix */
402                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
403                 tdb->ecode = TDB_ERR_LOCK;
404                 return -1;
405         }
406
407         if (tdb->travlocks.next != NULL) {
408                 /* you cannot use transactions inside a traverse (although you can use
409                    traverse inside a transaction) as otherwise you can end up with
410                    deadlock */
411                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
412                 tdb->ecode = TDB_ERR_LOCK;
413                 return -1;
414         }
415
416         tdb->transaction = (struct tdb_transaction *)
417                 calloc(sizeof(struct tdb_transaction), 1);
418         if (tdb->transaction == NULL) {
419                 tdb->ecode = TDB_ERR_OOM;
420                 return -1;
421         }
422
423         /* get the transaction write lock. This is a blocking lock. As
424            discussed with Volker, there are a number of ways we could
425            make this async, which we will probably do in the future */
426         if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
427                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
428                 tdb->ecode = TDB_ERR_LOCK;
429                 SAFE_FREE(tdb->transaction);
430                 return -1;
431         }
432         
433         /* get a read lock from the freelist to the end of file. This
434            is upgraded to a write lock during the commit */
435         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
436                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
437                 tdb->ecode = TDB_ERR_LOCK;
438                 goto fail;
439         }
440
441         /* setup a copy of the hash table heads so the hash scan in
442            traverse can be fast */
443         tdb->transaction->hash_heads = (u32 *)
444                 calloc(tdb->header.hash_size+1, sizeof(u32));
445         if (tdb->transaction->hash_heads == NULL) {
446                 tdb->ecode = TDB_ERR_OOM;
447                 goto fail;
448         }
449         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
450                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
451                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
452                 tdb->ecode = TDB_ERR_IO;
453                 goto fail;
454         }
455
456         /* make sure we know about any file expansions already done by
457            anyone else */
458         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
459         tdb->transaction->old_map_size = tdb->map_size;
460
461         /* finally hook the io methods, replacing them with
462            transaction specific methods */
463         tdb->transaction->io_methods = tdb->methods;
464         tdb->methods = &transaction_methods;
465
466         /* by calling this transaction write here, we ensure that we don't grow the
467            transaction linked list due to hash table updates */
468         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
469                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
470                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
471                 tdb->ecode = TDB_ERR_IO;
472                 goto fail;
473         }
474
475         return 0;
476         
477 fail:
478         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
479         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
480         SAFE_FREE(tdb->transaction->hash_heads);
481         SAFE_FREE(tdb->transaction);
482         return -1;
483 }
484
485
486 /*
487   cancel the current transaction
488 */
489 int tdb_transaction_cancel(struct tdb_context *tdb)
490 {       
491         if (tdb->transaction == NULL) {
492                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
493                 return -1;
494         }
495
496         if (tdb->transaction->nesting != 0) {
497                 tdb->transaction->transaction_error = 1;
498                 tdb->transaction->nesting--;
499                 return 0;
500         }               
501
502         tdb->map_size = tdb->transaction->old_map_size;
503
504         /* free all the transaction elements */
505         while (tdb->transaction->elements) {
506                 struct tdb_transaction_el *el = tdb->transaction->elements;
507                 tdb->transaction->elements = el->next;
508                 free(el->data);
509                 free(el);
510         }
511
512         /* remove any global lock created during the transaction */
513         if (tdb->global_lock.count != 0) {
514                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
515                 tdb->global_lock.count = 0;
516         }
517
518         /* remove any locks created during the transaction */
519         if (tdb->num_locks != 0) {
520                 int i;
521                 for (i=0;i<tdb->num_lockrecs;i++) {
522                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
523                                    F_UNLCK,F_SETLKW, 0, 1);
524                 }
525                 tdb->num_locks = 0;
526                 tdb->num_lockrecs = 0;
527                 SAFE_FREE(tdb->lockrecs);
528         }
529
530         /* restore the normal io methods */
531         tdb->methods = tdb->transaction->io_methods;
532
533         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
534         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
535         SAFE_FREE(tdb->transaction->hash_heads);
536         SAFE_FREE(tdb->transaction);
537         
538         return 0;
539 }
540
541 /*
542   sync to disk
543 */
544 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
545 {       
546         if (fsync(tdb->fd) != 0) {
547                 tdb->ecode = TDB_ERR_IO;
548                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
549                 return -1;
550         }
551 #ifdef MS_SYNC
552         if (tdb->map_ptr) {
553                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
554                 if (msync(moffset + (char *)tdb->map_ptr, 
555                           length + (offset - moffset), MS_SYNC) != 0) {
556                         tdb->ecode = TDB_ERR_IO;
557                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
558                                  strerror(errno)));
559                         return -1;
560                 }
561         }
562 #endif
563         return 0;
564 }
565
566
567 /*
568   work out how much space the linearised recovery data will consume
569 */
570 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
571 {
572         struct tdb_transaction_el *el;
573         tdb_len_t recovery_size = 0;
574
575         recovery_size = sizeof(u32);
576         for (el=tdb->transaction->elements;el;el=el->next) {
577                 if (el->offset >= tdb->transaction->old_map_size) {
578                         continue;
579                 }
580                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
581         }
582
583         return recovery_size;
584 }
585
586 /*
587   allocate the recovery area, or use an existing recovery area if it is
588   large enough
589 */
590 static int tdb_recovery_allocate(struct tdb_context *tdb, 
591                                  tdb_len_t *recovery_size,
592                                  tdb_off_t *recovery_offset,
593                                  tdb_len_t *recovery_max_size)
594 {
595         struct list_struct rec;
596         const struct tdb_methods *methods = tdb->transaction->io_methods;
597         tdb_off_t recovery_head;
598
599         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
600                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
601                 return -1;
602         }
603
604         rec.rec_len = 0;
605
606         if (recovery_head != 0 && 
607             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
608                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
609                 return -1;
610         }
611
612         *recovery_size = tdb_recovery_size(tdb);
613
614         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
615                 /* it fits in the existing area */
616                 *recovery_max_size = rec.rec_len;
617                 *recovery_offset = recovery_head;
618                 return 0;
619         }
620
621         /* we need to free up the old recovery area, then allocate a
622            new one at the end of the file. Note that we cannot use
623            tdb_allocate() to allocate the new one as that might return
624            us an area that is being currently used (as of the start of
625            the transaction) */
626         if (recovery_head != 0) {
627                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
628                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
629                         return -1;
630                 }
631         }
632
633         /* the tdb_free() call might have increased the recovery size */
634         *recovery_size = tdb_recovery_size(tdb);
635
636         /* round up to a multiple of page size */
637         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
638         *recovery_offset = tdb->map_size;
639         recovery_head = *recovery_offset;
640
641         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
642                                      (tdb->map_size - tdb->transaction->old_map_size) +
643                                      sizeof(rec) + *recovery_max_size) == -1) {
644                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
645                 return -1;
646         }
647
648         /* remap the file (if using mmap) */
649         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
650
651         /* we have to reset the old map size so that we don't try to expand the file
652            again in the transaction commit, which would destroy the recovery area */
653         tdb->transaction->old_map_size = tdb->map_size;
654
655         /* write the recovery header offset and sync - we can sync without a race here
656            as the magic ptr in the recovery record has not been set */
657         CONVERT(recovery_head);
658         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
659                                &recovery_head, sizeof(tdb_off_t)) == -1) {
660                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
661                 return -1;
662         }
663
664         return 0;
665 }
666
667
668 /*
669   setup the recovery data that will be used on a crash during commit
670 */
671 static int transaction_setup_recovery(struct tdb_context *tdb, 
672                                       tdb_off_t *magic_offset)
673 {
674         struct tdb_transaction_el *el;
675         tdb_len_t recovery_size;
676         unsigned char *data, *p;
677         const struct tdb_methods *methods = tdb->transaction->io_methods;
678         struct list_struct *rec;
679         tdb_off_t recovery_offset, recovery_max_size;
680         tdb_off_t old_map_size = tdb->transaction->old_map_size;
681         u32 magic, tailer;
682
683         /*
684           check that the recovery area has enough space
685         */
686         if (tdb_recovery_allocate(tdb, &recovery_size, 
687                                   &recovery_offset, &recovery_max_size) == -1) {
688                 return -1;
689         }
690
691         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
692         if (data == NULL) {
693                 tdb->ecode = TDB_ERR_OOM;
694                 return -1;
695         }
696
697         rec = (struct list_struct *)data;
698         memset(rec, 0, sizeof(*rec));
699
700         rec->magic    = 0;
701         rec->data_len = recovery_size;
702         rec->rec_len  = recovery_max_size;
703         rec->key_len  = old_map_size;
704         CONVERT(rec);
705
706         /* build the recovery data into a single blob to allow us to do a single
707            large write, which should be more efficient */
708         p = data + sizeof(*rec);
709         for (el=tdb->transaction->elements;el;el=el->next) {
710                 if (el->offset >= old_map_size) {
711                         continue;
712                 }
713                 if (el->offset + el->length > tdb->transaction->old_map_size) {
714                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
715                         free(data);
716                         tdb->ecode = TDB_ERR_CORRUPT;
717                         return -1;
718                 }
719                 memcpy(p, &el->offset, 4);
720                 memcpy(p+4, &el->length, 4);
721                 if (DOCONV()) {
722                         tdb_convert(p, 8);
723                 }
724                 /* the recovery area contains the old data, not the
725                    new data, so we have to call the original tdb_read
726                    method to get it */
727                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
728                         free(data);
729                         tdb->ecode = TDB_ERR_IO;
730                         return -1;
731                 }
732                 p += 8 + el->length;
733         }
734
735         /* and the tailer */
736         tailer = sizeof(*rec) + recovery_max_size;
737         memcpy(p, &tailer, 4);
738         CONVERT(p);
739
740         /* write the recovery data to the recovery area */
741         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
742                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
743                 free(data);
744                 tdb->ecode = TDB_ERR_IO;
745                 return -1;
746         }
747
748         /* as we don't have ordered writes, we have to sync the recovery
749            data before we update the magic to indicate that the recovery
750            data is present */
751         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
752                 free(data);
753                 return -1;
754         }
755
756         free(data);
757
758         magic = TDB_RECOVERY_MAGIC;
759         CONVERT(magic);
760
761         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
762
763         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
764                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
765                 tdb->ecode = TDB_ERR_IO;
766                 return -1;
767         }
768
769         /* ensure the recovery magic marker is on disk */
770         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
771                 return -1;
772         }
773
774         return 0;
775 }
776
777 /*
778   commit the current transaction
779 */
780 int tdb_transaction_commit(struct tdb_context *tdb)
781 {       
782         const struct tdb_methods *methods;
783         tdb_off_t magic_offset = 0;
784         u32 zero = 0;
785
786         if (tdb->transaction == NULL) {
787                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
788                 return -1;
789         }
790
791         if (tdb->transaction->transaction_error) {
792                 tdb->ecode = TDB_ERR_IO;
793                 tdb_transaction_cancel(tdb);
794                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
795                 return -1;
796         }
797
798         if (tdb->transaction->nesting != 0) {
799                 tdb->transaction->nesting--;
800                 return 0;
801         }               
802
803         /* check for a null transaction */
804         if (tdb->transaction->elements == NULL) {
805                 tdb_transaction_cancel(tdb);
806                 return 0;
807         }
808
809         methods = tdb->transaction->io_methods;
810         
811         /* if there are any locks pending then the caller has not
812            nested their locks properly, so fail the transaction */
813         if (tdb->num_locks || tdb->global_lock.count) {
814                 tdb->ecode = TDB_ERR_LOCK;
815                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
816                 tdb_transaction_cancel(tdb);
817                 return -1;
818         }
819
820         /* upgrade the main transaction lock region to a write lock */
821         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
822                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
823                 tdb->ecode = TDB_ERR_LOCK;
824                 tdb_transaction_cancel(tdb);
825                 return -1;
826         }
827
828         /* get the global lock - this prevents new users attaching to the database
829            during the commit */
830         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
831                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
832                 tdb->ecode = TDB_ERR_LOCK;
833                 tdb_transaction_cancel(tdb);
834                 return -1;
835         }
836
837         if (!(tdb->flags & TDB_NOSYNC)) {
838                 /* write the recovery data to the end of the file */
839                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
840                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
841                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
842                         tdb_transaction_cancel(tdb);
843                         return -1;
844                 }
845         }
846
847         /* expand the file to the new size if needed */
848         if (tdb->map_size != tdb->transaction->old_map_size) {
849                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
850                                              tdb->map_size - 
851                                              tdb->transaction->old_map_size) == -1) {
852                         tdb->ecode = TDB_ERR_IO;
853                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
854                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
855                         tdb_transaction_cancel(tdb);
856                         return -1;
857                 }
858                 tdb->map_size = tdb->transaction->old_map_size;
859                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
860         }
861
862         /* perform all the writes */
863         while (tdb->transaction->elements) {
864                 struct tdb_transaction_el *el = tdb->transaction->elements;
865
866                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
867                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
868                         
869                         /* we've overwritten part of the data and
870                            possibly expanded the file, so we need to
871                            run the crash recovery code */
872                         tdb->methods = methods;
873                         tdb_transaction_recover(tdb); 
874
875                         tdb_transaction_cancel(tdb);
876                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
877
878                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
879                         return -1;
880                 }
881                 tdb->transaction->elements = el->next;
882                 free(el->data); 
883                 free(el);
884         } 
885
886         if (!(tdb->flags & TDB_NOSYNC)) {
887                 /* ensure the new data is on disk */
888                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
889                         return -1;
890                 }
891
892                 /* remove the recovery marker */
893                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
894                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
895                         return -1;
896                 }
897
898                 /* ensure the recovery marker has been removed on disk */
899                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
900                         return -1;
901                 }
902         }
903
904         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
905
906         /*
907           TODO: maybe write to some dummy hdr field, or write to magic
908           offset without mmap, before the last sync, instead of the
909           utime() call
910         */
911
912         /* on some systems (like Linux 2.6.x) changes via mmap/msync
913            don't change the mtime of the file, this means the file may
914            not be backed up (as tdb rounding to block sizes means that
915            file size changes are quite rare too). The following forces
916            mtime changes when a transaction completes */
917 #ifdef HAVE_UTIME
918         utime(tdb->name, NULL);
919 #endif
920
921         /* use a transaction cancel to free memory and remove the
922            transaction locks */
923         tdb_transaction_cancel(tdb);
924         return 0;
925 }
926
927
928 /*
929   recover from an aborted transaction. Must be called with exclusive
930   database write access already established (including the global
931   lock to prevent new processes attaching)
932 */
933 int tdb_transaction_recover(struct tdb_context *tdb)
934 {
935         tdb_off_t recovery_head, recovery_eof;
936         unsigned char *data, *p;
937         u32 zero = 0;
938         struct list_struct rec;
939
940         /* find the recovery area */
941         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
942                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
943                 tdb->ecode = TDB_ERR_IO;
944                 return -1;
945         }
946
947         if (recovery_head == 0) {
948                 /* we have never allocated a recovery record */
949                 return 0;
950         }
951
952         /* read the recovery record */
953         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
954                                    sizeof(rec), DOCONV()) == -1) {
955                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
956                 tdb->ecode = TDB_ERR_IO;
957                 return -1;
958         }
959
960         if (rec.magic != TDB_RECOVERY_MAGIC) {
961                 /* there is no valid recovery data */
962                 return 0;
963         }
964
965         if (tdb->read_only) {
966                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
967                 tdb->ecode = TDB_ERR_CORRUPT;
968                 return -1;
969         }
970
971         recovery_eof = rec.key_len;
972
973         data = (unsigned char *)malloc(rec.data_len);
974         if (data == NULL) {
975                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
976                 tdb->ecode = TDB_ERR_OOM;
977                 return -1;
978         }
979
980         /* read the full recovery data */
981         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
982                                    rec.data_len, 0) == -1) {
983                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
984                 tdb->ecode = TDB_ERR_IO;
985                 return -1;
986         }
987
988         /* recover the file data */
989         p = data;
990         while (p+8 < data + rec.data_len) {
991                 u32 ofs, len;
992                 if (DOCONV()) {
993                         tdb_convert(p, 8);
994                 }
995                 memcpy(&ofs, p, 4);
996                 memcpy(&len, p+4, 4);
997
998                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
999                         free(data);
1000                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1001                         tdb->ecode = TDB_ERR_IO;
1002                         return -1;
1003                 }
1004                 p += 8 + len;
1005         }
1006
1007         free(data);
1008
1009         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1010                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1011                 tdb->ecode = TDB_ERR_IO;
1012                 return -1;
1013         }
1014
1015         /* if the recovery area is after the recovered eof then remove it */
1016         if (recovery_eof <= recovery_head) {
1017                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1018                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1019                         tdb->ecode = TDB_ERR_IO;
1020                         return -1;                      
1021                 }
1022         }
1023
1024         /* remove the recovery magic */
1025         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1026                           &zero) == -1) {
1027                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1028                 tdb->ecode = TDB_ERR_IO;
1029                 return -1;                      
1030         }
1031         
1032         /* reduce the file size to the old size */
1033         tdb_munmap(tdb);
1034         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1035                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1036                 tdb->ecode = TDB_ERR_IO;
1037                 return -1;                      
1038         }
1039         tdb->map_size = recovery_eof;
1040         tdb_mmap(tdb);
1041
1042         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1043                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1044                 tdb->ecode = TDB_ERR_IO;
1045                 return -1;
1046         }
1047
1048         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1049                  recovery_eof));
1050
1051         /* all done */
1052         return 0;
1053 }