r19401: make tdb_lockall() much more efficient, and add a tdb_lockall_read()
[ira/wip.git] / source / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 #include "tdb_private.h"
28
29 /*
30   transaction design:
31
32   - only allow a single transaction at a time per database. This makes
33     using the transaction API simpler, as otherwise the caller would
34     have to cope with temporary failures in transactions that conflict
35     with other current transactions
36
37   - keep the transaction recovery information in the same file as the
38     database, using a special 'transaction recovery' record pointed at
39     by the header. This removes the need for extra journal files as
40     used by some other databases
41
42   - dymacially allocated the transaction recover record, re-using it
43     for subsequent transactions. If a larger record is needed then
44     tdb_free() the old record to place it on the normal tdb freelist
45     before allocating the new record
46
47   - during transactions, keep a linked list of writes all that have
48     been performed by intercepting all tdb_write() calls. The hooked
49     transaction versions of tdb_read() and tdb_write() check this
50     linked list and try to use the elements of the list in preference
51     to the real database.
52
53   - don't allow any locks to be held when a transaction starts,
54     otherwise we can end up with deadlock (plus lack of lock nesting
55     in posix locks would mean the lock is lost)
56
57   - if the caller gains a lock during the transaction but doesn't
58     release it then fail the commit
59
60   - allow for nested calls to tdb_transaction_start(), re-using the
61     existing transaction record. If the inner transaction is cancelled
62     then a subsequent commit will fail
63  
64   - keep a mirrored copy of the tdb hash chain heads to allow for the
65     fast hash heads scan on traverse, updating the mirrored copy in
66     the transaction version of tdb_write
67
68   - allow callers to mix transaction and non-transaction use of tdb,
69     although once a transaction is started then an exclusive lock is
70     gained until the transaction is committed or cancelled
71
72   - the commit stategy involves first saving away all modified data
73     into a linearised buffer in the transaction recovery area, then
74     marking the transaction recovery area with a magic value to
75     indicate a valid recovery record. In total 4 fsync/msync calls are
76     needed per commit to prevent race conditions. It might be possible
77     to reduce this to 3 or even 2 with some more work.
78
79   - check for a valid recovery record on open of the tdb, while the
80     global lock is held. Automatically recover from the transaction
81     recovery area if needed, then continue with the open as
82     usual. This allows for smooth crash recovery with no administrator
83     intervention.
84
85   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86     still available, but no transaction recovery area is used and no
87     fsync/msync calls are made.
88
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* we keep a mirrored copy of the tdb hash heads here so
97            tdb_next_hash_chain() can operate efficiently */
98         u32 *hash_heads;
99
100         /* the original io methods - used to do IOs to the real db */
101         const struct tdb_methods *io_methods;
102
103         /* the list of transaction elements. We use a doubly linked
104            list with a last pointer to allow us to keep the list
105            ordered, with first element at the front of the list. It
106            needs to be doubly linked as the read/write traversals need
107            to be backwards, while the commit needs to be forwards */
108         struct tdb_transaction_el {
109                 struct tdb_transaction_el *next, *prev;
110                 tdb_off_t offset;
111                 tdb_len_t length;
112                 unsigned char *data;
113         } *elements, *elements_last;
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* old file size before transaction */
126         tdb_len_t old_map_size;
127 };
128
129
130 /*
131   read while in a transaction. We need to check first if the data is in our list
132   of transaction elements, then if not do a real read
133 */
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
135                             tdb_len_t len, int cv)
136 {
137         struct tdb_transaction_el *el;
138
139         /* we need to walk the list backwards to get the most recent data */
140         for (el=tdb->transaction->elements_last;el;el=el->prev) {
141                 tdb_len_t partial;
142
143                 if (off+len <= el->offset) {
144                         continue;
145                 }
146                 if (off >= el->offset + el->length) {
147                         continue;
148                 }
149
150                 /* an overlapping read - needs to be split into up to
151                    2 reads and a memcpy */
152                 if (off < el->offset) {
153                         partial = el->offset - off;
154                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155                                 goto fail;
156                         }
157                         len -= partial;
158                         off += partial;
159                         buf = (void *)(partial + (char *)buf);
160                 }
161                 if (off + len <= el->offset + el->length) {
162                         partial = len;
163                 } else {
164                         partial = el->offset + el->length - off;
165                 }
166                 memcpy(buf, el->data + (off - el->offset), partial);
167                 if (cv) {
168                         tdb_convert(buf, len);
169                 }
170                 len -= partial;
171                 off += partial;
172                 buf = (void *)(partial + (char *)buf);
173                 
174                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177
178                 return 0;
179         }
180
181         /* its not in the transaction elements - do a real read */
182         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183
184 fail:
185         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_read: failed at off=%d len=%d\n", off, len));
186         tdb->ecode = TDB_ERR_IO;
187         tdb->transaction->transaction_error = 1;
188         return -1;
189 }
190
191
192 /*
193   write while in a transaction
194 */
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
196                              const void *buf, tdb_len_t len)
197 {
198         struct tdb_transaction_el *el, *best_el=NULL;
199
200         if (len == 0) {
201                 return 0;
202         }
203         
204         /* if the write is to a hash head, then update the transaction
205            hash heads */
206         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
210         }
211
212         /* first see if we can replace an existing entry */
213         for (el=tdb->transaction->elements_last;el;el=el->prev) {
214                 tdb_len_t partial;
215
216                 if (best_el == NULL && off == el->offset+el->length) {
217                         best_el = el;
218                 }
219
220                 if (off+len <= el->offset) {
221                         continue;
222                 }
223                 if (off >= el->offset + el->length) {
224                         continue;
225                 }
226
227                 /* an overlapping write - needs to be split into up to
228                    2 writes and a memcpy */
229                 if (off < el->offset) {
230                         partial = el->offset - off;
231                         if (transaction_write(tdb, off, buf, partial) != 0) {
232                                 goto fail;
233                         }
234                         len -= partial;
235                         off += partial;
236                         buf = (const void *)(partial + (const char *)buf);
237                 }
238                 if (off + len <= el->offset + el->length) {
239                         partial = len;
240                 } else {
241                         partial = el->offset + el->length - off;
242                 }
243                 memcpy(el->data + (off - el->offset), buf, partial);
244                 len -= partial;
245                 off += partial;
246                 buf = (const void *)(partial + (const char *)buf);
247                 
248                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
249                         goto fail;
250                 }
251
252                 return 0;
253         }
254
255         /* see if we can append the new entry to an existing entry */
256         if (best_el && best_el->offset + best_el->length == off && 
257             (off+len < tdb->transaction->old_map_size ||
258              off > tdb->transaction->old_map_size)) {
259                 unsigned char *data = best_el->data;
260                 el = best_el;
261                 el->data = realloc(el->data, el->length + len);
262                 if (el->data == NULL) {
263                         tdb->ecode = TDB_ERR_OOM;
264                         tdb->transaction->transaction_error = 1;
265                         el->data = data;
266                         return -1;
267                 }
268                 if (buf) {
269                         memcpy(el->data + el->length, buf, len);
270                 } else {
271                         memset(el->data + el->length, TDB_PAD_BYTE, len);
272                 }
273                 el->length += len;
274                 return 0;
275         }
276
277         /* add a new entry at the end of the list */
278         el = malloc(sizeof(*el));
279         if (el == NULL) {
280                 tdb->ecode = TDB_ERR_OOM;
281                 tdb->transaction->transaction_error = 1;                
282                 return -1;
283         }
284         el->next = NULL;
285         el->prev = tdb->transaction->elements_last;
286         el->offset = off;
287         el->length = len;
288         el->data = malloc(len);
289         if (el->data == NULL) {
290                 free(el);
291                 tdb->ecode = TDB_ERR_OOM;
292                 tdb->transaction->transaction_error = 1;                
293                 return -1;
294         }
295         if (buf) {
296                 memcpy(el->data, buf, len);
297         } else {
298                 memset(el->data, TDB_PAD_BYTE, len);
299         }
300         if (el->prev) {
301                 el->prev->next = el;
302         } else {
303                 tdb->transaction->elements = el;
304         }
305         tdb->transaction->elements_last = el;
306         return 0;
307
308 fail:
309         TDB_LOG((tdb, TDB_DEBUG_FATAL, "transaction_write: failed at off=%d len=%d\n", off, len));
310         tdb->ecode = TDB_ERR_IO;
311         tdb->transaction->transaction_error = 1;
312         return -1;
313 }
314
315 /*
316   accelerated hash chain head search, using the cached hash heads
317 */
318 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
319 {
320         u32 h = *chain;
321         for (;h < tdb->header.hash_size;h++) {
322                 /* the +1 takes account of the freelist */
323                 if (0 != tdb->transaction->hash_heads[h+1]) {
324                         break;
325                 }
326         }
327         (*chain) = h;
328 }
329
330 /*
331   out of bounds check during a transaction
332 */
333 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
334 {
335         if (len <= tdb->map_size) {
336                 return 0;
337         }
338         return TDB_ERRCODE(TDB_ERR_IO, -1);
339 }
340
341 /*
342   transaction version of tdb_expand().
343 */
344 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
345                                    tdb_off_t addition)
346 {
347         /* add a write to the transaction elements, so subsequent
348            reads see the zero data */
349         if (transaction_write(tdb, size, NULL, addition) != 0) {
350                 return -1;
351         }
352
353         return 0;
354 }
355
356 /*
357   brlock during a transaction - ignore them
358 */
359 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
360                        int rw_type, int lck_type, int probe, size_t len)
361 {
362         return 0;
363 }
364
365 static const struct tdb_methods transaction_methods = {
366         transaction_read,
367         transaction_write,
368         transaction_next_hash_chain,
369         transaction_oob,
370         transaction_expand_file,
371         transaction_brlock
372 };
373
374
375 /*
376   start a tdb transaction. No token is returned, as only a single
377   transaction is allowed to be pending per tdb_context
378 */
379 int tdb_transaction_start(struct tdb_context *tdb)
380 {
381         /* some sanity checks */
382         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
383                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
384                 tdb->ecode = TDB_ERR_EINVAL;
385                 return -1;
386         }
387
388         /* cope with nested tdb_transaction_start() calls */
389         if (tdb->transaction != NULL) {
390                 tdb->transaction->nesting++;
391                 TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_start: nesting %d\n", 
392                          tdb->transaction->nesting));
393                 return 0;
394         }
395
396         if (tdb->num_locks != 0 || tdb->global_lock.count) {
397                 /* the caller must not have any locks when starting a
398                    transaction as otherwise we'll be screwed by lack
399                    of nested locks in posix */
400                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction with locks held\n"));
401                 tdb->ecode = TDB_ERR_LOCK;
402                 return -1;
403         }
404
405         if (tdb->travlocks.next != NULL) {
406                 /* you cannot use transactions inside a traverse (although you can use
407                    traverse inside a transaction) as otherwise you can end up with
408                    deadlock */
409                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
410                 tdb->ecode = TDB_ERR_LOCK;
411                 return -1;
412         }
413
414         tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
415         if (tdb->transaction == NULL) {
416                 tdb->ecode = TDB_ERR_OOM;
417                 return -1;
418         }
419
420         /* get the transaction write lock. This is a blocking lock. As
421            discussed with Volker, there are a number of ways we could
422            make this async, which we will probably do in the future */
423         if (tdb_brlock(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
424                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get transaction lock\n"));
425                 tdb->ecode = TDB_ERR_LOCK;
426                 SAFE_FREE(tdb->transaction);
427                 return -1;
428         }
429         
430         /* get a read lock from the freelist to the end of file. This
431            is upgraded to a write lock during the commit */
432         if (tdb_brlock(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
433                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to get hash locks\n"));
434                 tdb->ecode = TDB_ERR_LOCK;
435                 goto fail;
436         }
437
438         /* setup a copy of the hash table heads so the hash scan in
439            traverse can be fast */
440         tdb->transaction->hash_heads = calloc(tdb->header.hash_size+1, sizeof(tdb_off_t));
441         if (tdb->transaction->hash_heads == NULL) {
442                 tdb->ecode = TDB_ERR_OOM;
443                 goto fail;
444         }
445         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
446                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
447                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to read hash heads\n"));
448                 tdb->ecode = TDB_ERR_IO;
449                 goto fail;
450         }
451
452         /* make sure we know about any file expansions already done by
453            anyone else */
454         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
455         tdb->transaction->old_map_size = tdb->map_size;
456
457         /* finally hook the io methods, replacing them with
458            transaction specific methods */
459         tdb->transaction->io_methods = tdb->methods;
460         tdb->methods = &transaction_methods;
461
462         /* by calling this transaction write here, we ensure that we don't grow the
463            transaction linked list due to hash table updates */
464         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
465                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
466                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_start: failed to prime hash table\n"));
467                 tdb->ecode = TDB_ERR_IO;
468                 goto fail;
469         }
470
471         return 0;
472         
473 fail:
474         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
475         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
476         SAFE_FREE(tdb->transaction->hash_heads);
477         SAFE_FREE(tdb->transaction);
478         return -1;
479 }
480
481
482 /*
483   cancel the current transaction
484 */
485 int tdb_transaction_cancel(struct tdb_context *tdb)
486 {       
487         if (tdb->transaction == NULL) {
488                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_cancel: no transaction\n"));
489                 return -1;
490         }
491
492         if (tdb->transaction->nesting != 0) {
493                 tdb->transaction->transaction_error = 1;
494                 tdb->transaction->nesting--;
495                 return 0;
496         }               
497
498         tdb->map_size = tdb->transaction->old_map_size;
499
500         /* free all the transaction elements */
501         while (tdb->transaction->elements) {
502                 struct tdb_transaction_el *el = tdb->transaction->elements;
503                 tdb->transaction->elements = el->next;
504                 free(el->data);
505                 free(el);
506         }
507
508         /* remove any global lock created during the transaction */
509         if (tdb->global_lock.count != 0) {
510                 tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
511                 tdb->global_lock.count = 0;
512         }
513
514         /* remove any locks created during the transaction */
515         if (tdb->num_locks != 0) {
516                 int h;
517                 for (h=0;h<tdb->header.hash_size+1;h++) {
518                         if (tdb->locked[h].count != 0) {
519                                 tdb_brlock(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
520                                 tdb->locked[h].count = 0;
521                         }
522                 }
523                 tdb->num_locks = 0;
524         }
525
526         /* restore the normal io methods */
527         tdb->methods = tdb->transaction->io_methods;
528
529         tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
530         tdb_brlock(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
531         SAFE_FREE(tdb->transaction->hash_heads);
532         SAFE_FREE(tdb->transaction);
533         
534         return 0;
535 }
536
537 /*
538   sync to disk
539 */
540 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
541 {       
542         if (fsync(tdb->fd) != 0) {
543                 tdb->ecode = TDB_ERR_IO;
544                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: fsync failed\n"));
545                 return -1;
546         }
547 #ifdef MS_SYNC
548         if (tdb->map_ptr) {
549                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
550                 if (msync(moffset + (char *)tdb->map_ptr, 
551                           length + (offset - moffset), MS_SYNC) != 0) {
552                         tdb->ecode = TDB_ERR_IO;
553                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction: msync failed - %s\n",
554                                  strerror(errno)));
555                         return -1;
556                 }
557         }
558 #endif
559         return 0;
560 }
561
562
563 /*
564   work out how much space the linearised recovery data will consume
565 */
566 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
567 {
568         struct tdb_transaction_el *el;
569         tdb_len_t recovery_size = 0;
570
571         recovery_size = sizeof(u32);
572         for (el=tdb->transaction->elements;el;el=el->next) {
573                 if (el->offset >= tdb->transaction->old_map_size) {
574                         continue;
575                 }
576                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
577         }
578
579         return recovery_size;
580 }
581
582 /*
583   allocate the recovery area, or use an existing recovery area if it is
584   large enough
585 */
586 static int tdb_recovery_allocate(struct tdb_context *tdb, 
587                                  tdb_len_t *recovery_size,
588                                  tdb_off_t *recovery_offset,
589                                  tdb_len_t *recovery_max_size)
590 {
591         struct list_struct rec;
592         const struct tdb_methods *methods = tdb->transaction->io_methods;
593         tdb_off_t recovery_head;
594
595         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
596                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery head\n"));
597                 return -1;
598         }
599
600         rec.rec_len = 0;
601
602         if (recovery_head != 0 && 
603             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
604                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to read recovery record\n"));
605                 return -1;
606         }
607
608         *recovery_size = tdb_recovery_size(tdb);
609
610         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
611                 /* it fits in the existing area */
612                 *recovery_max_size = rec.rec_len;
613                 *recovery_offset = recovery_head;
614                 return 0;
615         }
616
617         /* we need to free up the old recovery area, then allocate a
618            new one at the end of the file. Note that we cannot use
619            tdb_allocate() to allocate the new one as that might return
620            us an area that is being currently used (as of the start of
621            the transaction) */
622         if (recovery_head != 0) {
623                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
624                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to free previous recovery area\n"));
625                         return -1;
626                 }
627         }
628
629         /* the tdb_free() call might have increased the recovery size */
630         *recovery_size = tdb_recovery_size(tdb);
631
632         /* round up to a multiple of page size */
633         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
634         *recovery_offset = tdb->map_size;
635         recovery_head = *recovery_offset;
636
637         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
638                                      (tdb->map_size - tdb->transaction->old_map_size) +
639                                      sizeof(rec) + *recovery_max_size) == -1) {
640                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to create recovery area\n"));
641                 return -1;
642         }
643
644         /* remap the file (if using mmap) */
645         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
646
647         /* we have to reset the old map size so that we don't try to expand the file
648            again in the transaction commit, which would destroy the recovery area */
649         tdb->transaction->old_map_size = tdb->map_size;
650
651         /* write the recovery header offset and sync - we can sync without a race here
652            as the magic ptr in the recovery record has not been set */
653         CONVERT(recovery_head);
654         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
655                                &recovery_head, sizeof(tdb_off_t)) == -1) {
656                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_recovery_allocate: failed to write recovery head\n"));
657                 return -1;
658         }
659
660         return 0;
661 }
662
663
664 /*
665   setup the recovery data that will be used on a crash during commit
666 */
667 static int transaction_setup_recovery(struct tdb_context *tdb, 
668                                       tdb_off_t *magic_offset)
669 {
670         struct tdb_transaction_el *el;
671         tdb_len_t recovery_size;
672         unsigned char *data, *p;
673         const struct tdb_methods *methods = tdb->transaction->io_methods;
674         struct list_struct *rec;
675         tdb_off_t recovery_offset, recovery_max_size;
676         tdb_off_t old_map_size = tdb->transaction->old_map_size;
677         u32 magic, tailer;
678
679         /*
680           check that the recovery area has enough space
681         */
682         if (tdb_recovery_allocate(tdb, &recovery_size, 
683                                   &recovery_offset, &recovery_max_size) == -1) {
684                 return -1;
685         }
686
687         data = malloc(recovery_size + sizeof(*rec));
688         if (data == NULL) {
689                 tdb->ecode = TDB_ERR_OOM;
690                 return -1;
691         }
692
693         rec = (struct list_struct *)data;
694         memset(rec, 0, sizeof(*rec));
695
696         rec->magic    = 0;
697         rec->data_len = recovery_size;
698         rec->rec_len  = recovery_max_size;
699         rec->key_len  = old_map_size;
700         CONVERT(rec);
701
702         /* build the recovery data into a single blob to allow us to do a single
703            large write, which should be more efficient */
704         p = data + sizeof(*rec);
705         for (el=tdb->transaction->elements;el;el=el->next) {
706                 if (el->offset >= old_map_size) {
707                         continue;
708                 }
709                 if (el->offset + el->length > tdb->transaction->old_map_size) {
710                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: transaction data over new region boundary\n"));
711                         free(data);
712                         tdb->ecode = TDB_ERR_CORRUPT;
713                         return -1;
714                 }
715                 memcpy(p, &el->offset, 4);
716                 memcpy(p+4, &el->length, 4);
717                 if (DOCONV()) {
718                         tdb_convert(p, 8);
719                 }
720                 /* the recovery area contains the old data, not the
721                    new data, so we have to call the original tdb_read
722                    method to get it */
723                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
724                         free(data);
725                         tdb->ecode = TDB_ERR_IO;
726                         return -1;
727                 }
728                 p += 8 + el->length;
729         }
730
731         /* and the tailer */
732         tailer = sizeof(*rec) + recovery_max_size;
733         memcpy(p, &tailer, 4);
734         CONVERT(p);
735
736         /* write the recovery data to the recovery area */
737         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
738                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery data\n"));
739                 free(data);
740                 tdb->ecode = TDB_ERR_IO;
741                 return -1;
742         }
743
744         /* as we don't have ordered writes, we have to sync the recovery
745            data before we update the magic to indicate that the recovery
746            data is present */
747         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
748                 free(data);
749                 return -1;
750         }
751
752         free(data);
753
754         magic = TDB_RECOVERY_MAGIC;
755         CONVERT(magic);
756
757         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
758
759         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
760                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_setup_recovery: failed to write recovery magic\n"));
761                 tdb->ecode = TDB_ERR_IO;
762                 return -1;
763         }
764
765         /* ensure the recovery magic marker is on disk */
766         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
767                 return -1;
768         }
769
770         return 0;
771 }
772
773 /*
774   commit the current transaction
775 */
776 int tdb_transaction_commit(struct tdb_context *tdb)
777 {       
778         const struct tdb_methods *methods;
779         tdb_off_t magic_offset = 0;
780         u32 zero = 0;
781
782         if (tdb->transaction == NULL) {
783                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: no transaction\n"));
784                 return -1;
785         }
786
787         if (tdb->transaction->transaction_error) {
788                 tdb->ecode = TDB_ERR_IO;
789                 tdb_transaction_cancel(tdb);
790                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: transaction error pending\n"));
791                 return -1;
792         }
793
794         if (tdb->transaction->nesting != 0) {
795                 tdb->transaction->nesting--;
796                 return 0;
797         }               
798
799         /* check for a null transaction */
800         if (tdb->transaction->elements == NULL) {
801                 tdb_transaction_cancel(tdb);
802                 return 0;
803         }
804
805         methods = tdb->transaction->io_methods;
806         
807         /* if there are any locks pending then the caller has not
808            nested their locks properly, so fail the transaction */
809         if (tdb->num_locks || tdb->global_lock.count) {
810                 tdb->ecode = TDB_ERR_LOCK;
811                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: locks pending on commit\n"));
812                 tdb_transaction_cancel(tdb);
813                 return -1;
814         }
815
816         /* upgrade the main transaction lock region to a write lock */
817         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
818                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_start: failed to upgrade hash locks\n"));
819                 tdb->ecode = TDB_ERR_LOCK;
820                 tdb_transaction_cancel(tdb);
821                 return -1;
822         }
823
824         /* get the global lock - this prevents new users attaching to the database
825            during the commit */
826         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
827                 TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_transaction_commit: failed to get global lock\n"));
828                 tdb->ecode = TDB_ERR_LOCK;
829                 tdb_transaction_cancel(tdb);
830                 return -1;
831         }
832
833         if (!(tdb->flags & TDB_NOSYNC)) {
834                 /* write the recovery data to the end of the file */
835                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
836                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to setup recovery data\n"));
837                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
838                         tdb_transaction_cancel(tdb);
839                         return -1;
840                 }
841         }
842
843         /* expand the file to the new size if needed */
844         if (tdb->map_size != tdb->transaction->old_map_size) {
845                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
846                                              tdb->map_size - 
847                                              tdb->transaction->old_map_size) == -1) {
848                         tdb->ecode = TDB_ERR_IO;
849                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: expansion failed\n"));
850                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
851                         tdb_transaction_cancel(tdb);
852                         return -1;
853                 }
854                 tdb->map_size = tdb->transaction->old_map_size;
855                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
856         }
857
858         /* perform all the writes */
859         while (tdb->transaction->elements) {
860                 struct tdb_transaction_el *el = tdb->transaction->elements;
861
862                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
863                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed during commit\n"));
864                         
865                         /* we've overwritten part of the data and
866                            possibly expanded the file, so we need to
867                            run the crash recovery code */
868                         tdb->methods = methods;
869                         tdb_transaction_recover(tdb); 
870
871                         tdb_transaction_cancel(tdb);
872                         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
873
874                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
875                         return -1;
876                 }
877                 tdb->transaction->elements = el->next;
878                 free(el->data); 
879                 free(el);
880         } 
881
882         if (!(tdb->flags & TDB_NOSYNC)) {
883                 /* ensure the new data is on disk */
884                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
885                         return -1;
886                 }
887
888                 /* remove the recovery marker */
889                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
890                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: failed to remove recovery magic\n"));
891                         return -1;
892                 }
893
894                 /* ensure the recovery marker has been removed on disk */
895                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
896                         return -1;
897                 }
898         }
899
900         tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
901
902         /*
903           TODO: maybe write to some dummy hdr field, or write to magic
904           offset without mmap, before the last sync, instead of the
905           utime() call
906         */
907
908         /* on some systems (like Linux 2.6.x) changes via mmap/msync
909            don't change the mtime of the file, this means the file may
910            not be backed up (as tdb rounding to block sizes means that
911            file size changes are quite rare too). The following forces
912            mtime changes when a transaction completes */
913 #ifdef HAVE_UTIME
914         utime(tdb->name, NULL);
915 #endif
916
917         /* use a transaction cancel to free memory and remove the
918            transaction locks */
919         tdb_transaction_cancel(tdb);
920         return 0;
921 }
922
923
924 /*
925   recover from an aborted transaction. Must be called with exclusive
926   database write access already established (including the global
927   lock to prevent new processes attaching)
928 */
929 int tdb_transaction_recover(struct tdb_context *tdb)
930 {
931         tdb_off_t recovery_head, recovery_eof;
932         unsigned char *data, *p;
933         u32 zero = 0;
934         struct list_struct rec;
935
936         /* find the recovery area */
937         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
938                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery head\n"));
939                 tdb->ecode = TDB_ERR_IO;
940                 return -1;
941         }
942
943         if (recovery_head == 0) {
944                 /* we have never allocated a recovery record */
945                 return 0;
946         }
947
948         /* read the recovery record */
949         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
950                                    sizeof(rec), DOCONV()) == -1) {
951                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery record\n"));           
952                 tdb->ecode = TDB_ERR_IO;
953                 return -1;
954         }
955
956         if (rec.magic != TDB_RECOVERY_MAGIC) {
957                 /* there is no valid recovery data */
958                 return 0;
959         }
960
961         if (tdb->read_only) {
962                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: attempt to recover read only database\n"));
963                 tdb->ecode = TDB_ERR_CORRUPT;
964                 return -1;
965         }
966
967         recovery_eof = rec.key_len;
968
969         data = malloc(rec.data_len);
970         if (data == NULL) {
971                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to allocate recovery data\n"));         
972                 tdb->ecode = TDB_ERR_OOM;
973                 return -1;
974         }
975
976         /* read the full recovery data */
977         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
978                                    rec.data_len, 0) == -1) {
979                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to read recovery data\n"));             
980                 tdb->ecode = TDB_ERR_IO;
981                 return -1;
982         }
983
984         /* recover the file data */
985         p = data;
986         while (p+8 < data + rec.data_len) {
987                 u32 ofs, len;
988                 if (DOCONV()) {
989                         tdb_convert(p, 8);
990                 }
991                 memcpy(&ofs, p, 4);
992                 memcpy(&len, p+4, 4);
993
994                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
995                         free(data);
996                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
997                         tdb->ecode = TDB_ERR_IO;
998                         return -1;
999                 }
1000                 p += 8 + len;
1001         }
1002
1003         free(data);
1004
1005         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1006                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync recovery\n"));
1007                 tdb->ecode = TDB_ERR_IO;
1008                 return -1;
1009         }
1010
1011         /* if the recovery area is after the recovered eof then remove it */
1012         if (recovery_eof <= recovery_head) {
1013                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1014                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery head\n"));
1015                         tdb->ecode = TDB_ERR_IO;
1016                         return -1;                      
1017                 }
1018         }
1019
1020         /* remove the recovery magic */
1021         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1022                           &zero) == -1) {
1023                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to remove recovery magic\n"));
1024                 tdb->ecode = TDB_ERR_IO;
1025                 return -1;                      
1026         }
1027         
1028         /* reduce the file size to the old size */
1029         tdb_munmap(tdb);
1030         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1031                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1032                 tdb->ecode = TDB_ERR_IO;
1033                 return -1;                      
1034         }
1035         tdb->map_size = recovery_eof;
1036         tdb_mmap(tdb);
1037
1038         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1039                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_recover: failed to sync2 recovery\n"));
1040                 tdb->ecode = TDB_ERR_IO;
1041                 return -1;
1042         }
1043
1044         TDB_LOG((tdb, TDB_DEBUG_TRACE, "tdb_transaction_recover: recovered %d byte database\n", 
1045                  recovery_eof));
1046
1047         /* all done */
1048         return 0;
1049 }