imported the tdb_repack() code from CTDB
[kai/samba.git] / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
63                 return;
64         }
65
66         tdb_increment_seqnum_nonblock(tdb);
67
68         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
69 }
70
71 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
72 {
73         return memcmp(data.dptr, key.dptr, data.dsize);
74 }
75
76 /* Returns 0 on fail.  On success, return offset of record, and fills
77    in rec */
78 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
79                         struct list_struct *r)
80 {
81         tdb_off_t rec_ptr;
82         
83         /* read in the hash top */
84         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
85                 return 0;
86
87         /* keep looking until we find the right record */
88         while (rec_ptr) {
89                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
90                         return 0;
91
92                 if (!TDB_DEAD(r) && hash==r->full_hash
93                     && key.dsize==r->key_len
94                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
95                                       r->key_len, tdb_key_compare,
96                                       NULL) == 0) {
97                         return rec_ptr;
98                 }
99                 rec_ptr = r->next;
100         }
101         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
102 }
103
104 /* As tdb_find, but if you succeed, keep the lock */
105 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
106                            struct list_struct *rec)
107 {
108         uint32_t rec_ptr;
109
110         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
111                 return 0;
112         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
113                 tdb_unlock(tdb, BUCKET(hash), locktype);
114         return rec_ptr;
115 }
116
117
118 /* update an entry in place - this only works if the new data size
119    is <= the old data size and the key exists.
120    on failure return -1.
121 */
122 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
123 {
124         struct list_struct rec;
125         tdb_off_t rec_ptr;
126
127         /* find entry */
128         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
129                 return -1;
130
131         /* must be long enough key, data and tailer */
132         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
133                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
134                 return -1;
135         }
136
137         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
138                       dbuf.dptr, dbuf.dsize) == -1)
139                 return -1;
140
141         if (dbuf.dsize != rec.data_len) {
142                 /* update size */
143                 rec.data_len = dbuf.dsize;
144                 return tdb_rec_write(tdb, rec_ptr, &rec);
145         }
146  
147         return 0;
148 }
149
150 /* find an entry in the database given a key */
151 /* If an entry doesn't exist tdb_err will be set to
152  * TDB_ERR_NOEXIST. If a key has no data attached
153  * then the TDB_DATA will have zero length but
154  * a non-zero pointer
155  */
156 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
157 {
158         tdb_off_t rec_ptr;
159         struct list_struct rec;
160         TDB_DATA ret;
161         uint32_t hash;
162
163         /* find which hash bucket it is in */
164         hash = tdb->hash_fn(&key);
165         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
166                 return tdb_null;
167
168         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
169                                   rec.data_len);
170         ret.dsize = rec.data_len;
171         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
172         return ret;
173 }
174
175 /*
176  * Find an entry in the database and hand the record's data to a parsing
177  * function. The parsing function is executed under the chain read lock, so it
178  * should be fast and should not block on other syscalls.
179  *
180  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
181  *
182  * For mmapped tdb's that do not have a transaction open it points the parsing
183  * function directly at the mmap area, it avoids the malloc/memcpy in this
184  * case. If a transaction is open or no mmap is available, it has to do
185  * malloc/read/parse/free.
186  *
187  * This is interesting for all readers of potentially large data structures in
188  * the tdb records, ldb indexes being one example.
189  */
190
191 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
192                      int (*parser)(TDB_DATA key, TDB_DATA data,
193                                    void *private_data),
194                      void *private_data)
195 {
196         tdb_off_t rec_ptr;
197         struct list_struct rec;
198         int ret;
199         uint32_t hash;
200
201         /* find which hash bucket it is in */
202         hash = tdb->hash_fn(&key);
203
204         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
205                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
206         }
207
208         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
209                              rec.data_len, parser, private_data);
210
211         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
212
213         return ret;
214 }
215
216 /* check if an entry in the database exists 
217
218    note that 1 is returned if the key is found and 0 is returned if not found
219    this doesn't match the conventions in the rest of this module, but is
220    compatible with gdbm
221 */
222 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
223 {
224         struct list_struct rec;
225         
226         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
227                 return 0;
228         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
229         return 1;
230 }
231
232 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
233 {
234         uint32_t hash = tdb->hash_fn(&key);
235         return tdb_exists_hash(tdb, key, hash);
236 }
237
238 /* actually delete an entry in the database given the offset */
239 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec)
240 {
241         tdb_off_t last_ptr, i;
242         struct list_struct lastrec;
243
244         if (tdb->read_only || tdb->traverse_read) return -1;
245
246         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
247             tdb_write_lock_record(tdb, rec_ptr) == -1) {
248                 /* Someone traversing here: mark it as dead */
249                 rec->magic = TDB_DEAD_MAGIC;
250                 return tdb_rec_write(tdb, rec_ptr, rec);
251         }
252         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
253                 return -1;
254
255         /* find previous record in hash chain */
256         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
257                 return -1;
258         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
259                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
260                         return -1;
261
262         /* unlink it: next ptr is at start of record. */
263         if (last_ptr == 0)
264                 last_ptr = TDB_HASH_TOP(rec->full_hash);
265         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
266                 return -1;
267
268         /* recover the space */
269         if (tdb_free(tdb, rec_ptr, rec) == -1)
270                 return -1;
271         return 0;
272 }
273
274 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
275 {
276         int res = 0;
277         tdb_off_t rec_ptr;
278         struct list_struct rec;
279         
280         /* read in the hash top */
281         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
282                 return 0;
283
284         while (rec_ptr) {
285                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
286                         return 0;
287
288                 if (rec.magic == TDB_DEAD_MAGIC) {
289                         res += 1;
290                 }
291                 rec_ptr = rec.next;
292         }
293         return res;
294 }
295
296 /*
297  * Purge all DEAD records from a hash chain
298  */
299 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
300 {
301         int res = -1;
302         struct list_struct rec;
303         tdb_off_t rec_ptr;
304
305         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
306                 return -1;
307         }
308         
309         /* read in the hash top */
310         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
311                 goto fail;
312
313         while (rec_ptr) {
314                 tdb_off_t next;
315
316                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
317                         goto fail;
318                 }
319
320                 next = rec.next;
321
322                 if (rec.magic == TDB_DEAD_MAGIC
323                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
324                         goto fail;
325                 }
326                 rec_ptr = next;
327         }
328         res = 0;
329  fail:
330         tdb_unlock(tdb, -1, F_WRLCK);
331         return res;
332 }
333
334 /* delete an entry in the database given a key */
335 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
336 {
337         tdb_off_t rec_ptr;
338         struct list_struct rec;
339         int ret;
340
341         if (tdb->max_dead_records != 0) {
342
343                 /*
344                  * Allow for some dead records per hash chain, mainly for
345                  * tdb's with a very high create/delete rate like locking.tdb.
346                  */
347
348                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
349                         return -1;
350
351                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
352                         /*
353                          * Don't let the per-chain freelist grow too large,
354                          * delete all existing dead records
355                          */
356                         tdb_purge_dead(tdb, hash);
357                 }
358
359                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
360                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
361                         return -1;
362                 }
363
364                 /*
365                  * Just mark the record as dead.
366                  */
367                 rec.magic = TDB_DEAD_MAGIC;
368                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
369         }
370         else {
371                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
372                                                    &rec)))
373                         return -1;
374
375                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
376         }
377
378         if (ret == 0) {
379                 tdb_increment_seqnum(tdb);
380         }
381
382         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
383                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
384         return ret;
385 }
386
387 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
388 {
389         uint32_t hash = tdb->hash_fn(&key);
390         return tdb_delete_hash(tdb, key, hash);
391 }
392
393 /*
394  * See if we have a dead record around with enough space
395  */
396 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
397                                struct list_struct *r, tdb_len_t length)
398 {
399         tdb_off_t rec_ptr;
400         
401         /* read in the hash top */
402         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
403                 return 0;
404
405         /* keep looking until we find the right record */
406         while (rec_ptr) {
407                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
408                         return 0;
409
410                 if (TDB_DEAD(r) && r->rec_len >= length) {
411                         /*
412                          * First fit for simple coding, TODO: change to best
413                          * fit
414                          */
415                         return rec_ptr;
416                 }
417                 rec_ptr = r->next;
418         }
419         return 0;
420 }
421
422 /* store an element in the database, replacing any existing element
423    with the same key 
424
425    return 0 on success, -1 on failure
426 */
427 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
428 {
429         struct list_struct rec;
430         uint32_t hash;
431         tdb_off_t rec_ptr;
432         char *p = NULL;
433         int ret = -1;
434
435         if (tdb->read_only || tdb->traverse_read) {
436                 tdb->ecode = TDB_ERR_RDONLY;
437                 return -1;
438         }
439
440         /* find which hash bucket it is in */
441         hash = tdb->hash_fn(&key);
442         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
443                 return -1;
444
445         /* check for it existing, on insert. */
446         if (flag == TDB_INSERT) {
447                 if (tdb_exists_hash(tdb, key, hash)) {
448                         tdb->ecode = TDB_ERR_EXISTS;
449                         goto fail;
450                 }
451         } else {
452                 /* first try in-place update, on modify or replace. */
453                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
454                         goto done;
455                 }
456                 if (tdb->ecode == TDB_ERR_NOEXIST &&
457                     flag == TDB_MODIFY) {
458                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
459                          we should fail the store */
460                         goto fail;
461                 }
462         }
463         /* reset the error code potentially set by the tdb_update() */
464         tdb->ecode = TDB_SUCCESS;
465
466         /* delete any existing record - if it doesn't exist we don't
467            care.  Doing this first reduces fragmentation, and avoids
468            coalescing with `allocated' block before it's updated. */
469         if (flag != TDB_INSERT)
470                 tdb_delete_hash(tdb, key, hash);
471
472         /* Copy key+value *before* allocating free space in case malloc
473            fails and we are left with a dead spot in the tdb. */
474
475         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
476                 tdb->ecode = TDB_ERR_OOM;
477                 goto fail;
478         }
479
480         memcpy(p, key.dptr, key.dsize);
481         if (dbuf.dsize)
482                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
483
484         if (tdb->max_dead_records != 0) {
485                 /*
486                  * Allow for some dead records per hash chain, look if we can
487                  * find one that can hold the new record. We need enough space
488                  * for key, data and tailer. If we find one, we don't have to
489                  * consult the central freelist.
490                  */
491                 rec_ptr = tdb_find_dead(
492                         tdb, hash, &rec,
493                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
494
495                 if (rec_ptr != 0) {
496                         rec.key_len = key.dsize;
497                         rec.data_len = dbuf.dsize;
498                         rec.full_hash = hash;
499                         rec.magic = TDB_MAGIC;
500                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
501                             || tdb->methods->tdb_write(
502                                     tdb, rec_ptr + sizeof(rec),
503                                     p, key.dsize + dbuf.dsize) == -1) {
504                                 goto fail;
505                         }
506                         goto done;
507                 }
508         }
509
510         /*
511          * We have to allocate some space from the freelist, so this means we
512          * have to lock it. Use the chance to purge all the DEAD records from
513          * the hash chain under the freelist lock.
514          */
515
516         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
517                 goto fail;
518         }
519
520         if ((tdb->max_dead_records != 0)
521             && (tdb_purge_dead(tdb, hash) == -1)) {
522                 tdb_unlock(tdb, -1, F_WRLCK);
523                 goto fail;
524         }
525
526         /* we have to allocate some space */
527         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
528
529         tdb_unlock(tdb, -1, F_WRLCK);
530
531         if (rec_ptr == 0) {
532                 goto fail;
533         }
534
535         /* Read hash top into next ptr */
536         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
537                 goto fail;
538
539         rec.key_len = key.dsize;
540         rec.data_len = dbuf.dsize;
541         rec.full_hash = hash;
542         rec.magic = TDB_MAGIC;
543
544         /* write out and point the top of the hash chain at it */
545         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
546             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
547             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
548                 /* Need to tdb_unallocate() here */
549                 goto fail;
550         }
551
552  done:
553         ret = 0;
554  fail:
555         if (ret == 0) {
556                 tdb_increment_seqnum(tdb);
557         }
558
559         SAFE_FREE(p); 
560         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
561         return ret;
562 }
563
564
565 /* Append to an entry. Create if not exist. */
566 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
567 {
568         uint32_t hash;
569         TDB_DATA dbuf;
570         int ret = -1;
571
572         /* find which hash bucket it is in */
573         hash = tdb->hash_fn(&key);
574         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
575                 return -1;
576
577         dbuf = tdb_fetch(tdb, key);
578
579         if (dbuf.dptr == NULL) {
580                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
581         } else {
582                 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
583                                                      dbuf.dsize + new_dbuf.dsize);
584                 if (new_dptr == NULL) {
585                         free(dbuf.dptr);
586                 }
587                 dbuf.dptr = new_dptr;
588         }
589
590         if (dbuf.dptr == NULL) {
591                 tdb->ecode = TDB_ERR_OOM;
592                 goto failed;
593         }
594
595         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
596         dbuf.dsize += new_dbuf.dsize;
597
598         ret = tdb_store(tdb, key, dbuf, 0);
599         
600 failed:
601         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
602         SAFE_FREE(dbuf.dptr);
603         return ret;
604 }
605
606
607 /*
608   return the name of the current tdb file
609   useful for external logging functions
610 */
611 const char *tdb_name(struct tdb_context *tdb)
612 {
613         return tdb->name;
614 }
615
616 /*
617   return the underlying file descriptor being used by tdb, or -1
618   useful for external routines that want to check the device/inode
619   of the fd
620 */
621 int tdb_fd(struct tdb_context *tdb)
622 {
623         return tdb->fd;
624 }
625
626 /*
627   return the current logging function
628   useful for external tdb routines that wish to log tdb errors
629 */
630 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
631 {
632         return tdb->log.log_fn;
633 }
634
635
636 /*
637   get the tdb sequence number. Only makes sense if the writers opened
638   with TDB_SEQNUM set. Note that this sequence number will wrap quite
639   quickly, so it should only be used for a 'has something changed'
640   test, not for code that relies on the count of the number of changes
641   made. If you want a counter then use a tdb record.
642
643   The aim of this sequence number is to allow for a very lightweight
644   test of a possible tdb change.
645 */
646 int tdb_get_seqnum(struct tdb_context *tdb)
647 {
648         tdb_off_t seqnum=0;
649
650         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
651         return seqnum;
652 }
653
654 int tdb_hash_size(struct tdb_context *tdb)
655 {
656         return tdb->header.hash_size;
657 }
658
659 size_t tdb_map_size(struct tdb_context *tdb)
660 {
661         return tdb->map_size;
662 }
663
664 int tdb_get_flags(struct tdb_context *tdb)
665 {
666         return tdb->flags;
667 }
668
669 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
670 {
671         tdb->flags |= flags;
672 }
673
674 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
675 {
676         tdb->flags &= ~flags;
677 }
678
679
680 /*
681   enable sequence number handling on an open tdb
682 */
683 void tdb_enable_seqnum(struct tdb_context *tdb)
684 {
685         tdb->flags |= TDB_SEQNUM;
686 }
687
688
689 /*
690   add a region of the file to the freelist. Length is the size of the region in bytes, 
691   which includes the free list header that needs to be added
692  */
693 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
694 {
695         struct list_struct rec;
696         if (length <= sizeof(rec)) {
697                 /* the region is not worth adding */
698                 return 0;
699         }
700         if (length + offset > tdb->map_size) {
701                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
702                 return -1;              
703         }
704         memset(&rec,'\0',sizeof(rec));
705         rec.rec_len = length - sizeof(rec);
706         if (tdb_free(tdb, offset, &rec) == -1) {
707                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
708                 return -1;
709         }
710         return 0;
711 }
712
713 /*
714   wipe the entire database, deleting all records. This can be done
715   very fast by using a global lock. The entire data portion of the
716   file becomes a single entry in the freelist.
717
718   This code carefully steps around the recovery area, leaving it alone
719  */
720 int tdb_wipe_all(struct tdb_context *tdb)
721 {
722         int i;
723         tdb_off_t offset = 0;
724         ssize_t data_len;
725         tdb_off_t recovery_head;
726         tdb_len_t recovery_size = 0;
727
728         if (tdb_lockall(tdb) != 0) {
729                 return -1;
730         }
731
732         /* see if the tdb has a recovery area, and remember its size
733            if so. We don't want to lose this as otherwise each
734            tdb_wipe_all() in a transaction will increase the size of
735            the tdb by the size of the recovery area */
736         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
737                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
738                 goto failed;
739         }
740
741         if (recovery_head != 0) {
742                 struct list_struct rec;
743                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
744                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
745                         return -1;
746                 }       
747                 recovery_size = rec.rec_len + sizeof(rec);
748         }
749
750         /* wipe the hashes */
751         for (i=0;i<tdb->header.hash_size;i++) {
752                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
753                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
754                         goto failed;
755                 }
756         }
757
758         /* wipe the freelist */
759         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
760                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
761                 goto failed;
762         }
763
764         /* add all the rest of the file to the freelist, possibly leaving a gap 
765            for the recovery area */
766         if (recovery_size == 0) {
767                 /* the simple case - the whole file can be used as a freelist */
768                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
769                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
770                         goto failed;
771                 }
772         } else {
773                 /* we need to add two freelist entries - one on either
774                    side of the recovery area 
775
776                    Note that we cannot shift the recovery area during
777                    this operation. Only the transaction.c code may
778                    move the recovery area or we risk subtle data
779                    corruption
780                 */
781                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
782                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
783                         goto failed;
784                 }
785                 /* and the 2nd free list entry after the recovery area - if any */
786                 data_len = tdb->map_size - (recovery_head+recovery_size);
787                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
788                         goto failed;
789                 }
790         }
791
792         if (tdb_unlockall(tdb) != 0) {
793                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
794                 goto failed;
795         }
796
797         return 0;
798
799 failed:
800         tdb_unlockall(tdb);
801         return -1;
802 }
803
804 struct traverse_state {
805         bool error;
806         struct tdb_context *dest_db;
807 };
808
809 /*
810   traverse function for repacking
811  */
812 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
813 {
814         struct traverse_state *state = (struct traverse_state *)private;
815         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
816                 state->error = true;
817                 return -1;
818         }
819         return 0;
820 }
821
822 /*
823   repack a tdb
824  */
825 int tdb_repack(struct tdb_context *tdb)
826 {
827         struct tdb_context *tmp_db;
828         struct traverse_state state;
829
830         if (tdb_transaction_start(tdb) != 0) {
831                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
832                 return -1;
833         }
834
835         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
836         if (tmp_db == NULL) {
837                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
838                 tdb_transaction_cancel(tdb);
839                 return -1;
840         }
841
842         state.error = false;
843         state.dest_db = tmp_db;
844
845         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
846                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
847                 tdb_transaction_cancel(tdb);
848                 tdb_close(tmp_db);
849                 return -1;              
850         }
851
852         if (state.error) {
853                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
854                 tdb_transaction_cancel(tdb);
855                 tdb_close(tmp_db);
856                 return -1;
857         }
858
859         if (tdb_wipe_all(tdb) != 0) {
860                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
861                 tdb_transaction_cancel(tdb);
862                 tdb_close(tmp_db);
863                 return -1;
864         }
865
866         state.error = false;
867         state.dest_db = tdb;
868
869         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
870                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
871                 tdb_transaction_cancel(tdb);
872                 tdb_close(tmp_db);
873                 return -1;              
874         }
875
876         if (state.error) {
877                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
878                 tdb_transaction_cancel(tdb);
879                 tdb_close(tmp_db);
880                 return -1;
881         }
882
883         tdb_close(tmp_db);
884
885         if (tdb_transaction_commit(tdb) != 0) {
886                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
887                 return -1;
888         }
889
890         return 0;
891 }