r23798: updated old Temple Place FSF addresses to new URL
[nivanova/samba-autobuild/.git] / source3 / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "tdb_private.h"
27
28 /*
29   transaction design:
30
31   - only allow a single transaction at a time per database. This makes
32     using the transaction API simpler, as otherwise the caller would
33     have to cope with temporary failures in transactions that conflict
34     with other current transactions
35
36   - keep the transaction recovery information in the same file as the
37     database, using a special 'transaction recovery' record pointed at
38     by the header. This removes the need for extra journal files as
39     used by some other databases
40
41   - dynamically allocated the transaction recover record, re-using it
42     for subsequent transactions. If a larger record is needed then
43     tdb_free() the old record to place it on the normal tdb freelist
44     before allocating the new record
45
46   - during transactions, keep a linked list of writes all that have
47     been performed by intercepting all tdb_write() calls. The hooked
48     transaction versions of tdb_read() and tdb_write() check this
49     linked list and try to use the elements of the list in preference
50     to the real database.
51
52   - don't allow any locks to be held when a transaction starts,
53     otherwise we can end up with deadlock (plus lack of lock nesting
54     in posix locks would mean the lock is lost)
55
56   - if the caller gains a lock during the transaction but doesn't
57     release it then fail the commit
58
59   - allow for nested calls to tdb_transaction_start(), re-using the
60     existing transaction record. If the inner transaction is cancelled
61     then a subsequent commit will fail
62  
63   - keep a mirrored copy of the tdb hash chain heads to allow for the
64     fast hash heads scan on traverse, updating the mirrored copy in
65     the transaction version of tdb_write
66
67   - allow callers to mix transaction and non-transaction use of tdb,
68     although once a transaction is started then an exclusive lock is
69     gained until the transaction is committed or cancelled
70
71   - the commit stategy involves first saving away all modified data
72     into a linearised buffer in the transaction recovery area, then
73     marking the transaction recovery area with a magic value to
74     indicate a valid recovery record. In total 4 fsync/msync calls are
75     needed per commit to prevent race conditions. It might be possible
76     to reduce this to 3 or even 2 with some more work.
77
78   - check for a valid recovery record on open of the tdb, while the
79     global lock is held. Automatically recover from the transaction
80     recovery area if needed, then continue with the open as
81     usual. This allows for smooth crash recovery with no administrator
82     intervention.
83
84   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
85     still available, but no transaction recovery area is used and no
86     fsync/msync calls are made.
87
88 */
89
90 struct tdb_transaction_el {
91         struct tdb_transaction_el *next, *prev;
92         tdb_off_t offset;
93         tdb_len_t length;
94         unsigned char *data;
95 };
96
97 /*
98   hold the context of any current transaction
99 */
100 struct tdb_transaction {
101         /* we keep a mirrored copy of the tdb hash heads here so
102            tdb_next_hash_chain() can operate efficiently */
103         u32 *hash_heads;
104
105         /* the original io methods - used to do IOs to the real db */
106         const struct tdb_methods *io_methods;
107
108         /* the list of transaction elements. We use a doubly linked
109            list with a last pointer to allow us to keep the list
110            ordered, with first element at the front of the list. It
111            needs to be doubly linked as the read/write traversals need
112            to be backwards, while the commit needs to be forwards */
113         struct tdb_transaction_el *elements, *elements_last;
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* old file size before transaction */
126         tdb_len_t old_map_size;
127 };
128
129
130 /*
131   read while in a transaction. We need to check first if the data is in our list
132   of transaction elements, then if not do a real read
133 */
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
135                             tdb_len_t len, int cv)
136 {
137         struct tdb_transaction_el *el;
138
139         /* we need to walk the list backwards to get the most recent data */
140         for (el=tdb->transaction->elements_last;el;el=el->prev) {
141                 tdb_len_t partial;
142
143                 if (off+len <= el->offset) {
144                         continue;
145                 }
146                 if (off >= el->offset + el->length) {
147                         continue;
148                 }
149
150                 /* an overlapping read - needs to be split into up to
151                    2 reads and a memcpy */
152                 if (off < el->offset) {
153                         partial = el->offset - off;
154                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155                                 goto fail;
156                         }
157                         len -= partial;
158                         off += partial;
159                         buf = (void *)(partial + (char *)buf);
160                 }
161                 if (off + len <= el->offset + el->length) {
162                         partial = len;
163                 } else {
164                         partial = el->offset + el->length - off;
165                 }
166                 memcpy(buf, el->data + (off - el->offset), partial);
167                 if (cv) {
168                         tdb_convert(buf, len);
169                 }
170                 len -= partial;
171                 off += partial;
172                 buf = (void *)(partial + (char *)buf);
173                 
174                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177
178                 return 0;
179         }
180
181         /* its not in the transaction elements - do a real read */
182         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183
184 fail:
185         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
186         tdb->ecode = TDB_ERR_IO;
187         tdb->transaction->transaction_error = 1;
188         return -1;
189 }
190
191
192 /*
193   write while in a transaction
194 */
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
196                              const void *buf, tdb_len_t len)
197 {
198         struct tdb_transaction_el *el, *best_el=NULL;
199
200         if (len == 0) {
201                 return 0;
202         }
203         
204         /* if the write is to a hash head, then update the transaction
205            hash heads */
206         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
210         }
211
212         /* first see if we can replace an existing entry */
213         for (el=tdb->transaction->elements_last;el;el=el->prev) {
214                 tdb_len_t partial;
215
216                 if (best_el == NULL && off == el->offset+el->length) {
217                         best_el = el;
218                 }
219
220                 if (off+len <= el->offset) {
221                         continue;
222                 }
223                 if (off >= el->offset + el->length) {
224                         continue;
225                 }
226
227                 /* an overlapping write - needs to be split into up to
228                    2 writes and a memcpy */
229                 if (off < el->offset) {
230                         partial = el->offset - off;
231                         if (transaction_write(tdb, off, buf, partial) != 0) {
232                                 goto fail;
233                         }
234                         len -= partial;
235                         off += partial;
236                         buf = (const void *)(partial + (const char *)buf);
237                 }
238                 if (off + len <= el->offset + el->length) {
239                         partial = len;
240                 } else {
241                         partial = el->offset + el->length - off;
242                 }
243                 memcpy(el->data + (off - el->offset), buf, partial);
244                 len -= partial;
245                 off += partial;
246                 buf = (const void *)(partial + (const char *)buf);
247                 
248                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
249                         goto fail;
250                 }
251
252                 return 0;
253         }
254
255         /* see if we can append the new entry to an existing entry */
256         if (best_el && best_el->offset + best_el->length == off && 
257             (off+len < tdb->transaction->old_map_size ||
258              off > tdb->transaction->old_map_size)) {
259                 unsigned char *data = best_el->data;
260                 el = best_el;
261                 el->data = (unsigned char *)realloc(el->data,
262                                                     el->length + len);
263                 if (el->data == NULL) {
264                         tdb->ecode = TDB_ERR_OOM;
265                         tdb->transaction->transaction_error = 1;
266                         el->data = data;
267                         return -1;
268                 }
269                 if (buf) {
270                         memcpy(el->data + el->length, buf, len);
271                 } else {
272                         memset(el->data + el->length, TDB_PAD_BYTE, len);
273                 }
274                 el->length += len;
275                 return 0;
276         }
277
278         /* add a new entry at the end of the list */
279         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
280         if (el == NULL) {
281                 tdb->ecode = TDB_ERR_OOM;
282                 tdb->transaction->transaction_error = 1;                
283                 return -1;
284         }
285         el->next = NULL;
286         el->prev = tdb->transaction->elements_last;
287         el->offset = off;
288         el->length = len;
289         el->data = (unsigned char *)malloc(len);
290         if (el->data == NULL) {
291                 free(el);
292                 tdb->ecode = TDB_ERR_OOM;
293                 tdb->transaction->transaction_error = 1;                
294                 return -1;
295         }
296         if (buf) {
297                 memcpy(el->data, buf, len);
298         } else {
299                 memset(el->data, TDB_PAD_BYTE, len);
300         }
301         if (el->prev) {
302                 el->prev->next = el;
303         } else {
304                 tdb->transaction->elements = el;
305         }
306         tdb->transaction->elements_last = el;
307         return 0;
308
309 fail:
310         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
311         tdb->ecode = TDB_ERR_IO;
312         tdb->transaction->transaction_error = 1;
313         return -1;
314 }
315
316 /*
317   accelerated hash chain head search, using the cached hash heads
318 */
319 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
320 {
321         u32 h = *chain;
322         for (;h < tdb->header.hash_size;h++) {
323                 /* the +1 takes account of the freelist */
324                 if (0 != tdb->transaction->hash_heads[h+1]) {
325                         break;
326                 }
327         }
328         (*chain) = h;
329 }
330
331 /*
332   out of bounds check during a transaction
333 */
334 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
335 {
336         if (len <= tdb->map_size) {
337                 return 0;
338         }
339         return TDB_ERRCODE(TDB_ERR_IO, -1);
340 }
341
342 /*
343   transaction version of tdb_expand().
344 */
345 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
346                                    tdb_off_t addition)
347 {
348         /* add a write to the transaction elements, so subsequent
349            reads see the zero data */
350         if (transaction_write(tdb, size, NULL, addition) != 0) {
351                 return -1;
352         }
353
354         return 0;
355 }
356
357 /*
358   brlock during a transaction - ignore them
359 */
360 static int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
361                               int rw_type, int lck_type, int probe, size_t len)
362 {
363         return 0;
364 }
365
366 static const struct tdb_methods transaction_methods = {
367         transaction_read,
368         transaction_write,
369         transaction_next_hash_chain,
370         transaction_oob,
371         transaction_expand_file,
372         transaction_brlock
373 };
374
375
376 /*
377   start a tdb transaction. No token is returned, as only a single
378   transaction is allowed to be pending per tdb_context
379 */
380 int tdb_transaction_start(struct tdb_context *tdb)
381 {
382         /* some sanity checks */
383         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
384                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
385                 tdb->ecode = TDB_ERR_EINVAL;
386                 return -1;
387         }
388
389         /* cope with nested tdb_transaction_start() calls */
390         if (tdb->transaction != NULL) {
391                 tdb->transaction->nesting++;
392                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
393                          tdb->transaction->nesting));
394                 return 0;
395         }
396
397         if (tdb->num_locks != 0 || tdb->global_lock.count) {
398                 /* the caller must not have any locks when starting a
399                    transaction as otherwise we'll be screwed by lack
400                    of nested locks in posix */
401                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
402                 tdb->ecode = TDB_ERR_LOCK;
403                 return -1;
404         }
405
406         if (tdb->travlocks.next != NULL) {
407                 /* you cannot use transactions inside a traverse (although you can use
408                    traverse inside a transaction) as otherwise you can end up with
409                    deadlock */
410                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
411                 tdb->ecode = TDB_ERR_LOCK;
412                 return -1;
413         }
414
415         tdb->transaction = (struct tdb_transaction *)
416                 calloc(sizeof(struct tdb_transaction), 1);
417         if (tdb->transaction == NULL) {
418                 tdb->ecode = TDB_ERR_OOM;
419                 return -1;
420         }
421
422         /* get the transaction write lock. This is a blocking lock. As
423            discussed with Volker, there are a number of ways we could
424            make this async, which we will probably do in the future */
425         if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
426                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
427                 tdb->ecode = TDB_ERR_LOCK;
428                 SAFE_FREE(tdb->transaction);
429                 return -1;
430         }
431         
432         /* get a read lock from the freelist to the end of file. This
433            is upgraded to a write lock during the commit */
434         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
435                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
436                 tdb->ecode = TDB_ERR_LOCK;
437                 goto fail;
438         }
439
440         /* setup a copy of the hash table heads so the hash scan in
441            traverse can be fast */
442         tdb->transaction->hash_heads = (u32 *)
443                 calloc(tdb->header.hash_size+1, sizeof(u32));
444         if (tdb->transaction->hash_heads == NULL) {
445                 tdb->ecode = TDB_ERR_OOM;
446                 goto fail;
447         }
448         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
449                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
450                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
451                 tdb->ecode = TDB_ERR_IO;
452                 goto fail;
453         }
454
455         /* make sure we know about any file expansions already done by
456            anyone else */
457         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
458         tdb->transaction->old_map_size = tdb->map_size;
459
460         /* finally hook the io methods, replacing them with
461            transaction specific methods */
462         tdb->transaction->io_methods = tdb->methods;
463         tdb->methods = &transaction_methods;
464
465         /* by calling this transaction write here, we ensure that we don't grow the
466            transaction linked list due to hash table updates */
467         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
468                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
469                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
470                 tdb->ecode = TDB_ERR_IO;
471                 goto fail;
472         }
473
474         return 0;
475         
476 fail:
477         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
478         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
479         SAFE_FREE(tdb->transaction->hash_heads);
480         SAFE_FREE(tdb->transaction);
481         return -1;
482 }
483
484
485 /*
486   cancel the current transaction
487 */
488 int tdb_transaction_cancel(struct tdb_context *tdb)
489 {       
490         if (tdb->transaction == NULL) {
491                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
492                 return -1;
493         }
494
495         if (tdb->transaction->nesting != 0) {
496                 tdb->transaction->transaction_error = 1;
497                 tdb->transaction->nesting--;
498                 return 0;
499         }               
500
501         tdb->map_size = tdb->transaction->old_map_size;
502
503         /* free all the transaction elements */
504         while (tdb->transaction->elements) {
505                 struct tdb_transaction_el *el = tdb->transaction->elements;
506                 tdb->transaction->elements = el->next;
507                 free(el->data);
508                 free(el);
509         }
510
511         /* remove any global lock created during the transaction */
512         if (tdb->global_lock.count != 0) {
513                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
514                 tdb->global_lock.count = 0;
515         }
516
517         /* remove any locks created during the transaction */
518         if (tdb->num_locks != 0) {
519                 int i;
520                 for (i=0;i<tdb->num_lockrecs;i++) {
521                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
522                                    F_UNLCK,F_SETLKW, 0, 1);
523                 }
524                 tdb->num_locks = 0;
525                 tdb->num_lockrecs = 0;
526                 SAFE_FREE(tdb->lockrecs);
527         }
528
529         /* restore the normal io methods */
530         tdb->methods = tdb->transaction->io_methods;
531
532         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
533         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
534         SAFE_FREE(tdb->transaction->hash_heads);
535         SAFE_FREE(tdb->transaction);
536         
537         return 0;
538 }
539
540 /*
541   sync to disk
542 */
543 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
544 {       
545         if (fsync(tdb->fd) != 0) {
546                 tdb->ecode = TDB_ERR_IO;
547                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
548                 return -1;
549         }
550 #ifdef MS_SYNC
551         if (tdb->map_ptr) {
552                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
553                 if (msync(moffset + (char *)tdb->map_ptr, 
554                           length + (offset - moffset), MS_SYNC) != 0) {
555                         tdb->ecode = TDB_ERR_IO;
556                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
557                                  strerror(errno)));
558                         return -1;
559                 }
560         }
561 #endif
562         return 0;
563 }
564
565
566 /*
567   work out how much space the linearised recovery data will consume
568 */
569 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
570 {
571         struct tdb_transaction_el *el;
572         tdb_len_t recovery_size = 0;
573
574         recovery_size = sizeof(u32);
575         for (el=tdb->transaction->elements;el;el=el->next) {
576                 if (el->offset >= tdb->transaction->old_map_size) {
577                         continue;
578                 }
579                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
580         }
581
582         return recovery_size;
583 }
584
585 /*
586   allocate the recovery area, or use an existing recovery area if it is
587   large enough
588 */
589 static int tdb_recovery_allocate(struct tdb_context *tdb, 
590                                  tdb_len_t *recovery_size,
591                                  tdb_off_t *recovery_offset,
592                                  tdb_len_t *recovery_max_size)
593 {
594         struct list_struct rec;
595         const struct tdb_methods *methods = tdb->transaction->io_methods;
596         tdb_off_t recovery_head;
597
598         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
599                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
600                 return -1;
601         }
602
603         rec.rec_len = 0;
604
605         if (recovery_head != 0 && 
606             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
607                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
608                 return -1;
609         }
610
611         *recovery_size = tdb_recovery_size(tdb);
612
613         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
614                 /* it fits in the existing area */
615                 *recovery_max_size = rec.rec_len;
616                 *recovery_offset = recovery_head;
617                 return 0;
618         }
619
620         /* we need to free up the old recovery area, then allocate a
621            new one at the end of the file. Note that we cannot use
622            tdb_allocate() to allocate the new one as that might return
623            us an area that is being currently used (as of the start of
624            the transaction) */
625         if (recovery_head != 0) {
626                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
627                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
628                         return -1;
629                 }
630         }
631
632         /* the tdb_free() call might have increased the recovery size */
633         *recovery_size = tdb_recovery_size(tdb);
634
635         /* round up to a multiple of page size */
636         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
637         *recovery_offset = tdb->map_size;
638         recovery_head = *recovery_offset;
639
640         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
641                                      (tdb->map_size - tdb->transaction->old_map_size) +
642                                      sizeof(rec) + *recovery_max_size) == -1) {
643                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
644                 return -1;
645         }
646
647         /* remap the file (if using mmap) */
648         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
649
650         /* we have to reset the old map size so that we don't try to expand the file
651            again in the transaction commit, which would destroy the recovery area */
652         tdb->transaction->old_map_size = tdb->map_size;
653
654         /* write the recovery header offset and sync - we can sync without a race here
655            as the magic ptr in the recovery record has not been set */
656         CONVERT(recovery_head);
657         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
658                                &recovery_head, sizeof(tdb_off_t)) == -1) {
659                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
660                 return -1;
661         }
662
663         return 0;
664 }
665
666
667 /*
668   setup the recovery data that will be used on a crash during commit
669 */
670 static int transaction_setup_recovery(struct tdb_context *tdb, 
671                                       tdb_off_t *magic_offset)
672 {
673         struct tdb_transaction_el *el;
674         tdb_len_t recovery_size;
675         unsigned char *data, *p;
676         const struct tdb_methods *methods = tdb->transaction->io_methods;
677         struct list_struct *rec;
678         tdb_off_t recovery_offset, recovery_max_size;
679         tdb_off_t old_map_size = tdb->transaction->old_map_size;
680         u32 magic, tailer;
681
682         /*
683           check that the recovery area has enough space
684         */
685         if (tdb_recovery_allocate(tdb, &recovery_size, 
686                                   &recovery_offset, &recovery_max_size) == -1) {
687                 return -1;
688         }
689
690         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
691         if (data == NULL) {
692                 tdb->ecode = TDB_ERR_OOM;
693                 return -1;
694         }
695
696         rec = (struct list_struct *)data;
697         memset(rec, 0, sizeof(*rec));
698
699         rec->magic    = 0;
700         rec->data_len = recovery_size;
701         rec->rec_len  = recovery_max_size;
702         rec->key_len  = old_map_size;
703         CONVERT(rec);
704
705         /* build the recovery data into a single blob to allow us to do a single
706            large write, which should be more efficient */
707         p = data + sizeof(*rec);
708         for (el=tdb->transaction->elements;el;el=el->next) {
709                 if (el->offset >= old_map_size) {
710                         continue;
711                 }
712                 if (el->offset + el->length > tdb->transaction->old_map_size) {
713                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
714                         free(data);
715                         tdb->ecode = TDB_ERR_CORRUPT;
716                         return -1;
717                 }
718                 memcpy(p, &el->offset, 4);
719                 memcpy(p+4, &el->length, 4);
720                 if (DOCONV()) {
721                         tdb_convert(p, 8);
722                 }
723                 /* the recovery area contains the old data, not the
724                    new data, so we have to call the original tdb_read
725                    method to get it */
726                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
727                         free(data);
728                         tdb->ecode = TDB_ERR_IO;
729                         return -1;
730                 }
731                 p += 8 + el->length;
732         }
733
734         /* and the tailer */
735         tailer = sizeof(*rec) + recovery_max_size;
736         memcpy(p, &tailer, 4);
737         CONVERT(p);
738
739         /* write the recovery data to the recovery area */
740         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
741                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
742                 free(data);
743                 tdb->ecode = TDB_ERR_IO;
744                 return -1;
745         }
746
747         /* as we don't have ordered writes, we have to sync the recovery
748            data before we update the magic to indicate that the recovery
749            data is present */
750         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
751                 free(data);
752                 return -1;
753         }
754
755         free(data);
756
757         magic = TDB_RECOVERY_MAGIC;
758         CONVERT(magic);
759
760         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
761
762         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
763                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
764                 tdb->ecode = TDB_ERR_IO;
765                 return -1;
766         }
767
768         /* ensure the recovery magic marker is on disk */
769         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
770                 return -1;
771         }
772
773         return 0;
774 }
775
776 /*
777   commit the current transaction
778 */
779 int tdb_transaction_commit(struct tdb_context *tdb)
780 {       
781         const struct tdb_methods *methods;
782         tdb_off_t magic_offset = 0;
783         u32 zero = 0;
784
785         if (tdb->transaction == NULL) {
786                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
787                 return -1;
788         }
789
790         if (tdb->transaction->transaction_error) {
791                 tdb->ecode = TDB_ERR_IO;
792                 tdb_transaction_cancel(tdb);
793                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
794                 return -1;
795         }
796
797         if (tdb->transaction->nesting != 0) {
798                 tdb->transaction->nesting--;
799                 return 0;
800         }               
801
802         /* check for a null transaction */
803         if (tdb->transaction->elements == NULL) {
804                 tdb_transaction_cancel(tdb);
805                 return 0;
806         }
807
808         methods = tdb->transaction->io_methods;
809         
810         /* if there are any locks pending then the caller has not
811            nested their locks properly, so fail the transaction */
812         if (tdb->num_locks || tdb->global_lock.count) {
813                 tdb->ecode = TDB_ERR_LOCK;
814                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
815                 tdb_transaction_cancel(tdb);
816                 return -1;
817         }
818
819         /* upgrade the main transaction lock region to a write lock */
820         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
821                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
822                 tdb->ecode = TDB_ERR_LOCK;
823                 tdb_transaction_cancel(tdb);
824                 return -1;
825         }
826
827         /* get the global lock - this prevents new users attaching to the database
828            during the commit */
829         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
830                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
831                 tdb->ecode = TDB_ERR_LOCK;
832                 tdb_transaction_cancel(tdb);
833                 return -1;
834         }
835
836         if (!(tdb->flags & TDB_NOSYNC)) {
837                 /* write the recovery data to the end of the file */
838                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
839                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
840                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
841                         tdb_transaction_cancel(tdb);
842                         return -1;
843                 }
844         }
845
846         /* expand the file to the new size if needed */
847         if (tdb->map_size != tdb->transaction->old_map_size) {
848                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
849                                              tdb->map_size - 
850                                              tdb->transaction->old_map_size) == -1) {
851                         tdb->ecode = TDB_ERR_IO;
852                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
853                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
854                         tdb_transaction_cancel(tdb);
855                         return -1;
856                 }
857                 tdb->map_size = tdb->transaction->old_map_size;
858                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
859         }
860
861         /* perform all the writes */
862         while (tdb->transaction->elements) {
863                 struct tdb_transaction_el *el = tdb->transaction->elements;
864
865                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
866                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
867                         
868                         /* we've overwritten part of the data and
869                            possibly expanded the file, so we need to
870                            run the crash recovery code */
871                         tdb->methods = methods;
872                         tdb_transaction_recover(tdb); 
873
874                         tdb_transaction_cancel(tdb);
875                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
876
877                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
878                         return -1;
879                 }
880                 tdb->transaction->elements = el->next;
881                 free(el->data); 
882                 free(el);
883         } 
884
885         if (!(tdb->flags & TDB_NOSYNC)) {
886                 /* ensure the new data is on disk */
887                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
888                         return -1;
889                 }
890
891                 /* remove the recovery marker */
892                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
893                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
894                         return -1;
895                 }
896
897                 /* ensure the recovery marker has been removed on disk */
898                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
899                         return -1;
900                 }
901         }
902
903         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
904
905         /*
906           TODO: maybe write to some dummy hdr field, or write to magic
907           offset without mmap, before the last sync, instead of the
908           utime() call
909         */
910
911         /* on some systems (like Linux 2.6.x) changes via mmap/msync
912            don't change the mtime of the file, this means the file may
913            not be backed up (as tdb rounding to block sizes means that
914            file size changes are quite rare too). The following forces
915            mtime changes when a transaction completes */
916 #ifdef HAVE_UTIME
917         utime(tdb->name, NULL);
918 #endif
919
920         /* use a transaction cancel to free memory and remove the
921            transaction locks */
922         tdb_transaction_cancel(tdb);
923         return 0;
924 }
925
926
927 /*
928   recover from an aborted transaction. Must be called with exclusive
929   database write access already established (including the global
930   lock to prevent new processes attaching)
931 */
932 int tdb_transaction_recover(struct tdb_context *tdb)
933 {
934         tdb_off_t recovery_head, recovery_eof;
935         unsigned char *data, *p;
936         u32 zero = 0;
937         struct list_struct rec;
938
939         /* find the recovery area */
940         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
941                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
942                 tdb->ecode = TDB_ERR_IO;
943                 return -1;
944         }
945
946         if (recovery_head == 0) {
947                 /* we have never allocated a recovery record */
948                 return 0;
949         }
950
951         /* read the recovery record */
952         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
953                                    sizeof(rec), DOCONV()) == -1) {
954                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
955                 tdb->ecode = TDB_ERR_IO;
956                 return -1;
957         }
958
959         if (rec.magic != TDB_RECOVERY_MAGIC) {
960                 /* there is no valid recovery data */
961                 return 0;
962         }
963
964         if (tdb->read_only) {
965                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
966                 tdb->ecode = TDB_ERR_CORRUPT;
967                 return -1;
968         }
969
970         recovery_eof = rec.key_len;
971
972         data = (unsigned char *)malloc(rec.data_len);
973         if (data == NULL) {
974                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
975                 tdb->ecode = TDB_ERR_OOM;
976                 return -1;
977         }
978
979         /* read the full recovery data */
980         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
981                                    rec.data_len, 0) == -1) {
982                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
983                 tdb->ecode = TDB_ERR_IO;
984                 return -1;
985         }
986
987         /* recover the file data */
988         p = data;
989         while (p+8 < data + rec.data_len) {
990                 u32 ofs, len;
991                 if (DOCONV()) {
992                         tdb_convert(p, 8);
993                 }
994                 memcpy(&ofs, p, 4);
995                 memcpy(&len, p+4, 4);
996
997                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
998                         free(data);
999                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
1000                         tdb->ecode = TDB_ERR_IO;
1001                         return -1;
1002                 }
1003                 p += 8 + len;
1004         }
1005
1006         free(data);
1007
1008         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1009                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1010                 tdb->ecode = TDB_ERR_IO;
1011                 return -1;
1012         }
1013
1014         /* if the recovery area is after the recovered eof then remove it */
1015         if (recovery_eof <= recovery_head) {
1016                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1017                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1018                         tdb->ecode = TDB_ERR_IO;
1019                         return -1;                      
1020                 }
1021         }
1022
1023         /* remove the recovery magic */
1024         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1025                           &zero) == -1) {
1026                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1027                 tdb->ecode = TDB_ERR_IO;
1028                 return -1;                      
1029         }
1030         
1031         /* reduce the file size to the old size */
1032         tdb_munmap(tdb);
1033         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1034                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1035                 tdb->ecode = TDB_ERR_IO;
1036                 return -1;                      
1037         }
1038         tdb->map_size = recovery_eof;
1039         tdb_mmap(tdb);
1040
1041         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1042                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1043                 tdb->ecode = TDB_ERR_IO;
1044                 return -1;
1045         }
1046
1047         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1048                  recovery_eof));
1049
1050         /* all done */
1051         return 0;
1052 }