merged changes from v3-2-test
[ira/wip.git] / source / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
63                 return;
64         }
65
66         tdb_increment_seqnum_nonblock(tdb);
67
68         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
69 }
70
71 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
72 {
73         return memcmp(data.dptr, key.dptr, data.dsize);
74 }
75
76 /* Returns 0 on fail.  On success, return offset of record, and fills
77    in rec */
78 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
79                         struct list_struct *r)
80 {
81         tdb_off_t rec_ptr;
82         
83         /* read in the hash top */
84         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
85                 return 0;
86
87         /* keep looking until we find the right record */
88         while (rec_ptr) {
89                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
90                         return 0;
91
92                 if (!TDB_DEAD(r) && hash==r->full_hash
93                     && key.dsize==r->key_len
94                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
95                                       r->key_len, tdb_key_compare,
96                                       NULL) == 0) {
97                         return rec_ptr;
98                 }
99                 rec_ptr = r->next;
100         }
101         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
102 }
103
104 /* As tdb_find, but if you succeed, keep the lock */
105 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
106                            struct list_struct *rec)
107 {
108         uint32_t rec_ptr;
109
110         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
111                 return 0;
112         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
113                 tdb_unlock(tdb, BUCKET(hash), locktype);
114         return rec_ptr;
115 }
116
117
118 /* update an entry in place - this only works if the new data size
119    is <= the old data size and the key exists.
120    on failure return -1.
121 */
122 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
123 {
124         struct list_struct rec;
125         tdb_off_t rec_ptr;
126
127         /* find entry */
128         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
129                 return -1;
130
131         /* must be long enough key, data and tailer */
132         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
133                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
134                 return -1;
135         }
136
137         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
138                       dbuf.dptr, dbuf.dsize) == -1)
139                 return -1;
140
141         if (dbuf.dsize != rec.data_len) {
142                 /* update size */
143                 rec.data_len = dbuf.dsize;
144                 return tdb_rec_write(tdb, rec_ptr, &rec);
145         }
146  
147         return 0;
148 }
149
150 /* find an entry in the database given a key */
151 /* If an entry doesn't exist tdb_err will be set to
152  * TDB_ERR_NOEXIST. If a key has no data attached
153  * then the TDB_DATA will have zero length but
154  * a non-zero pointer
155  */
156 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
157 {
158         tdb_off_t rec_ptr;
159         struct list_struct rec;
160         TDB_DATA ret;
161         uint32_t hash;
162
163         /* find which hash bucket it is in */
164         hash = tdb->hash_fn(&key);
165         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
166                 return tdb_null;
167
168         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
169                                   rec.data_len);
170         ret.dsize = rec.data_len;
171         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
172         return ret;
173 }
174
175 /*
176  * Find an entry in the database and hand the record's data to a parsing
177  * function. The parsing function is executed under the chain read lock, so it
178  * should be fast and should not block on other syscalls.
179  *
180  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
181  *
182  * For mmapped tdb's that do not have a transaction open it points the parsing
183  * function directly at the mmap area, it avoids the malloc/memcpy in this
184  * case. If a transaction is open or no mmap is available, it has to do
185  * malloc/read/parse/free.
186  *
187  * This is interesting for all readers of potentially large data structures in
188  * the tdb records, ldb indexes being one example.
189  */
190
191 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
192                      int (*parser)(TDB_DATA key, TDB_DATA data,
193                                    void *private_data),
194                      void *private_data)
195 {
196         tdb_off_t rec_ptr;
197         struct list_struct rec;
198         int ret;
199         uint32_t hash;
200
201         /* find which hash bucket it is in */
202         hash = tdb->hash_fn(&key);
203
204         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
205                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
206         }
207
208         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
209                              rec.data_len, parser, private_data);
210
211         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
212
213         return ret;
214 }
215
216 /* check if an entry in the database exists 
217
218    note that 1 is returned if the key is found and 0 is returned if not found
219    this doesn't match the conventions in the rest of this module, but is
220    compatible with gdbm
221 */
222 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
223 {
224         struct list_struct rec;
225         
226         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
227                 return 0;
228         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
229         return 1;
230 }
231
232 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
233 {
234         uint32_t hash = tdb->hash_fn(&key);
235         return tdb_exists_hash(tdb, key, hash);
236 }
237
238 /* actually delete an entry in the database given the offset */
239 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct *rec)
240 {
241         tdb_off_t last_ptr, i;
242         struct list_struct lastrec;
243
244         if (tdb->read_only || tdb->traverse_read) return -1;
245
246         if (tdb->traverse_write != 0 || 
247             tdb_write_lock_record(tdb, rec_ptr) == -1) {
248                 /* Someone traversing here: mark it as dead */
249                 rec->magic = TDB_DEAD_MAGIC;
250                 return tdb_rec_write(tdb, rec_ptr, rec);
251         }
252         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
253                 return -1;
254
255         /* find previous record in hash chain */
256         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
257                 return -1;
258         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
259                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
260                         return -1;
261
262         /* unlink it: next ptr is at start of record. */
263         if (last_ptr == 0)
264                 last_ptr = TDB_HASH_TOP(rec->full_hash);
265         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
266                 return -1;
267
268         /* recover the space */
269         if (tdb_free(tdb, rec_ptr, rec) == -1)
270                 return -1;
271         return 0;
272 }
273
274 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
275 {
276         int res = 0;
277         tdb_off_t rec_ptr;
278         struct list_struct rec;
279         
280         /* read in the hash top */
281         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
282                 return 0;
283
284         while (rec_ptr) {
285                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
286                         return 0;
287
288                 if (rec.magic == TDB_DEAD_MAGIC) {
289                         res += 1;
290                 }
291                 rec_ptr = rec.next;
292         }
293         return res;
294 }
295
296 /*
297  * Purge all DEAD records from a hash chain
298  */
299 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
300 {
301         int res = -1;
302         struct list_struct rec;
303         tdb_off_t rec_ptr;
304
305         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
306                 return -1;
307         }
308         
309         /* read in the hash top */
310         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
311                 goto fail;
312
313         while (rec_ptr) {
314                 tdb_off_t next;
315
316                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
317                         goto fail;
318                 }
319
320                 next = rec.next;
321
322                 if (rec.magic == TDB_DEAD_MAGIC
323                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
324                         goto fail;
325                 }
326                 rec_ptr = next;
327         }
328         res = 0;
329  fail:
330         tdb_unlock(tdb, -1, F_WRLCK);
331         return res;
332 }
333
334 /* delete an entry in the database given a key */
335 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
336 {
337         tdb_off_t rec_ptr;
338         struct list_struct rec;
339         int ret;
340
341         if (tdb->max_dead_records != 0) {
342
343                 /*
344                  * Allow for some dead records per hash chain, mainly for
345                  * tdb's with a very high create/delete rate like locking.tdb.
346                  */
347
348                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
349                         return -1;
350
351                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
352                         /*
353                          * Don't let the per-chain freelist grow too large,
354                          * delete all existing dead records
355                          */
356                         tdb_purge_dead(tdb, hash);
357                 }
358
359                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
360                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
361                         return -1;
362                 }
363
364                 /*
365                  * Just mark the record as dead.
366                  */
367                 rec.magic = TDB_DEAD_MAGIC;
368                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
369         }
370         else {
371                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
372                                                    &rec)))
373                         return -1;
374
375                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
376         }
377
378         if (ret == 0) {
379                 tdb_increment_seqnum(tdb);
380         }
381
382         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
383                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
384         return ret;
385 }
386
387 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
388 {
389         uint32_t hash = tdb->hash_fn(&key);
390         return tdb_delete_hash(tdb, key, hash);
391 }
392
393 /*
394  * See if we have a dead record around with enough space
395  */
396 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
397                                struct list_struct *r, tdb_len_t length)
398 {
399         tdb_off_t rec_ptr;
400         
401         /* read in the hash top */
402         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
403                 return 0;
404
405         /* keep looking until we find the right record */
406         while (rec_ptr) {
407                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
408                         return 0;
409
410                 if (TDB_DEAD(r) && r->rec_len >= length) {
411                         /*
412                          * First fit for simple coding, TODO: change to best
413                          * fit
414                          */
415                         return rec_ptr;
416                 }
417                 rec_ptr = r->next;
418         }
419         return 0;
420 }
421
422 /* store an element in the database, replacing any existing element
423    with the same key 
424
425    return 0 on success, -1 on failure
426 */
427 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
428 {
429         struct list_struct rec;
430         uint32_t hash;
431         tdb_off_t rec_ptr;
432         char *p = NULL;
433         int ret = -1;
434
435         if (tdb->read_only || tdb->traverse_read) {
436                 tdb->ecode = TDB_ERR_RDONLY;
437                 return -1;
438         }
439
440         /* find which hash bucket it is in */
441         hash = tdb->hash_fn(&key);
442         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
443                 return -1;
444
445         /* check for it existing, on insert. */
446         if (flag == TDB_INSERT) {
447                 if (tdb_exists_hash(tdb, key, hash)) {
448                         tdb->ecode = TDB_ERR_EXISTS;
449                         goto fail;
450                 }
451         } else {
452                 /* first try in-place update, on modify or replace. */
453                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
454                         goto done;
455                 }
456                 if (tdb->ecode == TDB_ERR_NOEXIST &&
457                     flag == TDB_MODIFY) {
458                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
459                          we should fail the store */
460                         goto fail;
461                 }
462         }
463         /* reset the error code potentially set by the tdb_update() */
464         tdb->ecode = TDB_SUCCESS;
465
466         /* delete any existing record - if it doesn't exist we don't
467            care.  Doing this first reduces fragmentation, and avoids
468            coalescing with `allocated' block before it's updated. */
469         if (flag != TDB_INSERT)
470                 tdb_delete_hash(tdb, key, hash);
471
472         /* Copy key+value *before* allocating free space in case malloc
473            fails and we are left with a dead spot in the tdb. */
474
475         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
476                 tdb->ecode = TDB_ERR_OOM;
477                 goto fail;
478         }
479
480         memcpy(p, key.dptr, key.dsize);
481         if (dbuf.dsize)
482                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
483
484         if (tdb->max_dead_records != 0) {
485                 /*
486                  * Allow for some dead records per hash chain, look if we can
487                  * find one that can hold the new record. We need enough space
488                  * for key, data and tailer. If we find one, we don't have to
489                  * consult the central freelist.
490                  */
491                 rec_ptr = tdb_find_dead(
492                         tdb, hash, &rec,
493                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
494
495                 if (rec_ptr != 0) {
496                         rec.key_len = key.dsize;
497                         rec.data_len = dbuf.dsize;
498                         rec.full_hash = hash;
499                         rec.magic = TDB_MAGIC;
500                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
501                             || tdb->methods->tdb_write(
502                                     tdb, rec_ptr + sizeof(rec),
503                                     p, key.dsize + dbuf.dsize) == -1) {
504                                 goto fail;
505                         }
506                         goto done;
507                 }
508         }
509
510         /*
511          * We have to allocate some space from the freelist, so this means we
512          * have to lock it. Use the chance to purge all the DEAD records from
513          * the hash chain under the freelist lock.
514          */
515
516         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
517                 goto fail;
518         }
519
520         if ((tdb->max_dead_records != 0)
521             && (tdb_purge_dead(tdb, hash) == -1)) {
522                 tdb_unlock(tdb, -1, F_WRLCK);
523                 goto fail;
524         }
525
526         /* we have to allocate some space */
527         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
528
529         tdb_unlock(tdb, -1, F_WRLCK);
530
531         if (rec_ptr == 0) {
532                 goto fail;
533         }
534
535         /* Read hash top into next ptr */
536         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
537                 goto fail;
538
539         rec.key_len = key.dsize;
540         rec.data_len = dbuf.dsize;
541         rec.full_hash = hash;
542         rec.magic = TDB_MAGIC;
543
544         /* write out and point the top of the hash chain at it */
545         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
546             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
547             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
548                 /* Need to tdb_unallocate() here */
549                 goto fail;
550         }
551
552  done:
553         ret = 0;
554  fail:
555         if (ret == 0) {
556                 tdb_increment_seqnum(tdb);
557         }
558
559         SAFE_FREE(p); 
560         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
561         return ret;
562 }
563
564
565 /* Append to an entry. Create if not exist. */
566 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
567 {
568         uint32_t hash;
569         TDB_DATA dbuf;
570         int ret = -1;
571
572         /* find which hash bucket it is in */
573         hash = tdb->hash_fn(&key);
574         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
575                 return -1;
576
577         dbuf = tdb_fetch(tdb, key);
578
579         if (dbuf.dptr == NULL) {
580                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
581         } else {
582                 unsigned char *new_dptr = (unsigned char *)realloc(dbuf.dptr,
583                                                      dbuf.dsize + new_dbuf.dsize);
584                 if (new_dptr == NULL) {
585                         free(dbuf.dptr);
586                 }
587                 dbuf.dptr = new_dptr;
588         }
589
590         if (dbuf.dptr == NULL) {
591                 tdb->ecode = TDB_ERR_OOM;
592                 goto failed;
593         }
594
595         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
596         dbuf.dsize += new_dbuf.dsize;
597
598         ret = tdb_store(tdb, key, dbuf, 0);
599         
600 failed:
601         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
602         SAFE_FREE(dbuf.dptr);
603         return ret;
604 }
605
606
607 /*
608   return the name of the current tdb file
609   useful for external logging functions
610 */
611 const char *tdb_name(struct tdb_context *tdb)
612 {
613         return tdb->name;
614 }
615
616 /*
617   return the underlying file descriptor being used by tdb, or -1
618   useful for external routines that want to check the device/inode
619   of the fd
620 */
621 int tdb_fd(struct tdb_context *tdb)
622 {
623         return tdb->fd;
624 }
625
626 /*
627   return the current logging function
628   useful for external tdb routines that wish to log tdb errors
629 */
630 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
631 {
632         return tdb->log.log_fn;
633 }
634
635
636 /*
637   get the tdb sequence number. Only makes sense if the writers opened
638   with TDB_SEQNUM set. Note that this sequence number will wrap quite
639   quickly, so it should only be used for a 'has something changed'
640   test, not for code that relies on the count of the number of changes
641   made. If you want a counter then use a tdb record.
642
643   The aim of this sequence number is to allow for a very lightweight
644   test of a possible tdb change.
645 */
646 int tdb_get_seqnum(struct tdb_context *tdb)
647 {
648         tdb_off_t seqnum=0;
649
650         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
651         return seqnum;
652 }
653
654 int tdb_hash_size(struct tdb_context *tdb)
655 {
656         return tdb->header.hash_size;
657 }
658
659 size_t tdb_map_size(struct tdb_context *tdb)
660 {
661         return tdb->map_size;
662 }
663
664 int tdb_get_flags(struct tdb_context *tdb)
665 {
666         return tdb->flags;
667 }
668
669 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
670 {
671         tdb->flags |= flags;
672 }
673
674 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
675 {
676         tdb->flags &= ~flags;
677 }
678
679
680 /*
681   enable sequence number handling on an open tdb
682 */
683 void tdb_enable_seqnum(struct tdb_context *tdb)
684 {
685         tdb->flags |= TDB_SEQNUM;
686 }
687
688
689 /*
690   wipe the entire database, deleting all records. This can be done
691   very fast by using a global lock. The entire data portion of the
692   file becomes a single entry in the freelist.
693  */
694 int tdb_wipe_all(struct tdb_context *tdb)
695 {
696         int i;
697         tdb_off_t offset = 0;
698         ssize_t data_len;
699
700         if (tdb_lockall(tdb) != 0) {
701                 return -1;
702         }
703
704         /* wipe the hashes */
705         for (i=0;i<tdb->header.hash_size;i++) {
706                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
707                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
708                         goto failed;
709                 }
710         }
711
712         /* wipe the freelist */
713         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
714                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
715                 goto failed;
716         }
717
718         if (tdb_ofs_write(tdb, TDB_RECOVERY_HEAD, &offset) == -1) {
719                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write recovery head\n"));
720                 goto failed;            
721         }
722
723         /* add all the rest of the file to the freelist */
724         data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size)) - sizeof(struct list_struct);
725         if (data_len > 0) {
726                 struct list_struct rec;
727                 memset(&rec,'\0',sizeof(rec));
728                 rec.rec_len = data_len;
729                 if (tdb_free(tdb, TDB_DATA_START(tdb->header.hash_size), &rec) == -1) {
730                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to add free record\n"));
731                         goto failed;
732                 }
733         }
734
735         if (tdb_unlockall(tdb) != 0) {
736                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
737                 goto failed;
738         }
739
740         return 0;
741
742 failed:
743         tdb_unlockall(tdb);
744         return -1;
745 }