4d8c5fc59c9952056fa177c75ce93922276b45d8
[ira/wip.git] / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
63                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
64                 return;
65         }
66
67         tdb_increment_seqnum_nonblock(tdb);
68
69         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
70 }
71
72 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
73 {
74         return memcmp(data.dptr, key.dptr, data.dsize);
75 }
76
77 /* Returns 0 on fail.  On success, return offset of record, and fills
78    in rec */
79 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
80                         struct tdb_record *r)
81 {
82         tdb_off_t rec_ptr;
83
84         /* read in the hash top */
85         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
86                 return 0;
87
88         /* keep looking until we find the right record */
89         while (rec_ptr) {
90                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
91                         return 0;
92
93                 if (!TDB_DEAD(r) && hash==r->full_hash
94                     && key.dsize==r->key_len
95                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
96                                       r->key_len, tdb_key_compare,
97                                       NULL) == 0) {
98                         return rec_ptr;
99                 }
100                 /* detect tight infinite loop */
101                 if (rec_ptr == r->next) {
102                         tdb->ecode = TDB_ERR_CORRUPT;
103                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
104                         return 0;
105                 }
106                 rec_ptr = r->next;
107         }
108         tdb->ecode = TDB_ERR_NOEXIST;
109         return 0;
110 }
111
112 /* As tdb_find, but if you succeed, keep the lock */
113 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
114                            struct tdb_record *rec)
115 {
116         uint32_t rec_ptr;
117
118         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
119                 return 0;
120         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
121                 tdb_unlock(tdb, BUCKET(hash), locktype);
122         return rec_ptr;
123 }
124
125 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
126
127 /* update an entry in place - this only works if the new data size
128    is <= the old data size and the key exists.
129    on failure return -1.
130 */
131 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
132 {
133         struct tdb_record rec;
134         tdb_off_t rec_ptr;
135
136         /* find entry */
137         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
138                 return -1;
139
140         /* it could be an exact duplicate of what is there - this is
141          * surprisingly common (eg. with a ldb re-index). */
142         if (rec.key_len == key.dsize && 
143             rec.data_len == dbuf.dsize &&
144             rec.full_hash == hash) {
145                 TDB_DATA data = _tdb_fetch(tdb, key);
146                 if (data.dsize == dbuf.dsize &&
147                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
148                         if (data.dptr) {
149                                 free(data.dptr);
150                         }
151                         return 0;
152                 }
153                 if (data.dptr) {
154                         free(data.dptr);
155                 }
156         }
157
158         /* must be long enough key, data and tailer */
159         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
160                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
161                 return -1;
162         }
163
164         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
165                       dbuf.dptr, dbuf.dsize) == -1)
166                 return -1;
167
168         if (dbuf.dsize != rec.data_len) {
169                 /* update size */
170                 rec.data_len = dbuf.dsize;
171                 return tdb_rec_write(tdb, rec_ptr, &rec);
172         }
173
174         return 0;
175 }
176
177 /* find an entry in the database given a key */
178 /* If an entry doesn't exist tdb_err will be set to
179  * TDB_ERR_NOEXIST. If a key has no data attached
180  * then the TDB_DATA will have zero length but
181  * a non-zero pointer
182  */
183 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
184 {
185         tdb_off_t rec_ptr;
186         struct tdb_record rec;
187         TDB_DATA ret;
188         uint32_t hash;
189
190         /* find which hash bucket it is in */
191         hash = tdb->hash_fn(&key);
192         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
193                 return tdb_null;
194
195         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
196                                   rec.data_len);
197         ret.dsize = rec.data_len;
198         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
199         return ret;
200 }
201
202 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
203 {
204         TDB_DATA ret = _tdb_fetch(tdb, key);
205
206         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
207         return ret;
208 }
209
210 /*
211  * Find an entry in the database and hand the record's data to a parsing
212  * function. The parsing function is executed under the chain read lock, so it
213  * should be fast and should not block on other syscalls.
214  *
215  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
216  *
217  * For mmapped tdb's that do not have a transaction open it points the parsing
218  * function directly at the mmap area, it avoids the malloc/memcpy in this
219  * case. If a transaction is open or no mmap is available, it has to do
220  * malloc/read/parse/free.
221  *
222  * This is interesting for all readers of potentially large data structures in
223  * the tdb records, ldb indexes being one example.
224  *
225  * Return -1 if the record was not found.
226  */
227
228 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
229                      int (*parser)(TDB_DATA key, TDB_DATA data,
230                                    void *private_data),
231                      void *private_data)
232 {
233         tdb_off_t rec_ptr;
234         struct tdb_record rec;
235         int ret;
236         uint32_t hash;
237
238         /* find which hash bucket it is in */
239         hash = tdb->hash_fn(&key);
240
241         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
242                 /* record not found */
243                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
244                 tdb->ecode = TDB_ERR_NOEXIST;
245                 return -1;
246         }
247         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
248
249         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
250                              rec.data_len, parser, private_data);
251
252         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
253
254         return ret;
255 }
256
257 /* check if an entry in the database exists 
258
259    note that 1 is returned if the key is found and 0 is returned if not found
260    this doesn't match the conventions in the rest of this module, but is
261    compatible with gdbm
262 */
263 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
264 {
265         struct tdb_record rec;
266
267         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
268                 return 0;
269         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
270         return 1;
271 }
272
273 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
274 {
275         uint32_t hash = tdb->hash_fn(&key);
276         int ret;
277
278         ret = tdb_exists_hash(tdb, key, hash);
279         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
280         return ret;
281 }
282
283 /* actually delete an entry in the database given the offset */
284 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
285 {
286         tdb_off_t last_ptr, i;
287         struct tdb_record lastrec;
288
289         if (tdb->read_only || tdb->traverse_read) return -1;
290
291         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
292             tdb_write_lock_record(tdb, rec_ptr) == -1) {
293                 /* Someone traversing here: mark it as dead */
294                 rec->magic = TDB_DEAD_MAGIC;
295                 return tdb_rec_write(tdb, rec_ptr, rec);
296         }
297         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
298                 return -1;
299
300         /* find previous record in hash chain */
301         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
302                 return -1;
303         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
304                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
305                         return -1;
306
307         /* unlink it: next ptr is at start of record. */
308         if (last_ptr == 0)
309                 last_ptr = TDB_HASH_TOP(rec->full_hash);
310         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
311                 return -1;
312
313         /* recover the space */
314         if (tdb_free(tdb, rec_ptr, rec) == -1)
315                 return -1;
316         return 0;
317 }
318
319 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
320 {
321         int res = 0;
322         tdb_off_t rec_ptr;
323         struct tdb_record rec;
324
325         /* read in the hash top */
326         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
327                 return 0;
328
329         while (rec_ptr) {
330                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
331                         return 0;
332
333                 if (rec.magic == TDB_DEAD_MAGIC) {
334                         res += 1;
335                 }
336                 rec_ptr = rec.next;
337         }
338         return res;
339 }
340
341 /*
342  * Purge all DEAD records from a hash chain
343  */
344 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
345 {
346         int res = -1;
347         struct tdb_record rec;
348         tdb_off_t rec_ptr;
349
350         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
351                 return -1;
352         }
353
354         /* read in the hash top */
355         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
356                 goto fail;
357
358         while (rec_ptr) {
359                 tdb_off_t next;
360
361                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
362                         goto fail;
363                 }
364
365                 next = rec.next;
366
367                 if (rec.magic == TDB_DEAD_MAGIC
368                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
369                         goto fail;
370                 }
371                 rec_ptr = next;
372         }
373         res = 0;
374  fail:
375         tdb_unlock(tdb, -1, F_WRLCK);
376         return res;
377 }
378
379 /* delete an entry in the database given a key */
380 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
381 {
382         tdb_off_t rec_ptr;
383         struct tdb_record rec;
384         int ret;
385
386         if (tdb->max_dead_records != 0) {
387
388                 /*
389                  * Allow for some dead records per hash chain, mainly for
390                  * tdb's with a very high create/delete rate like locking.tdb.
391                  */
392
393                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
394                         return -1;
395
396                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
397                         /*
398                          * Don't let the per-chain freelist grow too large,
399                          * delete all existing dead records
400                          */
401                         tdb_purge_dead(tdb, hash);
402                 }
403
404                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
405                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
406                         return -1;
407                 }
408
409                 /*
410                  * Just mark the record as dead.
411                  */
412                 rec.magic = TDB_DEAD_MAGIC;
413                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
414         }
415         else {
416                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
417                                                    &rec)))
418                         return -1;
419
420                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
421         }
422
423         if (ret == 0) {
424                 tdb_increment_seqnum(tdb);
425         }
426
427         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
428                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
429         return ret;
430 }
431
432 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
433 {
434         uint32_t hash = tdb->hash_fn(&key);
435         int ret;
436
437         ret = tdb_delete_hash(tdb, key, hash);
438         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
439         return ret;
440 }
441
442 /*
443  * See if we have a dead record around with enough space
444  */
445 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
446                                struct tdb_record *r, tdb_len_t length)
447 {
448         tdb_off_t rec_ptr;
449
450         /* read in the hash top */
451         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
452                 return 0;
453
454         /* keep looking until we find the right record */
455         while (rec_ptr) {
456                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
457                         return 0;
458
459                 if (TDB_DEAD(r) && r->rec_len >= length) {
460                         /*
461                          * First fit for simple coding, TODO: change to best
462                          * fit
463                          */
464                         return rec_ptr;
465                 }
466                 rec_ptr = r->next;
467         }
468         return 0;
469 }
470
471 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
472                        TDB_DATA dbuf, int flag, uint32_t hash)
473 {
474         struct tdb_record rec;
475         tdb_off_t rec_ptr;
476         char *p = NULL;
477         int ret = -1;
478
479         /* check for it existing, on insert. */
480         if (flag == TDB_INSERT) {
481                 if (tdb_exists_hash(tdb, key, hash)) {
482                         tdb->ecode = TDB_ERR_EXISTS;
483                         goto fail;
484                 }
485         } else {
486                 /* first try in-place update, on modify or replace. */
487                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
488                         goto done;
489                 }
490                 if (tdb->ecode == TDB_ERR_NOEXIST &&
491                     flag == TDB_MODIFY) {
492                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
493                          we should fail the store */
494                         goto fail;
495                 }
496         }
497         /* reset the error code potentially set by the tdb_update() */
498         tdb->ecode = TDB_SUCCESS;
499
500         /* delete any existing record - if it doesn't exist we don't
501            care.  Doing this first reduces fragmentation, and avoids
502            coalescing with `allocated' block before it's updated. */
503         if (flag != TDB_INSERT)
504                 tdb_delete_hash(tdb, key, hash);
505
506         /* Copy key+value *before* allocating free space in case malloc
507            fails and we are left with a dead spot in the tdb. */
508
509         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
510                 tdb->ecode = TDB_ERR_OOM;
511                 goto fail;
512         }
513
514         memcpy(p, key.dptr, key.dsize);
515         if (dbuf.dsize)
516                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
517
518         if (tdb->max_dead_records != 0) {
519                 /*
520                  * Allow for some dead records per hash chain, look if we can
521                  * find one that can hold the new record. We need enough space
522                  * for key, data and tailer. If we find one, we don't have to
523                  * consult the central freelist.
524                  */
525                 rec_ptr = tdb_find_dead(
526                         tdb, hash, &rec,
527                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
528
529                 if (rec_ptr != 0) {
530                         rec.key_len = key.dsize;
531                         rec.data_len = dbuf.dsize;
532                         rec.full_hash = hash;
533                         rec.magic = TDB_MAGIC;
534                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
535                             || tdb->methods->tdb_write(
536                                     tdb, rec_ptr + sizeof(rec),
537                                     p, key.dsize + dbuf.dsize) == -1) {
538                                 goto fail;
539                         }
540                         goto done;
541                 }
542         }
543
544         /*
545          * We have to allocate some space from the freelist, so this means we
546          * have to lock it. Use the chance to purge all the DEAD records from
547          * the hash chain under the freelist lock.
548          */
549
550         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
551                 goto fail;
552         }
553
554         if ((tdb->max_dead_records != 0)
555             && (tdb_purge_dead(tdb, hash) == -1)) {
556                 tdb_unlock(tdb, -1, F_WRLCK);
557                 goto fail;
558         }
559
560         /* we have to allocate some space */
561         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
562
563         tdb_unlock(tdb, -1, F_WRLCK);
564
565         if (rec_ptr == 0) {
566                 goto fail;
567         }
568
569         /* Read hash top into next ptr */
570         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
571                 goto fail;
572
573         rec.key_len = key.dsize;
574         rec.data_len = dbuf.dsize;
575         rec.full_hash = hash;
576         rec.magic = TDB_MAGIC;
577
578         /* write out and point the top of the hash chain at it */
579         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
580             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
581             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
582                 /* Need to tdb_unallocate() here */
583                 goto fail;
584         }
585
586  done:
587         ret = 0;
588  fail:
589         if (ret == 0) {
590                 tdb_increment_seqnum(tdb);
591         }
592
593         SAFE_FREE(p); 
594         return ret;
595 }
596
597 /* store an element in the database, replacing any existing element
598    with the same key
599
600    return 0 on success, -1 on failure
601 */
602 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
603 {
604         uint32_t hash;
605         int ret;
606
607         if (tdb->read_only || tdb->traverse_read) {
608                 tdb->ecode = TDB_ERR_RDONLY;
609                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
610                 return -1;
611         }
612
613         /* find which hash bucket it is in */
614         hash = tdb->hash_fn(&key);
615         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
616                 return -1;
617
618         ret = _tdb_store(tdb, key, dbuf, flag, hash);
619         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
620         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
621         return ret;
622 }
623
624 /* Append to an entry. Create if not exist. */
625 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
626 {
627         uint32_t hash;
628         TDB_DATA dbuf;
629         int ret = -1;
630
631         /* find which hash bucket it is in */
632         hash = tdb->hash_fn(&key);
633         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
634                 return -1;
635
636         dbuf = _tdb_fetch(tdb, key);
637
638         if (dbuf.dptr == NULL) {
639                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
640         } else {
641                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
642                 unsigned char *new_dptr;
643
644                 /* realloc '0' is special: don't do that. */
645                 if (new_len == 0)
646                         new_len = 1;
647                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
648                 if (new_dptr == NULL) {
649                         free(dbuf.dptr);
650                 }
651                 dbuf.dptr = new_dptr;
652         }
653
654         if (dbuf.dptr == NULL) {
655                 tdb->ecode = TDB_ERR_OOM;
656                 goto failed;
657         }
658
659         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
660         dbuf.dsize += new_dbuf.dsize;
661
662         ret = _tdb_store(tdb, key, dbuf, 0, hash);
663         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
664
665 failed:
666         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
667         SAFE_FREE(dbuf.dptr);
668         return ret;
669 }
670
671
672 /*
673   return the name of the current tdb file
674   useful for external logging functions
675 */
676 const char *tdb_name(struct tdb_context *tdb)
677 {
678         return tdb->name;
679 }
680
681 /*
682   return the underlying file descriptor being used by tdb, or -1
683   useful for external routines that want to check the device/inode
684   of the fd
685 */
686 int tdb_fd(struct tdb_context *tdb)
687 {
688         return tdb->fd;
689 }
690
691 /*
692   return the current logging function
693   useful for external tdb routines that wish to log tdb errors
694 */
695 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
696 {
697         return tdb->log.log_fn;
698 }
699
700
701 /*
702   get the tdb sequence number. Only makes sense if the writers opened
703   with TDB_SEQNUM set. Note that this sequence number will wrap quite
704   quickly, so it should only be used for a 'has something changed'
705   test, not for code that relies on the count of the number of changes
706   made. If you want a counter then use a tdb record.
707
708   The aim of this sequence number is to allow for a very lightweight
709   test of a possible tdb change.
710 */
711 int tdb_get_seqnum(struct tdb_context *tdb)
712 {
713         tdb_off_t seqnum=0;
714
715         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
716         return seqnum;
717 }
718
719 int tdb_hash_size(struct tdb_context *tdb)
720 {
721         return tdb->header.hash_size;
722 }
723
724 size_t tdb_map_size(struct tdb_context *tdb)
725 {
726         return tdb->map_size;
727 }
728
729 int tdb_get_flags(struct tdb_context *tdb)
730 {
731         return tdb->flags;
732 }
733
734 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
735 {
736         if ((flags & TDB_ALLOW_NESTING) &&
737             (flags & TDB_DISALLOW_NESTING)) {
738                 tdb->ecode = TDB_ERR_NESTING;
739                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
740                         "allow_nesting and disallow_nesting are not allowed together!"));
741                 return;
742         }
743
744         if (flags & TDB_ALLOW_NESTING) {
745                 tdb->flags &= ~TDB_DISALLOW_NESTING;
746         }
747         if (flags & TDB_DISALLOW_NESTING) {
748                 tdb->flags &= ~TDB_ALLOW_NESTING;
749         }
750
751         tdb->flags |= flags;
752 }
753
754 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
755 {
756         if ((flags & TDB_ALLOW_NESTING) &&
757             (flags & TDB_DISALLOW_NESTING)) {
758                 tdb->ecode = TDB_ERR_NESTING;
759                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
760                         "allow_nesting and disallow_nesting are not allowed together!"));
761                 return;
762         }
763
764         if (flags & TDB_ALLOW_NESTING) {
765                 tdb->flags |= TDB_DISALLOW_NESTING;
766         }
767         if (flags & TDB_DISALLOW_NESTING) {
768                 tdb->flags |= TDB_ALLOW_NESTING;
769         }
770
771         tdb->flags &= ~flags;
772 }
773
774
775 /*
776   enable sequence number handling on an open tdb
777 */
778 void tdb_enable_seqnum(struct tdb_context *tdb)
779 {
780         tdb->flags |= TDB_SEQNUM;
781 }
782
783
784 /*
785   add a region of the file to the freelist. Length is the size of the region in bytes, 
786   which includes the free list header that needs to be added
787  */
788 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
789 {
790         struct tdb_record rec;
791         if (length <= sizeof(rec)) {
792                 /* the region is not worth adding */
793                 return 0;
794         }
795         if (length + offset > tdb->map_size) {
796                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
797                 return -1;              
798         }
799         memset(&rec,'\0',sizeof(rec));
800         rec.rec_len = length - sizeof(rec);
801         if (tdb_free(tdb, offset, &rec) == -1) {
802                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
803                 return -1;
804         }
805         return 0;
806 }
807
808 /*
809   wipe the entire database, deleting all records. This can be done
810   very fast by using a allrecord lock. The entire data portion of the
811   file becomes a single entry in the freelist.
812
813   This code carefully steps around the recovery area, leaving it alone
814  */
815 int tdb_wipe_all(struct tdb_context *tdb)
816 {
817         int i;
818         tdb_off_t offset = 0;
819         ssize_t data_len;
820         tdb_off_t recovery_head;
821         tdb_len_t recovery_size = 0;
822
823         if (tdb_lockall(tdb) != 0) {
824                 return -1;
825         }
826
827         tdb_trace(tdb, "tdb_wipe_all");
828
829         /* see if the tdb has a recovery area, and remember its size
830            if so. We don't want to lose this as otherwise each
831            tdb_wipe_all() in a transaction will increase the size of
832            the tdb by the size of the recovery area */
833         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
834                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
835                 goto failed;
836         }
837
838         if (recovery_head != 0) {
839                 struct tdb_record rec;
840                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
841                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
842                         return -1;
843                 }       
844                 recovery_size = rec.rec_len + sizeof(rec);
845         }
846
847         /* wipe the hashes */
848         for (i=0;i<tdb->header.hash_size;i++) {
849                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
850                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
851                         goto failed;
852                 }
853         }
854
855         /* wipe the freelist */
856         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
857                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
858                 goto failed;
859         }
860
861         /* add all the rest of the file to the freelist, possibly leaving a gap 
862            for the recovery area */
863         if (recovery_size == 0) {
864                 /* the simple case - the whole file can be used as a freelist */
865                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
866                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
867                         goto failed;
868                 }
869         } else {
870                 /* we need to add two freelist entries - one on either
871                    side of the recovery area 
872
873                    Note that we cannot shift the recovery area during
874                    this operation. Only the transaction.c code may
875                    move the recovery area or we risk subtle data
876                    corruption
877                 */
878                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
879                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
880                         goto failed;
881                 }
882                 /* and the 2nd free list entry after the recovery area - if any */
883                 data_len = tdb->map_size - (recovery_head+recovery_size);
884                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
885                         goto failed;
886                 }
887         }
888
889         if (tdb_unlockall(tdb) != 0) {
890                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
891                 goto failed;
892         }
893
894         return 0;
895
896 failed:
897         tdb_unlockall(tdb);
898         return -1;
899 }
900
901 struct traverse_state {
902         bool error;
903         struct tdb_context *dest_db;
904 };
905
906 /*
907   traverse function for repacking
908  */
909 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
910 {
911         struct traverse_state *state = (struct traverse_state *)private_data;
912         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
913                 state->error = true;
914                 return -1;
915         }
916         return 0;
917 }
918
919 /*
920   repack a tdb
921  */
922 int tdb_repack(struct tdb_context *tdb)
923 {
924         struct tdb_context *tmp_db;
925         struct traverse_state state;
926
927         tdb_trace(tdb, "tdb_repack");
928
929         if (tdb_transaction_start(tdb) != 0) {
930                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
931                 return -1;
932         }
933
934         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
935         if (tmp_db == NULL) {
936                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
937                 tdb_transaction_cancel(tdb);
938                 return -1;
939         }
940
941         state.error = false;
942         state.dest_db = tmp_db;
943
944         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
945                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
946                 tdb_transaction_cancel(tdb);
947                 tdb_close(tmp_db);
948                 return -1;              
949         }
950
951         if (state.error) {
952                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
953                 tdb_transaction_cancel(tdb);
954                 tdb_close(tmp_db);
955                 return -1;
956         }
957
958         if (tdb_wipe_all(tdb) != 0) {
959                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
960                 tdb_transaction_cancel(tdb);
961                 tdb_close(tmp_db);
962                 return -1;
963         }
964
965         state.error = false;
966         state.dest_db = tdb;
967
968         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
969                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
970                 tdb_transaction_cancel(tdb);
971                 tdb_close(tmp_db);
972                 return -1;              
973         }
974
975         if (state.error) {
976                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
977                 tdb_transaction_cancel(tdb);
978                 tdb_close(tmp_db);
979                 return -1;
980         }
981
982         tdb_close(tmp_db);
983
984         if (tdb_transaction_commit(tdb) != 0) {
985                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
986                 return -1;
987         }
988
989         return 0;
990 }
991
992 /* Even on files, we can get partial writes due to signals. */
993 bool tdb_write_all(int fd, const void *buf, size_t count)
994 {
995         while (count) {
996                 size_t ret;
997                 ret = write(fd, buf, count);
998                 if (ret < 0)
999                         return false;
1000                 buf = (const char *)buf + ret;
1001                 count -= ret;
1002         }
1003         return true;
1004 }
1005
1006 #ifdef TDB_TRACE
1007 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1008 {
1009         if (!tdb_write_alltdb->tracefd, str, strlen(str)) {
1010                 close(tdb->tracefd);
1011                 tdb->tracefd = -1;
1012         }
1013 }
1014
1015 static void tdb_trace_start(struct tdb_context *tdb)
1016 {
1017         tdb_off_t seqnum=0;
1018         char msg[sizeof(tdb_off_t) * 4 + 1];
1019
1020         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1021         snprintf(msg, sizeof(msg), "%u ", seqnum);
1022         tdb_trace_write(tdb, msg);
1023 }
1024
1025 static void tdb_trace_end(struct tdb_context *tdb)
1026 {
1027         tdb_trace_write(tdb, "\n");
1028 }
1029
1030 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1031 {
1032         char msg[sizeof(ret) * 4 + 4];
1033         snprintf(msg, sizeof(msg), " = %i\n", ret);
1034         tdb_trace_write(tdb, msg);
1035 }
1036
1037 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1038 {
1039         char msg[20 + rec.dsize*2], *p;
1040         unsigned int i;
1041
1042         /* We differentiate zero-length records from non-existent ones. */
1043         if (rec.dptr == NULL) {
1044                 tdb_trace_write(tdb, " NULL");
1045                 return;
1046         }
1047
1048         /* snprintf here is purely cargo-cult programming. */
1049         p = msg;
1050         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1051         for (i = 0; i < rec.dsize; i++)
1052                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1053
1054         tdb_trace_write(tdb, msg);
1055 }
1056
1057 void tdb_trace(struct tdb_context *tdb, const char *op)
1058 {
1059         tdb_trace_start(tdb);
1060         tdb_trace_write(tdb, op);
1061         tdb_trace_end(tdb);
1062 }
1063
1064 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1065 {
1066         char msg[sizeof(tdb_off_t) * 4 + 1];
1067
1068         snprintf(msg, sizeof(msg), "%u ", seqnum);
1069         tdb_trace_write(tdb, msg);
1070         tdb_trace_write(tdb, op);
1071         tdb_trace_end(tdb);
1072 }
1073
1074 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1075                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1076 {
1077         char msg[128];
1078
1079         snprintf(msg, sizeof(msg),
1080                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1081         tdb_trace_start(tdb);
1082         tdb_trace_write(tdb, msg);
1083         tdb_trace_end(tdb);
1084 }
1085
1086 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1087 {
1088         tdb_trace_start(tdb);
1089         tdb_trace_write(tdb, op);
1090         tdb_trace_end_ret(tdb, ret);
1091 }
1092
1093 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1094 {
1095         tdb_trace_start(tdb);
1096         tdb_trace_write(tdb, op);
1097         tdb_trace_write(tdb, " =");
1098         tdb_trace_record(tdb, ret);
1099         tdb_trace_end(tdb);
1100 }
1101
1102 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1103                     TDB_DATA rec)
1104 {
1105         tdb_trace_start(tdb);
1106         tdb_trace_write(tdb, op);
1107         tdb_trace_record(tdb, rec);
1108         tdb_trace_end(tdb);
1109 }
1110
1111 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1112                         TDB_DATA rec, int ret)
1113 {
1114         tdb_trace_start(tdb);
1115         tdb_trace_write(tdb, op);
1116         tdb_trace_record(tdb, rec);
1117         tdb_trace_end_ret(tdb, ret);
1118 }
1119
1120 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1121                            TDB_DATA rec, TDB_DATA ret)
1122 {
1123         tdb_trace_start(tdb);
1124         tdb_trace_write(tdb, op);
1125         tdb_trace_record(tdb, rec);
1126         tdb_trace_write(tdb, " =");
1127         tdb_trace_record(tdb, ret);
1128         tdb_trace_end(tdb);
1129 }
1130
1131 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1132                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1133                              int ret)
1134 {
1135         char msg[1 + sizeof(ret) * 4];
1136
1137         snprintf(msg, sizeof(msg), " %#x", flag);
1138         tdb_trace_start(tdb);
1139         tdb_trace_write(tdb, op);
1140         tdb_trace_record(tdb, rec1);
1141         tdb_trace_record(tdb, rec2);
1142         tdb_trace_write(tdb, msg);
1143         tdb_trace_end_ret(tdb, ret);
1144 }
1145
1146 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1147                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1148 {
1149         tdb_trace_start(tdb);
1150         tdb_trace_write(tdb, op);
1151         tdb_trace_record(tdb, rec1);
1152         tdb_trace_record(tdb, rec2);
1153         tdb_trace_write(tdb, " =");
1154         tdb_trace_record(tdb, ret);
1155         tdb_trace_end(tdb);
1156 }
1157 #endif