f8a788f8573620d942f71fb736293ae57489afd8
[ira/wip.git] / source / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 #include "tdb_private.h"
28
29 /*
30   transaction design:
31
32   - only allow a single transaction at a time per database. This makes
33     using the transaction API simpler, as otherwise the caller would
34     have to cope with temporary failures in transactions that conflict
35     with other current transactions
36
37   - keep the transaction recovery information in the same file as the
38     database, using a special 'transaction recovery' record pointed at
39     by the header. This removes the need for extra journal files as
40     used by some other databases
41
42   - dynamically allocated the transaction recover record, re-using it
43     for subsequent transactions. If a larger record is needed then
44     tdb_free() the old record to place it on the normal tdb freelist
45     before allocating the new record
46
47   - during transactions, keep a linked list of writes all that have
48     been performed by intercepting all tdb_write() calls. The hooked
49     transaction versions of tdb_read() and tdb_write() check this
50     linked list and try to use the elements of the list in preference
51     to the real database.
52
53   - don't allow any locks to be held when a transaction starts,
54     otherwise we can end up with deadlock (plus lack of lock nesting
55     in posix locks would mean the lock is lost)
56
57   - if the caller gains a lock during the transaction but doesn't
58     release it then fail the commit
59
60   - allow for nested calls to tdb_transaction_start(), re-using the
61     existing transaction record. If the inner transaction is cancelled
62     then a subsequent commit will fail
63  
64   - keep a mirrored copy of the tdb hash chain heads to allow for the
65     fast hash heads scan on traverse, updating the mirrored copy in
66     the transaction version of tdb_write
67
68   - allow callers to mix transaction and non-transaction use of tdb,
69     although once a transaction is started then an exclusive lock is
70     gained until the transaction is committed or cancelled
71
72   - the commit stategy involves first saving away all modified data
73     into a linearised buffer in the transaction recovery area, then
74     marking the transaction recovery area with a magic value to
75     indicate a valid recovery record. In total 4 fsync/msync calls are
76     needed per commit to prevent race conditions. It might be possible
77     to reduce this to 3 or even 2 with some more work.
78
79   - check for a valid recovery record on open of the tdb, while the
80     global lock is held. Automatically recover from the transaction
81     recovery area if needed, then continue with the open as
82     usual. This allows for smooth crash recovery with no administrator
83     intervention.
84
85   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86     still available, but no transaction recovery area is used and no
87     fsync/msync calls are made.
88
89 */
90
91 struct tdb_transaction_el {
92         struct tdb_transaction_el *next, *prev;
93         tdb_off_t offset;
94         tdb_len_t length;
95         unsigned char *data;
96 };
97
98 /*
99   hold the context of any current transaction
100 */
101 struct tdb_transaction {
102         /* we keep a mirrored copy of the tdb hash heads here so
103            tdb_next_hash_chain() can operate efficiently */
104         u32 *hash_heads;
105
106         /* the original io methods - used to do IOs to the real db */
107         const struct tdb_methods *io_methods;
108
109         /* the list of transaction elements. We use a doubly linked
110            list with a last pointer to allow us to keep the list
111            ordered, with first element at the front of the list. It
112            needs to be doubly linked as the read/write traversals need
113            to be backwards, while the commit needs to be forwards */
114         struct tdb_transaction_el *elements, *elements_last;
115
116         /* non-zero when an internal transaction error has
117            occurred. All write operations will then fail until the
118            transaction is ended */
119         int transaction_error;
120
121         /* when inside a transaction we need to keep track of any
122            nested tdb_transaction_start() calls, as these are allowed,
123            but don't create a new transaction */
124         int nesting;
125
126         /* old file size before transaction */
127         tdb_len_t old_map_size;
128 };
129
130
131 /*
132   read while in a transaction. We need to check first if the data is in our list
133   of transaction elements, then if not do a real read
134 */
135 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
136                             tdb_len_t len, int cv)
137 {
138         struct tdb_transaction_el *el;
139
140         /* we need to walk the list backwards to get the most recent data */
141         for (el=tdb->transaction->elements_last;el;el=el->prev) {
142                 tdb_len_t partial;
143
144                 if (off+len <= el->offset) {
145                         continue;
146                 }
147                 if (off >= el->offset + el->length) {
148                         continue;
149                 }
150
151                 /* an overlapping read - needs to be split into up to
152                    2 reads and a memcpy */
153                 if (off < el->offset) {
154                         partial = el->offset - off;
155                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
156                                 goto fail;
157                         }
158                         len -= partial;
159                         off += partial;
160                         buf = (void *)(partial + (char *)buf);
161                 }
162                 if (off + len <= el->offset + el->length) {
163                         partial = len;
164                 } else {
165                         partial = el->offset + el->length - off;
166                 }
167                 memcpy(buf, el->data + (off - el->offset), partial);
168                 if (cv) {
169                         tdb_convert(buf, len);
170                 }
171                 len -= partial;
172                 off += partial;
173                 buf = (void *)(partial + (char *)buf);
174                 
175                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
176                         goto fail;
177                 }
178
179                 return 0;
180         }
181
182         /* its not in the transaction elements - do a real read */
183         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
184
185 fail:
186         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
187         tdb->ecode = TDB_ERR_IO;
188         tdb->transaction->transaction_error = 1;
189         return -1;
190 }
191
192
193 /*
194   write while in a transaction
195 */
196 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
197                              const void *buf, tdb_len_t len)
198 {
199         struct tdb_transaction_el *el, *best_el=NULL;
200
201         if (len == 0) {
202                 return 0;
203         }
204         
205         /* if the write is to a hash head, then update the transaction
206            hash heads */
207         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
208             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
209                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
210                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
211         }
212
213         /* first see if we can replace an existing entry */
214         for (el=tdb->transaction->elements_last;el;el=el->prev) {
215                 tdb_len_t partial;
216
217                 if (best_el == NULL && off == el->offset+el->length) {
218                         best_el = el;
219                 }
220
221                 if (off+len <= el->offset) {
222                         continue;
223                 }
224                 if (off >= el->offset + el->length) {
225                         continue;
226                 }
227
228                 /* an overlapping write - needs to be split into up to
229                    2 writes and a memcpy */
230                 if (off < el->offset) {
231                         partial = el->offset - off;
232                         if (transaction_write(tdb, off, buf, partial) != 0) {
233                                 goto fail;
234                         }
235                         len -= partial;
236                         off += partial;
237                         buf = (const void *)(partial + (const char *)buf);
238                 }
239                 if (off + len <= el->offset + el->length) {
240                         partial = len;
241                 } else {
242                         partial = el->offset + el->length - off;
243                 }
244                 memcpy(el->data + (off - el->offset), buf, partial);
245                 len -= partial;
246                 off += partial;
247                 buf = (const void *)(partial + (const char *)buf);
248                 
249                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
250                         goto fail;
251                 }
252
253                 return 0;
254         }
255
256         /* see if we can append the new entry to an existing entry */
257         if (best_el && best_el->offset + best_el->length == off && 
258             (off+len < tdb->transaction->old_map_size ||
259              off > tdb->transaction->old_map_size)) {
260                 unsigned char *data = best_el->data;
261                 el = best_el;
262                 el->data = (unsigned char *)realloc(el->data,
263                                                     el->length + len);
264                 if (el->data == NULL) {
265                         tdb->ecode = TDB_ERR_OOM;
266                         tdb->transaction->transaction_error = 1;
267                         el->data = data;
268                         return -1;
269                 }
270                 if (buf) {
271                         memcpy(el->data + el->length, buf, len);
272                 } else {
273                         memset(el->data + el->length, TDB_PAD_BYTE, len);
274                 }
275                 el->length += len;
276                 return 0;
277         }
278
279         /* add a new entry at the end of the list */
280         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
281         if (el == NULL) {
282                 tdb->ecode = TDB_ERR_OOM;
283                 tdb->transaction->transaction_error = 1;                
284                 return -1;
285         }
286         el->next = NULL;
287         el->prev = tdb->transaction->elements_last;
288         el->offset = off;
289         el->length = len;
290         el->data = (unsigned char *)malloc(len);
291         if (el->data == NULL) {
292                 free(el);
293                 tdb->ecode = TDB_ERR_OOM;
294                 tdb->transaction->transaction_error = 1;                
295                 return -1;
296         }
297         if (buf) {
298                 memcpy(el->data, buf, len);
299         } else {
300                 memset(el->data, TDB_PAD_BYTE, len);
301         }
302         if (el->prev) {
303                 el->prev->next = el;
304         } else {
305                 tdb->transaction->elements = el;
306         }
307         tdb->transaction->elements_last = el;
308         return 0;
309
310 fail:
311         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
312         tdb->ecode = TDB_ERR_IO;
313         tdb->transaction->transaction_error = 1;
314         return -1;
315 }
316
317 /*
318   accelerated hash chain head search, using the cached hash heads
319 */
320 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
321 {
322         u32 h = *chain;
323         for (;h < tdb->header.hash_size;h++) {
324                 /* the +1 takes account of the freelist */
325                 if (0 != tdb->transaction->hash_heads[h+1]) {
326                         break;
327                 }
328         }
329         (*chain) = h;
330 }
331
332 /*
333   out of bounds check during a transaction
334 */
335 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
336 {
337         if (len <= tdb->map_size) {
338                 return 0;
339         }
340         return TDB_ERRCODE(TDB_ERR_IO, -1);
341 }
342
343 /*
344   transaction version of tdb_expand().
345 */
346 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
347                                    tdb_off_t addition)
348 {
349         /* add a write to the transaction elements, so subsequent
350            reads see the zero data */
351         if (transaction_write(tdb, size, NULL, addition) != 0) {
352                 return -1;
353         }
354
355         return 0;
356 }
357
358 /*
359   brlock during a transaction - ignore them
360 */
361 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
362                               int rw_type, int lck_type, int probe, size_t len)
363 {
364         return 0;
365 }
366
367 static const struct tdb_methods transaction_methods = {
368         transaction_read,
369         transaction_write,
370         transaction_next_hash_chain,
371         transaction_oob,
372         transaction_expand_file,
373         transaction_brlock
374 };
375
376
377 /*
378   start a tdb transaction. No token is returned, as only a single
379   transaction is allowed to be pending per tdb_context
380 */
381 int tdb_transaction_start(struct tdb_context *tdb)
382 {
383         /* some sanity checks */
384         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
385                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
386                 tdb->ecode = TDB_ERR_EINVAL;
387                 return -1;
388         }
389
390         /* cope with nested tdb_transaction_start() calls */
391         if (tdb->transaction != NULL) {
392                 tdb->transaction->nesting++;
393                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
394                          tdb->transaction->nesting));
395                 return 0;
396         }
397
398         if (tdb->num_locks != 0 || tdb->global_lock.count) {
399                 /* the caller must not have any locks when starting a
400                    transaction as otherwise we'll be screwed by lack
401                    of nested locks in posix */
402                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
403                 tdb->ecode = TDB_ERR_LOCK;
404                 return -1;
405         }
406
407         if (tdb->travlocks.next != NULL) {
408                 /* you cannot use transactions inside a traverse (although you can use
409                    traverse inside a transaction) as otherwise you can end up with
410                    deadlock */
411                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
412                 tdb->ecode = TDB_ERR_LOCK;
413                 return -1;
414         }
415
416         tdb->transaction = (struct tdb_transaction *)
417                 calloc(sizeof(struct tdb_transaction), 1);
418         if (tdb->transaction == NULL) {
419                 tdb->ecode = TDB_ERR_OOM;
420                 return -1;
421         }
422
423         /* get the transaction write lock. This is a blocking lock. As
424            discussed with Volker, there are a number of ways we could
425            make this async, which we will probably do in the future */
426         if (tdb_transaction_lock(tdb, F_WRLCK) == -1) {
427                 SAFE_FREE(tdb->transaction);
428                 return -1;
429         }
430         
431         /* get a read lock from the freelist to the end of file. This
432            is upgraded to a write lock during the commit */
433         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
434                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
435                 tdb->ecode = TDB_ERR_LOCK;
436                 goto fail;
437         }
438
439         /* setup a copy of the hash table heads so the hash scan in
440            traverse can be fast */
441         tdb->transaction->hash_heads = (u32 *)
442                 calloc(tdb->header.hash_size+1, sizeof(u32));
443         if (tdb->transaction->hash_heads == NULL) {
444                 tdb->ecode = TDB_ERR_OOM;
445                 goto fail;
446         }
447         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
448                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
449                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
450                 tdb->ecode = TDB_ERR_IO;
451                 goto fail;
452         }
453
454         /* make sure we know about any file expansions already done by
455            anyone else */
456         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
457         tdb->transaction->old_map_size = tdb->map_size;
458
459         /* finally hook the io methods, replacing them with
460            transaction specific methods */
461         tdb->transaction->io_methods = tdb->methods;
462         tdb->methods = &transaction_methods;
463
464         /* by calling this transaction write here, we ensure that we don't grow the
465            transaction linked list due to hash table updates */
466         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
467                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
468                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
469                 tdb->ecode = TDB_ERR_IO;
470                 tdb->methods = tdb->transaction->io_methods;
471                 goto fail;
472         }
473
474         return 0;
475         
476 fail:
477         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
478         tdb_transaction_unlock(tdb);
479         SAFE_FREE(tdb->transaction->hash_heads);
480         SAFE_FREE(tdb->transaction);
481         return -1;
482 }
483
484
485 /*
486   cancel the current transaction
487 */
488 int tdb_transaction_cancel(struct tdb_context *tdb)
489 {       
490         if (tdb->transaction == NULL) {
491                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
492                 return -1;
493         }
494
495         if (tdb->transaction->nesting != 0) {
496                 tdb->transaction->transaction_error = 1;
497                 tdb->transaction->nesting--;
498                 return 0;
499         }               
500
501         tdb->map_size = tdb->transaction->old_map_size;
502
503         /* free all the transaction elements */
504         while (tdb->transaction->elements) {
505                 struct tdb_transaction_el *el = tdb->transaction->elements;
506                 tdb->transaction->elements = el->next;
507                 free(el->data);
508                 free(el);
509         }
510
511         /* remove any global lock created during the transaction */
512         if (tdb->global_lock.count != 0) {
513                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
514                 tdb->global_lock.count = 0;
515         }
516
517         /* remove any locks created during the transaction */
518         if (tdb->num_locks != 0) {
519                 int i;
520                 for (i=0;i<tdb->num_lockrecs;i++) {
521                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
522                                    F_UNLCK,F_SETLKW, 0, 1);
523                 }
524                 tdb->num_locks = 0;
525                 tdb->num_lockrecs = 0;
526                 SAFE_FREE(tdb->lockrecs);
527         }
528
529         /* restore the normal io methods */
530         tdb->methods = tdb->transaction->io_methods;
531
532         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
533         tdb_transaction_unlock(tdb);
534         SAFE_FREE(tdb->transaction->hash_heads);
535         SAFE_FREE(tdb->transaction);
536         
537         return 0;
538 }
539
540 /*
541   sync to disk
542 */
543 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
544 {       
545         if (fsync(tdb->fd) != 0) {
546                 tdb->ecode = TDB_ERR_IO;
547                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
548                 return -1;
549         }
550 #ifdef MS_SYNC
551         if (tdb->map_ptr) {
552                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
553                 if (msync(moffset + (char *)tdb->map_ptr, 
554                           length + (offset - moffset), MS_SYNC) != 0) {
555                         tdb->ecode = TDB_ERR_IO;
556                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
557                                  strerror(errno)));
558                         return -1;
559                 }
560         }
561 #endif
562         return 0;
563 }
564
565
566 /*
567   work out how much space the linearised recovery data will consume
568 */
569 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
570 {
571         struct tdb_transaction_el *el;
572         tdb_len_t recovery_size = 0;
573
574         recovery_size = sizeof(u32);
575         for (el=tdb->transaction->elements;el;el=el->next) {
576                 if (el->offset >= tdb->transaction->old_map_size) {
577                         continue;
578                 }
579                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
580         }
581
582         return recovery_size;
583 }
584
585 /*
586   allocate the recovery area, or use an existing recovery area if it is
587   large enough
588 */
589 static int tdb_recovery_allocate(struct tdb_context *tdb, 
590                                  tdb_len_t *recovery_size,
591                                  tdb_off_t *recovery_offset,
592                                  tdb_len_t *recovery_max_size)
593 {
594         struct list_struct rec;
595         const struct tdb_methods *methods = tdb->transaction->io_methods;
596         tdb_off_t recovery_head;
597
598         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
599                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
600                 return -1;
601         }
602
603         rec.rec_len = 0;
604
605         if (recovery_head != 0 && 
606             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
607                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
608                 return -1;
609         }
610
611         *recovery_size = tdb_recovery_size(tdb);
612
613         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
614                 /* it fits in the existing area */
615                 *recovery_max_size = rec.rec_len;
616                 *recovery_offset = recovery_head;
617                 return 0;
618         }
619
620         /* we need to free up the old recovery area, then allocate a
621            new one at the end of the file. Note that we cannot use
622            tdb_allocate() to allocate the new one as that might return
623            us an area that is being currently used (as of the start of
624            the transaction) */
625         if (recovery_head != 0) {
626                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
627                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
628                         return -1;
629                 }
630         }
631
632         /* the tdb_free() call might have increased the recovery size */
633         *recovery_size = tdb_recovery_size(tdb);
634
635         /* round up to a multiple of page size */
636         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
637         *recovery_offset = tdb->map_size;
638         recovery_head = *recovery_offset;
639
640         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
641                                      (tdb->map_size - tdb->transaction->old_map_size) +
642                                      sizeof(rec) + *recovery_max_size) == -1) {
643                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
644                 return -1;
645         }
646
647         /* remap the file (if using mmap) */
648         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
649
650         /* we have to reset the old map size so that we don't try to expand the file
651            again in the transaction commit, which would destroy the recovery area */
652         tdb->transaction->old_map_size = tdb->map_size;
653
654         /* write the recovery header offset and sync - we can sync without a race here
655            as the magic ptr in the recovery record has not been set */
656         CONVERT(recovery_head);
657         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
658                                &recovery_head, sizeof(tdb_off_t)) == -1) {
659                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
660                 return -1;
661         }
662
663         return 0;
664 }
665
666
667 /*
668   setup the recovery data that will be used on a crash during commit
669 */
670 static int transaction_setup_recovery(struct tdb_context *tdb, 
671                                       tdb_off_t *magic_offset)
672 {
673         struct tdb_transaction_el *el;
674         tdb_len_t recovery_size;
675         unsigned char *data, *p;
676         const struct tdb_methods *methods = tdb->transaction->io_methods;
677         struct list_struct *rec;
678         tdb_off_t recovery_offset, recovery_max_size;
679         tdb_off_t old_map_size = tdb->transaction->old_map_size;
680         u32 magic, tailer;
681
682         /*
683           check that the recovery area has enough space
684         */
685         if (tdb_recovery_allocate(tdb, &recovery_size, 
686                                   &recovery_offset, &recovery_max_size) == -1) {
687                 return -1;
688         }
689
690         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
691         if (data == NULL) {
692                 tdb->ecode = TDB_ERR_OOM;
693                 return -1;
694         }
695
696         rec = (struct list_struct *)data;
697         memset(rec, 0, sizeof(*rec));
698
699         rec->magic    = 0;
700         rec->data_len = recovery_size;
701         rec->rec_len  = recovery_max_size;
702         rec->key_len  = old_map_size;
703         CONVERT(rec);
704
705         /* build the recovery data into a single blob to allow us to do a single
706            large write, which should be more efficient */
707         p = data + sizeof(*rec);
708         for (el=tdb->transaction->elements;el;el=el->next) {
709                 if (el->offset >= old_map_size) {
710                         continue;
711                 }
712                 if (el->offset + el->length > tdb->transaction->old_map_size) {
713                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
714                         free(data);
715                         tdb->ecode = TDB_ERR_CORRUPT;
716                         return -1;
717                 }
718                 memcpy(p, &el->offset, 4);
719                 memcpy(p+4, &el->length, 4);
720                 if (DOCONV()) {
721                         tdb_convert(p, 8);
722                 }
723                 /* the recovery area contains the old data, not the
724                    new data, so we have to call the original tdb_read
725                    method to get it */
726                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
727                         free(data);
728                         tdb->ecode = TDB_ERR_IO;
729                         return -1;
730                 }
731                 p += 8 + el->length;
732         }
733
734         /* and the tailer */
735         tailer = sizeof(*rec) + recovery_max_size;
736         memcpy(p, &tailer, 4);
737         CONVERT(p);
738
739         /* write the recovery data to the recovery area */
740         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
741                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
742                 free(data);
743                 tdb->ecode = TDB_ERR_IO;
744                 return -1;
745         }
746
747         /* as we don't have ordered writes, we have to sync the recovery
748            data before we update the magic to indicate that the recovery
749            data is present */
750         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
751                 free(data);
752                 return -1;
753         }
754
755         free(data);
756
757         magic = TDB_RECOVERY_MAGIC;
758         CONVERT(magic);
759
760         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
761
762         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
763                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
764                 tdb->ecode = TDB_ERR_IO;
765                 return -1;
766         }
767
768         /* ensure the recovery magic marker is on disk */
769         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
770                 return -1;
771         }
772
773         return 0;
774 }
775
776 /*
777   commit the current transaction
778 */
779 int tdb_transaction_commit(struct tdb_context *tdb)
780 {       
781         const struct tdb_methods *methods;
782         tdb_off_t magic_offset = 0;
783         u32 zero = 0;
784
785         if (tdb->transaction == NULL) {
786                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
787                 return -1;
788         }
789
790         if (tdb->transaction->transaction_error) {
791                 tdb->ecode = TDB_ERR_IO;
792                 tdb_transaction_cancel(tdb);
793                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
794                 return -1;
795         }
796
797         if (tdb->transaction->nesting != 0) {
798                 tdb->transaction->nesting--;
799                 return 0;
800         }               
801
802         /* check for a null transaction */
803         if (tdb->transaction->elements == NULL) {
804                 tdb_transaction_cancel(tdb);
805                 return 0;
806         }
807
808         methods = tdb->transaction->io_methods;
809         
810         /* if there are any locks pending then the caller has not
811            nested their locks properly, so fail the transaction */
812         if (tdb->num_locks || tdb->global_lock.count) {
813                 tdb->ecode = TDB_ERR_LOCK;
814                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
815                 tdb_transaction_cancel(tdb);
816                 return -1;
817         }
818
819         /* upgrade the main transaction lock region to a write lock */
820         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
821                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
822                 tdb->ecode = TDB_ERR_LOCK;
823                 tdb_transaction_cancel(tdb);
824                 return -1;
825         }
826
827         /* get the global lock - this prevents new users attaching to the database
828            during the commit */
829         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
830                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
831                 tdb->ecode = TDB_ERR_LOCK;
832                 tdb_transaction_cancel(tdb);
833                 return -1;
834         }
835
836         if (!(tdb->flags & TDB_NOSYNC)) {
837                 /* write the recovery data to the end of the file */
838                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
839                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
840                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
841                         tdb_transaction_cancel(tdb);
842                         return -1;
843                 }
844         }
845
846         /* expand the file to the new size if needed */
847         if (tdb->map_size != tdb->transaction->old_map_size) {
848                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
849                                              tdb->map_size - 
850                                              tdb->transaction->old_map_size) == -1) {
851                         tdb->ecode = TDB_ERR_IO;
852                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
853                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
854                         tdb_transaction_cancel(tdb);
855                         return -1;
856                 }
857                 tdb->map_size = tdb->transaction->old_map_size;
858                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
859         }
860
861         /* perform all the writes */
862         while (tdb->transaction->elements) {
863                 struct tdb_transaction_el *el = tdb->transaction->elements;
864
865                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
866                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
867                         
868                         /* we've overwritten part of the data and
869                            possibly expanded the file, so we need to
870                            run the crash recovery code */
871                         tdb->methods = methods;
872                         tdb_transaction_recover(tdb); 
873
874                         tdb_transaction_cancel(tdb);
875                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
876
877                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
878                         return -1;
879                 }
880                 tdb->transaction->elements = el->next;
881                 free(el->data); 
882                 free(el);
883         } 
884
885         if (!(tdb->flags & TDB_NOSYNC)) {
886                 /* ensure the new data is on disk */
887                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
888                         return -1;
889                 }
890
891                 /* remove the recovery marker */
892                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
893                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
894                         return -1;
895                 }
896
897                 /* ensure the recovery marker has been removed on disk */
898                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
899                         return -1;
900                 }
901         }
902
903         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
904
905         /*
906           TODO: maybe write to some dummy hdr field, or write to magic
907           offset without mmap, before the last sync, instead of the
908           utime() call
909         */
910
911         /* on some systems (like Linux 2.6.x) changes via mmap/msync
912            don't change the mtime of the file, this means the file may
913            not be backed up (as tdb rounding to block sizes means that
914            file size changes are quite rare too). The following forces
915            mtime changes when a transaction completes */
916 #ifdef HAVE_UTIME
917         utime(tdb->name, NULL);
918 #endif
919
920         /* use a transaction cancel to free memory and remove the
921            transaction locks */
922         tdb_transaction_cancel(tdb);
923         return 0;
924 }
925
926
927 /*
928   recover from an aborted transaction. Must be called with exclusive
929   database write access already established (including the global
930   lock to prevent new processes attaching)
931 */
932 int tdb_transaction_recover(struct tdb_context *tdb)
933 {
934         tdb_off_t recovery_head, recovery_eof;
935         unsigned char *data, *p;
936         u32 zero = 0;
937         struct list_struct rec;
938
939         /* find the recovery area */
940         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
941                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
942                 tdb->ecode = TDB_ERR_IO;
943                 return -1;
944         }
945
946         if (recovery_head == 0) {
947                 /* we have never allocated a recovery record */
948                 return 0;
949         }
950
951         /* read the recovery record */
952         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
953                                    sizeof(rec), DOCONV()) == -1) {
954                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
955                 tdb->ecode = TDB_ERR_IO;
956                 return -1;
957         }
958
959         if (rec.magic != TDB_RECOVERY_MAGIC) {
960                 /* there is no valid recovery data */
961                 return 0;
962         }
963
964         if (tdb->read_only) {
965                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
966                 tdb->ecode = TDB_ERR_CORRUPT;
967                 return -1;
968         }
969
970         recovery_eof = rec.key_len;
971
972         data = (unsigned char *)malloc(rec.data_len);
973         if (data == NULL) {
974                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
975                 tdb->ecode = TDB_ERR_OOM;
976                 return -1;
977         }
978
979         /* read the full recovery data */
980         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
981                                    rec.data_len, 0) == -1) {
982                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
983                 tdb->ecode = TDB_ERR_IO;
984                 return -1;
985         }
986
987         /* recover the file data */
988         p = data;
989         while (p+8 < data + rec.data_len) {
990                 u32 ofs, len;
991                 if (DOCONV()) {
992                         tdb_convert(p, 8);
993                 }
994                 memcpy(&ofs, p, 4);
995                 memcpy(&len, p+4, 4);
996
997                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
998                         free(data);
999                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1000                         tdb->ecode = TDB_ERR_IO;
1001                         return -1;
1002                 }
1003                 p += 8 + len;
1004         }
1005
1006         free(data);
1007
1008         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1009                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1010                 tdb->ecode = TDB_ERR_IO;
1011                 return -1;
1012         }
1013
1014         /* if the recovery area is after the recovered eof then remove it */
1015         if (recovery_eof <= recovery_head) {
1016                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1017                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1018                         tdb->ecode = TDB_ERR_IO;
1019                         return -1;                      
1020                 }
1021         }
1022
1023         /* remove the recovery magic */
1024         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1025                           &zero) == -1) {
1026                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1027                 tdb->ecode = TDB_ERR_IO;
1028                 return -1;                      
1029         }
1030         
1031         /* reduce the file size to the old size */
1032         tdb_munmap(tdb);
1033         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1034                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1035                 tdb->ecode = TDB_ERR_IO;
1036                 return -1;                      
1037         }
1038         tdb->map_size = recovery_eof;
1039         tdb_mmap(tdb);
1040
1041         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1042                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1043                 tdb->ecode = TDB_ERR_IO;
1044                 return -1;
1045         }
1046
1047         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1048                  recovery_eof));
1049
1050         /* all done */
1051         return 0;
1052 }