tdb: cleanup: rename global_lock to allrecord_lock.
[samba.git] / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1,
63                        TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
64                 return;
65         }
66
67         tdb_increment_seqnum_nonblock(tdb);
68
69         tdb_brunlock(tdb, F_WRLCK, TDB_SEQNUM_OFS, 1);
70 }
71
72 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
73 {
74         return memcmp(data.dptr, key.dptr, data.dsize);
75 }
76
77 /* Returns 0 on fail.  On success, return offset of record, and fills
78    in rec */
79 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
80                         struct tdb_record *r)
81 {
82         tdb_off_t rec_ptr;
83         
84         /* read in the hash top */
85         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
86                 return 0;
87
88         /* keep looking until we find the right record */
89         while (rec_ptr) {
90                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
91                         return 0;
92
93                 if (!TDB_DEAD(r) && hash==r->full_hash
94                     && key.dsize==r->key_len
95                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
96                                       r->key_len, tdb_key_compare,
97                                       NULL) == 0) {
98                         return rec_ptr;
99                 }
100                 /* detect tight infinite loop */
101                 if (rec_ptr == r->next) {
102                         tdb->ecode = TDB_ERR_CORRUPT;
103                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
104                         return 0;
105                 }
106                 rec_ptr = r->next;
107         }
108         tdb->ecode = TDB_ERR_NOEXIST;
109         return 0;
110 }
111
112 /* As tdb_find, but if you succeed, keep the lock */
113 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
114                            struct tdb_record *rec)
115 {
116         uint32_t rec_ptr;
117
118         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
119                 return 0;
120         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
121                 tdb_unlock(tdb, BUCKET(hash), locktype);
122         return rec_ptr;
123 }
124
125 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
126
127 /* update an entry in place - this only works if the new data size
128    is <= the old data size and the key exists.
129    on failure return -1.
130 */
131 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
132 {
133         struct tdb_record rec;
134         tdb_off_t rec_ptr;
135
136         /* find entry */
137         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
138                 return -1;
139
140         /* it could be an exact duplicate of what is there - this is
141          * surprisingly common (eg. with a ldb re-index). */
142         if (rec.key_len == key.dsize && 
143             rec.data_len == dbuf.dsize &&
144             rec.full_hash == hash) {
145                 TDB_DATA data = _tdb_fetch(tdb, key);
146                 if (data.dsize == dbuf.dsize &&
147                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
148                         if (data.dptr) {
149                                 free(data.dptr);
150                         }
151                         return 0;
152                 }
153                 if (data.dptr) {
154                         free(data.dptr);
155                 }
156         }
157          
158
159         /* must be long enough key, data and tailer */
160         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
161                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
162                 return -1;
163         }
164
165         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
166                       dbuf.dptr, dbuf.dsize) == -1)
167                 return -1;
168
169         if (dbuf.dsize != rec.data_len) {
170                 /* update size */
171                 rec.data_len = dbuf.dsize;
172                 return tdb_rec_write(tdb, rec_ptr, &rec);
173         }
174  
175         return 0;
176 }
177
178 /* find an entry in the database given a key */
179 /* If an entry doesn't exist tdb_err will be set to
180  * TDB_ERR_NOEXIST. If a key has no data attached
181  * then the TDB_DATA will have zero length but
182  * a non-zero pointer
183  */
184 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
185 {
186         tdb_off_t rec_ptr;
187         struct tdb_record rec;
188         TDB_DATA ret;
189         uint32_t hash;
190
191         /* find which hash bucket it is in */
192         hash = tdb->hash_fn(&key);
193         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
194                 return tdb_null;
195
196         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
197                                   rec.data_len);
198         ret.dsize = rec.data_len;
199         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
200         return ret;
201 }
202
203 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
204 {
205         TDB_DATA ret = _tdb_fetch(tdb, key);
206
207         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
208         return ret;
209 }
210
211 /*
212  * Find an entry in the database and hand the record's data to a parsing
213  * function. The parsing function is executed under the chain read lock, so it
214  * should be fast and should not block on other syscalls.
215  *
216  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
217  *
218  * For mmapped tdb's that do not have a transaction open it points the parsing
219  * function directly at the mmap area, it avoids the malloc/memcpy in this
220  * case. If a transaction is open or no mmap is available, it has to do
221  * malloc/read/parse/free.
222  *
223  * This is interesting for all readers of potentially large data structures in
224  * the tdb records, ldb indexes being one example.
225  */
226
227 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
228                      int (*parser)(TDB_DATA key, TDB_DATA data,
229                                    void *private_data),
230                      void *private_data)
231 {
232         tdb_off_t rec_ptr;
233         struct tdb_record rec;
234         int ret;
235         uint32_t hash;
236
237         /* find which hash bucket it is in */
238         hash = tdb->hash_fn(&key);
239
240         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
241                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
242                 tdb->ecode = TDB_ERR_NOEXIST;
243                 return 0;
244         }
245         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
246
247         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
248                              rec.data_len, parser, private_data);
249
250         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
251
252         return ret;
253 }
254
255 /* check if an entry in the database exists 
256
257    note that 1 is returned if the key is found and 0 is returned if not found
258    this doesn't match the conventions in the rest of this module, but is
259    compatible with gdbm
260 */
261 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
262 {
263         struct tdb_record rec;
264         
265         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
266                 return 0;
267         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
268         return 1;
269 }
270
271 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
272 {
273         uint32_t hash = tdb->hash_fn(&key);
274         int ret;
275
276         ret = tdb_exists_hash(tdb, key, hash);
277         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
278         return ret;
279 }
280
281 /* actually delete an entry in the database given the offset */
282 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
283 {
284         tdb_off_t last_ptr, i;
285         struct tdb_record lastrec;
286
287         if (tdb->read_only || tdb->traverse_read) return -1;
288
289         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
290             tdb_write_lock_record(tdb, rec_ptr) == -1) {
291                 /* Someone traversing here: mark it as dead */
292                 rec->magic = TDB_DEAD_MAGIC;
293                 return tdb_rec_write(tdb, rec_ptr, rec);
294         }
295         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
296                 return -1;
297
298         /* find previous record in hash chain */
299         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
300                 return -1;
301         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
302                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
303                         return -1;
304
305         /* unlink it: next ptr is at start of record. */
306         if (last_ptr == 0)
307                 last_ptr = TDB_HASH_TOP(rec->full_hash);
308         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
309                 return -1;
310
311         /* recover the space */
312         if (tdb_free(tdb, rec_ptr, rec) == -1)
313                 return -1;
314         return 0;
315 }
316
317 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
318 {
319         int res = 0;
320         tdb_off_t rec_ptr;
321         struct tdb_record rec;
322         
323         /* read in the hash top */
324         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
325                 return 0;
326
327         while (rec_ptr) {
328                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
329                         return 0;
330
331                 if (rec.magic == TDB_DEAD_MAGIC) {
332                         res += 1;
333                 }
334                 rec_ptr = rec.next;
335         }
336         return res;
337 }
338
339 /*
340  * Purge all DEAD records from a hash chain
341  */
342 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
343 {
344         int res = -1;
345         struct tdb_record rec;
346         tdb_off_t rec_ptr;
347
348         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
349                 return -1;
350         }
351         
352         /* read in the hash top */
353         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
354                 goto fail;
355
356         while (rec_ptr) {
357                 tdb_off_t next;
358
359                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
360                         goto fail;
361                 }
362
363                 next = rec.next;
364
365                 if (rec.magic == TDB_DEAD_MAGIC
366                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
367                         goto fail;
368                 }
369                 rec_ptr = next;
370         }
371         res = 0;
372  fail:
373         tdb_unlock(tdb, -1, F_WRLCK);
374         return res;
375 }
376
377 /* delete an entry in the database given a key */
378 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
379 {
380         tdb_off_t rec_ptr;
381         struct tdb_record rec;
382         int ret;
383
384         if (tdb->max_dead_records != 0) {
385
386                 /*
387                  * Allow for some dead records per hash chain, mainly for
388                  * tdb's with a very high create/delete rate like locking.tdb.
389                  */
390
391                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
392                         return -1;
393
394                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
395                         /*
396                          * Don't let the per-chain freelist grow too large,
397                          * delete all existing dead records
398                          */
399                         tdb_purge_dead(tdb, hash);
400                 }
401
402                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
403                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
404                         return -1;
405                 }
406
407                 /*
408                  * Just mark the record as dead.
409                  */
410                 rec.magic = TDB_DEAD_MAGIC;
411                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
412         }
413         else {
414                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
415                                                    &rec)))
416                         return -1;
417
418                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
419         }
420
421         if (ret == 0) {
422                 tdb_increment_seqnum(tdb);
423         }
424
425         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
426                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
427         return ret;
428 }
429
430 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
431 {
432         uint32_t hash = tdb->hash_fn(&key);
433         int ret;
434
435         ret = tdb_delete_hash(tdb, key, hash);
436         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
437         return ret;
438 }
439
440 /*
441  * See if we have a dead record around with enough space
442  */
443 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
444                                struct tdb_record *r, tdb_len_t length)
445 {
446         tdb_off_t rec_ptr;
447         
448         /* read in the hash top */
449         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
450                 return 0;
451
452         /* keep looking until we find the right record */
453         while (rec_ptr) {
454                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
455                         return 0;
456
457                 if (TDB_DEAD(r) && r->rec_len >= length) {
458                         /*
459                          * First fit for simple coding, TODO: change to best
460                          * fit
461                          */
462                         return rec_ptr;
463                 }
464                 rec_ptr = r->next;
465         }
466         return 0;
467 }
468
469 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
470                        TDB_DATA dbuf, int flag, uint32_t hash)
471 {
472         struct tdb_record rec;
473         tdb_off_t rec_ptr;
474         char *p = NULL;
475         int ret = -1;
476
477         /* check for it existing, on insert. */
478         if (flag == TDB_INSERT) {
479                 if (tdb_exists_hash(tdb, key, hash)) {
480                         tdb->ecode = TDB_ERR_EXISTS;
481                         goto fail;
482                 }
483         } else {
484                 /* first try in-place update, on modify or replace. */
485                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
486                         goto done;
487                 }
488                 if (tdb->ecode == TDB_ERR_NOEXIST &&
489                     flag == TDB_MODIFY) {
490                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
491                          we should fail the store */
492                         goto fail;
493                 }
494         }
495         /* reset the error code potentially set by the tdb_update() */
496         tdb->ecode = TDB_SUCCESS;
497
498         /* delete any existing record - if it doesn't exist we don't
499            care.  Doing this first reduces fragmentation, and avoids
500            coalescing with `allocated' block before it's updated. */
501         if (flag != TDB_INSERT)
502                 tdb_delete_hash(tdb, key, hash);
503
504         /* Copy key+value *before* allocating free space in case malloc
505            fails and we are left with a dead spot in the tdb. */
506
507         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
508                 tdb->ecode = TDB_ERR_OOM;
509                 goto fail;
510         }
511
512         memcpy(p, key.dptr, key.dsize);
513         if (dbuf.dsize)
514                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
515
516         if (tdb->max_dead_records != 0) {
517                 /*
518                  * Allow for some dead records per hash chain, look if we can
519                  * find one that can hold the new record. We need enough space
520                  * for key, data and tailer. If we find one, we don't have to
521                  * consult the central freelist.
522                  */
523                 rec_ptr = tdb_find_dead(
524                         tdb, hash, &rec,
525                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
526
527                 if (rec_ptr != 0) {
528                         rec.key_len = key.dsize;
529                         rec.data_len = dbuf.dsize;
530                         rec.full_hash = hash;
531                         rec.magic = TDB_MAGIC;
532                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
533                             || tdb->methods->tdb_write(
534                                     tdb, rec_ptr + sizeof(rec),
535                                     p, key.dsize + dbuf.dsize) == -1) {
536                                 goto fail;
537                         }
538                         goto done;
539                 }
540         }
541
542         /*
543          * We have to allocate some space from the freelist, so this means we
544          * have to lock it. Use the chance to purge all the DEAD records from
545          * the hash chain under the freelist lock.
546          */
547
548         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
549                 goto fail;
550         }
551
552         if ((tdb->max_dead_records != 0)
553             && (tdb_purge_dead(tdb, hash) == -1)) {
554                 tdb_unlock(tdb, -1, F_WRLCK);
555                 goto fail;
556         }
557
558         /* we have to allocate some space */
559         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
560
561         tdb_unlock(tdb, -1, F_WRLCK);
562
563         if (rec_ptr == 0) {
564                 goto fail;
565         }
566
567         /* Read hash top into next ptr */
568         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
569                 goto fail;
570
571         rec.key_len = key.dsize;
572         rec.data_len = dbuf.dsize;
573         rec.full_hash = hash;
574         rec.magic = TDB_MAGIC;
575
576         /* write out and point the top of the hash chain at it */
577         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
578             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
579             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
580                 /* Need to tdb_unallocate() here */
581                 goto fail;
582         }
583
584  done:
585         ret = 0;
586  fail:
587         if (ret == 0) {
588                 tdb_increment_seqnum(tdb);
589         }
590
591         SAFE_FREE(p); 
592         return ret;
593 }
594
595 /* store an element in the database, replacing any existing element
596    with the same key
597
598    return 0 on success, -1 on failure
599 */
600 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
601 {
602         uint32_t hash;
603         int ret;
604
605         if (tdb->read_only || tdb->traverse_read) {
606                 tdb->ecode = TDB_ERR_RDONLY;
607                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
608                 return -1;
609         }
610
611         /* find which hash bucket it is in */
612         hash = tdb->hash_fn(&key);
613         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
614                 return -1;
615
616         ret = _tdb_store(tdb, key, dbuf, flag, hash);
617         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
618         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
619         return ret;
620 }
621
622 /* Append to an entry. Create if not exist. */
623 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
624 {
625         uint32_t hash;
626         TDB_DATA dbuf;
627         int ret = -1;
628
629         /* find which hash bucket it is in */
630         hash = tdb->hash_fn(&key);
631         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
632                 return -1;
633
634         dbuf = _tdb_fetch(tdb, key);
635
636         if (dbuf.dptr == NULL) {
637                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
638         } else {
639                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
640                 unsigned char *new_dptr;
641
642                 /* realloc '0' is special: don't do that. */
643                 if (new_len == 0)
644                         new_len = 1;
645                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
646                 if (new_dptr == NULL) {
647                         free(dbuf.dptr);
648                 }
649                 dbuf.dptr = new_dptr;
650         }
651
652         if (dbuf.dptr == NULL) {
653                 tdb->ecode = TDB_ERR_OOM;
654                 goto failed;
655         }
656
657         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
658         dbuf.dsize += new_dbuf.dsize;
659
660         ret = _tdb_store(tdb, key, dbuf, 0, hash);
661         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
662         
663 failed:
664         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
665         SAFE_FREE(dbuf.dptr);
666         return ret;
667 }
668
669
670 /*
671   return the name of the current tdb file
672   useful for external logging functions
673 */
674 const char *tdb_name(struct tdb_context *tdb)
675 {
676         return tdb->name;
677 }
678
679 /*
680   return the underlying file descriptor being used by tdb, or -1
681   useful for external routines that want to check the device/inode
682   of the fd
683 */
684 int tdb_fd(struct tdb_context *tdb)
685 {
686         return tdb->fd;
687 }
688
689 /*
690   return the current logging function
691   useful for external tdb routines that wish to log tdb errors
692 */
693 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
694 {
695         return tdb->log.log_fn;
696 }
697
698
699 /*
700   get the tdb sequence number. Only makes sense if the writers opened
701   with TDB_SEQNUM set. Note that this sequence number will wrap quite
702   quickly, so it should only be used for a 'has something changed'
703   test, not for code that relies on the count of the number of changes
704   made. If you want a counter then use a tdb record.
705
706   The aim of this sequence number is to allow for a very lightweight
707   test of a possible tdb change.
708 */
709 int tdb_get_seqnum(struct tdb_context *tdb)
710 {
711         tdb_off_t seqnum=0;
712
713         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
714         return seqnum;
715 }
716
717 int tdb_hash_size(struct tdb_context *tdb)
718 {
719         return tdb->header.hash_size;
720 }
721
722 size_t tdb_map_size(struct tdb_context *tdb)
723 {
724         return tdb->map_size;
725 }
726
727 int tdb_get_flags(struct tdb_context *tdb)
728 {
729         return tdb->flags;
730 }
731
732 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
733 {
734         if ((flags & TDB_ALLOW_NESTING) &&
735             (flags & TDB_DISALLOW_NESTING)) {
736                 tdb->ecode = TDB_ERR_NESTING;
737                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
738                         "allow_nesting and disallow_nesting are not allowed together!"));
739                 return;
740         }
741
742         if (flags & TDB_ALLOW_NESTING) {
743                 tdb->flags &= ~TDB_DISALLOW_NESTING;
744         }
745         if (flags & TDB_DISALLOW_NESTING) {
746                 tdb->flags &= ~TDB_ALLOW_NESTING;
747         }
748
749         tdb->flags |= flags;
750 }
751
752 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
753 {
754         if ((flags & TDB_ALLOW_NESTING) &&
755             (flags & TDB_DISALLOW_NESTING)) {
756                 tdb->ecode = TDB_ERR_NESTING;
757                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
758                         "allow_nesting and disallow_nesting are not allowed together!"));
759                 return;
760         }
761
762         if (flags & TDB_ALLOW_NESTING) {
763                 tdb->flags |= TDB_DISALLOW_NESTING;
764         }
765         if (flags & TDB_DISALLOW_NESTING) {
766                 tdb->flags |= TDB_ALLOW_NESTING;
767         }
768
769         tdb->flags &= ~flags;
770 }
771
772
773 /*
774   enable sequence number handling on an open tdb
775 */
776 void tdb_enable_seqnum(struct tdb_context *tdb)
777 {
778         tdb->flags |= TDB_SEQNUM;
779 }
780
781
782 /*
783   add a region of the file to the freelist. Length is the size of the region in bytes, 
784   which includes the free list header that needs to be added
785  */
786 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
787 {
788         struct tdb_record rec;
789         if (length <= sizeof(rec)) {
790                 /* the region is not worth adding */
791                 return 0;
792         }
793         if (length + offset > tdb->map_size) {
794                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
795                 return -1;              
796         }
797         memset(&rec,'\0',sizeof(rec));
798         rec.rec_len = length - sizeof(rec);
799         if (tdb_free(tdb, offset, &rec) == -1) {
800                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
801                 return -1;
802         }
803         return 0;
804 }
805
806 /*
807   wipe the entire database, deleting all records. This can be done
808   very fast by using a allrecord lock. The entire data portion of the
809   file becomes a single entry in the freelist.
810
811   This code carefully steps around the recovery area, leaving it alone
812  */
813 int tdb_wipe_all(struct tdb_context *tdb)
814 {
815         int i;
816         tdb_off_t offset = 0;
817         ssize_t data_len;
818         tdb_off_t recovery_head;
819         tdb_len_t recovery_size = 0;
820
821         if (tdb_lockall(tdb) != 0) {
822                 return -1;
823         }
824
825         tdb_trace(tdb, "tdb_wipe_all");
826
827         /* see if the tdb has a recovery area, and remember its size
828            if so. We don't want to lose this as otherwise each
829            tdb_wipe_all() in a transaction will increase the size of
830            the tdb by the size of the recovery area */
831         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
832                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
833                 goto failed;
834         }
835
836         if (recovery_head != 0) {
837                 struct tdb_record rec;
838                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
839                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
840                         return -1;
841                 }       
842                 recovery_size = rec.rec_len + sizeof(rec);
843         }
844
845         /* wipe the hashes */
846         for (i=0;i<tdb->header.hash_size;i++) {
847                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
848                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
849                         goto failed;
850                 }
851         }
852
853         /* wipe the freelist */
854         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
855                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
856                 goto failed;
857         }
858
859         /* add all the rest of the file to the freelist, possibly leaving a gap 
860            for the recovery area */
861         if (recovery_size == 0) {
862                 /* the simple case - the whole file can be used as a freelist */
863                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
864                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
865                         goto failed;
866                 }
867         } else {
868                 /* we need to add two freelist entries - one on either
869                    side of the recovery area 
870
871                    Note that we cannot shift the recovery area during
872                    this operation. Only the transaction.c code may
873                    move the recovery area or we risk subtle data
874                    corruption
875                 */
876                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
877                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
878                         goto failed;
879                 }
880                 /* and the 2nd free list entry after the recovery area - if any */
881                 data_len = tdb->map_size - (recovery_head+recovery_size);
882                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
883                         goto failed;
884                 }
885         }
886
887         if (tdb_unlockall(tdb) != 0) {
888                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
889                 goto failed;
890         }
891
892         return 0;
893
894 failed:
895         tdb_unlockall(tdb);
896         return -1;
897 }
898
899 struct traverse_state {
900         bool error;
901         struct tdb_context *dest_db;
902 };
903
904 /*
905   traverse function for repacking
906  */
907 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
908 {
909         struct traverse_state *state = (struct traverse_state *)private_data;
910         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
911                 state->error = true;
912                 return -1;
913         }
914         return 0;
915 }
916
917 /*
918   repack a tdb
919  */
920 int tdb_repack(struct tdb_context *tdb)
921 {
922         struct tdb_context *tmp_db;
923         struct traverse_state state;
924
925         tdb_trace(tdb, "tdb_repack");
926
927         if (tdb_transaction_start(tdb) != 0) {
928                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
929                 return -1;
930         }
931
932         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
933         if (tmp_db == NULL) {
934                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
935                 tdb_transaction_cancel(tdb);
936                 return -1;
937         }
938
939         state.error = false;
940         state.dest_db = tmp_db;
941
942         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
943                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
944                 tdb_transaction_cancel(tdb);
945                 tdb_close(tmp_db);
946                 return -1;              
947         }
948
949         if (state.error) {
950                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
951                 tdb_transaction_cancel(tdb);
952                 tdb_close(tmp_db);
953                 return -1;
954         }
955
956         if (tdb_wipe_all(tdb) != 0) {
957                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
958                 tdb_transaction_cancel(tdb);
959                 tdb_close(tmp_db);
960                 return -1;
961         }
962
963         state.error = false;
964         state.dest_db = tdb;
965
966         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
967                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
968                 tdb_transaction_cancel(tdb);
969                 tdb_close(tmp_db);
970                 return -1;              
971         }
972
973         if (state.error) {
974                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
975                 tdb_transaction_cancel(tdb);
976                 tdb_close(tmp_db);
977                 return -1;
978         }
979
980         tdb_close(tmp_db);
981
982         if (tdb_transaction_commit(tdb) != 0) {
983                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
984                 return -1;
985         }
986
987         return 0;
988 }
989
990 #ifdef TDB_TRACE
991 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
992 {
993         if (write(tdb->tracefd, str, strlen(str)) != strlen(str)) {
994                 close(tdb->tracefd);
995                 tdb->tracefd = -1;
996         }
997 }
998
999 static void tdb_trace_start(struct tdb_context *tdb)
1000 {
1001         tdb_off_t seqnum=0;
1002         char msg[sizeof(tdb_off_t) * 4 + 1];
1003
1004         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1005         snprintf(msg, sizeof(msg), "%u ", seqnum);
1006         tdb_trace_write(tdb, msg);
1007 }
1008
1009 static void tdb_trace_end(struct tdb_context *tdb)
1010 {
1011         tdb_trace_write(tdb, "\n");
1012 }
1013
1014 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1015 {
1016         char msg[sizeof(ret) * 4 + 4];
1017         snprintf(msg, sizeof(msg), " = %i\n", ret);
1018         tdb_trace_write(tdb, msg);
1019 }
1020
1021 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1022 {
1023         char msg[20 + rec.dsize*2], *p;
1024         unsigned int i;
1025
1026         /* We differentiate zero-length records from non-existent ones. */
1027         if (rec.dptr == NULL) {
1028                 tdb_trace_write(tdb, " NULL");
1029                 return;
1030         }
1031
1032         /* snprintf here is purely cargo-cult programming. */
1033         p = msg;
1034         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1035         for (i = 0; i < rec.dsize; i++)
1036                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1037
1038         tdb_trace_write(tdb, msg);
1039 }
1040
1041 void tdb_trace(struct tdb_context *tdb, const char *op)
1042 {
1043         tdb_trace_start(tdb);
1044         tdb_trace_write(tdb, op);
1045         tdb_trace_end(tdb);
1046 }
1047
1048 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1049 {
1050         char msg[sizeof(tdb_off_t) * 4 + 1];
1051
1052         snprintf(msg, sizeof(msg), "%u ", seqnum);
1053         tdb_trace_write(tdb, msg);
1054         tdb_trace_write(tdb, op);
1055         tdb_trace_end(tdb);
1056 }
1057
1058 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1059                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1060 {
1061         char msg[128];
1062
1063         snprintf(msg, sizeof(msg),
1064                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1065         tdb_trace_start(tdb);
1066         tdb_trace_write(tdb, msg);
1067         tdb_trace_end(tdb);
1068 }
1069
1070 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1071 {
1072         tdb_trace_start(tdb);
1073         tdb_trace_write(tdb, op);
1074         tdb_trace_end_ret(tdb, ret);
1075 }
1076
1077 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1078 {
1079         tdb_trace_start(tdb);
1080         tdb_trace_write(tdb, op);
1081         tdb_trace_write(tdb, " =");
1082         tdb_trace_record(tdb, ret);
1083         tdb_trace_end(tdb);
1084 }
1085
1086 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1087                     TDB_DATA rec)
1088 {
1089         tdb_trace_start(tdb);
1090         tdb_trace_write(tdb, op);
1091         tdb_trace_record(tdb, rec);
1092         tdb_trace_end(tdb);
1093 }
1094
1095 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1096                         TDB_DATA rec, int ret)
1097 {
1098         tdb_trace_start(tdb);
1099         tdb_trace_write(tdb, op);
1100         tdb_trace_record(tdb, rec);
1101         tdb_trace_end_ret(tdb, ret);
1102 }
1103
1104 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1105                            TDB_DATA rec, TDB_DATA ret)
1106 {
1107         tdb_trace_start(tdb);
1108         tdb_trace_write(tdb, op);
1109         tdb_trace_record(tdb, rec);
1110         tdb_trace_write(tdb, " =");
1111         tdb_trace_record(tdb, ret);
1112         tdb_trace_end(tdb);
1113 }
1114
1115 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1116                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1117                              int ret)
1118 {
1119         char msg[1 + sizeof(ret) * 4];
1120
1121         snprintf(msg, sizeof(msg), " %#x", flag);
1122         tdb_trace_start(tdb);
1123         tdb_trace_write(tdb, op);
1124         tdb_trace_record(tdb, rec1);
1125         tdb_trace_record(tdb, rec2);
1126         tdb_trace_write(tdb, msg);
1127         tdb_trace_end_ret(tdb, ret);
1128 }
1129
1130 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1131                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1132 {
1133         tdb_trace_start(tdb);
1134         tdb_trace_write(tdb, op);
1135         tdb_trace_record(tdb, rec1);
1136         tdb_trace_record(tdb, rec2);
1137         tdb_trace_write(tdb, " =");
1138         tdb_trace_record(tdb, ret);
1139         tdb_trace_end(tdb);
1140 }
1141 #endif