s4-ldb: don't allow modifies outside a transaction.
[ira/wip.git] / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
63                 return;
64         }
65
66         tdb_increment_seqnum_nonblock(tdb);
67
68         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
69 }
70
71 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
72 {
73         return memcmp(data.dptr, key.dptr, data.dsize);
74 }
75
76 /* Returns 0 on fail.  On success, return offset of record, and fills
77    in rec */
78 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
79                         struct tdb_record *r)
80 {
81         tdb_off_t rec_ptr;
82         
83         /* read in the hash top */
84         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
85                 return 0;
86
87         /* keep looking until we find the right record */
88         while (rec_ptr) {
89                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
90                         return 0;
91
92                 if (!TDB_DEAD(r) && hash==r->full_hash
93                     && key.dsize==r->key_len
94                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
95                                       r->key_len, tdb_key_compare,
96                                       NULL) == 0) {
97                         return rec_ptr;
98                 }
99                 /* detect tight infinite loop */
100                 if (rec_ptr == r->next) {
101                         tdb->ecode = TDB_ERR_CORRUPT;
102                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
103                         return 0;
104                 }
105                 rec_ptr = r->next;
106         }
107         tdb->ecode = TDB_ERR_NOEXIST;
108         return 0;
109 }
110
111 /* As tdb_find, but if you succeed, keep the lock */
112 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
113                            struct tdb_record *rec)
114 {
115         uint32_t rec_ptr;
116
117         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
118                 return 0;
119         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
120                 tdb_unlock(tdb, BUCKET(hash), locktype);
121         return rec_ptr;
122 }
123
124
125 /* update an entry in place - this only works if the new data size
126    is <= the old data size and the key exists.
127    on failure return -1.
128 */
129 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
130 {
131         struct tdb_record rec;
132         tdb_off_t rec_ptr;
133
134         /* find entry */
135         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
136                 return -1;
137
138         /* must be long enough key, data and tailer */
139         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
140                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
141                 return -1;
142         }
143
144         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
145                       dbuf.dptr, dbuf.dsize) == -1)
146                 return -1;
147
148         if (dbuf.dsize != rec.data_len) {
149                 /* update size */
150                 rec.data_len = dbuf.dsize;
151                 return tdb_rec_write(tdb, rec_ptr, &rec);
152         }
153  
154         return 0;
155 }
156
157 /* find an entry in the database given a key */
158 /* If an entry doesn't exist tdb_err will be set to
159  * TDB_ERR_NOEXIST. If a key has no data attached
160  * then the TDB_DATA will have zero length but
161  * a non-zero pointer
162  */
163 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
164 {
165         tdb_off_t rec_ptr;
166         struct tdb_record rec;
167         TDB_DATA ret;
168         uint32_t hash;
169
170         /* find which hash bucket it is in */
171         hash = tdb->hash_fn(&key);
172         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
173                 return tdb_null;
174
175         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
176                                   rec.data_len);
177         ret.dsize = rec.data_len;
178         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
179         return ret;
180 }
181
182 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
183 {
184         TDB_DATA ret = _tdb_fetch(tdb, key);
185
186         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
187         return ret;
188 }
189
190 /*
191  * Find an entry in the database and hand the record's data to a parsing
192  * function. The parsing function is executed under the chain read lock, so it
193  * should be fast and should not block on other syscalls.
194  *
195  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
196  *
197  * For mmapped tdb's that do not have a transaction open it points the parsing
198  * function directly at the mmap area, it avoids the malloc/memcpy in this
199  * case. If a transaction is open or no mmap is available, it has to do
200  * malloc/read/parse/free.
201  *
202  * This is interesting for all readers of potentially large data structures in
203  * the tdb records, ldb indexes being one example.
204  */
205
206 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
207                      int (*parser)(TDB_DATA key, TDB_DATA data,
208                                    void *private_data),
209                      void *private_data)
210 {
211         tdb_off_t rec_ptr;
212         struct tdb_record rec;
213         int ret;
214         uint32_t hash;
215
216         /* find which hash bucket it is in */
217         hash = tdb->hash_fn(&key);
218
219         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
220                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
221                 tdb->ecode = TDB_ERR_NOEXIST;
222                 return 0;
223         }
224         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
225
226         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
227                              rec.data_len, parser, private_data);
228
229         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
230
231         return ret;
232 }
233
234 /* check if an entry in the database exists 
235
236    note that 1 is returned if the key is found and 0 is returned if not found
237    this doesn't match the conventions in the rest of this module, but is
238    compatible with gdbm
239 */
240 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
241 {
242         struct tdb_record rec;
243         
244         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
245                 return 0;
246         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
247         return 1;
248 }
249
250 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
251 {
252         uint32_t hash = tdb->hash_fn(&key);
253         int ret;
254
255         ret = tdb_exists_hash(tdb, key, hash);
256         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
257         return ret;
258 }
259
260 /* actually delete an entry in the database given the offset */
261 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
262 {
263         tdb_off_t last_ptr, i;
264         struct tdb_record lastrec;
265
266         if (tdb->read_only || tdb->traverse_read) return -1;
267
268         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
269             tdb_write_lock_record(tdb, rec_ptr) == -1) {
270                 /* Someone traversing here: mark it as dead */
271                 rec->magic = TDB_DEAD_MAGIC;
272                 return tdb_rec_write(tdb, rec_ptr, rec);
273         }
274         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
275                 return -1;
276
277         /* find previous record in hash chain */
278         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
279                 return -1;
280         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
281                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
282                         return -1;
283
284         /* unlink it: next ptr is at start of record. */
285         if (last_ptr == 0)
286                 last_ptr = TDB_HASH_TOP(rec->full_hash);
287         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
288                 return -1;
289
290         /* recover the space */
291         if (tdb_free(tdb, rec_ptr, rec) == -1)
292                 return -1;
293         return 0;
294 }
295
296 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
297 {
298         int res = 0;
299         tdb_off_t rec_ptr;
300         struct tdb_record rec;
301         
302         /* read in the hash top */
303         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
304                 return 0;
305
306         while (rec_ptr) {
307                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
308                         return 0;
309
310                 if (rec.magic == TDB_DEAD_MAGIC) {
311                         res += 1;
312                 }
313                 rec_ptr = rec.next;
314         }
315         return res;
316 }
317
318 /*
319  * Purge all DEAD records from a hash chain
320  */
321 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
322 {
323         int res = -1;
324         struct tdb_record rec;
325         tdb_off_t rec_ptr;
326
327         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
328                 return -1;
329         }
330         
331         /* read in the hash top */
332         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
333                 goto fail;
334
335         while (rec_ptr) {
336                 tdb_off_t next;
337
338                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
339                         goto fail;
340                 }
341
342                 next = rec.next;
343
344                 if (rec.magic == TDB_DEAD_MAGIC
345                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
346                         goto fail;
347                 }
348                 rec_ptr = next;
349         }
350         res = 0;
351  fail:
352         tdb_unlock(tdb, -1, F_WRLCK);
353         return res;
354 }
355
356 /* delete an entry in the database given a key */
357 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
358 {
359         tdb_off_t rec_ptr;
360         struct tdb_record rec;
361         int ret;
362
363         if (tdb->max_dead_records != 0) {
364
365                 /*
366                  * Allow for some dead records per hash chain, mainly for
367                  * tdb's with a very high create/delete rate like locking.tdb.
368                  */
369
370                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
371                         return -1;
372
373                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
374                         /*
375                          * Don't let the per-chain freelist grow too large,
376                          * delete all existing dead records
377                          */
378                         tdb_purge_dead(tdb, hash);
379                 }
380
381                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
382                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
383                         return -1;
384                 }
385
386                 /*
387                  * Just mark the record as dead.
388                  */
389                 rec.magic = TDB_DEAD_MAGIC;
390                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
391         }
392         else {
393                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
394                                                    &rec)))
395                         return -1;
396
397                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
398         }
399
400         if (ret == 0) {
401                 tdb_increment_seqnum(tdb);
402         }
403
404         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
405                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
406         return ret;
407 }
408
409 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
410 {
411         uint32_t hash = tdb->hash_fn(&key);
412         int ret;
413
414         ret = tdb_delete_hash(tdb, key, hash);
415         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
416         return ret;
417 }
418
419 /*
420  * See if we have a dead record around with enough space
421  */
422 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
423                                struct tdb_record *r, tdb_len_t length)
424 {
425         tdb_off_t rec_ptr;
426         
427         /* read in the hash top */
428         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
429                 return 0;
430
431         /* keep looking until we find the right record */
432         while (rec_ptr) {
433                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
434                         return 0;
435
436                 if (TDB_DEAD(r) && r->rec_len >= length) {
437                         /*
438                          * First fit for simple coding, TODO: change to best
439                          * fit
440                          */
441                         return rec_ptr;
442                 }
443                 rec_ptr = r->next;
444         }
445         return 0;
446 }
447
448 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
449                        TDB_DATA dbuf, int flag, uint32_t hash)
450 {
451         struct tdb_record rec;
452         tdb_off_t rec_ptr;
453         char *p = NULL;
454         int ret = -1;
455
456         /* check for it existing, on insert. */
457         if (flag == TDB_INSERT) {
458                 if (tdb_exists_hash(tdb, key, hash)) {
459                         tdb->ecode = TDB_ERR_EXISTS;
460                         goto fail;
461                 }
462         } else {
463                 /* first try in-place update, on modify or replace. */
464                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
465                         goto done;
466                 }
467                 if (tdb->ecode == TDB_ERR_NOEXIST &&
468                     flag == TDB_MODIFY) {
469                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
470                          we should fail the store */
471                         goto fail;
472                 }
473         }
474         /* reset the error code potentially set by the tdb_update() */
475         tdb->ecode = TDB_SUCCESS;
476
477         /* delete any existing record - if it doesn't exist we don't
478            care.  Doing this first reduces fragmentation, and avoids
479            coalescing with `allocated' block before it's updated. */
480         if (flag != TDB_INSERT)
481                 tdb_delete_hash(tdb, key, hash);
482
483         /* Copy key+value *before* allocating free space in case malloc
484            fails and we are left with a dead spot in the tdb. */
485
486         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
487                 tdb->ecode = TDB_ERR_OOM;
488                 goto fail;
489         }
490
491         memcpy(p, key.dptr, key.dsize);
492         if (dbuf.dsize)
493                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
494
495         if (tdb->max_dead_records != 0) {
496                 /*
497                  * Allow for some dead records per hash chain, look if we can
498                  * find one that can hold the new record. We need enough space
499                  * for key, data and tailer. If we find one, we don't have to
500                  * consult the central freelist.
501                  */
502                 rec_ptr = tdb_find_dead(
503                         tdb, hash, &rec,
504                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
505
506                 if (rec_ptr != 0) {
507                         rec.key_len = key.dsize;
508                         rec.data_len = dbuf.dsize;
509                         rec.full_hash = hash;
510                         rec.magic = TDB_MAGIC;
511                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
512                             || tdb->methods->tdb_write(
513                                     tdb, rec_ptr + sizeof(rec),
514                                     p, key.dsize + dbuf.dsize) == -1) {
515                                 goto fail;
516                         }
517                         goto done;
518                 }
519         }
520
521         /*
522          * We have to allocate some space from the freelist, so this means we
523          * have to lock it. Use the chance to purge all the DEAD records from
524          * the hash chain under the freelist lock.
525          */
526
527         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
528                 goto fail;
529         }
530
531         if ((tdb->max_dead_records != 0)
532             && (tdb_purge_dead(tdb, hash) == -1)) {
533                 tdb_unlock(tdb, -1, F_WRLCK);
534                 goto fail;
535         }
536
537         /* we have to allocate some space */
538         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
539
540         tdb_unlock(tdb, -1, F_WRLCK);
541
542         if (rec_ptr == 0) {
543                 goto fail;
544         }
545
546         /* Read hash top into next ptr */
547         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
548                 goto fail;
549
550         rec.key_len = key.dsize;
551         rec.data_len = dbuf.dsize;
552         rec.full_hash = hash;
553         rec.magic = TDB_MAGIC;
554
555         /* write out and point the top of the hash chain at it */
556         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
557             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
558             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
559                 /* Need to tdb_unallocate() here */
560                 goto fail;
561         }
562
563  done:
564         ret = 0;
565  fail:
566         if (ret == 0) {
567                 tdb_increment_seqnum(tdb);
568         }
569
570         SAFE_FREE(p); 
571         return ret;
572 }
573
574 /* store an element in the database, replacing any existing element
575    with the same key
576
577    return 0 on success, -1 on failure
578 */
579 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
580 {
581         uint32_t hash;
582         int ret;
583
584         if (tdb->read_only || tdb->traverse_read) {
585                 tdb->ecode = TDB_ERR_RDONLY;
586                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
587                 return -1;
588         }
589
590         /* find which hash bucket it is in */
591         hash = tdb->hash_fn(&key);
592         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
593                 return -1;
594
595         ret = _tdb_store(tdb, key, dbuf, flag, hash);
596         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
597         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
598         return ret;
599 }
600
601 /* Append to an entry. Create if not exist. */
602 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
603 {
604         uint32_t hash;
605         TDB_DATA dbuf;
606         int ret = -1;
607
608         /* find which hash bucket it is in */
609         hash = tdb->hash_fn(&key);
610         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
611                 return -1;
612
613         dbuf = _tdb_fetch(tdb, key);
614
615         if (dbuf.dptr == NULL) {
616                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
617         } else {
618                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
619                 unsigned char *new_dptr;
620
621                 /* realloc '0' is special: don't do that. */
622                 if (new_len == 0)
623                         new_len = 1;
624                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
625                 if (new_dptr == NULL) {
626                         free(dbuf.dptr);
627                 }
628                 dbuf.dptr = new_dptr;
629         }
630
631         if (dbuf.dptr == NULL) {
632                 tdb->ecode = TDB_ERR_OOM;
633                 goto failed;
634         }
635
636         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
637         dbuf.dsize += new_dbuf.dsize;
638
639         ret = _tdb_store(tdb, key, dbuf, 0, hash);
640         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
641         
642 failed:
643         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
644         SAFE_FREE(dbuf.dptr);
645         return ret;
646 }
647
648
649 /*
650   return the name of the current tdb file
651   useful for external logging functions
652 */
653 const char *tdb_name(struct tdb_context *tdb)
654 {
655         return tdb->name;
656 }
657
658 /*
659   return the underlying file descriptor being used by tdb, or -1
660   useful for external routines that want to check the device/inode
661   of the fd
662 */
663 int tdb_fd(struct tdb_context *tdb)
664 {
665         return tdb->fd;
666 }
667
668 /*
669   return the current logging function
670   useful for external tdb routines that wish to log tdb errors
671 */
672 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
673 {
674         return tdb->log.log_fn;
675 }
676
677
678 /*
679   get the tdb sequence number. Only makes sense if the writers opened
680   with TDB_SEQNUM set. Note that this sequence number will wrap quite
681   quickly, so it should only be used for a 'has something changed'
682   test, not for code that relies on the count of the number of changes
683   made. If you want a counter then use a tdb record.
684
685   The aim of this sequence number is to allow for a very lightweight
686   test of a possible tdb change.
687 */
688 int tdb_get_seqnum(struct tdb_context *tdb)
689 {
690         tdb_off_t seqnum=0;
691
692         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
693         return seqnum;
694 }
695
696 int tdb_hash_size(struct tdb_context *tdb)
697 {
698         return tdb->header.hash_size;
699 }
700
701 size_t tdb_map_size(struct tdb_context *tdb)
702 {
703         return tdb->map_size;
704 }
705
706 int tdb_get_flags(struct tdb_context *tdb)
707 {
708         return tdb->flags;
709 }
710
711 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
712 {
713         tdb->flags |= flags;
714 }
715
716 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
717 {
718         tdb->flags &= ~flags;
719 }
720
721
722 /*
723   enable sequence number handling on an open tdb
724 */
725 void tdb_enable_seqnum(struct tdb_context *tdb)
726 {
727         tdb->flags |= TDB_SEQNUM;
728 }
729
730
731 /*
732   add a region of the file to the freelist. Length is the size of the region in bytes, 
733   which includes the free list header that needs to be added
734  */
735 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
736 {
737         struct tdb_record rec;
738         if (length <= sizeof(rec)) {
739                 /* the region is not worth adding */
740                 return 0;
741         }
742         if (length + offset > tdb->map_size) {
743                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
744                 return -1;              
745         }
746         memset(&rec,'\0',sizeof(rec));
747         rec.rec_len = length - sizeof(rec);
748         if (tdb_free(tdb, offset, &rec) == -1) {
749                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
750                 return -1;
751         }
752         return 0;
753 }
754
755 /*
756   wipe the entire database, deleting all records. This can be done
757   very fast by using a global lock. The entire data portion of the
758   file becomes a single entry in the freelist.
759
760   This code carefully steps around the recovery area, leaving it alone
761  */
762 int tdb_wipe_all(struct tdb_context *tdb)
763 {
764         int i;
765         tdb_off_t offset = 0;
766         ssize_t data_len;
767         tdb_off_t recovery_head;
768         tdb_len_t recovery_size = 0;
769
770         if (tdb_lockall(tdb) != 0) {
771                 return -1;
772         }
773
774         tdb_trace(tdb, "tdb_wipe_all");
775
776         /* see if the tdb has a recovery area, and remember its size
777            if so. We don't want to lose this as otherwise each
778            tdb_wipe_all() in a transaction will increase the size of
779            the tdb by the size of the recovery area */
780         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
781                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
782                 goto failed;
783         }
784
785         if (recovery_head != 0) {
786                 struct tdb_record rec;
787                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
788                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
789                         return -1;
790                 }       
791                 recovery_size = rec.rec_len + sizeof(rec);
792         }
793
794         /* wipe the hashes */
795         for (i=0;i<tdb->header.hash_size;i++) {
796                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
797                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
798                         goto failed;
799                 }
800         }
801
802         /* wipe the freelist */
803         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
804                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
805                 goto failed;
806         }
807
808         /* add all the rest of the file to the freelist, possibly leaving a gap 
809            for the recovery area */
810         if (recovery_size == 0) {
811                 /* the simple case - the whole file can be used as a freelist */
812                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
813                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
814                         goto failed;
815                 }
816         } else {
817                 /* we need to add two freelist entries - one on either
818                    side of the recovery area 
819
820                    Note that we cannot shift the recovery area during
821                    this operation. Only the transaction.c code may
822                    move the recovery area or we risk subtle data
823                    corruption
824                 */
825                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
826                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
827                         goto failed;
828                 }
829                 /* and the 2nd free list entry after the recovery area - if any */
830                 data_len = tdb->map_size - (recovery_head+recovery_size);
831                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
832                         goto failed;
833                 }
834         }
835
836         if (tdb_unlockall(tdb) != 0) {
837                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
838                 goto failed;
839         }
840
841         return 0;
842
843 failed:
844         tdb_unlockall(tdb);
845         return -1;
846 }
847
848 struct traverse_state {
849         bool error;
850         struct tdb_context *dest_db;
851 };
852
853 /*
854   traverse function for repacking
855  */
856 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
857 {
858         struct traverse_state *state = (struct traverse_state *)private_data;
859         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
860                 state->error = true;
861                 return -1;
862         }
863         return 0;
864 }
865
866 /*
867   repack a tdb
868  */
869 int tdb_repack(struct tdb_context *tdb)
870 {
871         struct tdb_context *tmp_db;
872         struct traverse_state state;
873
874         tdb_trace(tdb, "tdb_repack");
875
876         if (tdb_transaction_start(tdb) != 0) {
877                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
878                 return -1;
879         }
880
881         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
882         if (tmp_db == NULL) {
883                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
884                 tdb_transaction_cancel(tdb);
885                 return -1;
886         }
887
888         state.error = false;
889         state.dest_db = tmp_db;
890
891         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
892                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
893                 tdb_transaction_cancel(tdb);
894                 tdb_close(tmp_db);
895                 return -1;              
896         }
897
898         if (state.error) {
899                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
900                 tdb_transaction_cancel(tdb);
901                 tdb_close(tmp_db);
902                 return -1;
903         }
904
905         if (tdb_wipe_all(tdb) != 0) {
906                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
907                 tdb_transaction_cancel(tdb);
908                 tdb_close(tmp_db);
909                 return -1;
910         }
911
912         state.error = false;
913         state.dest_db = tdb;
914
915         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
916                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
917                 tdb_transaction_cancel(tdb);
918                 tdb_close(tmp_db);
919                 return -1;              
920         }
921
922         if (state.error) {
923                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
924                 tdb_transaction_cancel(tdb);
925                 tdb_close(tmp_db);
926                 return -1;
927         }
928
929         tdb_close(tmp_db);
930
931         if (tdb_transaction_commit(tdb) != 0) {
932                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
933                 return -1;
934         }
935
936         return 0;
937 }
938
939 #ifdef TDB_TRACE
940 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
941 {
942         if (write(tdb->tracefd, str, strlen(str)) != strlen(str)) {
943                 close(tdb->tracefd);
944                 tdb->tracefd = -1;
945         }
946 }
947
948 static void tdb_trace_start(struct tdb_context *tdb)
949 {
950         tdb_off_t seqnum=0;
951         char msg[sizeof(tdb_off_t) * 4 + 1];
952
953         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
954         snprintf(msg, sizeof(msg), "%u ", seqnum);
955         tdb_trace_write(tdb, msg);
956 }
957
958 static void tdb_trace_end(struct tdb_context *tdb)
959 {
960         tdb_trace_write(tdb, "\n");
961 }
962
963 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
964 {
965         char msg[sizeof(ret) * 4 + 4];
966         snprintf(msg, sizeof(msg), " = %i\n", ret);
967         tdb_trace_write(tdb, msg);
968 }
969
970 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
971 {
972         char msg[20 + rec.dsize*2], *p;
973         unsigned int i;
974
975         /* We differentiate zero-length records from non-existent ones. */
976         if (rec.dptr == NULL) {
977                 tdb_trace_write(tdb, " NULL");
978                 return;
979         }
980
981         /* snprintf here is purely cargo-cult programming. */
982         p = msg;
983         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
984         for (i = 0; i < rec.dsize; i++)
985                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
986
987         tdb_trace_write(tdb, msg);
988 }
989
990 void tdb_trace(struct tdb_context *tdb, const char *op)
991 {
992         tdb_trace_start(tdb);
993         tdb_trace_write(tdb, op);
994         tdb_trace_end(tdb);
995 }
996
997 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
998 {
999         char msg[sizeof(tdb_off_t) * 4 + 1];
1000
1001         snprintf(msg, sizeof(msg), "%u ", seqnum);
1002         tdb_trace_write(tdb, msg);
1003         tdb_trace_write(tdb, op);
1004         tdb_trace_end(tdb);
1005 }
1006
1007 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1008                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1009 {
1010         char msg[128];
1011
1012         snprintf(msg, sizeof(msg),
1013                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1014         tdb_trace_start(tdb);
1015         tdb_trace_write(tdb, msg);
1016         tdb_trace_end(tdb);
1017 }
1018
1019 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1020 {
1021         tdb_trace_start(tdb);
1022         tdb_trace_write(tdb, op);
1023         tdb_trace_end_ret(tdb, ret);
1024 }
1025
1026 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1027 {
1028         tdb_trace_start(tdb);
1029         tdb_trace_write(tdb, op);
1030         tdb_trace_write(tdb, " =");
1031         tdb_trace_record(tdb, ret);
1032         tdb_trace_end(tdb);
1033 }
1034
1035 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1036                     TDB_DATA rec)
1037 {
1038         tdb_trace_start(tdb);
1039         tdb_trace_write(tdb, op);
1040         tdb_trace_record(tdb, rec);
1041         tdb_trace_end(tdb);
1042 }
1043
1044 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1045                         TDB_DATA rec, int ret)
1046 {
1047         tdb_trace_start(tdb);
1048         tdb_trace_write(tdb, op);
1049         tdb_trace_record(tdb, rec);
1050         tdb_trace_end_ret(tdb, ret);
1051 }
1052
1053 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1054                            TDB_DATA rec, TDB_DATA ret)
1055 {
1056         tdb_trace_start(tdb);
1057         tdb_trace_write(tdb, op);
1058         tdb_trace_record(tdb, rec);
1059         tdb_trace_write(tdb, " =");
1060         tdb_trace_record(tdb, ret);
1061         tdb_trace_end(tdb);
1062 }
1063
1064 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1065                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1066                              int ret)
1067 {
1068         char msg[1 + sizeof(ret) * 4];
1069
1070         snprintf(msg, sizeof(msg), " %#x", flag);
1071         tdb_trace_start(tdb);
1072         tdb_trace_write(tdb, op);
1073         tdb_trace_record(tdb, rec1);
1074         tdb_trace_record(tdb, rec2);
1075         tdb_trace_write(tdb, msg);
1076         tdb_trace_end_ret(tdb, ret);
1077 }
1078
1079 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1080                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1081 {
1082         tdb_trace_start(tdb);
1083         tdb_trace_write(tdb, op);
1084         tdb_trace_record(tdb, rec1);
1085         tdb_trace_record(tdb, rec2);
1086         tdb_trace_write(tdb, " =");
1087         tdb_trace_record(tdb, ret);
1088         tdb_trace_end(tdb);
1089 }
1090 #endif