r16945: Sync trunk -> 3.0 for 3.0.24 code. Still need
[ira/wip.git] / source3 / tdb / transaction.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              2005
7
8      ** NOTE! The following LGPL license applies to the tdb
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11    
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 2 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, write to the Free Software
24    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
25 */
26
27 #include "tdb_private.h"
28
29 /*
30   transaction design:
31
32   - only allow a single transaction at a time per database. This makes
33     using the transaction API simpler, as otherwise the caller would
34     have to cope with temporary failures in transactions that conflict
35     with other current transactions
36
37   - keep the transaction recovery information in the same file as the
38     database, using a special 'transaction recovery' record pointed at
39     by the header. This removes the need for extra journal files as
40     used by some other databases
41
42   - dymacially allocated the transaction recover record, re-using it
43     for subsequent transactions. If a larger record is needed then
44     tdb_free() the old record to place it on the normal tdb freelist
45     before allocating the new record
46
47   - during transactions, keep a linked list of writes all that have
48     been performed by intercepting all tdb_write() calls. The hooked
49     transaction versions of tdb_read() and tdb_write() check this
50     linked list and try to use the elements of the list in preference
51     to the real database.
52
53   - don't allow any locks to be held when a transaction starts,
54     otherwise we can end up with deadlock (plus lack of lock nesting
55     in posix locks would mean the lock is lost)
56
57   - if the caller gains a lock during the transaction but doesn't
58     release it then fail the commit
59
60   - allow for nested calls to tdb_transaction_start(), re-using the
61     existing transaction record. If the inner transaction is cancelled
62     then a subsequent commit will fail
63  
64   - keep a mirrored copy of the tdb hash chain heads to allow for the
65     fast hash heads scan on traverse, updating the mirrored copy in
66     the transaction version of tdb_write
67
68   - allow callers to mix transaction and non-transaction use of tdb,
69     although once a transaction is started then an exclusive lock is
70     gained until the transaction is committed or cancelled
71
72   - the commit stategy involves first saving away all modified data
73     into a linearised buffer in the transaction recovery area, then
74     marking the transaction recovery area with a magic value to
75     indicate a valid recovery record. In total 4 fsync/msync calls are
76     needed per commit to prevent race conditions. It might be possible
77     to reduce this to 3 or even 2 with some more work.
78
79   - check for a valid recovery record on open of the tdb, while the
80     global lock is held. Automatically recover from the transaction
81     recovery area if needed, then continue with the open as
82     usual. This allows for smooth crash recovery with no administrator
83     intervention.
84
85   - if TDB_NOSYNC is passed to flags in tdb_open then transactions are
86     still available, but no transaction recovery area is used and no
87     fsync/msync calls are made.
88
89 */
90
91
92 /*
93   hold the context of any current transaction
94 */
95 struct tdb_transaction {
96         /* we keep a mirrored copy of the tdb hash heads here so
97            tdb_next_hash_chain() can operate efficiently */
98         u32 *hash_heads;
99
100         /* the original io methods - used to do IOs to the real db */
101         const struct tdb_methods *io_methods;
102
103         /* the list of transaction elements. We use a doubly linked
104            list with a last pointer to allow us to keep the list
105            ordered, with first element at the front of the list. It
106            needs to be doubly linked as the read/write traversals need
107            to be backwards, while the commit needs to be forwards */
108         struct tdb_transaction_el {
109                 struct tdb_transaction_el *next, *prev;
110                 tdb_off_t offset;
111                 tdb_len_t length;
112                 unsigned char *data;
113         } *elements, *elements_last;
114
115         /* non-zero when an internal transaction error has
116            occurred. All write operations will then fail until the
117            transaction is ended */
118         int transaction_error;
119
120         /* when inside a transaction we need to keep track of any
121            nested tdb_transaction_start() calls, as these are allowed,
122            but don't create a new transaction */
123         int nesting;
124
125         /* old file size before transaction */
126         tdb_len_t old_map_size;
127 };
128
129
130 /*
131   read while in a transaction. We need to check first if the data is in our list
132   of transaction elements, then if not do a real read
133 */
134 static int transaction_read(struct tdb_context *tdb, tdb_off_t off, void *buf, 
135                             tdb_len_t len, int cv)
136 {
137         struct tdb_transaction_el *el;
138
139         /* we need to walk the list backwards to get the most recent data */
140         for (el=tdb->transaction->elements_last;el;el=el->prev) {
141                 tdb_len_t partial;
142
143                 if (off+len <= el->offset) {
144                         continue;
145                 }
146                 if (off >= el->offset + el->length) {
147                         continue;
148                 }
149
150                 /* an overlapping read - needs to be split into up to
151                    2 reads and a memcpy */
152                 if (off < el->offset) {
153                         partial = el->offset - off;
154                         if (transaction_read(tdb, off, buf, partial, cv) != 0) {
155                                 goto fail;
156                         }
157                         len -= partial;
158                         off += partial;
159                         buf = (void *)(partial + (char *)buf);
160                 }
161                 if (off + len <= el->offset + el->length) {
162                         partial = len;
163                 } else {
164                         partial = el->offset + el->length - off;
165                 }
166                 memcpy(buf, el->data + (off - el->offset), partial);
167                 if (cv) {
168                         tdb_convert(buf, len);
169                 }
170                 len -= partial;
171                 off += partial;
172                 buf = (void *)(partial + (char *)buf);
173                 
174                 if (len != 0 && transaction_read(tdb, off, buf, len, cv) != 0) {
175                         goto fail;
176                 }
177
178                 return 0;
179         }
180
181         /* its not in the transaction elements - do a real read */
182         return tdb->transaction->io_methods->tdb_read(tdb, off, buf, len, cv);
183
184 fail:
185         TDB_LOG((tdb, 0, "transaction_read: failed at off=%d len=%d\n", off, len));
186         tdb->ecode = TDB_ERR_IO;
187         tdb->transaction->transaction_error = 1;
188         return -1;
189 }
190
191
192 /*
193   write while in a transaction
194 */
195 static int transaction_write(struct tdb_context *tdb, tdb_off_t off, 
196                              const void *buf, tdb_len_t len)
197 {
198         struct tdb_transaction_el *el, *best_el=NULL;
199
200         if (len == 0) {
201                 return 0;
202         }
203         
204         /* if the write is to a hash head, then update the transaction
205            hash heads */
206         if (len == sizeof(tdb_off_t) && off >= FREELIST_TOP &&
207             off < FREELIST_TOP+TDB_HASHTABLE_SIZE(tdb)) {
208                 u32 chain = (off-FREELIST_TOP) / sizeof(tdb_off_t);
209                 memcpy(&tdb->transaction->hash_heads[chain], buf, len);
210         }
211
212         /* first see if we can replace an existing entry */
213         for (el=tdb->transaction->elements_last;el;el=el->prev) {
214                 tdb_len_t partial;
215
216                 if (best_el == NULL && off == el->offset+el->length) {
217                         best_el = el;
218                 }
219
220                 if (off+len <= el->offset) {
221                         continue;
222                 }
223                 if (off >= el->offset + el->length) {
224                         continue;
225                 }
226
227                 /* an overlapping write - needs to be split into up to
228                    2 writes and a memcpy */
229                 if (off < el->offset) {
230                         partial = el->offset - off;
231                         if (transaction_write(tdb, off, buf, partial) != 0) {
232                                 goto fail;
233                         }
234                         len -= partial;
235                         off += partial;
236                         buf = (const void *)(partial + (const char *)buf);
237                 }
238                 if (off + len <= el->offset + el->length) {
239                         partial = len;
240                 } else {
241                         partial = el->offset + el->length - off;
242                 }
243                 memcpy(el->data + (off - el->offset), buf, partial);
244                 len -= partial;
245                 off += partial;
246                 buf = (const void *)(partial + (const char *)buf);
247                 
248                 if (len != 0 && transaction_write(tdb, off, buf, len) != 0) {
249                         goto fail;
250                 }
251
252                 return 0;
253         }
254
255         /* see if we can append the new entry to an existing entry */
256         if (best_el && best_el->offset + best_el->length == off && 
257             (off+len < tdb->transaction->old_map_size ||
258              off > tdb->transaction->old_map_size)) {
259                 unsigned char *data = best_el->data;
260                 el = best_el;
261                 el->data = realloc(el->data, el->length + len);
262                 if (el->data == NULL) {
263                         tdb->ecode = TDB_ERR_OOM;
264                         tdb->transaction->transaction_error = 1;
265                         el->data = data;
266                         return -1;
267                 }
268                 if (buf) {
269                         memcpy(el->data + el->length, buf, len);
270                 } else {
271                         memset(el->data + el->length, TDB_PAD_BYTE, len);
272                 }
273                 el->length += len;
274                 return 0;
275         }
276
277         /* add a new entry at the end of the list */
278         el = malloc(sizeof(*el));
279         if (el == NULL) {
280                 tdb->ecode = TDB_ERR_OOM;
281                 tdb->transaction->transaction_error = 1;                
282                 return -1;
283         }
284         el->next = NULL;
285         el->prev = tdb->transaction->elements_last;
286         el->offset = off;
287         el->length = len;
288         el->data = malloc(len);
289         if (el->data == NULL) {
290                 free(el);
291                 tdb->ecode = TDB_ERR_OOM;
292                 tdb->transaction->transaction_error = 1;                
293                 return -1;
294         }
295         if (buf) {
296                 memcpy(el->data, buf, len);
297         } else {
298                 memset(el->data, TDB_PAD_BYTE, len);
299         }
300         if (el->prev) {
301                 el->prev->next = el;
302         } else {
303                 tdb->transaction->elements = el;
304         }
305         tdb->transaction->elements_last = el;
306         return 0;
307
308 fail:
309         TDB_LOG((tdb, 0, "transaction_write: failed at off=%d len=%d\n", off, len));
310         tdb->ecode = TDB_ERR_IO;
311         tdb->transaction->transaction_error = 1;
312         return -1;
313 }
314
315 /*
316   accelerated hash chain head search, using the cached hash heads
317 */
318 static void transaction_next_hash_chain(struct tdb_context *tdb, u32 *chain)
319 {
320         u32 h = *chain;
321         for (;h < tdb->header.hash_size;h++) {
322                 /* the +1 takes account of the freelist */
323                 if (0 != tdb->transaction->hash_heads[h+1]) {
324                         break;
325                 }
326         }
327         (*chain) = h;
328 }
329
330 /*
331   out of bounds check during a transaction
332 */
333 static int transaction_oob(struct tdb_context *tdb, tdb_off_t len, int probe)
334 {
335         if (len <= tdb->map_size) {
336                 return 0;
337         }
338         return TDB_ERRCODE(TDB_ERR_IO, -1);
339 }
340
341 /*
342   transaction version of tdb_expand().
343 */
344 static int transaction_expand_file(struct tdb_context *tdb, tdb_off_t size, 
345                                    tdb_off_t addition)
346 {
347         /* add a write to the transaction elements, so subsequent
348            reads see the zero data */
349         if (transaction_write(tdb, size, NULL, addition) != 0) {
350                 return -1;
351         }
352
353         return 0;
354 }
355
356 /*
357   brlock during a transaction - ignore them
358 */
359 int transaction_brlock(struct tdb_context *tdb, tdb_off_t offset, 
360                        int rw_type, int lck_type, int probe)
361 {
362         return 0;
363 }
364
365 static const struct tdb_methods transaction_methods = {
366         transaction_read,
367         transaction_write,
368         transaction_next_hash_chain,
369         transaction_oob,
370         transaction_expand_file,
371         transaction_brlock
372 };
373
374
375 /*
376   start a tdb transaction. No token is returned, as only a single
377   transaction is allowed to be pending per tdb_context
378 */
379 int tdb_transaction_start(struct tdb_context *tdb)
380 {
381         /* some sanity checks */
382         if (tdb->read_only || (tdb->flags & TDB_INTERNAL) || tdb->traverse_read) {
383                 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction on a read-only or internal db\n"));
384                 tdb->ecode = TDB_ERR_EINVAL;
385                 return -1;
386         }
387
388         /* cope with nested tdb_transaction_start() calls */
389         if (tdb->transaction != NULL) {
390                 tdb->transaction->nesting++;
391                 TDB_LOG((tdb, 0, "tdb_transaction_start: nesting %d\n", 
392                          tdb->transaction->nesting));
393                 return 0;
394         }
395
396         if (tdb->num_locks != 0) {
397                 /* the caller must not have any locks when starting a
398                    transaction as otherwise we'll be screwed by lack
399                    of nested locks in posix */
400                 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction with locks held\n"));
401                 tdb->ecode = TDB_ERR_LOCK;
402                 return -1;
403         }
404
405         if (tdb->travlocks.next != NULL) {
406                 /* you cannot use transactions inside a traverse (although you can use
407                    traverse inside a transaction) as otherwise you can end up with
408                    deadlock */
409                 TDB_LOG((tdb, 0, "tdb_transaction_start: cannot start a transaction within a traverse\n"));
410                 tdb->ecode = TDB_ERR_LOCK;
411                 return -1;
412         }
413
414         tdb->transaction = calloc(sizeof(struct tdb_transaction), 1);
415         if (tdb->transaction == NULL) {
416                 tdb->ecode = TDB_ERR_OOM;
417                 return -1;
418         }
419
420         /* get the transaction write lock. This is a blocking lock. As
421            discussed with Volker, there are a number of ways we could
422            make this async, which we will probably do in the future */
423         if (tdb_brlock_len(tdb, TRANSACTION_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
424                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get transaction lock\n"));
425                 tdb->ecode = TDB_ERR_LOCK;
426                 SAFE_FREE(tdb->transaction);
427                 return -1;
428         }
429         
430         /* get a read lock from the freelist to the end of file. This
431            is upgraded to a write lock during the commit */
432         if (tdb_brlock_len(tdb, FREELIST_TOP, F_RDLCK, F_SETLKW, 0, 0) == -1) {
433                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to get hash locks\n"));
434                 tdb->ecode = TDB_ERR_LOCK;
435                 goto fail;
436         }
437
438         /* setup a copy of the hash table heads so the hash scan in
439            traverse can be fast */
440         tdb->transaction->hash_heads = calloc(tdb->header.hash_size+1, sizeof(tdb_off_t));
441         if (tdb->transaction->hash_heads == NULL) {
442                 tdb->ecode = TDB_ERR_OOM;
443                 goto fail;
444         }
445         if (tdb->methods->tdb_read(tdb, FREELIST_TOP, tdb->transaction->hash_heads,
446                                    TDB_HASHTABLE_SIZE(tdb), 0) != 0) {
447                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to read hash heads\n"));
448                 tdb->ecode = TDB_ERR_IO;
449                 goto fail;
450         }
451
452         /* make sure we know about any file expansions already done by
453            anyone else */
454         tdb->methods->tdb_oob(tdb, tdb->map_size + 1, 1);
455         tdb->transaction->old_map_size = tdb->map_size;
456
457         /* finally hook the io methods, replacing them with
458            transaction specific methods */
459         tdb->transaction->io_methods = tdb->methods;
460         tdb->methods = &transaction_methods;
461
462         /* by calling this transaction write here, we ensure that we don't grow the
463            transaction linked list due to hash table updates */
464         if (transaction_write(tdb, FREELIST_TOP, tdb->transaction->hash_heads, 
465                               TDB_HASHTABLE_SIZE(tdb)) != 0) {
466                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to prime hash table\n"));
467                 tdb->ecode = TDB_ERR_IO;
468                 goto fail;
469         }
470
471         return 0;
472         
473 fail:
474         tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
475         tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
476         SAFE_FREE(tdb->transaction->hash_heads);
477         SAFE_FREE(tdb->transaction);
478         return -1;
479 }
480
481
482 /*
483   cancel the current transaction
484 */
485 int tdb_transaction_cancel(struct tdb_context *tdb)
486 {       
487         if (tdb->transaction == NULL) {
488                 TDB_LOG((tdb, 0, "tdb_transaction_cancel: no transaction\n"));
489                 return -1;
490         }
491
492         if (tdb->transaction->nesting != 0) {
493                 tdb->transaction->transaction_error = 1;
494                 tdb->transaction->nesting--;
495                 return 0;
496         }               
497
498         tdb->map_size = tdb->transaction->old_map_size;
499
500         /* free all the transaction elements */
501         while (tdb->transaction->elements) {
502                 struct tdb_transaction_el *el = tdb->transaction->elements;
503                 tdb->transaction->elements = el->next;
504                 free(el->data);
505                 free(el);
506         }
507
508         /* remove any locks created during the transaction */
509         if (tdb->num_locks != 0) {
510                 int h;
511                 for (h=0;h<tdb->header.hash_size+1;h++) {
512                         if (tdb->locked[h].count != 0) {
513                                 tdb_brlock_len(tdb,FREELIST_TOP+4*h,F_UNLCK,F_SETLKW, 0, 1);
514                                 tdb->locked[h].count = 0;
515                         }
516                 }
517                 tdb->num_locks = 0;
518         }
519
520         /* restore the normal io methods */
521         tdb->methods = tdb->transaction->io_methods;
522
523         tdb_brlock_len(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 0);
524         tdb_brlock_len(tdb, TRANSACTION_LOCK, F_UNLCK, F_SETLKW, 0, 1);
525         SAFE_FREE(tdb->transaction->hash_heads);
526         SAFE_FREE(tdb->transaction);
527         
528         return 0;
529 }
530
531 /*
532   sync to disk
533 */
534 static int transaction_sync(struct tdb_context *tdb, tdb_off_t offset, tdb_len_t length)
535 {       
536         if (fsync(tdb->fd) != 0) {
537                 tdb->ecode = TDB_ERR_IO;
538                 TDB_LOG((tdb, 0, "tdb_transaction: fsync failed\n"));
539                 return -1;
540         }
541 #ifdef MS_SYNC
542         if (tdb->map_ptr) {
543                 tdb_off_t moffset = offset & ~(tdb->page_size-1);
544                 if (msync(moffset + (char *)tdb->map_ptr, 
545                           length + (offset - moffset), MS_SYNC) != 0) {
546                         tdb->ecode = TDB_ERR_IO;
547                         TDB_LOG((tdb, 0, "tdb_transaction: msync failed - %s\n",
548                                  strerror(errno)));
549                         return -1;
550                 }
551         }
552 #endif
553         return 0;
554 }
555
556
557 /*
558   work out how much space the linearised recovery data will consume
559 */
560 static tdb_len_t tdb_recovery_size(struct tdb_context *tdb)
561 {
562         struct tdb_transaction_el *el;
563         tdb_len_t recovery_size = 0;
564
565         recovery_size = sizeof(u32);
566         for (el=tdb->transaction->elements;el;el=el->next) {
567                 if (el->offset >= tdb->transaction->old_map_size) {
568                         continue;
569                 }
570                 recovery_size += 2*sizeof(tdb_off_t) + el->length;
571         }
572
573         return recovery_size;
574 }
575
576 /*
577   allocate the recovery area, or use an existing recovery area if it is
578   large enough
579 */
580 static int tdb_recovery_allocate(struct tdb_context *tdb, 
581                                  tdb_len_t *recovery_size,
582                                  tdb_off_t *recovery_offset,
583                                  tdb_len_t *recovery_max_size)
584 {
585         struct list_struct rec;
586         const struct tdb_methods *methods = tdb->transaction->io_methods;
587         tdb_off_t recovery_head;
588
589         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
590                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery head\n"));
591                 return -1;
592         }
593
594         rec.rec_len = 0;
595
596         if (recovery_head != 0 && 
597             methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
598                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to read recovery record\n"));
599                 return -1;
600         }
601
602         *recovery_size = tdb_recovery_size(tdb);
603
604         if (recovery_head != 0 && *recovery_size <= rec.rec_len) {
605                 /* it fits in the existing area */
606                 *recovery_max_size = rec.rec_len;
607                 *recovery_offset = recovery_head;
608                 return 0;
609         }
610
611         /* we need to free up the old recovery area, then allocate a
612            new one at the end of the file. Note that we cannot use
613            tdb_allocate() to allocate the new one as that might return
614            us an area that is being currently used (as of the start of
615            the transaction) */
616         if (recovery_head != 0) {
617                 if (tdb_free(tdb, recovery_head, &rec) == -1) {
618                         TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to free previous recovery area\n"));
619                         return -1;
620                 }
621         }
622
623         /* the tdb_free() call might have increased the recovery size */
624         *recovery_size = tdb_recovery_size(tdb);
625
626         /* round up to a multiple of page size */
627         *recovery_max_size = TDB_ALIGN(sizeof(rec) + *recovery_size, tdb->page_size) - sizeof(rec);
628         *recovery_offset = tdb->map_size;
629         recovery_head = *recovery_offset;
630
631         if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
632                                      (tdb->map_size - tdb->transaction->old_map_size) +
633                                      sizeof(rec) + *recovery_max_size) == -1) {
634                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to create recovery area\n"));
635                 return -1;
636         }
637
638         /* remap the file (if using mmap) */
639         methods->tdb_oob(tdb, tdb->map_size + 1, 1);
640
641         /* we have to reset the old map size so that we don't try to expand the file
642            again in the transaction commit, which would destroy the recovery area */
643         tdb->transaction->old_map_size = tdb->map_size;
644
645         /* write the recovery header offset and sync - we can sync without a race here
646            as the magic ptr in the recovery record has not been set */
647         CONVERT(recovery_head);
648         if (methods->tdb_write(tdb, TDB_RECOVERY_HEAD, 
649                                &recovery_head, sizeof(tdb_off_t)) == -1) {
650                 TDB_LOG((tdb, 0, "tdb_recovery_allocate: failed to write recovery head\n"));
651                 return -1;
652         }
653
654         return 0;
655 }
656
657
658 /*
659   setup the recovery data that will be used on a crash during commit
660 */
661 static int transaction_setup_recovery(struct tdb_context *tdb, 
662                                       tdb_off_t *magic_offset)
663 {
664         struct tdb_transaction_el *el;
665         tdb_len_t recovery_size;
666         unsigned char *data, *p;
667         const struct tdb_methods *methods = tdb->transaction->io_methods;
668         struct list_struct *rec;
669         tdb_off_t recovery_offset, recovery_max_size;
670         tdb_off_t old_map_size = tdb->transaction->old_map_size;
671         u32 magic, tailer;
672
673         /*
674           check that the recovery area has enough space
675         */
676         if (tdb_recovery_allocate(tdb, &recovery_size, 
677                                   &recovery_offset, &recovery_max_size) == -1) {
678                 return -1;
679         }
680
681         data = malloc(recovery_size + sizeof(*rec));
682         if (data == NULL) {
683                 tdb->ecode = TDB_ERR_OOM;
684                 return -1;
685         }
686
687         rec = (struct list_struct *)data;
688         memset(rec, 0, sizeof(*rec));
689
690         rec->magic    = 0;
691         rec->data_len = recovery_size;
692         rec->rec_len  = recovery_max_size;
693         rec->key_len  = old_map_size;
694         CONVERT(rec);
695
696         /* build the recovery data into a single blob to allow us to do a single
697            large write, which should be more efficient */
698         p = data + sizeof(*rec);
699         for (el=tdb->transaction->elements;el;el=el->next) {
700                 if (el->offset >= old_map_size) {
701                         continue;
702                 }
703                 if (el->offset + el->length > tdb->transaction->old_map_size) {
704                         TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction data over new region boundary\n"));
705                         free(data);
706                         tdb->ecode = TDB_ERR_CORRUPT;
707                         return -1;
708                 }
709                 memcpy(p, &el->offset, 4);
710                 memcpy(p+4, &el->length, 4);
711                 if (DOCONV()) {
712                         tdb_convert(p, 8);
713                 }
714                 /* the recovery area contains the old data, not the
715                    new data, so we have to call the original tdb_read
716                    method to get it */
717                 if (methods->tdb_read(tdb, el->offset, p + 8, el->length, 0) != 0) {
718                         free(data);
719                         tdb->ecode = TDB_ERR_IO;
720                         return -1;
721                 }
722                 p += 8 + el->length;
723         }
724
725         /* and the tailer */
726         tailer = sizeof(*rec) + recovery_max_size;
727         memcpy(p, &tailer, 4);
728         CONVERT(p);
729
730         /* write the recovery data to the recovery area */
731         if (methods->tdb_write(tdb, recovery_offset, data, sizeof(*rec) + recovery_size) == -1) {
732                 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery data\n"));
733                 free(data);
734                 tdb->ecode = TDB_ERR_IO;
735                 return -1;
736         }
737
738         /* as we don't have ordered writes, we have to sync the recovery
739            data before we update the magic to indicate that the recovery
740            data is present */
741         if (transaction_sync(tdb, recovery_offset, sizeof(*rec) + recovery_size) == -1) {
742                 free(data);
743                 return -1;
744         }
745
746         free(data);
747
748         magic = TDB_RECOVERY_MAGIC;
749         CONVERT(magic);
750
751         *magic_offset = recovery_offset + offsetof(struct list_struct, magic);
752
753         if (methods->tdb_write(tdb, *magic_offset, &magic, sizeof(magic)) == -1) {
754                 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to write recovery magic\n"));
755                 tdb->ecode = TDB_ERR_IO;
756                 return -1;
757         }
758
759         /* ensure the recovery magic marker is on disk */
760         if (transaction_sync(tdb, *magic_offset, sizeof(magic)) == -1) {
761                 return -1;
762         }
763
764         return 0;
765 }
766
767 /*
768   commit the current transaction
769 */
770 int tdb_transaction_commit(struct tdb_context *tdb)
771 {       
772         const struct tdb_methods *methods;
773         tdb_off_t magic_offset = 0;
774         u32 zero = 0;
775
776         if (tdb->transaction == NULL) {
777                 TDB_LOG((tdb, 0, "tdb_transaction_commit: no transaction\n"));
778                 return -1;
779         }
780
781         if (tdb->transaction->transaction_error) {
782                 tdb->ecode = TDB_ERR_IO;
783                 tdb_transaction_cancel(tdb);
784                 TDB_LOG((tdb, 0, "tdb_transaction_commit: transaction error pending\n"));
785                 return -1;
786         }
787
788         if (tdb->transaction->nesting != 0) {
789                 tdb->transaction->nesting--;
790                 return 0;
791         }               
792
793         /* check for a null transaction */
794         if (tdb->transaction->elements == NULL) {
795                 tdb_transaction_cancel(tdb);
796                 return 0;
797         }
798
799         methods = tdb->transaction->io_methods;
800         
801         /* if there are any locks pending then the caller has not
802            nested their locks properly, so fail the transaction */
803         if (tdb->num_locks) {
804                 tdb->ecode = TDB_ERR_LOCK;
805                 TDB_LOG((tdb, 0, "tdb_transaction_commit: locks pending on commit\n"));
806                 tdb_transaction_cancel(tdb);
807                 return -1;
808         }
809
810         /* upgrade the main transaction lock region to a write lock */
811         if (tdb_brlock_upgrade(tdb, FREELIST_TOP, 0) == -1) {
812                 TDB_LOG((tdb, 0, "tdb_transaction_start: failed to upgrade hash locks\n"));
813                 tdb->ecode = TDB_ERR_LOCK;
814                 tdb_transaction_cancel(tdb);
815                 return -1;
816         }
817
818         /* get the global lock - this prevents new users attaching to the database
819            during the commit */
820         if (tdb_brlock_len(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0, 1) == -1) {
821                 TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to get global lock\n"));
822                 tdb->ecode = TDB_ERR_LOCK;
823                 tdb_transaction_cancel(tdb);
824                 return -1;
825         }
826
827         if (!(tdb->flags & TDB_NOSYNC)) {
828                 /* write the recovery data to the end of the file */
829                 if (transaction_setup_recovery(tdb, &magic_offset) == -1) {
830                         TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to setup recovery data\n"));
831                         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
832                         tdb_transaction_cancel(tdb);
833                         return -1;
834                 }
835         }
836
837         /* expand the file to the new size if needed */
838         if (tdb->map_size != tdb->transaction->old_map_size) {
839                 if (methods->tdb_expand_file(tdb, tdb->transaction->old_map_size, 
840                                              tdb->map_size - 
841                                              tdb->transaction->old_map_size) == -1) {
842                         tdb->ecode = TDB_ERR_IO;
843                         TDB_LOG((tdb, 0, "tdb_transaction_commit: expansion failed\n"));
844                         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
845                         tdb_transaction_cancel(tdb);
846                         return -1;
847                 }
848                 tdb->map_size = tdb->transaction->old_map_size;
849                 methods->tdb_oob(tdb, tdb->map_size + 1, 1);
850         }
851
852         /* perform all the writes */
853         while (tdb->transaction->elements) {
854                 struct tdb_transaction_el *el = tdb->transaction->elements;
855
856                 if (methods->tdb_write(tdb, el->offset, el->data, el->length) == -1) {
857                         TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed during commit\n"));
858                         
859                         /* we've overwritten part of the data and
860                            possibly expanded the file, so we need to
861                            run the crash recovery code */
862                         tdb->methods = methods;
863                         tdb_transaction_recover(tdb); 
864
865                         tdb_transaction_cancel(tdb);
866                         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
867
868                         TDB_LOG((tdb, 0, "tdb_transaction_commit: write failed\n"));
869                         return -1;
870                 }
871                 tdb->transaction->elements = el->next;
872                 free(el->data); 
873                 free(el);
874         } 
875
876         if (!(tdb->flags & TDB_NOSYNC)) {
877                 /* ensure the new data is on disk */
878                 if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
879                         return -1;
880                 }
881
882                 /* remove the recovery marker */
883                 if (methods->tdb_write(tdb, magic_offset, &zero, 4) == -1) {
884                         TDB_LOG((tdb, 0, "tdb_transaction_commit: failed to remove recovery magic\n"));
885                         return -1;
886                 }
887
888                 /* ensure the recovery marker has been removed on disk */
889                 if (transaction_sync(tdb, magic_offset, 4) == -1) {
890                         return -1;
891                 }
892         }
893
894         tdb_brlock_len(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
895
896         /*
897           TODO: maybe write to some dummy hdr field, or write to magic
898           offset without mmap, before the last sync, instead of the
899           utime() call
900         */
901
902         /* on some systems (like Linux 2.6.x) changes via mmap/msync
903            don't change the mtime of the file, this means the file may
904            not be backed up (as tdb rounding to block sizes means that
905            file size changes are quite rare too). The following forces
906            mtime changes when a transaction completes */
907 #ifdef HAVE_UTIME
908         utime(tdb->name, NULL);
909 #endif
910
911         /* use a transaction cancel to free memory and remove the
912            transaction locks */
913         tdb_transaction_cancel(tdb);
914         return 0;
915 }
916
917
918 /*
919   recover from an aborted transaction. Must be called with exclusive
920   database write access already established (including the global
921   lock to prevent new processes attaching)
922 */
923 int tdb_transaction_recover(struct tdb_context *tdb)
924 {
925         tdb_off_t recovery_head, recovery_eof;
926         unsigned char *data, *p;
927         u32 zero = 0;
928         struct list_struct rec;
929
930         /* find the recovery area */
931         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
932                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery head\n"));
933                 tdb->ecode = TDB_ERR_IO;
934                 return -1;
935         }
936
937         if (recovery_head == 0) {
938                 /* we have never allocated a recovery record */
939                 return 0;
940         }
941
942         /* read the recovery record */
943         if (tdb->methods->tdb_read(tdb, recovery_head, &rec, 
944                                    sizeof(rec), DOCONV()) == -1) {
945                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery record\n"));         
946                 tdb->ecode = TDB_ERR_IO;
947                 return -1;
948         }
949
950         if (rec.magic != TDB_RECOVERY_MAGIC) {
951                 /* there is no valid recovery data */
952                 return 0;
953         }
954
955         if (tdb->read_only) {
956                 TDB_LOG((tdb, 0, "tdb_transaction_recover: attempt to recover read only database\n"));
957                 tdb->ecode = TDB_ERR_CORRUPT;
958                 return -1;
959         }
960
961         recovery_eof = rec.key_len;
962
963         data = malloc(rec.data_len);
964         if (data == NULL) {
965                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to allocate recovery data\n"));               
966                 tdb->ecode = TDB_ERR_OOM;
967                 return -1;
968         }
969
970         /* read the full recovery data */
971         if (tdb->methods->tdb_read(tdb, recovery_head + sizeof(rec), data,
972                                    rec.data_len, 0) == -1) {
973                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to read recovery data\n"));           
974                 tdb->ecode = TDB_ERR_IO;
975                 return -1;
976         }
977
978         /* recover the file data */
979         p = data;
980         while (p+8 < data + rec.data_len) {
981                 u32 ofs, len;
982                 if (DOCONV()) {
983                         tdb_convert(p, 8);
984                 }
985                 memcpy(&ofs, p, 4);
986                 memcpy(&len, p+4, 4);
987
988                 if (tdb->methods->tdb_write(tdb, ofs, p+8, len) == -1) {
989                         free(data);
990                         TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to recover %d bytes at offset %d\n", len, ofs));
991                         tdb->ecode = TDB_ERR_IO;
992                         return -1;
993                 }
994                 p += 8 + len;
995         }
996
997         free(data);
998
999         if (transaction_sync(tdb, 0, tdb->map_size) == -1) {
1000                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync recovery\n"));
1001                 tdb->ecode = TDB_ERR_IO;
1002                 return -1;
1003         }
1004
1005         /* if the recovery area is after the recovered eof then remove it */
1006         if (recovery_eof <= recovery_head) {
1007                 if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &zero) == -1) {
1008                         TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery head\n"));
1009                         tdb->ecode = TDB_ERR_IO;
1010                         return -1;                      
1011                 }
1012         }
1013
1014         /* remove the recovery magic */
1015         if (tdb_ofs_write(tdb, recovery_head + offsetof(struct list_struct, magic), 
1016                           &zero) == -1) {
1017                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to remove recovery magic\n"));
1018                 tdb->ecode = TDB_ERR_IO;
1019                 return -1;                      
1020         }
1021         
1022         /* reduce the file size to the old size */
1023         tdb_munmap(tdb);
1024         if (ftruncate(tdb->fd, recovery_eof) != 0) {
1025                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to reduce to recovery size\n"));
1026                 tdb->ecode = TDB_ERR_IO;
1027                 return -1;                      
1028         }
1029         tdb->map_size = recovery_eof;
1030         tdb_mmap(tdb);
1031
1032         if (transaction_sync(tdb, 0, recovery_eof) == -1) {
1033                 TDB_LOG((tdb, 0, "tdb_transaction_recover: failed to sync2 recovery\n"));
1034                 tdb->ecode = TDB_ERR_IO;
1035                 return -1;
1036         }
1037
1038         TDB_LOG((tdb, 0, "tdb_transaction_recover: recovered %d byte database\n", 
1039                  recovery_eof));
1040
1041         /* all done */
1042         return 0;
1043 }