tdb: Move adding tailer space to tdb_find_dead
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
63                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
64                 return;
65         }
66
67         tdb_increment_seqnum_nonblock(tdb);
68
69         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
70 }
71
72 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
73 {
74         return memcmp(data.dptr, key.dptr, data.dsize);
75 }
76
77 /* Returns 0 on fail.  On success, return offset of record, and fills
78    in rec */
79 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
80                         struct tdb_record *r)
81 {
82         tdb_off_t rec_ptr;
83
84         /* read in the hash top */
85         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
86                 return 0;
87
88         /* keep looking until we find the right record */
89         while (rec_ptr) {
90                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
91                         return 0;
92
93                 if (!TDB_DEAD(r) && hash==r->full_hash
94                     && key.dsize==r->key_len
95                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
96                                       r->key_len, tdb_key_compare,
97                                       NULL) == 0) {
98                         return rec_ptr;
99                 }
100                 /* detect tight infinite loop */
101                 if (rec_ptr == r->next) {
102                         tdb->ecode = TDB_ERR_CORRUPT;
103                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
104                         return 0;
105                 }
106                 rec_ptr = r->next;
107         }
108         tdb->ecode = TDB_ERR_NOEXIST;
109         return 0;
110 }
111
112 /* As tdb_find, but if you succeed, keep the lock */
113 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
114                            struct tdb_record *rec)
115 {
116         uint32_t rec_ptr;
117
118         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
119                 return 0;
120         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
121                 tdb_unlock(tdb, BUCKET(hash), locktype);
122         return rec_ptr;
123 }
124
125 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
126
127 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
128 {
129         TDB_DATA *dbuf = (TDB_DATA *)private_data;
130
131         if (dbuf->dsize != data.dsize) {
132                 return -1;
133         }
134         if (memcmp(dbuf->dptr, data.dptr, data.dsize) != 0) {
135                 return -1;
136         }
137         return 0;
138 }
139
140 /* update an entry in place - this only works if the new data size
141    is <= the old data size and the key exists.
142    on failure return -1.
143 */
144 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, TDB_DATA dbuf)
145 {
146         struct tdb_record rec;
147         tdb_off_t rec_ptr;
148
149         /* find entry */
150         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
151                 return -1;
152
153         /* it could be an exact duplicate of what is there - this is
154          * surprisingly common (eg. with a ldb re-index). */
155         if (rec.key_len == key.dsize &&
156             rec.data_len == dbuf.dsize &&
157             rec.full_hash == hash &&
158             tdb_parse_record(tdb, key, tdb_update_hash_cmp, &dbuf) == 0) {
159                 return 0;
160         }
161
162         /* must be long enough key, data and tailer */
163         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
164                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
165                 return -1;
166         }
167
168         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
169                       dbuf.dptr, dbuf.dsize) == -1)
170                 return -1;
171
172         if (dbuf.dsize != rec.data_len) {
173                 /* update size */
174                 rec.data_len = dbuf.dsize;
175                 return tdb_rec_write(tdb, rec_ptr, &rec);
176         }
177
178         return 0;
179 }
180
181 /* find an entry in the database given a key */
182 /* If an entry doesn't exist tdb_err will be set to
183  * TDB_ERR_NOEXIST. If a key has no data attached
184  * then the TDB_DATA will have zero length but
185  * a non-zero pointer
186  */
187 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
188 {
189         tdb_off_t rec_ptr;
190         struct tdb_record rec;
191         TDB_DATA ret;
192         uint32_t hash;
193
194         /* find which hash bucket it is in */
195         hash = tdb->hash_fn(&key);
196         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
197                 return tdb_null;
198
199         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
200                                   rec.data_len);
201         ret.dsize = rec.data_len;
202         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
203         return ret;
204 }
205
206 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
207 {
208         TDB_DATA ret = _tdb_fetch(tdb, key);
209
210         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
211         return ret;
212 }
213
214 /*
215  * Find an entry in the database and hand the record's data to a parsing
216  * function. The parsing function is executed under the chain read lock, so it
217  * should be fast and should not block on other syscalls.
218  *
219  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
220  *
221  * For mmapped tdb's that do not have a transaction open it points the parsing
222  * function directly at the mmap area, it avoids the malloc/memcpy in this
223  * case. If a transaction is open or no mmap is available, it has to do
224  * malloc/read/parse/free.
225  *
226  * This is interesting for all readers of potentially large data structures in
227  * the tdb records, ldb indexes being one example.
228  *
229  * Return -1 if the record was not found.
230  */
231
232 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
233                      int (*parser)(TDB_DATA key, TDB_DATA data,
234                                    void *private_data),
235                      void *private_data)
236 {
237         tdb_off_t rec_ptr;
238         struct tdb_record rec;
239         int ret;
240         uint32_t hash;
241
242         /* find which hash bucket it is in */
243         hash = tdb->hash_fn(&key);
244
245         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
246                 /* record not found */
247                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
248                 tdb->ecode = TDB_ERR_NOEXIST;
249                 return -1;
250         }
251         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
252
253         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
254                              rec.data_len, parser, private_data);
255
256         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
257
258         return ret;
259 }
260
261 /* check if an entry in the database exists
262
263    note that 1 is returned if the key is found and 0 is returned if not found
264    this doesn't match the conventions in the rest of this module, but is
265    compatible with gdbm
266 */
267 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
268 {
269         struct tdb_record rec;
270
271         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
272                 return 0;
273         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
274         return 1;
275 }
276
277 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
278 {
279         uint32_t hash = tdb->hash_fn(&key);
280         int ret;
281
282         ret = tdb_exists_hash(tdb, key, hash);
283         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
284         return ret;
285 }
286
287 /* actually delete an entry in the database given the offset */
288 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
289 {
290         tdb_off_t last_ptr, i;
291         struct tdb_record lastrec;
292
293         if (tdb->read_only || tdb->traverse_read) return -1;
294
295         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
296             tdb_write_lock_record(tdb, rec_ptr) == -1) {
297                 /* Someone traversing here: mark it as dead */
298                 rec->magic = TDB_DEAD_MAGIC;
299                 return tdb_rec_write(tdb, rec_ptr, rec);
300         }
301         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
302                 return -1;
303
304         /* find previous record in hash chain */
305         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
306                 return -1;
307         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
308                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
309                         return -1;
310
311         /* unlink it: next ptr is at start of record. */
312         if (last_ptr == 0)
313                 last_ptr = TDB_HASH_TOP(rec->full_hash);
314         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
315                 return -1;
316
317         /* recover the space */
318         if (tdb_free(tdb, rec_ptr, rec) == -1)
319                 return -1;
320         return 0;
321 }
322
323 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
324 {
325         int res = 0;
326         tdb_off_t rec_ptr;
327         struct tdb_record rec;
328
329         /* read in the hash top */
330         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
331                 return 0;
332
333         while (rec_ptr) {
334                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
335                         return 0;
336
337                 if (rec.magic == TDB_DEAD_MAGIC) {
338                         res += 1;
339                 }
340                 rec_ptr = rec.next;
341         }
342         return res;
343 }
344
345 /*
346  * Purge all DEAD records from a hash chain
347  */
348 static int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
349 {
350         int res = -1;
351         struct tdb_record rec;
352         tdb_off_t rec_ptr;
353
354         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
355                 /*
356                  * Don't block the freelist if not strictly necessary
357                  */
358                 return -1;
359         }
360
361         /* read in the hash top */
362         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
363                 goto fail;
364
365         while (rec_ptr) {
366                 tdb_off_t next;
367
368                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
369                         goto fail;
370                 }
371
372                 next = rec.next;
373
374                 if (rec.magic == TDB_DEAD_MAGIC
375                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
376                         goto fail;
377                 }
378                 rec_ptr = next;
379         }
380         res = 0;
381  fail:
382         tdb_unlock(tdb, -1, F_WRLCK);
383         return res;
384 }
385
386 /* delete an entry in the database given a key */
387 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
388 {
389         tdb_off_t rec_ptr;
390         struct tdb_record rec;
391         int ret;
392
393         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
394         if (rec_ptr == 0) {
395                 return -1;
396         }
397
398         if (tdb->max_dead_records != 0) {
399
400                 uint32_t magic = TDB_DEAD_MAGIC;
401
402                 /*
403                  * Allow for some dead records per hash chain, mainly for
404                  * tdb's with a very high create/delete rate like locking.tdb.
405                  */
406
407                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
408                         /*
409                          * Don't let the per-chain freelist grow too large,
410                          * delete all existing dead records
411                          */
412                         tdb_purge_dead(tdb, hash);
413                 }
414
415                 /*
416                  * Just mark the record as dead.
417                  */
418                 ret = tdb_ofs_write(
419                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
420                         &magic);
421         }
422         else {
423                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
424         }
425
426         if (ret == 0) {
427                 tdb_increment_seqnum(tdb);
428         }
429
430         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
431                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
432         return ret;
433 }
434
435 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
436 {
437         uint32_t hash = tdb->hash_fn(&key);
438         int ret;
439
440         ret = tdb_delete_hash(tdb, key, hash);
441         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
442         return ret;
443 }
444
445 /*
446  * See if we have a dead record around with enough space
447  */
448 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
449                                struct tdb_record *r, tdb_len_t length)
450 {
451         tdb_off_t rec_ptr;
452         tdb_off_t best_rec_ptr = 0;
453         struct tdb_record best = { .rec_len = UINT32_MAX };
454
455         length += sizeof(tdb_off_t); /* tailer */
456
457         /* read in the hash top */
458         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
459                 return 0;
460
461         /* keep looking until we find the right record */
462         while (rec_ptr) {
463                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
464                         return 0;
465
466                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
467                     (r->rec_len < best.rec_len)) {
468                         best_rec_ptr = rec_ptr;
469                         best = *r;
470                 }
471                 rec_ptr = r->next;
472         }
473
474         if (best.rec_len == UINT32_MAX) {
475                 return 0;
476         }
477
478         *r = best;
479         return best_rec_ptr;
480 }
481
482 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
483                        TDB_DATA dbuf, int flag, uint32_t hash)
484 {
485         struct tdb_record rec;
486         tdb_off_t rec_ptr;
487         int ret = -1;
488
489         /* check for it existing, on insert. */
490         if (flag == TDB_INSERT) {
491                 if (tdb_exists_hash(tdb, key, hash)) {
492                         tdb->ecode = TDB_ERR_EXISTS;
493                         goto fail;
494                 }
495         } else {
496                 /* first try in-place update, on modify or replace. */
497                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
498                         goto done;
499                 }
500                 if (tdb->ecode == TDB_ERR_NOEXIST &&
501                     flag == TDB_MODIFY) {
502                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
503                          we should fail the store */
504                         goto fail;
505                 }
506         }
507         /* reset the error code potentially set by the tdb_update() */
508         tdb->ecode = TDB_SUCCESS;
509
510         /* delete any existing record - if it doesn't exist we don't
511            care.  Doing this first reduces fragmentation, and avoids
512            coalescing with `allocated' block before it's updated. */
513         if (flag != TDB_INSERT)
514                 tdb_delete_hash(tdb, key, hash);
515
516         if (tdb->max_dead_records != 0) {
517                 /*
518                  * Allow for some dead records per hash chain, look if we can
519                  * find one that can hold the new record. We need enough space
520                  * for key, data and tailer. If we find one, we don't have to
521                  * consult the central freelist.
522                  */
523                 rec_ptr = tdb_find_dead(tdb, hash, &rec,
524                                         key.dsize + dbuf.dsize);
525
526                 if (rec_ptr != 0) {
527                         rec.key_len = key.dsize;
528                         rec.data_len = dbuf.dsize;
529                         rec.full_hash = hash;
530                         rec.magic = TDB_MAGIC;
531                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
532                             || tdb->methods->tdb_write(
533                                     tdb, rec_ptr + sizeof(rec),
534                                     key.dptr, key.dsize) == -1
535                             || tdb->methods->tdb_write(
536                                     tdb, rec_ptr + sizeof(rec) + key.dsize,
537                                     dbuf.dptr, dbuf.dsize) == -1) {
538                                 goto fail;
539                         }
540                         goto done;
541                 }
542         }
543
544         /*
545          * We have to allocate some space from the freelist, so this means we
546          * have to lock it. Use the chance to purge all the DEAD records from
547          * the hash chain under the freelist lock.
548          */
549
550         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
551                 goto fail;
552         }
553
554         if ((tdb->max_dead_records != 0)
555             && (tdb_purge_dead(tdb, hash) == -1)) {
556                 tdb_unlock(tdb, -1, F_WRLCK);
557                 goto fail;
558         }
559
560         /* we have to allocate some space */
561         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
562
563         tdb_unlock(tdb, -1, F_WRLCK);
564
565         if (rec_ptr == 0) {
566                 goto fail;
567         }
568
569         /* Read hash top into next ptr */
570         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
571                 goto fail;
572
573         rec.key_len = key.dsize;
574         rec.data_len = dbuf.dsize;
575         rec.full_hash = hash;
576         rec.magic = TDB_MAGIC;
577
578         /* write out and point the top of the hash chain at it */
579         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
580             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec),
581                                        key.dptr, key.dsize) == -1
582             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec)+key.dsize,
583                                        dbuf.dptr, dbuf.dsize) == -1
584             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
585                 /* Need to tdb_unallocate() here */
586                 goto fail;
587         }
588
589  done:
590         ret = 0;
591  fail:
592         if (ret == 0) {
593                 tdb_increment_seqnum(tdb);
594         }
595         return ret;
596 }
597
598 /* store an element in the database, replacing any existing element
599    with the same key
600
601    return 0 on success, -1 on failure
602 */
603 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
604 {
605         uint32_t hash;
606         int ret;
607
608         if (tdb->read_only || tdb->traverse_read) {
609                 tdb->ecode = TDB_ERR_RDONLY;
610                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
611                 return -1;
612         }
613
614         /* find which hash bucket it is in */
615         hash = tdb->hash_fn(&key);
616         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
617                 return -1;
618
619         ret = _tdb_store(tdb, key, dbuf, flag, hash);
620         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
621         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
622         return ret;
623 }
624
625 /* Append to an entry. Create if not exist. */
626 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
627 {
628         uint32_t hash;
629         TDB_DATA dbuf;
630         int ret = -1;
631
632         /* find which hash bucket it is in */
633         hash = tdb->hash_fn(&key);
634         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
635                 return -1;
636
637         dbuf = _tdb_fetch(tdb, key);
638
639         if (dbuf.dptr == NULL) {
640                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
641         } else {
642                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
643                 unsigned char *new_dptr;
644
645                 /* realloc '0' is special: don't do that. */
646                 if (new_len == 0)
647                         new_len = 1;
648                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
649                 if (new_dptr == NULL) {
650                         free(dbuf.dptr);
651                 }
652                 dbuf.dptr = new_dptr;
653         }
654
655         if (dbuf.dptr == NULL) {
656                 tdb->ecode = TDB_ERR_OOM;
657                 goto failed;
658         }
659
660         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
661         dbuf.dsize += new_dbuf.dsize;
662
663         ret = _tdb_store(tdb, key, dbuf, 0, hash);
664         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
665
666 failed:
667         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
668         SAFE_FREE(dbuf.dptr);
669         return ret;
670 }
671
672
673 /*
674   return the name of the current tdb file
675   useful for external logging functions
676 */
677 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
678 {
679         return tdb->name;
680 }
681
682 /*
683   return the underlying file descriptor being used by tdb, or -1
684   useful for external routines that want to check the device/inode
685   of the fd
686 */
687 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
688 {
689         return tdb->fd;
690 }
691
692 /*
693   return the current logging function
694   useful for external tdb routines that wish to log tdb errors
695 */
696 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
697 {
698         return tdb->log.log_fn;
699 }
700
701
702 /*
703   get the tdb sequence number. Only makes sense if the writers opened
704   with TDB_SEQNUM set. Note that this sequence number will wrap quite
705   quickly, so it should only be used for a 'has something changed'
706   test, not for code that relies on the count of the number of changes
707   made. If you want a counter then use a tdb record.
708
709   The aim of this sequence number is to allow for a very lightweight
710   test of a possible tdb change.
711 */
712 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
713 {
714         tdb_off_t seqnum=0;
715
716         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
717         return seqnum;
718 }
719
720 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
721 {
722         return tdb->hash_size;
723 }
724
725 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
726 {
727         return tdb->map_size;
728 }
729
730 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
731 {
732         return tdb->flags;
733 }
734
735 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
736 {
737         if ((flags & TDB_ALLOW_NESTING) &&
738             (flags & TDB_DISALLOW_NESTING)) {
739                 tdb->ecode = TDB_ERR_NESTING;
740                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
741                         "allow_nesting and disallow_nesting are not allowed together!"));
742                 return;
743         }
744
745         if (flags & TDB_ALLOW_NESTING) {
746                 tdb->flags &= ~TDB_DISALLOW_NESTING;
747         }
748         if (flags & TDB_DISALLOW_NESTING) {
749                 tdb->flags &= ~TDB_ALLOW_NESTING;
750         }
751
752         tdb->flags |= flags;
753 }
754
755 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
756 {
757         if ((flags & TDB_ALLOW_NESTING) &&
758             (flags & TDB_DISALLOW_NESTING)) {
759                 tdb->ecode = TDB_ERR_NESTING;
760                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
761                         "allow_nesting and disallow_nesting are not allowed together!"));
762                 return;
763         }
764
765         if (flags & TDB_ALLOW_NESTING) {
766                 tdb->flags |= TDB_DISALLOW_NESTING;
767         }
768         if (flags & TDB_DISALLOW_NESTING) {
769                 tdb->flags |= TDB_ALLOW_NESTING;
770         }
771
772         tdb->flags &= ~flags;
773 }
774
775
776 /*
777   enable sequence number handling on an open tdb
778 */
779 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
780 {
781         tdb->flags |= TDB_SEQNUM;
782 }
783
784
785 /*
786   add a region of the file to the freelist. Length is the size of the region in bytes,
787   which includes the free list header that needs to be added
788  */
789 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
790 {
791         struct tdb_record rec;
792         if (length <= sizeof(rec)) {
793                 /* the region is not worth adding */
794                 return 0;
795         }
796         if (length + offset > tdb->map_size) {
797                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
798                 return -1;
799         }
800         memset(&rec,'\0',sizeof(rec));
801         rec.rec_len = length - sizeof(rec);
802         if (tdb_free(tdb, offset, &rec) == -1) {
803                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
804                 return -1;
805         }
806         return 0;
807 }
808
809 /*
810   wipe the entire database, deleting all records. This can be done
811   very fast by using a allrecord lock. The entire data portion of the
812   file becomes a single entry in the freelist.
813
814   This code carefully steps around the recovery area, leaving it alone
815  */
816 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
817 {
818         int i;
819         tdb_off_t offset = 0;
820         ssize_t data_len;
821         tdb_off_t recovery_head;
822         tdb_len_t recovery_size = 0;
823
824         if (tdb_lockall(tdb) != 0) {
825                 return -1;
826         }
827
828         tdb_trace(tdb, "tdb_wipe_all");
829
830         /* see if the tdb has a recovery area, and remember its size
831            if so. We don't want to lose this as otherwise each
832            tdb_wipe_all() in a transaction will increase the size of
833            the tdb by the size of the recovery area */
834         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
835                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
836                 goto failed;
837         }
838
839         if (recovery_head != 0) {
840                 struct tdb_record rec;
841                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
842                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
843                         return -1;
844                 }
845                 recovery_size = rec.rec_len + sizeof(rec);
846         }
847
848         /* wipe the hashes */
849         for (i=0;i<tdb->hash_size;i++) {
850                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
851                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
852                         goto failed;
853                 }
854         }
855
856         /* wipe the freelist */
857         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
858                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
859                 goto failed;
860         }
861
862         /* add all the rest of the file to the freelist, possibly leaving a gap
863            for the recovery area */
864         if (recovery_size == 0) {
865                 /* the simple case - the whole file can be used as a freelist */
866                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
867                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
868                         goto failed;
869                 }
870         } else {
871                 /* we need to add two freelist entries - one on either
872                    side of the recovery area
873
874                    Note that we cannot shift the recovery area during
875                    this operation. Only the transaction.c code may
876                    move the recovery area or we risk subtle data
877                    corruption
878                 */
879                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
880                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
881                         goto failed;
882                 }
883                 /* and the 2nd free list entry after the recovery area - if any */
884                 data_len = tdb->map_size - (recovery_head+recovery_size);
885                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
886                         goto failed;
887                 }
888         }
889
890         tdb_increment_seqnum_nonblock(tdb);
891
892         if (tdb_unlockall(tdb) != 0) {
893                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
894                 goto failed;
895         }
896
897         return 0;
898
899 failed:
900         tdb_unlockall(tdb);
901         return -1;
902 }
903
904 struct traverse_state {
905         bool error;
906         struct tdb_context *dest_db;
907 };
908
909 /*
910   traverse function for repacking
911  */
912 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
913 {
914         struct traverse_state *state = (struct traverse_state *)private_data;
915         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
916                 state->error = true;
917                 return -1;
918         }
919         return 0;
920 }
921
922 /*
923   repack a tdb
924  */
925 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
926 {
927         struct tdb_context *tmp_db;
928         struct traverse_state state;
929
930         tdb_trace(tdb, "tdb_repack");
931
932         if (tdb_transaction_start(tdb) != 0) {
933                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
934                 return -1;
935         }
936
937         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
938         if (tmp_db == NULL) {
939                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
940                 tdb_transaction_cancel(tdb);
941                 return -1;
942         }
943
944         state.error = false;
945         state.dest_db = tmp_db;
946
947         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
948                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
949                 tdb_transaction_cancel(tdb);
950                 tdb_close(tmp_db);
951                 return -1;
952         }
953
954         if (state.error) {
955                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
956                 tdb_transaction_cancel(tdb);
957                 tdb_close(tmp_db);
958                 return -1;
959         }
960
961         if (tdb_wipe_all(tdb) != 0) {
962                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
963                 tdb_transaction_cancel(tdb);
964                 tdb_close(tmp_db);
965                 return -1;
966         }
967
968         state.error = false;
969         state.dest_db = tdb;
970
971         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
972                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
973                 tdb_transaction_cancel(tdb);
974                 tdb_close(tmp_db);
975                 return -1;
976         }
977
978         if (state.error) {
979                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
980                 tdb_transaction_cancel(tdb);
981                 tdb_close(tmp_db);
982                 return -1;
983         }
984
985         tdb_close(tmp_db);
986
987         if (tdb_transaction_commit(tdb) != 0) {
988                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
989                 return -1;
990         }
991
992         return 0;
993 }
994
995 /* Even on files, we can get partial writes due to signals. */
996 bool tdb_write_all(int fd, const void *buf, size_t count)
997 {
998         while (count) {
999                 ssize_t ret;
1000                 ret = write(fd, buf, count);
1001                 if (ret < 0)
1002                         return false;
1003                 buf = (const char *)buf + ret;
1004                 count -= ret;
1005         }
1006         return true;
1007 }
1008
1009 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1010 {
1011         tdb_off_t ret = a + b;
1012
1013         if ((ret < a) || (ret < b)) {
1014                 return false;
1015         }
1016         *pret = ret;
1017         return true;
1018 }
1019
1020 #ifdef TDB_TRACE
1021 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1022 {
1023         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1024                 close(tdb->tracefd);
1025                 tdb->tracefd = -1;
1026         }
1027 }
1028
1029 static void tdb_trace_start(struct tdb_context *tdb)
1030 {
1031         tdb_off_t seqnum=0;
1032         char msg[sizeof(tdb_off_t) * 4 + 1];
1033
1034         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1035         snprintf(msg, sizeof(msg), "%u ", seqnum);
1036         tdb_trace_write(tdb, msg);
1037 }
1038
1039 static void tdb_trace_end(struct tdb_context *tdb)
1040 {
1041         tdb_trace_write(tdb, "\n");
1042 }
1043
1044 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1045 {
1046         char msg[sizeof(ret) * 4 + 4];
1047         snprintf(msg, sizeof(msg), " = %i\n", ret);
1048         tdb_trace_write(tdb, msg);
1049 }
1050
1051 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1052 {
1053         char msg[20 + rec.dsize*2], *p;
1054         unsigned int i;
1055
1056         /* We differentiate zero-length records from non-existent ones. */
1057         if (rec.dptr == NULL) {
1058                 tdb_trace_write(tdb, " NULL");
1059                 return;
1060         }
1061
1062         /* snprintf here is purely cargo-cult programming. */
1063         p = msg;
1064         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1065         for (i = 0; i < rec.dsize; i++)
1066                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1067
1068         tdb_trace_write(tdb, msg);
1069 }
1070
1071 void tdb_trace(struct tdb_context *tdb, const char *op)
1072 {
1073         tdb_trace_start(tdb);
1074         tdb_trace_write(tdb, op);
1075         tdb_trace_end(tdb);
1076 }
1077
1078 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1079 {
1080         char msg[sizeof(tdb_off_t) * 4 + 1];
1081
1082         snprintf(msg, sizeof(msg), "%u ", seqnum);
1083         tdb_trace_write(tdb, msg);
1084         tdb_trace_write(tdb, op);
1085         tdb_trace_end(tdb);
1086 }
1087
1088 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1089                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1090 {
1091         char msg[128];
1092
1093         snprintf(msg, sizeof(msg),
1094                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1095         tdb_trace_start(tdb);
1096         tdb_trace_write(tdb, msg);
1097         tdb_trace_end(tdb);
1098 }
1099
1100 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1101 {
1102         tdb_trace_start(tdb);
1103         tdb_trace_write(tdb, op);
1104         tdb_trace_end_ret(tdb, ret);
1105 }
1106
1107 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1108 {
1109         tdb_trace_start(tdb);
1110         tdb_trace_write(tdb, op);
1111         tdb_trace_write(tdb, " =");
1112         tdb_trace_record(tdb, ret);
1113         tdb_trace_end(tdb);
1114 }
1115
1116 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1117                     TDB_DATA rec)
1118 {
1119         tdb_trace_start(tdb);
1120         tdb_trace_write(tdb, op);
1121         tdb_trace_record(tdb, rec);
1122         tdb_trace_end(tdb);
1123 }
1124
1125 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1126                         TDB_DATA rec, int ret)
1127 {
1128         tdb_trace_start(tdb);
1129         tdb_trace_write(tdb, op);
1130         tdb_trace_record(tdb, rec);
1131         tdb_trace_end_ret(tdb, ret);
1132 }
1133
1134 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1135                            TDB_DATA rec, TDB_DATA ret)
1136 {
1137         tdb_trace_start(tdb);
1138         tdb_trace_write(tdb, op);
1139         tdb_trace_record(tdb, rec);
1140         tdb_trace_write(tdb, " =");
1141         tdb_trace_record(tdb, ret);
1142         tdb_trace_end(tdb);
1143 }
1144
1145 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1146                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1147                              int ret)
1148 {
1149         char msg[1 + sizeof(ret) * 4];
1150
1151         snprintf(msg, sizeof(msg), " %#x", flag);
1152         tdb_trace_start(tdb);
1153         tdb_trace_write(tdb, op);
1154         tdb_trace_record(tdb, rec1);
1155         tdb_trace_record(tdb, rec2);
1156         tdb_trace_write(tdb, msg);
1157         tdb_trace_end_ret(tdb, ret);
1158 }
1159
1160 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1161                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1162 {
1163         tdb_trace_start(tdb);
1164         tdb_trace_write(tdb, op);
1165         tdb_trace_record(tdb, rec1);
1166         tdb_trace_record(tdb, rec2);
1167         tdb_trace_write(tdb, " =");
1168         tdb_trace_record(tdb, ret);
1169         tdb_trace_end(tdb);
1170 }
1171 #endif