d2688def04743ae4e03cc84489cf251bec30dc8b
[samba.git] / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
63                 return;
64         }
65
66         tdb_increment_seqnum_nonblock(tdb);
67
68         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
69 }
70
71 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
72 {
73         return memcmp(data.dptr, key.dptr, data.dsize);
74 }
75
76 /* Returns 0 on fail.  On success, return offset of record, and fills
77    in rec */
78 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
79                         struct tdb_record *r)
80 {
81         tdb_off_t rec_ptr;
82         
83         /* read in the hash top */
84         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
85                 return 0;
86
87         /* keep looking until we find the right record */
88         while (rec_ptr) {
89                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
90                         return 0;
91
92                 if (!TDB_DEAD(r) && hash==r->full_hash
93                     && key.dsize==r->key_len
94                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
95                                       r->key_len, tdb_key_compare,
96                                       NULL) == 0) {
97                         return rec_ptr;
98                 }
99                 /* detect tight infinite loop */
100                 if (rec_ptr == r->next) {
101                         tdb->ecode = TDB_ERR_CORRUPT;
102                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
103                         return 0;
104                 }
105                 rec_ptr = r->next;
106         }
107         tdb->ecode = TDB_ERR_NOEXIST;
108         return 0;
109 }
110
111 /* As tdb_find, but if you succeed, keep the lock */
112 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
113                            struct tdb_record *rec)
114 {
115         uint32_t rec_ptr;
116
117         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
118                 return 0;
119         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
120                 tdb_unlock(tdb, BUCKET(hash), locktype);
121         return rec_ptr;
122 }
123
124 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
125
126 /* update an entry in place - this only works if the new data size
127    is <= the old data size and the key exists.
128    on failure return -1.
129 */
130 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
131 {
132         struct tdb_record rec;
133         tdb_off_t rec_ptr;
134
135         /* find entry */
136         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
137                 return -1;
138
139         /* it could be an exact duplicate of what is there - this is
140          * surprisingly common (eg. with a ldb re-index). */
141         if (rec.key_len == key.dsize && 
142             rec.data_len == dbuf.dsize &&
143             rec.full_hash == hash) {
144                 TDB_DATA data = _tdb_fetch(tdb, key);
145                 if (data.dsize == dbuf.dsize &&
146                     memcmp(data.dptr, dbuf.dptr, data.dsize) == 0) {
147                         if (data.dptr) {
148                                 free(data.dptr);
149                         }
150                         return 0;
151                 }
152                 if (data.dptr) {
153                         free(data.dptr);
154                 }
155         }
156          
157
158         /* must be long enough key, data and tailer */
159         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
160                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
161                 return -1;
162         }
163
164         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
165                       dbuf.dptr, dbuf.dsize) == -1)
166                 return -1;
167
168         if (dbuf.dsize != rec.data_len) {
169                 /* update size */
170                 rec.data_len = dbuf.dsize;
171                 return tdb_rec_write(tdb, rec_ptr, &rec);
172         }
173  
174         return 0;
175 }
176
177 /* find an entry in the database given a key */
178 /* If an entry doesn't exist tdb_err will be set to
179  * TDB_ERR_NOEXIST. If a key has no data attached
180  * then the TDB_DATA will have zero length but
181  * a non-zero pointer
182  */
183 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
184 {
185         tdb_off_t rec_ptr;
186         struct tdb_record rec;
187         TDB_DATA ret;
188         uint32_t hash;
189
190         /* find which hash bucket it is in */
191         hash = tdb->hash_fn(&key);
192         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
193                 return tdb_null;
194
195         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
196                                   rec.data_len);
197         ret.dsize = rec.data_len;
198         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
199         return ret;
200 }
201
202 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
203 {
204         TDB_DATA ret = _tdb_fetch(tdb, key);
205
206         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
207         return ret;
208 }
209
210 /*
211  * Find an entry in the database and hand the record's data to a parsing
212  * function. The parsing function is executed under the chain read lock, so it
213  * should be fast and should not block on other syscalls.
214  *
215  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
216  *
217  * For mmapped tdb's that do not have a transaction open it points the parsing
218  * function directly at the mmap area, it avoids the malloc/memcpy in this
219  * case. If a transaction is open or no mmap is available, it has to do
220  * malloc/read/parse/free.
221  *
222  * This is interesting for all readers of potentially large data structures in
223  * the tdb records, ldb indexes being one example.
224  */
225
226 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
227                      int (*parser)(TDB_DATA key, TDB_DATA data,
228                                    void *private_data),
229                      void *private_data)
230 {
231         tdb_off_t rec_ptr;
232         struct tdb_record rec;
233         int ret;
234         uint32_t hash;
235
236         /* find which hash bucket it is in */
237         hash = tdb->hash_fn(&key);
238
239         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
240                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
241                 tdb->ecode = TDB_ERR_NOEXIST;
242                 return 0;
243         }
244         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
245
246         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
247                              rec.data_len, parser, private_data);
248
249         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
250
251         return ret;
252 }
253
254 /* check if an entry in the database exists 
255
256    note that 1 is returned if the key is found and 0 is returned if not found
257    this doesn't match the conventions in the rest of this module, but is
258    compatible with gdbm
259 */
260 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
261 {
262         struct tdb_record rec;
263         
264         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
265                 return 0;
266         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
267         return 1;
268 }
269
270 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
271 {
272         uint32_t hash = tdb->hash_fn(&key);
273         int ret;
274
275         ret = tdb_exists_hash(tdb, key, hash);
276         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
277         return ret;
278 }
279
280 /* actually delete an entry in the database given the offset */
281 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
282 {
283         tdb_off_t last_ptr, i;
284         struct tdb_record lastrec;
285
286         if (tdb->read_only || tdb->traverse_read) return -1;
287
288         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
289             tdb_write_lock_record(tdb, rec_ptr) == -1) {
290                 /* Someone traversing here: mark it as dead */
291                 rec->magic = TDB_DEAD_MAGIC;
292                 return tdb_rec_write(tdb, rec_ptr, rec);
293         }
294         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
295                 return -1;
296
297         /* find previous record in hash chain */
298         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
299                 return -1;
300         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
301                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
302                         return -1;
303
304         /* unlink it: next ptr is at start of record. */
305         if (last_ptr == 0)
306                 last_ptr = TDB_HASH_TOP(rec->full_hash);
307         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
308                 return -1;
309
310         /* recover the space */
311         if (tdb_free(tdb, rec_ptr, rec) == -1)
312                 return -1;
313         return 0;
314 }
315
316 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
317 {
318         int res = 0;
319         tdb_off_t rec_ptr;
320         struct tdb_record rec;
321         
322         /* read in the hash top */
323         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
324                 return 0;
325
326         while (rec_ptr) {
327                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
328                         return 0;
329
330                 if (rec.magic == TDB_DEAD_MAGIC) {
331                         res += 1;
332                 }
333                 rec_ptr = rec.next;
334         }
335         return res;
336 }
337
338 /*
339  * Purge all DEAD records from a hash chain
340  */
341 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
342 {
343         int res = -1;
344         struct tdb_record rec;
345         tdb_off_t rec_ptr;
346
347         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
348                 return -1;
349         }
350         
351         /* read in the hash top */
352         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
353                 goto fail;
354
355         while (rec_ptr) {
356                 tdb_off_t next;
357
358                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
359                         goto fail;
360                 }
361
362                 next = rec.next;
363
364                 if (rec.magic == TDB_DEAD_MAGIC
365                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
366                         goto fail;
367                 }
368                 rec_ptr = next;
369         }
370         res = 0;
371  fail:
372         tdb_unlock(tdb, -1, F_WRLCK);
373         return res;
374 }
375
376 /* delete an entry in the database given a key */
377 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
378 {
379         tdb_off_t rec_ptr;
380         struct tdb_record rec;
381         int ret;
382
383         if (tdb->max_dead_records != 0) {
384
385                 /*
386                  * Allow for some dead records per hash chain, mainly for
387                  * tdb's with a very high create/delete rate like locking.tdb.
388                  */
389
390                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
391                         return -1;
392
393                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
394                         /*
395                          * Don't let the per-chain freelist grow too large,
396                          * delete all existing dead records
397                          */
398                         tdb_purge_dead(tdb, hash);
399                 }
400
401                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
402                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
403                         return -1;
404                 }
405
406                 /*
407                  * Just mark the record as dead.
408                  */
409                 rec.magic = TDB_DEAD_MAGIC;
410                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
411         }
412         else {
413                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
414                                                    &rec)))
415                         return -1;
416
417                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
418         }
419
420         if (ret == 0) {
421                 tdb_increment_seqnum(tdb);
422         }
423
424         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
425                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
426         return ret;
427 }
428
429 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
430 {
431         uint32_t hash = tdb->hash_fn(&key);
432         int ret;
433
434         ret = tdb_delete_hash(tdb, key, hash);
435         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
436         return ret;
437 }
438
439 /*
440  * See if we have a dead record around with enough space
441  */
442 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
443                                struct tdb_record *r, tdb_len_t length)
444 {
445         tdb_off_t rec_ptr;
446         
447         /* read in the hash top */
448         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
449                 return 0;
450
451         /* keep looking until we find the right record */
452         while (rec_ptr) {
453                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
454                         return 0;
455
456                 if (TDB_DEAD(r) && r->rec_len >= length) {
457                         /*
458                          * First fit for simple coding, TODO: change to best
459                          * fit
460                          */
461                         return rec_ptr;
462                 }
463                 rec_ptr = r->next;
464         }
465         return 0;
466 }
467
468 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
469                        TDB_DATA dbuf, int flag, uint32_t hash)
470 {
471         struct tdb_record rec;
472         tdb_off_t rec_ptr;
473         char *p = NULL;
474         int ret = -1;
475
476         /* check for it existing, on insert. */
477         if (flag == TDB_INSERT) {
478                 if (tdb_exists_hash(tdb, key, hash)) {
479                         tdb->ecode = TDB_ERR_EXISTS;
480                         goto fail;
481                 }
482         } else {
483                 /* first try in-place update, on modify or replace. */
484                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
485                         goto done;
486                 }
487                 if (tdb->ecode == TDB_ERR_NOEXIST &&
488                     flag == TDB_MODIFY) {
489                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
490                          we should fail the store */
491                         goto fail;
492                 }
493         }
494         /* reset the error code potentially set by the tdb_update() */
495         tdb->ecode = TDB_SUCCESS;
496
497         /* delete any existing record - if it doesn't exist we don't
498            care.  Doing this first reduces fragmentation, and avoids
499            coalescing with `allocated' block before it's updated. */
500         if (flag != TDB_INSERT)
501                 tdb_delete_hash(tdb, key, hash);
502
503         /* Copy key+value *before* allocating free space in case malloc
504            fails and we are left with a dead spot in the tdb. */
505
506         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
507                 tdb->ecode = TDB_ERR_OOM;
508                 goto fail;
509         }
510
511         memcpy(p, key.dptr, key.dsize);
512         if (dbuf.dsize)
513                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
514
515         if (tdb->max_dead_records != 0) {
516                 /*
517                  * Allow for some dead records per hash chain, look if we can
518                  * find one that can hold the new record. We need enough space
519                  * for key, data and tailer. If we find one, we don't have to
520                  * consult the central freelist.
521                  */
522                 rec_ptr = tdb_find_dead(
523                         tdb, hash, &rec,
524                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
525
526                 if (rec_ptr != 0) {
527                         rec.key_len = key.dsize;
528                         rec.data_len = dbuf.dsize;
529                         rec.full_hash = hash;
530                         rec.magic = TDB_MAGIC;
531                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
532                             || tdb->methods->tdb_write(
533                                     tdb, rec_ptr + sizeof(rec),
534                                     p, key.dsize + dbuf.dsize) == -1) {
535                                 goto fail;
536                         }
537                         goto done;
538                 }
539         }
540
541         /*
542          * We have to allocate some space from the freelist, so this means we
543          * have to lock it. Use the chance to purge all the DEAD records from
544          * the hash chain under the freelist lock.
545          */
546
547         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
548                 goto fail;
549         }
550
551         if ((tdb->max_dead_records != 0)
552             && (tdb_purge_dead(tdb, hash) == -1)) {
553                 tdb_unlock(tdb, -1, F_WRLCK);
554                 goto fail;
555         }
556
557         /* we have to allocate some space */
558         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
559
560         tdb_unlock(tdb, -1, F_WRLCK);
561
562         if (rec_ptr == 0) {
563                 goto fail;
564         }
565
566         /* Read hash top into next ptr */
567         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
568                 goto fail;
569
570         rec.key_len = key.dsize;
571         rec.data_len = dbuf.dsize;
572         rec.full_hash = hash;
573         rec.magic = TDB_MAGIC;
574
575         /* write out and point the top of the hash chain at it */
576         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
577             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
578             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
579                 /* Need to tdb_unallocate() here */
580                 goto fail;
581         }
582
583  done:
584         ret = 0;
585  fail:
586         if (ret == 0) {
587                 tdb_increment_seqnum(tdb);
588         }
589
590         SAFE_FREE(p); 
591         return ret;
592 }
593
594 /* store an element in the database, replacing any existing element
595    with the same key
596
597    return 0 on success, -1 on failure
598 */
599 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
600 {
601         uint32_t hash;
602         int ret;
603
604         if (tdb->read_only || tdb->traverse_read) {
605                 tdb->ecode = TDB_ERR_RDONLY;
606                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
607                 return -1;
608         }
609
610         /* find which hash bucket it is in */
611         hash = tdb->hash_fn(&key);
612         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
613                 return -1;
614
615         ret = _tdb_store(tdb, key, dbuf, flag, hash);
616         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
617         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
618         return ret;
619 }
620
621 /* Append to an entry. Create if not exist. */
622 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
623 {
624         uint32_t hash;
625         TDB_DATA dbuf;
626         int ret = -1;
627
628         /* find which hash bucket it is in */
629         hash = tdb->hash_fn(&key);
630         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
631                 return -1;
632
633         dbuf = _tdb_fetch(tdb, key);
634
635         if (dbuf.dptr == NULL) {
636                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
637         } else {
638                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
639                 unsigned char *new_dptr;
640
641                 /* realloc '0' is special: don't do that. */
642                 if (new_len == 0)
643                         new_len = 1;
644                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
645                 if (new_dptr == NULL) {
646                         free(dbuf.dptr);
647                 }
648                 dbuf.dptr = new_dptr;
649         }
650
651         if (dbuf.dptr == NULL) {
652                 tdb->ecode = TDB_ERR_OOM;
653                 goto failed;
654         }
655
656         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
657         dbuf.dsize += new_dbuf.dsize;
658
659         ret = _tdb_store(tdb, key, dbuf, 0, hash);
660         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
661         
662 failed:
663         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
664         SAFE_FREE(dbuf.dptr);
665         return ret;
666 }
667
668
669 /*
670   return the name of the current tdb file
671   useful for external logging functions
672 */
673 const char *tdb_name(struct tdb_context *tdb)
674 {
675         return tdb->name;
676 }
677
678 /*
679   return the underlying file descriptor being used by tdb, or -1
680   useful for external routines that want to check the device/inode
681   of the fd
682 */
683 int tdb_fd(struct tdb_context *tdb)
684 {
685         return tdb->fd;
686 }
687
688 /*
689   return the current logging function
690   useful for external tdb routines that wish to log tdb errors
691 */
692 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
693 {
694         return tdb->log.log_fn;
695 }
696
697
698 /*
699   get the tdb sequence number. Only makes sense if the writers opened
700   with TDB_SEQNUM set. Note that this sequence number will wrap quite
701   quickly, so it should only be used for a 'has something changed'
702   test, not for code that relies on the count of the number of changes
703   made. If you want a counter then use a tdb record.
704
705   The aim of this sequence number is to allow for a very lightweight
706   test of a possible tdb change.
707 */
708 int tdb_get_seqnum(struct tdb_context *tdb)
709 {
710         tdb_off_t seqnum=0;
711
712         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
713         return seqnum;
714 }
715
716 int tdb_hash_size(struct tdb_context *tdb)
717 {
718         return tdb->header.hash_size;
719 }
720
721 size_t tdb_map_size(struct tdb_context *tdb)
722 {
723         return tdb->map_size;
724 }
725
726 int tdb_get_flags(struct tdb_context *tdb)
727 {
728         return tdb->flags;
729 }
730
731 void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
732 {
733         if ((flags & TDB_ALLOW_NESTING) &&
734             (flags & TDB_DISALLOW_NESTING)) {
735                 tdb->ecode = TDB_ERR_NESTING;
736                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
737                         "allow_nesting and disallow_nesting are not allowed together!"));
738                 return;
739         }
740
741         if (flags & TDB_ALLOW_NESTING) {
742                 tdb->flags &= ~TDB_DISALLOW_NESTING;
743         }
744         if (flags & TDB_DISALLOW_NESTING) {
745                 tdb->flags &= ~TDB_ALLOW_NESTING;
746         }
747
748         tdb->flags |= flags;
749 }
750
751 void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
752 {
753         if ((flags & TDB_ALLOW_NESTING) &&
754             (flags & TDB_DISALLOW_NESTING)) {
755                 tdb->ecode = TDB_ERR_NESTING;
756                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
757                         "allow_nesting and disallow_nesting are not allowed together!"));
758                 return;
759         }
760
761         if (flags & TDB_ALLOW_NESTING) {
762                 tdb->flags |= TDB_DISALLOW_NESTING;
763         }
764         if (flags & TDB_DISALLOW_NESTING) {
765                 tdb->flags |= TDB_ALLOW_NESTING;
766         }
767
768         tdb->flags &= ~flags;
769 }
770
771
772 /*
773   enable sequence number handling on an open tdb
774 */
775 void tdb_enable_seqnum(struct tdb_context *tdb)
776 {
777         tdb->flags |= TDB_SEQNUM;
778 }
779
780
781 /*
782   add a region of the file to the freelist. Length is the size of the region in bytes, 
783   which includes the free list header that needs to be added
784  */
785 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
786 {
787         struct tdb_record rec;
788         if (length <= sizeof(rec)) {
789                 /* the region is not worth adding */
790                 return 0;
791         }
792         if (length + offset > tdb->map_size) {
793                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
794                 return -1;              
795         }
796         memset(&rec,'\0',sizeof(rec));
797         rec.rec_len = length - sizeof(rec);
798         if (tdb_free(tdb, offset, &rec) == -1) {
799                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
800                 return -1;
801         }
802         return 0;
803 }
804
805 /*
806   wipe the entire database, deleting all records. This can be done
807   very fast by using a global lock. The entire data portion of the
808   file becomes a single entry in the freelist.
809
810   This code carefully steps around the recovery area, leaving it alone
811  */
812 int tdb_wipe_all(struct tdb_context *tdb)
813 {
814         int i;
815         tdb_off_t offset = 0;
816         ssize_t data_len;
817         tdb_off_t recovery_head;
818         tdb_len_t recovery_size = 0;
819
820         if (tdb_lockall(tdb) != 0) {
821                 return -1;
822         }
823
824         tdb_trace(tdb, "tdb_wipe_all");
825
826         /* see if the tdb has a recovery area, and remember its size
827            if so. We don't want to lose this as otherwise each
828            tdb_wipe_all() in a transaction will increase the size of
829            the tdb by the size of the recovery area */
830         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
831                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
832                 goto failed;
833         }
834
835         if (recovery_head != 0) {
836                 struct tdb_record rec;
837                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
838                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
839                         return -1;
840                 }       
841                 recovery_size = rec.rec_len + sizeof(rec);
842         }
843
844         /* wipe the hashes */
845         for (i=0;i<tdb->header.hash_size;i++) {
846                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
847                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
848                         goto failed;
849                 }
850         }
851
852         /* wipe the freelist */
853         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
854                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
855                 goto failed;
856         }
857
858         /* add all the rest of the file to the freelist, possibly leaving a gap 
859            for the recovery area */
860         if (recovery_size == 0) {
861                 /* the simple case - the whole file can be used as a freelist */
862                 data_len = (tdb->map_size - TDB_DATA_START(tdb->header.hash_size));
863                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
864                         goto failed;
865                 }
866         } else {
867                 /* we need to add two freelist entries - one on either
868                    side of the recovery area 
869
870                    Note that we cannot shift the recovery area during
871                    this operation. Only the transaction.c code may
872                    move the recovery area or we risk subtle data
873                    corruption
874                 */
875                 data_len = (recovery_head - TDB_DATA_START(tdb->header.hash_size));
876                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->header.hash_size), data_len) != 0) {
877                         goto failed;
878                 }
879                 /* and the 2nd free list entry after the recovery area - if any */
880                 data_len = tdb->map_size - (recovery_head+recovery_size);
881                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
882                         goto failed;
883                 }
884         }
885
886         if (tdb_unlockall(tdb) != 0) {
887                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
888                 goto failed;
889         }
890
891         return 0;
892
893 failed:
894         tdb_unlockall(tdb);
895         return -1;
896 }
897
898 struct traverse_state {
899         bool error;
900         struct tdb_context *dest_db;
901 };
902
903 /*
904   traverse function for repacking
905  */
906 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
907 {
908         struct traverse_state *state = (struct traverse_state *)private_data;
909         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
910                 state->error = true;
911                 return -1;
912         }
913         return 0;
914 }
915
916 /*
917   repack a tdb
918  */
919 int tdb_repack(struct tdb_context *tdb)
920 {
921         struct tdb_context *tmp_db;
922         struct traverse_state state;
923
924         tdb_trace(tdb, "tdb_repack");
925
926         if (tdb_transaction_start(tdb) != 0) {
927                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
928                 return -1;
929         }
930
931         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
932         if (tmp_db == NULL) {
933                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
934                 tdb_transaction_cancel(tdb);
935                 return -1;
936         }
937
938         state.error = false;
939         state.dest_db = tmp_db;
940
941         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
942                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
943                 tdb_transaction_cancel(tdb);
944                 tdb_close(tmp_db);
945                 return -1;              
946         }
947
948         if (state.error) {
949                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
950                 tdb_transaction_cancel(tdb);
951                 tdb_close(tmp_db);
952                 return -1;
953         }
954
955         if (tdb_wipe_all(tdb) != 0) {
956                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
957                 tdb_transaction_cancel(tdb);
958                 tdb_close(tmp_db);
959                 return -1;
960         }
961
962         state.error = false;
963         state.dest_db = tdb;
964
965         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
966                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
967                 tdb_transaction_cancel(tdb);
968                 tdb_close(tmp_db);
969                 return -1;              
970         }
971
972         if (state.error) {
973                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
974                 tdb_transaction_cancel(tdb);
975                 tdb_close(tmp_db);
976                 return -1;
977         }
978
979         tdb_close(tmp_db);
980
981         if (tdb_transaction_commit(tdb) != 0) {
982                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
983                 return -1;
984         }
985
986         return 0;
987 }
988
989 #ifdef TDB_TRACE
990 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
991 {
992         if (write(tdb->tracefd, str, strlen(str)) != strlen(str)) {
993                 close(tdb->tracefd);
994                 tdb->tracefd = -1;
995         }
996 }
997
998 static void tdb_trace_start(struct tdb_context *tdb)
999 {
1000         tdb_off_t seqnum=0;
1001         char msg[sizeof(tdb_off_t) * 4 + 1];
1002
1003         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1004         snprintf(msg, sizeof(msg), "%u ", seqnum);
1005         tdb_trace_write(tdb, msg);
1006 }
1007
1008 static void tdb_trace_end(struct tdb_context *tdb)
1009 {
1010         tdb_trace_write(tdb, "\n");
1011 }
1012
1013 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1014 {
1015         char msg[sizeof(ret) * 4 + 4];
1016         snprintf(msg, sizeof(msg), " = %i\n", ret);
1017         tdb_trace_write(tdb, msg);
1018 }
1019
1020 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1021 {
1022         char msg[20 + rec.dsize*2], *p;
1023         unsigned int i;
1024
1025         /* We differentiate zero-length records from non-existent ones. */
1026         if (rec.dptr == NULL) {
1027                 tdb_trace_write(tdb, " NULL");
1028                 return;
1029         }
1030
1031         /* snprintf here is purely cargo-cult programming. */
1032         p = msg;
1033         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1034         for (i = 0; i < rec.dsize; i++)
1035                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1036
1037         tdb_trace_write(tdb, msg);
1038 }
1039
1040 void tdb_trace(struct tdb_context *tdb, const char *op)
1041 {
1042         tdb_trace_start(tdb);
1043         tdb_trace_write(tdb, op);
1044         tdb_trace_end(tdb);
1045 }
1046
1047 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1048 {
1049         char msg[sizeof(tdb_off_t) * 4 + 1];
1050
1051         snprintf(msg, sizeof(msg), "%u ", seqnum);
1052         tdb_trace_write(tdb, msg);
1053         tdb_trace_write(tdb, op);
1054         tdb_trace_end(tdb);
1055 }
1056
1057 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1058                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1059 {
1060         char msg[128];
1061
1062         snprintf(msg, sizeof(msg),
1063                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1064         tdb_trace_start(tdb);
1065         tdb_trace_write(tdb, msg);
1066         tdb_trace_end(tdb);
1067 }
1068
1069 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1070 {
1071         tdb_trace_start(tdb);
1072         tdb_trace_write(tdb, op);
1073         tdb_trace_end_ret(tdb, ret);
1074 }
1075
1076 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1077 {
1078         tdb_trace_start(tdb);
1079         tdb_trace_write(tdb, op);
1080         tdb_trace_write(tdb, " =");
1081         tdb_trace_record(tdb, ret);
1082         tdb_trace_end(tdb);
1083 }
1084
1085 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1086                     TDB_DATA rec)
1087 {
1088         tdb_trace_start(tdb);
1089         tdb_trace_write(tdb, op);
1090         tdb_trace_record(tdb, rec);
1091         tdb_trace_end(tdb);
1092 }
1093
1094 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1095                         TDB_DATA rec, int ret)
1096 {
1097         tdb_trace_start(tdb);
1098         tdb_trace_write(tdb, op);
1099         tdb_trace_record(tdb, rec);
1100         tdb_trace_end_ret(tdb, ret);
1101 }
1102
1103 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1104                            TDB_DATA rec, TDB_DATA ret)
1105 {
1106         tdb_trace_start(tdb);
1107         tdb_trace_write(tdb, op);
1108         tdb_trace_record(tdb, rec);
1109         tdb_trace_write(tdb, " =");
1110         tdb_trace_record(tdb, ret);
1111         tdb_trace_end(tdb);
1112 }
1113
1114 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1115                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1116                              int ret)
1117 {
1118         char msg[1 + sizeof(ret) * 4];
1119
1120         snprintf(msg, sizeof(msg), " %#x", flag);
1121         tdb_trace_start(tdb);
1122         tdb_trace_write(tdb, op);
1123         tdb_trace_record(tdb, rec1);
1124         tdb_trace_record(tdb, rec2);
1125         tdb_trace_write(tdb, msg);
1126         tdb_trace_end_ret(tdb, ret);
1127 }
1128
1129 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1130                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1131 {
1132         tdb_trace_start(tdb);
1133         tdb_trace_write(tdb, op);
1134         tdb_trace_record(tdb, rec1);
1135         tdb_trace_record(tdb, rec2);
1136         tdb_trace_write(tdb, " =");
1137         tdb_trace_record(tdb, ret);
1138         tdb_trace_end(tdb);
1139 }
1140 #endif