r10405: added transactions into tdb, and hook them into ldb. See my
[sfrench/samba-autobuild/.git] / source4 / lib / tdb / common / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 #include "tdb_private.h"
28
29 /*
30   transaction design:
31
32   - only allow a single transaction at a time per database. This makes
33     using the transaction API simpler, as otherwise the caller would
34     have to cope with temporary failures in transactions that conflict
35     with other current transactions
36
37   - keep the transaction recovery information in the same file as the
38     database, using a special 'transaction recovery' record pointed at
39     by the header. This removes the need for extra journal files as
40     used by some other databases
41
42   - dymacially allocated the transaction recover record, re-using it
43     for subsequent transactions. If a larger record is needed then
44     tdb_free() the old record to place it on the normal tdb freelist
45     before allocating the new record
46
47   - during transactions, keep a linked list of writes all that have
48     been performed by intercepting all tdb_write() calls. The hooked
49     transaction versions of tdb_read() and tdb_write() check this
50     linked list and try to use the elements of the list in preference
51     to the real database.
52
53   - don't allow any locks to be held when a transaction starts,
54     otherwise we can end up with deadlock (plus lack of lock nesting
55     in posix locks would mean the lock is lost)
56
57   - if the caller gains a lock during the transaction but doesn't
58     release it then fail the commit
59
60   - allow for nested calls to tdb_transaction_start(), re-using the
61     existing transaction record. If the inner transaction is cancelled
62     then a subsequent commit will fail
63  
64   - keep a mirrored copy of the tdb hash chain heads to allow for the
65     fast hash heads scan on traverse, updating the mirrored copy in
66     the transaction version of tdb_write
67
68   - allow callers to mix transaction and non-transaction use of tdb,
69     although once a transaction is started then an exclusive lock is
70     gained until the transaction is committed or cancelled
71
72   - the commit stategy involves first saving away all modified data
73     into a linearised buffer in the transaction recovery area, then
74     marking the transaction recovery area with a magic value to
75     indicate a valid recovery record. In total 4 fsync/msync calls are
76     needed per commit to prevent race conditions. It might be possible
77     to reduce this to 3 or even 2 with some more work.
78
79   - check for a valid recovery record on open of the tdb, while the
80     global lock is held. Automatically recover from the transaction
81     recovery area if needed, then continue with the open as
82     usual. This allows for smooth crash recovery with no administrator
83     intervention.
84
85   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86     still available, but no transaction recovery area is used and no
87     fsync/msync calls are made.
88
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* we keep a mirrored copy of the tdb hash heads here so
97            tdb_next_hash_chain() can operate efficiently */
98         u32 *hash_heads;
99
100         /* the original io methods - used to do IOs to the real db */
101         const struct tdb_methods *io_methods;
102
103         /* the list of transaction elements. We use a doubly linked
104            list with a last pointer to allow us to keep the list
105            ordered, with first element at the front of the list. It
106            needs to be doubly linked as the read/write traversals need
107            to be backwards, while the commit needs to be forwards */
108         struct tdb_transaction_el {
109                 struct tdb_transaction_el *next, *prev;
110                 tdb_off_t offset;
111                 tdb_len_t length;
112                 unsigned char *data;
113         } *elements, *elements_last;
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* old file size before transaction */
126         tdb_len_t old_map_size;
127 };
128
129
130 /*
131   read while in a transaction. We need to check first if the data is in our list
132   of transaction elements, then if not do a real read
133 */
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
135                             tdb_len_t len, int cv)
136 {
137         struct tdb_transaction_el *el;
138
139         /* we need to walk the list backwards to get the most recent data */
140         for (el=tdb->transaction->elements_last;el;el=el->prev) {
141                 tdb_len_t partial;
142
143                 if (off+len <= el->offset) {
144                         continue;
145                 }
146                 if (off >= el->offset + el->length) {
147                         continue;
148                 }
149
150                 /* an overlapping read - needs to be split into up to
151                    2 reads and a memcpy */
152                 if (off < el->offset) {
153                         partial = el->offset - off;
154                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155                                 goto fail;
156                         }
157                         len -= partial;
158                         off += partial;
159                         buf = (void *)(partial + (char *)buf);
160                 }
161                 if (off + len <= el->offset + el->length) {
162                         partial = len;
163                 } else {
164                         partial = el->offset + el->length - off;
165                 }
166                 memcpy(buf, el->data + (off - el->offset), partial);
167                 if (cv) {
168                         tdb_convert(buf, len);
169                 }
170                 len -= partial;
171                 off += partial;
172                 buf = (void *)(partial + (char *)buf);
173                 
174                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177
178                 return 0;
179         }
180
181         /* its not in the transaction elements - do a real read */
182         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183
184 fail:
185         TDB_LOG((tdb, 0, "transaction_read: failed at off=%d len=%d\n", off, len));
186         tdb->ecode = TDB_ERR_IO;
187         tdb->transaction->transaction_error = 1;
188         return -1;
189 }
190
191
192 /*
193   write while in a transaction
194 */
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
196                              const void *buf, tdb_len_t len)
197 {
198         struct tdb_transaction_el *el;
199         
200         /* if the write is to a hash head, then update the transaction
201            hash heads */
202         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
203             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
204                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
205                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
206         }
207
208         /* first see if we can replace an existing entry */
209         for (el=tdb->transaction->elements_last;el;el=el->prev) {
210                 tdb_len_t partial;
211
212                 if (off+len <= el->offset) {
213                         continue;
214                 }
215                 if (off >= el->offset + el->length) {
216                         continue;
217                 }
218
219                 /* an overlapping write - needs to be split into up to
220                    2 writes and a memcpy */
221                 if (off < el->offset) {
222                         partial = el->offset - off;
223                         if (transaction_write(tdb, off, buf, partial) != 0) {
224                                 goto fail;
225                         }
226                         len -= partial;
227                         off += partial;
228                         buf = (const void *)(partial + (const char *)buf);
229                 }
230                 if (off + len <= el->offset + el->length) {
231                         partial = len;
232                 } else {
233                         partial = el->offset + el->length - off;
234                 }
235                 memcpy(el->data + (off - el->offset), buf, partial);
236                 len -= partial;
237                 off += partial;
238                 buf = (const void *)(partial + (const char *)buf);
239                 
240                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
241                         goto fail;
242                 }
243
244                 return 0;
245         }
246
247         /* add a new entry at the end of the list */
248         el = malloc(sizeof(*el));
249         if (el == NULL) {
250                 tdb->ecode = TDB_ERR_OOM;
251                 tdb->transaction->transaction_error = 1;                
252                 return -1;
253         }
254         el->next = NULL;
255         el->prev = tdb->transaction->elements_last;
256         el->offset = off;
257         el->length = len;
258         el->data = malloc(len);
259         if (el->data == NULL) {
260                 free(el);
261                 tdb->ecode = TDB_ERR_OOM;
262                 tdb->transaction->transaction_error = 1;                
263                 return -1;
264         }
265         if (buf) {
266                 memcpy(el->data, buf, len);
267         } else {
268                 memset(el->data, TDB_PAD_BYTE, len);
269         }
270         if (el->prev) {
271                 el->prev->next = el;
272         } else {
273                 tdb->transaction->elements = el;
274         }
275         tdb->transaction->elements_last = el;
276         return 0;
277
278 fail:
279         TDB_LOG((tdb, 0, "transaction_write: failed at off=%d len=%d\n", off, len));
280         tdb->ecode = TDB_ERR_IO;
281         tdb->transaction->transaction_error = 1;
282         return -1;
283 }
284
285 /*
286   accelerated hash chain head search, using the cached hash heads
287 */
288 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
289 {
290         u32 h = *chain;
291         for (;h < tdb->header.hash_size;h++) {
292                 /* the +1 takes account of the freelist */
293                 if (0 != tdb->transaction->hash_heads[h+1]) {
294                         break;
295                 }
296         }
297         (*chain) = h;
298 }
299
300 /*
301   out of bounds check during a transaction
302 */
303 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
304 {
305         if (len <= tdb->map_size) {
306                 return 0;
307         }
308         return TDB_ERRCODE(TDB_ERR_IO, -1);
309 }
310
311 /*
312   transaction version of tdb_expand().
313 */
314 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
315                                    tdb_off_t addition)
316 {
317         /* add a write to the transaction elements, so subsequent
318            reads see the zero data */
319         if (transaction_write(tdb, size, NULL, addition) != 0) {
320                 return -1;
321         }
322
323         return 0;
324 }
325
326 /*
327   brlock during a transaction - ignore them
328 */
329 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
330                        int rw_type, int lck_type, int probe)
331 {
332         return 0;
333 }
334
335 static const struct tdb_methods transaction_methods = {
336         .tdb_read        = transaction_read,
337         .tdb_write       = transaction_write,
338         .next_hash_chain = transaction_next_hash_chain,
339         .tdb_oob         = transaction_oob,
340         .tdb_expand_file = transaction_expand_file,
341         .tdb_brlock      = transaction_brlock
342 };
343
344
345 /*
346   start a tdb transaction. No token is returned, as only a single
347   transaction is allowed to be pending per tdb_context
348 */
349 int tdb_transaction_start(struct tdb_context *tdb)
350 {
351         /* some sanity checks */
352         if (tdb->read_only || (tdb->flags & TDB_INTERNAL)) {
353                 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
354                 tdb->ecode = TDB_ERR_EINVAL;
355                 return -1;
356         }
357
358         /* cope with nested tdb_transaction_start() calls */
359         if (tdb->transaction != NULL) {
360                 tdb->transaction->nesting++;
361                 TDB_LOG((tdb, 0, "tdb_transaction_start: nesting %d\n", 
362                          tdb->transaction->nesting));
363                 return 0;
364         }
365
366         if (tdb->num_locks != 0) {
367                 /* the caller must not have any locks when starting a
368                    transaction as otherwise we'll be screwed by lack
369                    of nested locks in posix */
370                 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction with locks held\n"));
371                 tdb->ecode = TDB_ERR_LOCK;
372                 return -1;
373         }
374
375         tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
376         if (tdb->transaction == NULL) {
377                 tdb->ecode = TDB_ERR_OOM;
378                 return -1;
379         }
380
381         /* get the transaction write lock. This is a blocking lock. As
382            discussed with Volker, there are a number of ways we could
383            make this async, which we will probably do in the future */
384         if (tdb_brlock_len(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
385                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get transaction lock\n"));
386                 tdb->ecode = TDB_ERR_LOCK;
387                 SAFE_FREE(tdb->transaction);
388                 return -1;
389         }
390         
391         /* get a write lock from the freelist to the end of file. It
392            would be much better to make this a read lock as it would
393            increase parallelism, but it could lead to deadlocks on
394            commit when a write lock needs to be taken. 
395
396            TODO: look at alternative locking strategies to allow this
397            to be a read lock 
398         */
399         if (tdb_brlock_len(tdb, FREELIST_TOP, F_WRLCK, F_SETLKW, 0, 0) == -1) {
400                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get hash locks\n"));
401                 tdb->ecode = TDB_ERR_LOCK;
402                 goto fail;
403         }
404
405         /* setup a copy of the hash table heads so the hash scan in
406            traverse can be fast */
407         tdb->transaction->hash_heads = calloc(tdb->header.hash_size+1, sizeof(tdb_off_t));
408         if (tdb->transaction->hash_heads == NULL) {
409                 tdb->ecode = TDB_ERR_OOM;
410                 goto fail;
411         }
412         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
413                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
414                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to read hash heads\n"));
415                 tdb->ecode = TDB_ERR_IO;
416                 goto fail;
417         }
418
419         /* make sure we know about any file expansions already done by
420            anyone else */
421         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
422         tdb->transaction->old_map_size = tdb->map_size;
423
424         /* finally hook the io methods, replacing them with
425            transaction specific methods */
426         tdb->transaction->io_methods = tdb->methods;
427         tdb->methods = &transaction_methods;
428
429         return 0;
430         
431 fail:
432         tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
433         tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
434         SAFE_FREE(tdb->transaction->hash_heads);
435         SAFE_FREE(tdb->transaction);
436         return -1;
437 }
438
439
440 /*
441   cancel the current transaction
442 */
443 int tdb_transaction_cancel(struct tdb_context *tdb)
444 {       
445         if (tdb->transaction == NULL) {
446                 TDB_LOG((tdb, 0, "tdb_transaction_cancel: no transaction\n"));
447                 return -1;
448         }
449
450         if (tdb->transaction->nesting != 0) {
451                 tdb->transaction->transaction_error = 1;
452                 tdb->transaction->nesting--;
453                 return 0;
454         }               
455
456         tdb->map_size = tdb->transaction->old_map_size;
457
458         /* free all the transaction elements */
459         while (tdb->transaction->elements) {
460                 struct tdb_transaction_el *el = tdb->transaction->elements;
461                 tdb->transaction->elements = el->next;
462                 free(el->data);
463                 free(el);
464         }
465
466         /* remove any locks created during the transaction */
467         if (tdb->num_locks != 0) {
468                 int h;
469                 for (h=0;h<tdb->header.hash_size+1;h++) {
470                         if (tdb->locked[h].count != 0) {
471                                 tdb_brlock_len(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
472                                 tdb->locked[h].count = 0;
473                         }
474                 }
475                 tdb->num_locks = 0;
476         }
477
478         /* restore the normal io methods */
479         tdb->methods = tdb->transaction->io_methods;
480
481         tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
482         tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
483         SAFE_FREE(tdb->transaction->hash_heads);
484         SAFE_FREE(tdb->transaction);
485         
486         return 0;
487 }
488
489 /*
490   sync to disk
491 */
492 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
493 {       
494         if (fsync(tdb->fd) != 0) {
495                 tdb->ecode = TDB_ERR_IO;
496                 TDB_LOG((tdb, 0, "tdb_transaction: fsync failed\n"));
497                 return -1;
498         }
499 #ifdef MS_SYNC
500         if (tdb->map_ptr) {
501                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
502                 if (msync(moffset + (char *)tdb->map_ptr, 
503                           length + (offset - moffset), MS_SYNC) != 0) {
504                         tdb->ecode = TDB_ERR_IO;
505                         TDB_LOG((tdb, 0, "tdb_transaction: msync failed\n"));
506                         return -1;
507                 }
508         }
509 #endif
510         return 0;
511 }
512
513
514 /*
515   work out how much space the linearised recovery data will consume
516 */
517 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
518 {
519         struct tdb_transaction_el *el;
520         tdb_len_t recovery_size = 0;
521
522         recovery_size = sizeof(u32);
523         for (el=tdb->transaction->elements;el;el=el->next) {
524                 if (el->offset >= tdb->transaction->old_map_size) {
525                         continue;
526                 }
527                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
528         }
529
530         return recovery_size;
531 }
532
533 /*
534   allocate the recovery area, or use an existing recovery area if it is
535   large enough
536 */
537 static int tdb_recovery_allocate(struct tdb_context *tdb, 
538                                  tdb_len_t *recovery_size,
539                                  tdb_off_t *recovery_offset,
540                                  tdb_len_t *recovery_max_size)
541 {
542         struct list_struct rec;
543         const struct tdb_methods *methods = tdb->transaction->io_methods;
544         tdb_off_t recovery_head;
545
546         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
547                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery head\n"));
548                 return -1;
549         }
550
551         rec.rec_len = 0;
552
553         if (recovery_head != 0 && 
554             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
555                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery record\n"));
556                 return -1;
557         }
558
559         *recovery_size = tdb_recovery_size(tdb);
560
561         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
562                 /* it fits in the existing area */
563                 *recovery_max_size = rec.rec_len;
564                 *recovery_offset = recovery_head;
565                 return 0;
566         }
567
568         /* we need to free up the old recovery area, then allocate a
569            new one at the end of the file. Note that we cannot use
570            tdb_allocate() to allocate the new one as that might return
571            us an area that is being currently used (as of the start of
572            the transaction) */
573         if (recovery_head != 0) {
574                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
575                         TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to free previous recovery area\n"));
576                         return -1;
577                 }
578         }
579
580         /* the tdb_free() call might have increased the recovery size */
581         *recovery_size = tdb_recovery_size(tdb);
582
583         /* round up to a multiple of page size */
584         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
585         *recovery_offset = tdb->map_size;
586         recovery_head = *recovery_offset;
587
588         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
589                                      (tdb->map_size - tdb->transaction->old_map_size) +
590                                      sizeof(rec) + *recovery_max_size) == -1) {
591                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to create recovery area\n"));
592                 return -1;
593         }
594
595         /* remap the file (if using mmap) */
596         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
597
598         /* we have to reset the old map size so that we don't try to expand the file
599            again in the transaction commit, which would destroy the recovery area */
600         tdb->transaction->old_map_size = tdb->map_size;
601
602         /* write the recovery header offset and sync - we can sync without a race here
603            as the magic ptr in the recovery record has not been set */
604         CONVERT(recovery_head);
605         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
606                                &recovery_head, sizeof(tdb_off_t)) == -1) {
607                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to write recovery head\n"));
608                 return -1;
609         }
610
611         return 0;
612 }
613
614
615 /*
616   setup the recovery data that will be used on a crash during commit
617 */
618 static int transaction_setup_recovery(struct tdb_context *tdb, 
619                                       tdb_off_t *magic_offset)
620 {
621         struct tdb_transaction_el *el;
622         tdb_len_t recovery_size;
623         unsigned char *data, *p;
624         const struct tdb_methods *methods = tdb->transaction->io_methods;
625         struct list_struct *rec;
626         tdb_off_t recovery_offset, recovery_max_size;
627         tdb_off_t old_map_size = tdb->transaction->old_map_size;
628         u32 magic;
629
630         /*
631           check that the recovery area has enough space
632         */
633         if (tdb_recovery_allocate(tdb, &recovery_size, 
634                                   &recovery_offset, &recovery_max_size) == -1) {
635                 return -1;
636         }
637
638         data = malloc(recovery_size + sizeof(*rec));
639         if (data == NULL) {
640                 tdb->ecode = TDB_ERR_OOM;
641                 return -1;
642         }
643
644         rec = (struct list_struct *)data;
645         memset(rec, 0, sizeof(*rec));
646
647         rec->magic    = 0;
648         rec->data_len = recovery_size;
649         rec->rec_len  = recovery_max_size;
650         rec->key_len  = old_map_size;
651         CONVERT(rec);
652
653         /* build the recovery data into a single blob to allow us to do a single
654            large write, which should be more efficient */
655         p = data + sizeof(*rec);
656         for (el=tdb->transaction->elements;el;el=el->next) {
657                 if (el->offset >= old_map_size) {
658                         continue;
659                 }
660                 if (el->offset + el->length > tdb->transaction->old_map_size) {
661                         TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction data over new region boundary\n"));
662                         free(data);
663                         tdb->ecode = TDB_ERR_CORRUPT;
664                         return -1;
665                 }
666                 ((u32 *)p)[0] = el->offset;
667                 ((u32 *)p)[1] = el->length;
668                 if (DOCONV()) {
669                         tdb_convert(p, 8);
670                 }
671                 /* the recovery area contains the old data, not the
672                    new data, so we have to call the original tdb_read
673                    method to get it */
674                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
675                         free(data);
676                         tdb->ecode = TDB_ERR_IO;
677                         return -1;
678                 }
679                 p += 8 + el->length;
680         }
681
682         /* and the tailer */
683         *(u32 *)p = sizeof(*rec) + recovery_max_size;
684         CONVERT(p);
685
686         /* write the recovery data to the recovery area */
687         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
688                 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery data\n"));
689                 free(data);
690                 tdb->ecode = TDB_ERR_IO;
691                 return -1;
692         }
693
694         /* as we don't have ordered writes, we have to sync the recovery
695            data before we update the magic to indicate that the recovery
696            data is present */
697         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
698                 free(data);
699                 return -1;
700         }
701
702         free(data);
703
704         magic = TDB_RECOVERY_MAGIC;
705         CONVERT(magic);
706
707         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
708
709         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
710                 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery magic\n"));
711                 tdb->ecode = TDB_ERR_IO;
712                 return -1;
713         }
714
715         /* ensure the recovery magic marker is on disk */
716         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
717                 return -1;
718         }
719
720         return 0;
721 }
722
723 /*
724   commit the current transaction
725 */
726 int tdb_transaction_commit(struct tdb_context *tdb)
727 {       
728         const struct tdb_methods *methods;
729         tdb_off_t magic_offset;
730         u32 zero = 0;
731
732         if (tdb->transaction == NULL) {
733                 TDB_LOG((tdb, 0, "tdb_transaction_commit: no transaction\n"));
734                 return -1;
735         }
736
737         if (tdb->transaction->transaction_error) {
738                 tdb->ecode = TDB_ERR_IO;
739                 tdb_transaction_cancel(tdb);
740                 TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction error pending\n"));
741                 return -1;
742         }
743
744         if (tdb->transaction->nesting != 0) {
745                 tdb->transaction->nesting--;
746                 return 0;
747         }               
748
749         /* check for a null transaction */
750         if (tdb->transaction->elements == NULL) {
751                 tdb_transaction_cancel(tdb);
752                 return 0;
753         }
754
755         methods = tdb->transaction->io_methods;
756         
757         /* if there are any locks pending then the caller has not
758            nested their locks properly, so fail the transaction */
759         if (tdb->num_locks) {
760                 tdb->ecode = TDB_ERR_LOCK;
761                 TDB_LOG((tdb, 0, "tdb_transaction_commit: locks pending on commit\n"));
762                 tdb_transaction_cancel(tdb);
763                 return -1;
764         }
765
766         /* get the global lock - this prevents new users attaching to the database
767            during the commit */
768         if (tdb_brlock_len(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
769                 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to get global lock\n"));
770                 tdb->ecode = TDB_ERR_LOCK;
771                 tdb_transaction_cancel(tdb);
772                 return -1;
773         }
774
775         if (!(tdb->flags & TDB_NOSYNC)) {
776                 /* write the recovery data to the end of the file */
777                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
778                         TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to setup recovery data\n"));
779                         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
780                         tdb_transaction_cancel(tdb);
781                         return -1;
782                 }
783         }
784
785         /* expand the file to the new size if needed */
786         if (tdb->map_size != tdb->transaction->old_map_size) {
787                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
788                                              tdb->map_size - 
789                                              tdb->transaction->old_map_size) == -1) {
790                         tdb->ecode = TDB_ERR_IO;
791                         TDB_LOG((tdb, 0, "tdb_transaction_commit: expansion failed\n"));
792                         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
793                         tdb_transaction_cancel(tdb);
794                         return -1;
795                 }
796                 tdb->map_size = tdb->transaction->old_map_size;
797                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
798         }
799
800         /* perform all the writes */
801         while (tdb->transaction->elements) {
802                 struct tdb_transaction_el *el = tdb->transaction->elements;
803
804                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
805                         TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed during commit\n"));
806                         
807                         /* we've overwritten part of the data and
808                            possibly expanded the file, so we need to
809                            run the crash recovery code */
810                         tdb->methods = methods;
811                         tdb_transaction_recover(tdb); 
812
813                         tdb_transaction_cancel(tdb);
814                         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
815
816                         TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed\n"));
817                         return -1;
818                 }
819                 tdb->transaction->elements = el->next;
820                 free(el->data); 
821                 free(el);
822         } 
823
824         if (!(tdb->flags & TDB_NOSYNC)) {
825                 /* ensure the new data is on disk */
826                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
827                         return -1;
828                 }
829
830                 /* remove the recovery marker */
831                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
832                         TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to remove recovery magic\n"));
833                         return -1;
834                 }
835
836                 /* ensure the recovery marker has been removed on disk */
837                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
838                         return -1;
839                 }
840         }
841
842         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
843
844         /* use a transaction cancel to free memory and remove the
845            transaction locks */
846         tdb_transaction_cancel(tdb);
847         return 0;
848 }
849
850
851 /*
852   recover from an aborted transaction. Must be called with exclusive
853   database write access already established (including the global
854   lock to prevent new processes attaching)
855 */
856 int tdb_transaction_recover(struct tdb_context *tdb)
857 {
858         tdb_off_t recovery_head, recovery_eof;
859         unsigned char *data, *p;
860         u32 zero = 0;
861         struct list_struct rec;
862
863         /* find the recovery area */
864         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
865                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery head\n"));
866                 tdb->ecode = TDB_ERR_IO;
867                 return -1;
868         }
869
870         if (recovery_head == 0) {
871                 /* we have never allocated a recovery record */
872                 return 0;
873         }
874
875         /* read the recovery record */
876         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
877                                    sizeof(rec), DOCONV()) == -1) {
878                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery record\n"));         
879                 tdb->ecode = TDB_ERR_IO;
880                 return -1;
881         }
882
883         if (rec.magic != TDB_RECOVERY_MAGIC) {
884                 /* there is no valid recovery data */
885                 return 0;
886         }
887
888         if (tdb->read_only) {
889                 TDB_LOG((tdb, 0, "tdb_transaction_recover: attempt to recover read only database\n"));
890                 tdb->ecode = TDB_ERR_CORRUPT;
891                 return -1;
892         }
893
894         recovery_eof = rec.key_len;
895
896         data = malloc(rec.data_len);
897         if (data == NULL) {
898                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to allocate recovery data\n"));               
899                 tdb->ecode = TDB_ERR_OOM;
900                 return -1;
901         }
902
903         /* read the full recovery data */
904         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
905                                    rec.data_len, 0) == -1) {
906                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery data\n"));           
907                 tdb->ecode = TDB_ERR_IO;
908                 return -1;
909         }
910
911         /* recover the file data */
912         p = data;
913         while (p+8 < data + rec.data_len) {
914                 u32 ofs, len;
915                 if (DOCONV()) {
916                         tdb_convert(p, 8);
917                 }
918                 ofs = ((u32 *)p)[0];
919                 len = ((u32 *)p)[1];
920
921                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
922                         free(data);
923                         TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
924                         tdb->ecode = TDB_ERR_IO;
925                         return -1;
926                 }
927                 p += 8 + len;
928         }
929
930         free(data);
931
932         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
933                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync recovery\n"));
934                 tdb->ecode = TDB_ERR_IO;
935                 return -1;
936         }
937
938         /* if the recovery area is after the recovered eof then remove it */
939         if (recovery_eof <= recovery_head) {
940                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
941                         TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery head\n"));
942                         tdb->ecode = TDB_ERR_IO;
943                         return -1;                      
944                 }
945         }
946
947         /* remove the recovery magic */
948         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
949                           &zero) == -1) {
950                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery magic\n"));
951                 tdb->ecode = TDB_ERR_IO;
952                 return -1;                      
953         }
954         
955         /* reduce the file size to the old size */
956         tdb_munmap(tdb);
957         if (ftruncate(tdb->fd, recovery_eof) != 0) {
958                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to reduce to recovery size\n"));
959                 tdb->ecode = TDB_ERR_IO;
960                 return -1;                      
961         }
962         tdb->map_size = recovery_eof;
963         tdb_mmap(tdb);
964
965         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
966                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync2 recovery\n"));
967                 tdb->ecode = TDB_ERR_IO;
968                 return -1;
969         }
970
971         TDB_LOG((tdb, 0, "tdb_transaction_recover: recovered %d byte database\n", 
972                  recovery_eof));
973
974         /* all done */
975         return 0;
976 }