5c94eb0afef991a8f87a4dbacfa414d747fee33c
[ira/wip.git] / source / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 #include "tdb_private.h"
28
29 /*
30   transaction design:
31
32   - only allow a single transaction at a time per database. This makes
33     using the transaction API simpler, as otherwise the caller would
34     have to cope with temporary failures in transactions that conflict
35     with other current transactions
36
37   - keep the transaction recovery information in the same file as the
38     database, using a special 'transaction recovery' record pointed at
39     by the header. This removes the need for extra journal files as
40     used by some other databases
41
42   - dymacially allocated the transaction recover record, re-using it
43     for subsequent transactions. If a larger record is needed then
44     tdb_free() the old record to place it on the normal tdb freelist
45     before allocating the new record
46
47   - during transactions, keep a linked list of writes all that have
48     been performed by intercepting all tdb_write() calls. The hooked
49     transaction versions of tdb_read() and tdb_write() check this
50     linked list and try to use the elements of the list in preference
51     to the real database.
52
53   - don't allow any locks to be held when a transaction starts,
54     otherwise we can end up with deadlock (plus lack of lock nesting
55     in posix locks would mean the lock is lost)
56
57   - if the caller gains a lock during the transaction but doesn't
58     release it then fail the commit
59
60   - allow for nested calls to tdb_transaction_start(), re-using the
61     existing transaction record. If the inner transaction is cancelled
62     then a subsequent commit will fail
63  
64   - keep a mirrored copy of the tdb hash chain heads to allow for the
65     fast hash heads scan on traverse, updating the mirrored copy in
66     the transaction version of tdb_write
67
68   - allow callers to mix transaction and non-transaction use of tdb,
69     although once a transaction is started then an exclusive lock is
70     gained until the transaction is committed or cancelled
71
72   - the commit stategy involves first saving away all modified data
73     into a linearised buffer in the transaction recovery area, then
74     marking the transaction recovery area with a magic value to
75     indicate a valid recovery record. In total 4 fsync/msync calls are
76     needed per commit to prevent race conditions. It might be possible
77     to reduce this to 3 or even 2 with some more work.
78
79   - check for a valid recovery record on open of the tdb, while the
80     global lock is held. Automatically recover from the transaction
81     recovery area if needed, then continue with the open as
82     usual. This allows for smooth crash recovery with no administrator
83     intervention.
84
85   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86     still available, but no transaction recovery area is used and no
87     fsync/msync calls are made.
88
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* we keep a mirrored copy of the tdb hash heads here so
97            tdb_next_hash_chain() can operate efficiently */
98         u32 *hash_heads;
99
100         /* the original io methods - used to do IOs to the real db */
101         const struct tdb_methods *io_methods;
102
103         /* the list of transaction elements. We use a doubly linked
104            list with a last pointer to allow us to keep the list
105            ordered, with first element at the front of the list. It
106            needs to be doubly linked as the read/write traversals need
107            to be backwards, while the commit needs to be forwards */
108         struct tdb_transaction_el {
109                 struct tdb_transaction_el *next, *prev;
110                 tdb_off_t offset;
111                 tdb_len_t length;
112                 unsigned char *data;
113         } *elements, *elements_last;
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* old file size before transaction */
126         tdb_len_t old_map_size;
127 };
128
129
130 /*
131   read while in a transaction. We need to check first if the data is in our list
132   of transaction elements, then if not do a real read
133 */
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
135                             tdb_len_t len, int cv)
136 {
137         struct tdb_transaction_el *el;
138
139         /* we need to walk the list backwards to get the most recent data */
140         for (el=tdb->transaction->elements_last;el;el=el->prev) {
141                 tdb_len_t partial;
142
143                 if (off+len <= el->offset) {
144                         continue;
145                 }
146                 if (off >= el->offset + el->length) {
147                         continue;
148                 }
149
150                 /* an overlapping read - needs to be split into up to
151                    2 reads and a memcpy */
152                 if (off < el->offset) {
153                         partial = el->offset - off;
154                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155                                 goto fail;
156                         }
157                         len -= partial;
158                         off += partial;
159                         buf = (void *)(partial + (char *)buf);
160                 }
161                 if (off + len <= el->offset + el->length) {
162                         partial = len;
163                 } else {
164                         partial = el->offset + el->length - off;
165                 }
166                 memcpy(buf, el->data + (off - el->offset), partial);
167                 if (cv) {
168                         tdb_convert(buf, len);
169                 }
170                 len -= partial;
171                 off += partial;
172                 buf = (void *)(partial + (char *)buf);
173                 
174                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177
178                 return 0;
179         }
180
181         /* its not in the transaction elements - do a real read */
182         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183
184 fail:
185         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
186         tdb->ecode = TDB_ERR_IO;
187         tdb->transaction->transaction_error = 1;
188         return -1;
189 }
190
191
192 /*
193   write while in a transaction
194 */
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
196                              const void *buf, tdb_len_t len)
197 {
198         struct tdb_transaction_el *el, *best_el=NULL;
199
200         if (len == 0) {
201                 return 0;
202         }
203         
204         /* if the write is to a hash head, then update the transaction
205            hash heads */
206         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
210         }
211
212         /* first see if we can replace an existing entry */
213         for (el=tdb->transaction->elements_last;el;el=el->prev) {
214                 tdb_len_t partial;
215
216                 if (best_el == NULL && off == el->offset+el->length) {
217                         best_el = el;
218                 }
219
220                 if (off+len <= el->offset) {
221                         continue;
222                 }
223                 if (off >= el->offset + el->length) {
224                         continue;
225                 }
226
227                 /* an overlapping write - needs to be split into up to
228                    2 writes and a memcpy */
229                 if (off < el->offset) {
230                         partial = el->offset - off;
231                         if (transaction_write(tdb, off, buf, partial) != 0) {
232                                 goto fail;
233                         }
234                         len -= partial;
235                         off += partial;
236                         buf = (const void *)(partial + (const char *)buf);
237                 }
238                 if (off + len <= el->offset + el->length) {
239                         partial = len;
240                 } else {
241                         partial = el->offset + el->length - off;
242                 }
243                 memcpy(el->data + (off - el->offset), buf, partial);
244                 len -= partial;
245                 off += partial;
246                 buf = (const void *)(partial + (const char *)buf);
247                 
248                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
249                         goto fail;
250                 }
251
252                 return 0;
253         }
254
255         /* see if we can append the new entry to an existing entry */
256         if (best_el && best_el->offset + best_el->length == off && 
257             (off+len < tdb->transaction->old_map_size ||
258              off > tdb->transaction->old_map_size)) {
259                 unsigned char *data = best_el->data;
260                 el = best_el;
261                 el->data = (unsigned char *)realloc(el->data,
262                                                     el->length + len);
263                 if (el->data == NULL) {
264                         tdb->ecode = TDB_ERR_OOM;
265                         tdb->transaction->transaction_error = 1;
266                         el->data = data;
267                         return -1;
268                 }
269                 if (buf) {
270                         memcpy(el->data + el->length, buf, len);
271                 } else {
272                         memset(el->data + el->length, TDB_PAD_BYTE, len);
273                 }
274                 el->length += len;
275                 return 0;
276         }
277
278         /* add a new entry at the end of the list */
279         el = (struct tdb_transaction_el *)malloc(sizeof(*el));
280         if (el == NULL) {
281                 tdb->ecode = TDB_ERR_OOM;
282                 tdb->transaction->transaction_error = 1;                
283                 return -1;
284         }
285         el->next = NULL;
286         el->prev = tdb->transaction->elements_last;
287         el->offset = off;
288         el->length = len;
289         el->data = (unsigned char *)malloc(len);
290         if (el->data == NULL) {
291                 free(el);
292                 tdb->ecode = TDB_ERR_OOM;
293                 tdb->transaction->transaction_error = 1;                
294                 return -1;
295         }
296         if (buf) {
297                 memcpy(el->data, buf, len);
298         } else {
299                 memset(el->data, TDB_PAD_BYTE, len);
300         }
301         if (el->prev) {
302                 el->prev->next = el;
303         } else {
304                 tdb->transaction->elements = el;
305         }
306         tdb->transaction->elements_last = el;
307         return 0;
308
309 fail:
310         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
311         tdb->ecode = TDB_ERR_IO;
312         tdb->transaction->transaction_error = 1;
313         return -1;
314 }
315
316 /*
317   accelerated hash chain head search, using the cached hash heads
318 */
319 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
320 {
321         u32 h = *chain;
322         for (;h < tdb->header.hash_size;h++) {
323                 /* the +1 takes account of the freelist */
324                 if (0 != tdb->transaction->hash_heads[h+1]) {
325                         break;
326                 }
327         }
328         (*chain) = h;
329 }
330
331 /*
332   out of bounds check during a transaction
333 */
334 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
335 {
336         if (len <= tdb->map_size) {
337                 return 0;
338         }
339         return TDB_ERRCODE(TDB_ERR_IO, -1);
340 }
341
342 /*
343   transaction version of tdb_expand().
344 */
345 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
346                                    tdb_off_t addition)
347 {
348         /* add a write to the transaction elements, so subsequent
349            reads see the zero data */
350         if (transaction_write(tdb, size, NULL, addition) != 0) {
351                 return -1;
352         }
353
354         return 0;
355 }
356
357 /*
358   brlock during a transaction - ignore them
359 */
360 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
361                        int rw_type, int lck_type, int probe, size_t len)
362 {
363         return 0;
364 }
365
366 static const struct tdb_methods transaction_methods = {
367         transaction_read,
368         transaction_write,
369         transaction_next_hash_chain,
370         transaction_oob,
371         transaction_expand_file,
372         transaction_brlock
373 };
374
375
376 /*
377   start a tdb transaction. No token is returned, as only a single
378   transaction is allowed to be pending per tdb_context
379 */
380 int tdb_transaction_start(struct tdb_context *tdb)
381 {
382         /* some sanity checks */
383         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
384                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
385                 tdb->ecode = TDB_ERR_EINVAL;
386                 return -1;
387         }
388
389         /* cope with nested tdb_transaction_start() calls */
390         if (tdb->transaction != NULL) {
391                 tdb->transaction->nesting++;
392                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
393                          tdb->transaction->nesting));
394                 return 0;
395         }
396
397         if (tdb->num_locks != 0 || tdb->global_lock.count) {
398                 /* the caller must not have any locks when starting a
399                    transaction as otherwise we'll be screwed by lack
400                    of nested locks in posix */
401                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
402                 tdb->ecode = TDB_ERR_LOCK;
403                 return -1;
404         }
405
406         if (tdb->travlocks.next != NULL) {
407                 /* you cannot use transactions inside a traverse (although you can use
408                    traverse inside a transaction) as otherwise you can end up with
409                    deadlock */
410                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
411                 tdb->ecode = TDB_ERR_LOCK;
412                 return -1;
413         }
414
415         tdb->transaction = (struct tdb_transaction *)
416                 calloc(sizeof(struct tdb_transaction), 1);
417         if (tdb->transaction == NULL) {
418                 tdb->ecode = TDB_ERR_OOM;
419                 return -1;
420         }
421
422         /* get the transaction write lock. This is a blocking lock. As
423            discussed with Volker, there are a number of ways we could
424            make this async, which we will probably do in the future */
425         if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
426                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
427                 tdb->ecode = TDB_ERR_LOCK;
428                 SAFE_FREE(tdb->transaction);
429                 return -1;
430         }
431         
432         /* get a read lock from the freelist to the end of file. This
433            is upgraded to a write lock during the commit */
434         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
435                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
436                 tdb->ecode = TDB_ERR_LOCK;
437                 goto fail;
438         }
439
440         /* setup a copy of the hash table heads so the hash scan in
441            traverse can be fast */
442         tdb->transaction->hash_heads = (u32 *)
443                 calloc(tdb->header.hash_size+1, sizeof(u32));
444         if (tdb->transaction->hash_heads == NULL) {
445                 tdb->ecode = TDB_ERR_OOM;
446                 goto fail;
447         }
448         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
449                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
450                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
451                 tdb->ecode = TDB_ERR_IO;
452                 goto fail;
453         }
454
455         /* make sure we know about any file expansions already done by
456            anyone else */
457         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
458         tdb->transaction->old_map_size = tdb->map_size;
459
460         /* finally hook the io methods, replacing them with
461            transaction specific methods */
462         tdb->transaction->io_methods = tdb->methods;
463         tdb->methods = &transaction_methods;
464
465         /* by calling this transaction write here, we ensure that we don't grow the
466            transaction linked list due to hash table updates */
467         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
468                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
469                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
470                 tdb->ecode = TDB_ERR_IO;
471                 goto fail;
472         }
473
474         return 0;
475         
476 fail:
477         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
478         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
479         SAFE_FREE(tdb->transaction->hash_heads);
480         SAFE_FREE(tdb->transaction);
481         return -1;
482 }
483
484
485 /*
486   cancel the current transaction
487 */
488 int tdb_transaction_cancel(struct tdb_context *tdb)
489 {       
490         if (tdb->transaction == NULL) {
491                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
492                 return -1;
493         }
494
495         if (tdb->transaction->nesting != 0) {
496                 tdb->transaction->transaction_error = 1;
497                 tdb->transaction->nesting--;
498                 return 0;
499         }               
500
501         tdb->map_size = tdb->transaction->old_map_size;
502
503         /* free all the transaction elements */
504         while (tdb->transaction->elements) {
505                 struct tdb_transaction_el *el = tdb->transaction->elements;
506                 tdb->transaction->elements = el->next;
507                 free(el->data);
508                 free(el);
509         }
510
511         /* remove any global lock created during the transaction */
512         if (tdb->global_lock.count != 0) {
513                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
514                 tdb->global_lock.count = 0;
515         }
516
517         /* remove any locks created during the transaction */
518         if (tdb->num_locks != 0) {
519                 int i;
520                 for (i=0;i<tdb->num_lockrecs;i++) {
521                         tdb_brlock(tdb,FREELIST_TOP+4*tdb->lockrecs[i].list,
522                                    F_UNLCK,F_SETLKW, 0, 1);
523                 }
524                 tdb->num_locks = 0;
525         }
526
527         /* restore the normal io methods */
528         tdb->methods = tdb->transaction->io_methods;
529
530         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
531         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
532         SAFE_FREE(tdb->transaction->hash_heads);
533         SAFE_FREE(tdb->transaction);
534         
535         return 0;
536 }
537
538 /*
539   sync to disk
540 */
541 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
542 {       
543         if (fsync(tdb->fd) != 0) {
544                 tdb->ecode = TDB_ERR_IO;
545                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
546                 return -1;
547         }
548 #ifdef MS_SYNC
549         if (tdb->map_ptr) {
550                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
551                 if (msync(moffset + (char *)tdb->map_ptr, 
552                           length + (offset - moffset), MS_SYNC) != 0) {
553                         tdb->ecode = TDB_ERR_IO;
554                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
555                                  strerror(errno)));
556                         return -1;
557                 }
558         }
559 #endif
560         return 0;
561 }
562
563
564 /*
565   work out how much space the linearised recovery data will consume
566 */
567 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
568 {
569         struct tdb_transaction_el *el;
570         tdb_len_t recovery_size = 0;
571
572         recovery_size = sizeof(u32);
573         for (el=tdb->transaction->elements;el;el=el->next) {
574                 if (el->offset >= tdb->transaction->old_map_size) {
575                         continue;
576                 }
577                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
578         }
579
580         return recovery_size;
581 }
582
583 /*
584   allocate the recovery area, or use an existing recovery area if it is
585   large enough
586 */
587 static int tdb_recovery_allocate(struct tdb_context *tdb, 
588                                  tdb_len_t *recovery_size,
589                                  tdb_off_t *recovery_offset,
590                                  tdb_len_t *recovery_max_size)
591 {
592         struct list_struct rec;
593         const struct tdb_methods *methods = tdb->transaction->io_methods;
594         tdb_off_t recovery_head;
595
596         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
597                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
598                 return -1;
599         }
600
601         rec.rec_len = 0;
602
603         if (recovery_head != 0 && 
604             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
605                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
606                 return -1;
607         }
608
609         *recovery_size = tdb_recovery_size(tdb);
610
611         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
612                 /* it fits in the existing area */
613                 *recovery_max_size = rec.rec_len;
614                 *recovery_offset = recovery_head;
615                 return 0;
616         }
617
618         /* we need to free up the old recovery area, then allocate a
619            new one at the end of the file. Note that we cannot use
620            tdb_allocate() to allocate the new one as that might return
621            us an area that is being currently used (as of the start of
622            the transaction) */
623         if (recovery_head != 0) {
624                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
625                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
626                         return -1;
627                 }
628         }
629
630         /* the tdb_free() call might have increased the recovery size */
631         *recovery_size = tdb_recovery_size(tdb);
632
633         /* round up to a multiple of page size */
634         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
635         *recovery_offset = tdb->map_size;
636         recovery_head = *recovery_offset;
637
638         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
639                                      (tdb->map_size - tdb->transaction->old_map_size) +
640                                      sizeof(rec) + *recovery_max_size) == -1) {
641                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
642                 return -1;
643         }
644
645         /* remap the file (if using mmap) */
646         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
647
648         /* we have to reset the old map size so that we don't try to expand the file
649            again in the transaction commit, which would destroy the recovery area */
650         tdb->transaction->old_map_size = tdb->map_size;
651
652         /* write the recovery header offset and sync - we can sync without a race here
653            as the magic ptr in the recovery record has not been set */
654         CONVERT(recovery_head);
655         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
656                                &recovery_head, sizeof(tdb_off_t)) == -1) {
657                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
658                 return -1;
659         }
660
661         return 0;
662 }
663
664
665 /*
666   setup the recovery data that will be used on a crash during commit
667 */
668 static int transaction_setup_recovery(struct tdb_context *tdb, 
669                                       tdb_off_t *magic_offset)
670 {
671         struct tdb_transaction_el *el;
672         tdb_len_t recovery_size;
673         unsigned char *data, *p;
674         const struct tdb_methods *methods = tdb->transaction->io_methods;
675         struct list_struct *rec;
676         tdb_off_t recovery_offset, recovery_max_size;
677         tdb_off_t old_map_size = tdb->transaction->old_map_size;
678         u32 magic, tailer;
679
680         /*
681           check that the recovery area has enough space
682         */
683         if (tdb_recovery_allocate(tdb, &recovery_size, 
684                                   &recovery_offset, &recovery_max_size) == -1) {
685                 return -1;
686         }
687
688         data = (unsigned char *)malloc(recovery_size + sizeof(*rec));
689         if (data == NULL) {
690                 tdb->ecode = TDB_ERR_OOM;
691                 return -1;
692         }
693
694         rec = (struct list_struct *)data;
695         memset(rec, 0, sizeof(*rec));
696
697         rec->magic    = 0;
698         rec->data_len = recovery_size;
699         rec->rec_len  = recovery_max_size;
700         rec->key_len  = old_map_size;
701         CONVERT(rec);
702
703         /* build the recovery data into a single blob to allow us to do a single
704            large write, which should be more efficient */
705         p = data + sizeof(*rec);
706         for (el=tdb->transaction->elements;el;el=el->next) {
707                 if (el->offset >= old_map_size) {
708                         continue;
709                 }
710                 if (el->offset + el->length > tdb->transaction->old_map_size) {
711                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
712                         free(data);
713                         tdb->ecode = TDB_ERR_CORRUPT;
714                         return -1;
715                 }
716                 memcpy(p, &el->offset, 4);
717                 memcpy(p+4, &el->length, 4);
718                 if (DOCONV()) {
719                         tdb_convert(p, 8);
720                 }
721                 /* the recovery area contains the old data, not the
722                    new data, so we have to call the original tdb_read
723                    method to get it */
724                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
725                         free(data);
726                         tdb->ecode = TDB_ERR_IO;
727                         return -1;
728                 }
729                 p += 8 + el->length;
730         }
731
732         /* and the tailer */
733         tailer = sizeof(*rec) + recovery_max_size;
734         memcpy(p, &tailer, 4);
735         CONVERT(p);
736
737         /* write the recovery data to the recovery area */
738         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
739                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
740                 free(data);
741                 tdb->ecode = TDB_ERR_IO;
742                 return -1;
743         }
744
745         /* as we don't have ordered writes, we have to sync the recovery
746            data before we update the magic to indicate that the recovery
747            data is present */
748         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
749                 free(data);
750                 return -1;
751         }
752
753         free(data);
754
755         magic = TDB_RECOVERY_MAGIC;
756         CONVERT(magic);
757
758         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
759
760         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
761                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
762                 tdb->ecode = TDB_ERR_IO;
763                 return -1;
764         }
765
766         /* ensure the recovery magic marker is on disk */
767         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
768                 return -1;
769         }
770
771         return 0;
772 }
773
774 /*
775   commit the current transaction
776 */
777 int tdb_transaction_commit(struct tdb_context *tdb)
778 {       
779         const struct tdb_methods *methods;
780         tdb_off_t magic_offset = 0;
781         u32 zero = 0;
782
783         if (tdb->transaction == NULL) {
784                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
785                 return -1;
786         }
787
788         if (tdb->transaction->transaction_error) {
789                 tdb->ecode = TDB_ERR_IO;
790                 tdb_transaction_cancel(tdb);
791                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
792                 return -1;
793         }
794
795         if (tdb->transaction->nesting != 0) {
796                 tdb->transaction->nesting--;
797                 return 0;
798         }               
799
800         /* check for a null transaction */
801         if (tdb->transaction->elements == NULL) {
802                 tdb_transaction_cancel(tdb);
803                 return 0;
804         }
805
806         methods = tdb->transaction->io_methods;
807         
808         /* if there are any locks pending then the caller has not
809            nested their locks properly, so fail the transaction */
810         if (tdb->num_locks || tdb->global_lock.count) {
811                 tdb->ecode = TDB_ERR_LOCK;
812                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
813                 tdb_transaction_cancel(tdb);
814                 return -1;
815         }
816
817         /* upgrade the main transaction lock region to a write lock */
818         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
819                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
820                 tdb->ecode = TDB_ERR_LOCK;
821                 tdb_transaction_cancel(tdb);
822                 return -1;
823         }
824
825         /* get the global lock - this prevents new users attaching to the database
826            during the commit */
827         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
828                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
829                 tdb->ecode = TDB_ERR_LOCK;
830                 tdb_transaction_cancel(tdb);
831                 return -1;
832         }
833
834         if (!(tdb->flags & TDB_NOSYNC)) {
835                 /* write the recovery data to the end of the file */
836                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
837                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
838                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
839                         tdb_transaction_cancel(tdb);
840                         return -1;
841                 }
842         }
843
844         /* expand the file to the new size if needed */
845         if (tdb->map_size != tdb->transaction->old_map_size) {
846                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
847                                              tdb->map_size - 
848                                              tdb->transaction->old_map_size) == -1) {
849                         tdb->ecode = TDB_ERR_IO;
850                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
851                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
852                         tdb_transaction_cancel(tdb);
853                         return -1;
854                 }
855                 tdb->map_size = tdb->transaction->old_map_size;
856                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
857         }
858
859         /* perform all the writes */
860         while (tdb->transaction->elements) {
861                 struct tdb_transaction_el *el = tdb->transaction->elements;
862
863                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
864                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
865                         
866                         /* we've overwritten part of the data and
867                            possibly expanded the file, so we need to
868                            run the crash recovery code */
869                         tdb->methods = methods;
870                         tdb_transaction_recover(tdb); 
871
872                         tdb_transaction_cancel(tdb);
873                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
874
875                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
876                         return -1;
877                 }
878                 tdb->transaction->elements = el->next;
879                 free(el->data); 
880                 free(el);
881         } 
882
883         if (!(tdb->flags & TDB_NOSYNC)) {
884                 /* ensure the new data is on disk */
885                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
886                         return -1;
887                 }
888
889                 /* remove the recovery marker */
890                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
891                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
892                         return -1;
893                 }
894
895                 /* ensure the recovery marker has been removed on disk */
896                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
897                         return -1;
898                 }
899         }
900
901         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
902
903         /*
904           TODO: maybe write to some dummy hdr field, or write to magic
905           offset without mmap, before the last sync, instead of the
906           utime() call
907         */
908
909         /* on some systems (like Linux 2.6.x) changes via mmap/msync
910            don't change the mtime of the file, this means the file may
911            not be backed up (as tdb rounding to block sizes means that
912            file size changes are quite rare too). The following forces
913            mtime changes when a transaction completes */
914 #ifdef HAVE_UTIME
915         utime(tdb->name, NULL);
916 #endif
917
918         /* use a transaction cancel to free memory and remove the
919            transaction locks */
920         tdb_transaction_cancel(tdb);
921         return 0;
922 }
923
924
925 /*
926   recover from an aborted transaction. Must be called with exclusive
927   database write access already established (including the global
928   lock to prevent new processes attaching)
929 */
930 int tdb_transaction_recover(struct tdb_context *tdb)
931 {
932         tdb_off_t recovery_head, recovery_eof;
933         unsigned char *data, *p;
934         u32 zero = 0;
935         struct list_struct rec;
936
937         /* find the recovery area */
938         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
939                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
940                 tdb->ecode = TDB_ERR_IO;
941                 return -1;
942         }
943
944         if (recovery_head == 0) {
945                 /* we have never allocated a recovery record */
946                 return 0;
947         }
948
949         /* read the recovery record */
950         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
951                                    sizeof(rec), DOCONV()) == -1) {
952                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
953                 tdb->ecode = TDB_ERR_IO;
954                 return -1;
955         }
956
957         if (rec.magic != TDB_RECOVERY_MAGIC) {
958                 /* there is no valid recovery data */
959                 return 0;
960         }
961
962         if (tdb->read_only) {
963                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
964                 tdb->ecode = TDB_ERR_CORRUPT;
965                 return -1;
966         }
967
968         recovery_eof = rec.key_len;
969
970         data = (unsigned char *)malloc(rec.data_len);
971         if (data == NULL) {
972                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
973                 tdb->ecode = TDB_ERR_OOM;
974                 return -1;
975         }
976
977         /* read the full recovery data */
978         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
979                                    rec.data_len, 0) == -1) {
980                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
981                 tdb->ecode = TDB_ERR_IO;
982                 return -1;
983         }
984
985         /* recover the file data */
986         p = data;
987         while (p+8 < data + rec.data_len) {
988                 u32 ofs, len;
989                 if (DOCONV()) {
990                         tdb_convert(p, 8);
991                 }
992                 memcpy(&ofs, p, 4);
993                 memcpy(&len, p+4, 4);
994
995                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
996                         free(data);
997                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
998                         tdb->ecode = TDB_ERR_IO;
999                         return -1;
1000                 }
1001                 p += 8 + len;
1002         }
1003
1004         free(data);
1005
1006         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1007                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1008                 tdb->ecode = TDB_ERR_IO;
1009                 return -1;
1010         }
1011
1012         /* if the recovery area is after the recovered eof then remove it */
1013         if (recovery_eof <= recovery_head) {
1014                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1015                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1016                         tdb->ecode = TDB_ERR_IO;
1017                         return -1;                      
1018                 }
1019         }
1020
1021         /* remove the recovery magic */
1022         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1023                           &zero) == -1) {
1024                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1025                 tdb->ecode = TDB_ERR_IO;
1026                 return -1;                      
1027         }
1028         
1029         /* reduce the file size to the old size */
1030         tdb_munmap(tdb);
1031         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1032                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1033                 tdb->ecode = TDB_ERR_IO;
1034                 return -1;                      
1035         }
1036         tdb->map_size = recovery_eof;
1037         tdb_mmap(tdb);
1038
1039         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1040                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1041                 tdb->ecode = TDB_ERR_IO;
1042                 return -1;
1043         }
1044
1045         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1046                  recovery_eof));
1047
1048         /* all done */
1049         return 0;
1050 }