9885d8cfc9dff95264f547d211f665f093e62497
[sharpe/samba-autobuild/.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
63                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
64                 return;
65         }
66
67         tdb_increment_seqnum_nonblock(tdb);
68
69         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
70 }
71
72 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
73 {
74         return memcmp(data.dptr, key.dptr, data.dsize);
75 }
76
77 /* Returns 0 on fail.  On success, return offset of record, and fills
78    in rec */
79 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
80                         struct tdb_record *r)
81 {
82         tdb_off_t rec_ptr;
83
84         /* read in the hash top */
85         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
86                 return 0;
87
88         /* keep looking until we find the right record */
89         while (rec_ptr) {
90                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
91                         return 0;
92
93                 if (!TDB_DEAD(r) && hash==r->full_hash
94                     && key.dsize==r->key_len
95                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
96                                       r->key_len, tdb_key_compare,
97                                       NULL) == 0) {
98                         return rec_ptr;
99                 }
100                 /* detect tight infinite loop */
101                 if (rec_ptr == r->next) {
102                         tdb->ecode = TDB_ERR_CORRUPT;
103                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
104                         return 0;
105                 }
106                 rec_ptr = r->next;
107         }
108         tdb->ecode = TDB_ERR_NOEXIST;
109         return 0;
110 }
111
112 /* As tdb_find, but if you succeed, keep the lock */
113 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
114                            struct tdb_record *rec)
115 {
116         uint32_t rec_ptr;
117
118         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
119                 return 0;
120         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
121                 tdb_unlock(tdb, BUCKET(hash), locktype);
122         return rec_ptr;
123 }
124
125 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
126
127 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
128 {
129         TDB_DATA *dbuf = (TDB_DATA *)private_data;
130
131         if (dbuf->dsize != data.dsize) {
132                 return -1;
133         }
134         if (memcmp(dbuf->dptr, data.dptr, data.dsize) != 0) {
135                 return -1;
136         }
137         return 0;
138 }
139
140 /* update an entry in place - this only works if the new data size
141    is <= the old data size and the key exists.
142    on failure return -1.
143 */
144 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
145 {
146         struct tdb_record rec;
147         tdb_off_t rec_ptr;
148
149         /* find entry */
150         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
151                 return -1;
152
153         /* it could be an exact duplicate of what is there - this is
154          * surprisingly common (eg. with a ldb re-index). */
155         if (rec.key_len == key.dsize &&
156             rec.data_len == dbuf.dsize &&
157             rec.full_hash == hash &&
158             tdb_parse_record(tdb, key, tdb_update_hash_cmp, &dbuf) == 0) {
159                 return 0;
160         }
161
162         /* must be long enough key, data and tailer */
163         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
164                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
165                 return -1;
166         }
167
168         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
169                       dbuf.dptr, dbuf.dsize) == -1)
170                 return -1;
171
172         if (dbuf.dsize != rec.data_len) {
173                 /* update size */
174                 rec.data_len = dbuf.dsize;
175                 return tdb_rec_write(tdb, rec_ptr, &rec);
176         }
177
178         return 0;
179 }
180
181 /* find an entry in the database given a key */
182 /* If an entry doesn't exist tdb_err will be set to
183  * TDB_ERR_NOEXIST. If a key has no data attached
184  * then the TDB_DATA will have zero length but
185  * a non-zero pointer
186  */
187 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
188 {
189         tdb_off_t rec_ptr;
190         struct tdb_record rec;
191         TDB_DATA ret;
192         uint32_t hash;
193
194         /* find which hash bucket it is in */
195         hash = tdb->hash_fn(&key);
196         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
197                 return tdb_null;
198
199         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
200                                   rec.data_len);
201         ret.dsize = rec.data_len;
202         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
203         return ret;
204 }
205
206 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
207 {
208         TDB_DATA ret = _tdb_fetch(tdb, key);
209
210         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
211         return ret;
212 }
213
214 /*
215  * Find an entry in the database and hand the record's data to a parsing
216  * function. The parsing function is executed under the chain read lock, so it
217  * should be fast and should not block on other syscalls.
218  *
219  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
220  *
221  * For mmapped tdb's that do not have a transaction open it points the parsing
222  * function directly at the mmap area, it avoids the malloc/memcpy in this
223  * case. If a transaction is open or no mmap is available, it has to do
224  * malloc/read/parse/free.
225  *
226  * This is interesting for all readers of potentially large data structures in
227  * the tdb records, ldb indexes being one example.
228  *
229  * Return -1 if the record was not found.
230  */
231
232 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
233                      int (*parser)(TDB_DATA key, TDB_DATA data,
234                                    void *private_data),
235                      void *private_data)
236 {
237         tdb_off_t rec_ptr;
238         struct tdb_record rec;
239         int ret;
240         uint32_t hash;
241
242         /* find which hash bucket it is in */
243         hash = tdb->hash_fn(&key);
244
245         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
246                 /* record not found */
247                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
248                 tdb->ecode = TDB_ERR_NOEXIST;
249                 return -1;
250         }
251         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
252
253         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
254                              rec.data_len, parser, private_data);
255
256         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
257
258         return ret;
259 }
260
261 /* check if an entry in the database exists
262
263    note that 1 is returned if the key is found and 0 is returned if not found
264    this doesn't match the conventions in the rest of this module, but is
265    compatible with gdbm
266 */
267 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
268 {
269         struct tdb_record rec;
270
271         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
272                 return 0;
273         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
274         return 1;
275 }
276
277 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
278 {
279         uint32_t hash = tdb->hash_fn(&key);
280         int ret;
281
282         ret = tdb_exists_hash(tdb, key, hash);
283         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
284         return ret;
285 }
286
287 /* actually delete an entry in the database given the offset */
288 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
289 {
290         tdb_off_t last_ptr, i;
291         struct tdb_record lastrec;
292
293         if (tdb->read_only || tdb->traverse_read) return -1;
294
295         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
296             tdb_write_lock_record(tdb, rec_ptr) == -1) {
297                 /* Someone traversing here: mark it as dead */
298                 rec->magic = TDB_DEAD_MAGIC;
299                 return tdb_rec_write(tdb, rec_ptr, rec);
300         }
301         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
302                 return -1;
303
304         /* find previous record in hash chain */
305         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
306                 return -1;
307         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
308                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
309                         return -1;
310
311         /* unlink it: next ptr is at start of record. */
312         if (last_ptr == 0)
313                 last_ptr = TDB_HASH_TOP(rec->full_hash);
314         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
315                 return -1;
316
317         /* recover the space */
318         if (tdb_free(tdb, rec_ptr, rec) == -1)
319                 return -1;
320         return 0;
321 }
322
323 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
324 {
325         int res = 0;
326         tdb_off_t rec_ptr;
327         struct tdb_record rec;
328
329         /* read in the hash top */
330         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
331                 return 0;
332
333         while (rec_ptr) {
334                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
335                         return 0;
336
337                 if (rec.magic == TDB_DEAD_MAGIC) {
338                         res += 1;
339                 }
340                 rec_ptr = rec.next;
341         }
342         return res;
343 }
344
345 /*
346  * Purge all DEAD records from a hash chain
347  */
348 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
349 {
350         int res = -1;
351         struct tdb_record rec;
352         tdb_off_t rec_ptr;
353
354         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
355                 /*
356                  * Don't block the freelist if not strictly necessary
357                  */
358                 return -1;
359         }
360
361         /* read in the hash top */
362         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
363                 goto fail;
364
365         while (rec_ptr) {
366                 tdb_off_t next;
367
368                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
369                         goto fail;
370                 }
371
372                 next = rec.next;
373
374                 if (rec.magic == TDB_DEAD_MAGIC
375                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
376                         goto fail;
377                 }
378                 rec_ptr = next;
379         }
380         res = 0;
381  fail:
382         tdb_unlock(tdb, -1, F_WRLCK);
383         return res;
384 }
385
386 /* delete an entry in the database given a key */
387 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
388 {
389         tdb_off_t rec_ptr;
390         struct tdb_record rec;
391         int ret;
392
393         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
394         if (rec_ptr == 0) {
395                 return -1;
396         }
397
398         if (tdb->max_dead_records != 0) {
399
400                 uint32_t magic = TDB_DEAD_MAGIC;
401
402                 /*
403                  * Allow for some dead records per hash chain, mainly for
404                  * tdb's with a very high create/delete rate like locking.tdb.
405                  */
406
407                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
408                         /*
409                          * Don't let the per-chain freelist grow too large,
410                          * delete all existing dead records
411                          */
412                         tdb_purge_dead(tdb, hash);
413                 }
414
415                 /*
416                  * Just mark the record as dead.
417                  */
418                 ret = tdb_ofs_write(
419                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
420                         &magic);
421         }
422         else {
423                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
424         }
425
426         if (ret == 0) {
427                 tdb_increment_seqnum(tdb);
428         }
429
430         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
431                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
432         return ret;
433 }
434
435 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
436 {
437         uint32_t hash = tdb->hash_fn(&key);
438         int ret;
439
440         ret = tdb_delete_hash(tdb, key, hash);
441         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
442         return ret;
443 }
444
445 /*
446  * See if we have a dead record around with enough space
447  */
448 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
449                         struct tdb_record *r, tdb_len_t length,
450                         tdb_off_t *p_last_ptr)
451 {
452         tdb_off_t rec_ptr, last_ptr;
453         tdb_off_t best_rec_ptr = 0;
454         tdb_off_t best_last_ptr = 0;
455         struct tdb_record best = { .rec_len = UINT32_MAX };
456
457         length += sizeof(tdb_off_t); /* tailer */
458
459         last_ptr = TDB_HASH_TOP(hash);
460
461         /* read in the hash top */
462         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
463                 return 0;
464
465         /* keep looking until we find the right record */
466         while (rec_ptr) {
467                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
468                         return 0;
469
470                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
471                     (r->rec_len < best.rec_len)) {
472                         best_rec_ptr = rec_ptr;
473                         best_last_ptr = last_ptr;
474                         best = *r;
475                 }
476                 last_ptr = rec_ptr;
477                 rec_ptr = r->next;
478         }
479
480         if (best.rec_len == UINT32_MAX) {
481                 return 0;
482         }
483
484         *r = best;
485         *p_last_ptr = best_last_ptr;
486         return best_rec_ptr;
487 }
488
489 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
490                        TDB_DATA dbuf, int flag, uint32_t hash)
491 {
492         struct tdb_record rec;
493         tdb_off_t rec_ptr;
494         int ret = -1;
495
496         /* check for it existing, on insert. */
497         if (flag == TDB_INSERT) {
498                 if (tdb_exists_hash(tdb, key, hash)) {
499                         tdb->ecode = TDB_ERR_EXISTS;
500                         goto fail;
501                 }
502         } else {
503                 /* first try in-place update, on modify or replace. */
504                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
505                         goto done;
506                 }
507                 if (tdb->ecode == TDB_ERR_NOEXIST &&
508                     flag == TDB_MODIFY) {
509                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
510                          we should fail the store */
511                         goto fail;
512                 }
513         }
514         /* reset the error code potentially set by the tdb_update_hash() */
515         tdb->ecode = TDB_SUCCESS;
516
517         /* delete any existing record - if it doesn't exist we don't
518            care.  Doing this first reduces fragmentation, and avoids
519            coalescing with `allocated' block before it's updated. */
520         if (flag != TDB_INSERT)
521                 tdb_delete_hash(tdb, key, hash);
522
523         /* we have to allocate some space */
524         rec_ptr = tdb_allocate(tdb, hash, key.dsize + dbuf.dsize, &rec);
525
526         if (rec_ptr == 0) {
527                 goto fail;
528         }
529
530         /* Read hash top into next ptr */
531         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
532                 goto fail;
533
534         rec.key_len = key.dsize;
535         rec.data_len = dbuf.dsize;
536         rec.full_hash = hash;
537         rec.magic = TDB_MAGIC;
538
539         /* write out and point the top of the hash chain at it */
540         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
541             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec),
542                                        key.dptr, key.dsize) == -1
543             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec)+key.dsize,
544                                        dbuf.dptr, dbuf.dsize) == -1
545             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
546                 /* Need to tdb_unallocate() here */
547                 goto fail;
548         }
549
550  done:
551         ret = 0;
552  fail:
553         if (ret == 0) {
554                 tdb_increment_seqnum(tdb);
555         }
556         return ret;
557 }
558
559 /* store an element in the database, replacing any existing element
560    with the same key
561
562    return 0 on success, -1 on failure
563 */
564 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
565 {
566         uint32_t hash;
567         int ret;
568
569         if (tdb->read_only || tdb->traverse_read) {
570                 tdb->ecode = TDB_ERR_RDONLY;
571                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
572                 return -1;
573         }
574
575         /* find which hash bucket it is in */
576         hash = tdb->hash_fn(&key);
577         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
578                 return -1;
579
580         ret = _tdb_store(tdb, key, dbuf, flag, hash);
581         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
582         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
583         return ret;
584 }
585
586 /* Append to an entry. Create if not exist. */
587 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
588 {
589         uint32_t hash;
590         TDB_DATA dbuf;
591         int ret = -1;
592
593         /* find which hash bucket it is in */
594         hash = tdb->hash_fn(&key);
595         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
596                 return -1;
597
598         dbuf = _tdb_fetch(tdb, key);
599
600         if (dbuf.dptr == NULL) {
601                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
602         } else {
603                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
604                 unsigned char *new_dptr;
605
606                 /* realloc '0' is special: don't do that. */
607                 if (new_len == 0)
608                         new_len = 1;
609                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
610                 if (new_dptr == NULL) {
611                         free(dbuf.dptr);
612                 }
613                 dbuf.dptr = new_dptr;
614         }
615
616         if (dbuf.dptr == NULL) {
617                 tdb->ecode = TDB_ERR_OOM;
618                 goto failed;
619         }
620
621         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
622         dbuf.dsize += new_dbuf.dsize;
623
624         ret = _tdb_store(tdb, key, dbuf, 0, hash);
625         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
626
627 failed:
628         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
629         SAFE_FREE(dbuf.dptr);
630         return ret;
631 }
632
633
634 /*
635   return the name of the current tdb file
636   useful for external logging functions
637 */
638 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
639 {
640         return tdb->name;
641 }
642
643 /*
644   return the underlying file descriptor being used by tdb, or -1
645   useful for external routines that want to check the device/inode
646   of the fd
647 */
648 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
649 {
650         return tdb->fd;
651 }
652
653 /*
654   return the current logging function
655   useful for external tdb routines that wish to log tdb errors
656 */
657 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
658 {
659         return tdb->log.log_fn;
660 }
661
662
663 /*
664   get the tdb sequence number. Only makes sense if the writers opened
665   with TDB_SEQNUM set. Note that this sequence number will wrap quite
666   quickly, so it should only be used for a 'has something changed'
667   test, not for code that relies on the count of the number of changes
668   made. If you want a counter then use a tdb record.
669
670   The aim of this sequence number is to allow for a very lightweight
671   test of a possible tdb change.
672 */
673 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
674 {
675         tdb_off_t seqnum=0;
676
677         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
678         return seqnum;
679 }
680
681 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
682 {
683         return tdb->hash_size;
684 }
685
686 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
687 {
688         return tdb->map_size;
689 }
690
691 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
692 {
693         return tdb->flags;
694 }
695
696 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
697 {
698         if ((flags & TDB_ALLOW_NESTING) &&
699             (flags & TDB_DISALLOW_NESTING)) {
700                 tdb->ecode = TDB_ERR_NESTING;
701                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
702                         "allow_nesting and disallow_nesting are not allowed together!"));
703                 return;
704         }
705
706         if (flags & TDB_ALLOW_NESTING) {
707                 tdb->flags &= ~TDB_DISALLOW_NESTING;
708         }
709         if (flags & TDB_DISALLOW_NESTING) {
710                 tdb->flags &= ~TDB_ALLOW_NESTING;
711         }
712
713         tdb->flags |= flags;
714 }
715
716 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
717 {
718         if ((flags & TDB_ALLOW_NESTING) &&
719             (flags & TDB_DISALLOW_NESTING)) {
720                 tdb->ecode = TDB_ERR_NESTING;
721                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
722                         "allow_nesting and disallow_nesting are not allowed together!"));
723                 return;
724         }
725
726         if ((flags & TDB_NOLOCK) &&
727             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
728             (tdb->mutexes == NULL)) {
729                 tdb->ecode = TDB_ERR_LOCK;
730                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
731                          "Can not remove NOLOCK flag on mutexed databases"));
732                 return;
733         }
734
735         if (flags & TDB_ALLOW_NESTING) {
736                 tdb->flags |= TDB_DISALLOW_NESTING;
737         }
738         if (flags & TDB_DISALLOW_NESTING) {
739                 tdb->flags |= TDB_ALLOW_NESTING;
740         }
741
742         tdb->flags &= ~flags;
743 }
744
745
746 /*
747   enable sequence number handling on an open tdb
748 */
749 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
750 {
751         tdb->flags |= TDB_SEQNUM;
752 }
753
754
755 /*
756   add a region of the file to the freelist. Length is the size of the region in bytes,
757   which includes the free list header that needs to be added
758  */
759 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
760 {
761         struct tdb_record rec;
762         if (length <= sizeof(rec)) {
763                 /* the region is not worth adding */
764                 return 0;
765         }
766         if (length + offset > tdb->map_size) {
767                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
768                 return -1;
769         }
770         memset(&rec,'\0',sizeof(rec));
771         rec.rec_len = length - sizeof(rec);
772         if (tdb_free(tdb, offset, &rec) == -1) {
773                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
774                 return -1;
775         }
776         return 0;
777 }
778
779 /*
780   wipe the entire database, deleting all records. This can be done
781   very fast by using a allrecord lock. The entire data portion of the
782   file becomes a single entry in the freelist.
783
784   This code carefully steps around the recovery area, leaving it alone
785  */
786 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
787 {
788         int i;
789         tdb_off_t offset = 0;
790         ssize_t data_len;
791         tdb_off_t recovery_head;
792         tdb_len_t recovery_size = 0;
793
794         if (tdb_lockall(tdb) != 0) {
795                 return -1;
796         }
797
798         tdb_trace(tdb, "tdb_wipe_all");
799
800         /* see if the tdb has a recovery area, and remember its size
801            if so. We don't want to lose this as otherwise each
802            tdb_wipe_all() in a transaction will increase the size of
803            the tdb by the size of the recovery area */
804         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
805                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
806                 goto failed;
807         }
808
809         if (recovery_head != 0) {
810                 struct tdb_record rec;
811                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
812                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
813                         return -1;
814                 }
815                 recovery_size = rec.rec_len + sizeof(rec);
816         }
817
818         /* wipe the hashes */
819         for (i=0;i<tdb->hash_size;i++) {
820                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
821                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
822                         goto failed;
823                 }
824         }
825
826         /* wipe the freelist */
827         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
828                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
829                 goto failed;
830         }
831
832         /* add all the rest of the file to the freelist, possibly leaving a gap
833            for the recovery area */
834         if (recovery_size == 0) {
835                 /* the simple case - the whole file can be used as a freelist */
836                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
837                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
838                         goto failed;
839                 }
840         } else {
841                 /* we need to add two freelist entries - one on either
842                    side of the recovery area
843
844                    Note that we cannot shift the recovery area during
845                    this operation. Only the transaction.c code may
846                    move the recovery area or we risk subtle data
847                    corruption
848                 */
849                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
850                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
851                         goto failed;
852                 }
853                 /* and the 2nd free list entry after the recovery area - if any */
854                 data_len = tdb->map_size - (recovery_head+recovery_size);
855                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
856                         goto failed;
857                 }
858         }
859
860         tdb_increment_seqnum_nonblock(tdb);
861
862         if (tdb_unlockall(tdb) != 0) {
863                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
864                 goto failed;
865         }
866
867         return 0;
868
869 failed:
870         tdb_unlockall(tdb);
871         return -1;
872 }
873
874 struct traverse_state {
875         bool error;
876         struct tdb_context *dest_db;
877 };
878
879 /*
880   traverse function for repacking
881  */
882 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
883 {
884         struct traverse_state *state = (struct traverse_state *)private_data;
885         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
886                 state->error = true;
887                 return -1;
888         }
889         return 0;
890 }
891
892 /*
893   repack a tdb
894  */
895 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
896 {
897         struct tdb_context *tmp_db;
898         struct traverse_state state;
899
900         tdb_trace(tdb, "tdb_repack");
901
902         if (tdb_transaction_start(tdb) != 0) {
903                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
904                 return -1;
905         }
906
907         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
908         if (tmp_db == NULL) {
909                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
910                 tdb_transaction_cancel(tdb);
911                 return -1;
912         }
913
914         state.error = false;
915         state.dest_db = tmp_db;
916
917         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
918                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
919                 tdb_transaction_cancel(tdb);
920                 tdb_close(tmp_db);
921                 return -1;
922         }
923
924         if (state.error) {
925                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
926                 tdb_transaction_cancel(tdb);
927                 tdb_close(tmp_db);
928                 return -1;
929         }
930
931         if (tdb_wipe_all(tdb) != 0) {
932                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
933                 tdb_transaction_cancel(tdb);
934                 tdb_close(tmp_db);
935                 return -1;
936         }
937
938         state.error = false;
939         state.dest_db = tdb;
940
941         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
942                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
943                 tdb_transaction_cancel(tdb);
944                 tdb_close(tmp_db);
945                 return -1;
946         }
947
948         if (state.error) {
949                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
950                 tdb_transaction_cancel(tdb);
951                 tdb_close(tmp_db);
952                 return -1;
953         }
954
955         tdb_close(tmp_db);
956
957         if (tdb_transaction_commit(tdb) != 0) {
958                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
959                 return -1;
960         }
961
962         return 0;
963 }
964
965 /* Even on files, we can get partial writes due to signals. */
966 bool tdb_write_all(int fd, const void *buf, size_t count)
967 {
968         while (count) {
969                 ssize_t ret;
970                 ret = write(fd, buf, count);
971                 if (ret < 0)
972                         return false;
973                 buf = (const char *)buf + ret;
974                 count -= ret;
975         }
976         return true;
977 }
978
979 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
980 {
981         tdb_off_t ret = a + b;
982
983         if ((ret < a) || (ret < b)) {
984                 return false;
985         }
986         *pret = ret;
987         return true;
988 }
989
990 #ifdef TDB_TRACE
991 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
992 {
993         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
994                 close(tdb->tracefd);
995                 tdb->tracefd = -1;
996         }
997 }
998
999 static void tdb_trace_start(struct tdb_context *tdb)
1000 {
1001         tdb_off_t seqnum=0;
1002         char msg[sizeof(tdb_off_t) * 4 + 1];
1003
1004         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1005         snprintf(msg, sizeof(msg), "%u ", seqnum);
1006         tdb_trace_write(tdb, msg);
1007 }
1008
1009 static void tdb_trace_end(struct tdb_context *tdb)
1010 {
1011         tdb_trace_write(tdb, "\n");
1012 }
1013
1014 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1015 {
1016         char msg[sizeof(ret) * 4 + 4];
1017         snprintf(msg, sizeof(msg), " = %i\n", ret);
1018         tdb_trace_write(tdb, msg);
1019 }
1020
1021 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1022 {
1023         char msg[20 + rec.dsize*2], *p;
1024         unsigned int i;
1025
1026         /* We differentiate zero-length records from non-existent ones. */
1027         if (rec.dptr == NULL) {
1028                 tdb_trace_write(tdb, " NULL");
1029                 return;
1030         }
1031
1032         /* snprintf here is purely cargo-cult programming. */
1033         p = msg;
1034         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1035         for (i = 0; i < rec.dsize; i++)
1036                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1037
1038         tdb_trace_write(tdb, msg);
1039 }
1040
1041 void tdb_trace(struct tdb_context *tdb, const char *op)
1042 {
1043         tdb_trace_start(tdb);
1044         tdb_trace_write(tdb, op);
1045         tdb_trace_end(tdb);
1046 }
1047
1048 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1049 {
1050         char msg[sizeof(tdb_off_t) * 4 + 1];
1051
1052         snprintf(msg, sizeof(msg), "%u ", seqnum);
1053         tdb_trace_write(tdb, msg);
1054         tdb_trace_write(tdb, op);
1055         tdb_trace_end(tdb);
1056 }
1057
1058 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1059                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1060 {
1061         char msg[128];
1062
1063         snprintf(msg, sizeof(msg),
1064                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1065         tdb_trace_start(tdb);
1066         tdb_trace_write(tdb, msg);
1067         tdb_trace_end(tdb);
1068 }
1069
1070 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1071 {
1072         tdb_trace_start(tdb);
1073         tdb_trace_write(tdb, op);
1074         tdb_trace_end_ret(tdb, ret);
1075 }
1076
1077 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1078 {
1079         tdb_trace_start(tdb);
1080         tdb_trace_write(tdb, op);
1081         tdb_trace_write(tdb, " =");
1082         tdb_trace_record(tdb, ret);
1083         tdb_trace_end(tdb);
1084 }
1085
1086 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1087                     TDB_DATA rec)
1088 {
1089         tdb_trace_start(tdb);
1090         tdb_trace_write(tdb, op);
1091         tdb_trace_record(tdb, rec);
1092         tdb_trace_end(tdb);
1093 }
1094
1095 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1096                         TDB_DATA rec, int ret)
1097 {
1098         tdb_trace_start(tdb);
1099         tdb_trace_write(tdb, op);
1100         tdb_trace_record(tdb, rec);
1101         tdb_trace_end_ret(tdb, ret);
1102 }
1103
1104 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1105                            TDB_DATA rec, TDB_DATA ret)
1106 {
1107         tdb_trace_start(tdb);
1108         tdb_trace_write(tdb, op);
1109         tdb_trace_record(tdb, rec);
1110         tdb_trace_write(tdb, " =");
1111         tdb_trace_record(tdb, ret);
1112         tdb_trace_end(tdb);
1113 }
1114
1115 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1116                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1117                              int ret)
1118 {
1119         char msg[1 + sizeof(ret) * 4];
1120
1121         snprintf(msg, sizeof(msg), " %#x", flag);
1122         tdb_trace_start(tdb);
1123         tdb_trace_write(tdb, op);
1124         tdb_trace_record(tdb, rec1);
1125         tdb_trace_record(tdb, rec2);
1126         tdb_trace_write(tdb, msg);
1127         tdb_trace_end_ret(tdb, ret);
1128 }
1129
1130 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1131                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1132 {
1133         tdb_trace_start(tdb);
1134         tdb_trace_write(tdb, op);
1135         tdb_trace_record(tdb, rec1);
1136         tdb_trace_record(tdb, rec2);
1137         tdb_trace_write(tdb, " =");
1138         tdb_trace_record(tdb, ret);
1139         tdb_trace_end(tdb);
1140 }
1141 #endif