tdb: Use tdb_storev in tdb_append
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 /* Returns 0 on fail.  On success, return offset of record, and fills
83    in rec */
84 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
85                         struct tdb_record *r)
86 {
87         tdb_off_t rec_ptr;
88
89         /* read in the hash top */
90         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
91                 return 0;
92
93         /* keep looking until we find the right record */
94         while (rec_ptr) {
95                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
96                         return 0;
97
98                 if (!TDB_DEAD(r) && hash==r->full_hash
99                     && key.dsize==r->key_len
100                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
101                                       r->key_len, tdb_key_compare,
102                                       NULL) == 0) {
103                         return rec_ptr;
104                 }
105                 /* detect tight infinite loop */
106                 if (rec_ptr == r->next) {
107                         tdb->ecode = TDB_ERR_CORRUPT;
108                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
109                         return 0;
110                 }
111                 rec_ptr = r->next;
112         }
113         tdb->ecode = TDB_ERR_NOEXIST;
114         return 0;
115 }
116
117 /* As tdb_find, but if you succeed, keep the lock */
118 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
119                            struct tdb_record *rec)
120 {
121         uint32_t rec_ptr;
122
123         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
124                 return 0;
125         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
126                 tdb_unlock(tdb, BUCKET(hash), locktype);
127         return rec_ptr;
128 }
129
130 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
131
132 struct tdb_update_hash_state {
133         const TDB_DATA *dbufs;
134         int num_dbufs;
135         tdb_len_t dbufs_len;
136 };
137
138 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
139 {
140         struct tdb_update_hash_state *state = private_data;
141         unsigned char *dptr = data.dptr;
142         int i;
143
144         if (state->dbufs_len != data.dsize) {
145                 return -1;
146         }
147
148         for (i=0; i<state->num_dbufs; i++) {
149                 TDB_DATA dbuf = state->dbufs[i];
150                 int ret;
151                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
152                 if (ret != 0) {
153                         return -1;
154                 }
155                 dptr += dbuf.dsize;
156         }
157
158         return 0;
159 }
160
161 /* update an entry in place - this only works if the new data size
162    is <= the old data size and the key exists.
163    on failure return -1.
164 */
165 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
166                            uint32_t hash,
167                            const TDB_DATA *dbufs, int num_dbufs,
168                            tdb_len_t dbufs_len)
169 {
170         struct tdb_record rec;
171         tdb_off_t rec_ptr, ofs;
172         int i;
173
174         /* find entry */
175         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
176                 return -1;
177
178         /* it could be an exact duplicate of what is there - this is
179          * surprisingly common (eg. with a ldb re-index). */
180         if (rec.data_len == dbufs_len) {
181                 struct tdb_update_hash_state state = {
182                         .dbufs = dbufs, .num_dbufs = num_dbufs,
183                         .dbufs_len = dbufs_len
184                 };
185                 int ret;
186
187                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
188                 if (ret == 0) {
189                         return 0;
190                 }
191         }
192
193         /* must be long enough key, data and tailer */
194         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
195                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
196                 return -1;
197         }
198
199         ofs = rec_ptr + sizeof(rec) + rec.key_len;
200
201         for (i=0; i<num_dbufs; i++) {
202                 TDB_DATA dbuf = dbufs[i];
203                 int ret;
204
205                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
206                 if (ret == -1) {
207                         return -1;
208                 }
209                 ofs += dbuf.dsize;
210         }
211
212         if (dbufs_len != rec.data_len) {
213                 /* update size */
214                 rec.data_len = dbufs_len;
215                 return tdb_rec_write(tdb, rec_ptr, &rec);
216         }
217
218         return 0;
219 }
220
221 /* find an entry in the database given a key */
222 /* If an entry doesn't exist tdb_err will be set to
223  * TDB_ERR_NOEXIST. If a key has no data attached
224  * then the TDB_DATA will have zero length but
225  * a non-zero pointer
226  */
227 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
228 {
229         tdb_off_t rec_ptr;
230         struct tdb_record rec;
231         TDB_DATA ret;
232         uint32_t hash;
233
234         /* find which hash bucket it is in */
235         hash = tdb->hash_fn(&key);
236         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
237                 return tdb_null;
238
239         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
240                                   rec.data_len);
241         ret.dsize = rec.data_len;
242         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
243         return ret;
244 }
245
246 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
247 {
248         TDB_DATA ret = _tdb_fetch(tdb, key);
249
250         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
251         return ret;
252 }
253
254 /*
255  * Find an entry in the database and hand the record's data to a parsing
256  * function. The parsing function is executed under the chain read lock, so it
257  * should be fast and should not block on other syscalls.
258  *
259  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
260  *
261  * For mmapped tdb's that do not have a transaction open it points the parsing
262  * function directly at the mmap area, it avoids the malloc/memcpy in this
263  * case. If a transaction is open or no mmap is available, it has to do
264  * malloc/read/parse/free.
265  *
266  * This is interesting for all readers of potentially large data structures in
267  * the tdb records, ldb indexes being one example.
268  *
269  * Return -1 if the record was not found.
270  */
271
272 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
273                      int (*parser)(TDB_DATA key, TDB_DATA data,
274                                    void *private_data),
275                      void *private_data)
276 {
277         tdb_off_t rec_ptr;
278         struct tdb_record rec;
279         int ret;
280         uint32_t hash;
281
282         /* find which hash bucket it is in */
283         hash = tdb->hash_fn(&key);
284
285         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
286                 /* record not found */
287                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
288                 tdb->ecode = TDB_ERR_NOEXIST;
289                 return -1;
290         }
291         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
292
293         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
294                              rec.data_len, parser, private_data);
295
296         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
297
298         return ret;
299 }
300
301 /* check if an entry in the database exists
302
303    note that 1 is returned if the key is found and 0 is returned if not found
304    this doesn't match the conventions in the rest of this module, but is
305    compatible with gdbm
306 */
307 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
308 {
309         struct tdb_record rec;
310
311         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
312                 return 0;
313         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
314         return 1;
315 }
316
317 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
318 {
319         uint32_t hash = tdb->hash_fn(&key);
320         int ret;
321
322         ret = tdb_exists_hash(tdb, key, hash);
323         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
324         return ret;
325 }
326
327 /* actually delete an entry in the database given the offset */
328 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
329 {
330         tdb_off_t last_ptr, i;
331         struct tdb_record lastrec;
332
333         if (tdb->read_only || tdb->traverse_read) return -1;
334
335         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
336             tdb_write_lock_record(tdb, rec_ptr) == -1) {
337                 /* Someone traversing here: mark it as dead */
338                 rec->magic = TDB_DEAD_MAGIC;
339                 return tdb_rec_write(tdb, rec_ptr, rec);
340         }
341         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
342                 return -1;
343
344         /* find previous record in hash chain */
345         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
346                 return -1;
347         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
348                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
349                         return -1;
350
351         /* unlink it: next ptr is at start of record. */
352         if (last_ptr == 0)
353                 last_ptr = TDB_HASH_TOP(rec->full_hash);
354         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
355                 return -1;
356
357         /* recover the space */
358         if (tdb_free(tdb, rec_ptr, rec) == -1)
359                 return -1;
360         return 0;
361 }
362
363 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
364 {
365         int res = 0;
366         tdb_off_t rec_ptr;
367         struct tdb_record rec;
368
369         /* read in the hash top */
370         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
371                 return 0;
372
373         while (rec_ptr) {
374                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
375                         return 0;
376
377                 if (rec.magic == TDB_DEAD_MAGIC) {
378                         res += 1;
379                 }
380                 rec_ptr = rec.next;
381         }
382         return res;
383 }
384
385 /*
386  * Purge all DEAD records from a hash chain
387  */
388 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
389 {
390         int res = -1;
391         struct tdb_record rec;
392         tdb_off_t rec_ptr;
393
394         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
395                 /*
396                  * Don't block the freelist if not strictly necessary
397                  */
398                 return -1;
399         }
400
401         /* read in the hash top */
402         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
403                 goto fail;
404
405         while (rec_ptr) {
406                 tdb_off_t next;
407
408                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
409                         goto fail;
410                 }
411
412                 next = rec.next;
413
414                 if (rec.magic == TDB_DEAD_MAGIC
415                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
416                         goto fail;
417                 }
418                 rec_ptr = next;
419         }
420         res = 0;
421  fail:
422         tdb_unlock(tdb, -1, F_WRLCK);
423         return res;
424 }
425
426 /* delete an entry in the database given a key */
427 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
428 {
429         tdb_off_t rec_ptr;
430         struct tdb_record rec;
431         int ret;
432
433         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
434         if (rec_ptr == 0) {
435                 return -1;
436         }
437
438         if (tdb->max_dead_records != 0) {
439
440                 uint32_t magic = TDB_DEAD_MAGIC;
441
442                 /*
443                  * Allow for some dead records per hash chain, mainly for
444                  * tdb's with a very high create/delete rate like locking.tdb.
445                  */
446
447                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
448                         /*
449                          * Don't let the per-chain freelist grow too large,
450                          * delete all existing dead records
451                          */
452                         tdb_purge_dead(tdb, hash);
453                 }
454
455                 /*
456                  * Just mark the record as dead.
457                  */
458                 ret = tdb_ofs_write(
459                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
460                         &magic);
461         }
462         else {
463                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
464         }
465
466         if (ret == 0) {
467                 tdb_increment_seqnum(tdb);
468         }
469
470         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
471                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
472         return ret;
473 }
474
475 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
476 {
477         uint32_t hash = tdb->hash_fn(&key);
478         int ret;
479
480         ret = tdb_delete_hash(tdb, key, hash);
481         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
482         return ret;
483 }
484
485 /*
486  * See if we have a dead record around with enough space
487  */
488 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
489                         struct tdb_record *r, tdb_len_t length,
490                         tdb_off_t *p_last_ptr)
491 {
492         tdb_off_t rec_ptr, last_ptr;
493         tdb_off_t best_rec_ptr = 0;
494         tdb_off_t best_last_ptr = 0;
495         struct tdb_record best = { .rec_len = UINT32_MAX };
496
497         length += sizeof(tdb_off_t); /* tailer */
498
499         last_ptr = TDB_HASH_TOP(hash);
500
501         /* read in the hash top */
502         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
503                 return 0;
504
505         /* keep looking until we find the right record */
506         while (rec_ptr) {
507                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
508                         return 0;
509
510                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
511                     (r->rec_len < best.rec_len)) {
512                         best_rec_ptr = rec_ptr;
513                         best_last_ptr = last_ptr;
514                         best = *r;
515                 }
516                 last_ptr = rec_ptr;
517                 rec_ptr = r->next;
518         }
519
520         if (best.rec_len == UINT32_MAX) {
521                 return 0;
522         }
523
524         *r = best;
525         *p_last_ptr = best_last_ptr;
526         return best_rec_ptr;
527 }
528
529 static int _tdb_storev(struct tdb_context *tdb, TDB_DATA key,
530                        const TDB_DATA *dbufs, int num_dbufs,
531                        int flag, uint32_t hash)
532 {
533         struct tdb_record rec;
534         tdb_off_t rec_ptr, ofs;
535         tdb_len_t rec_len, dbufs_len;
536         int i;
537         int ret = -1;
538
539         dbufs_len = 0;
540
541         for (i=0; i<num_dbufs; i++) {
542                 size_t dsize = dbufs[i].dsize;
543
544                 dbufs_len += dsize;
545                 if (dbufs_len < dsize) {
546                         tdb->ecode = TDB_ERR_OOM;
547                         goto fail;
548                 }
549         }
550
551         rec_len = key.dsize + dbufs_len;
552         if ((rec_len < key.dsize) || (rec_len < dbufs_len)) {
553                 tdb->ecode = TDB_ERR_OOM;
554                 goto fail;
555         }
556
557         /* check for it existing, on insert. */
558         if (flag == TDB_INSERT) {
559                 if (tdb_exists_hash(tdb, key, hash)) {
560                         tdb->ecode = TDB_ERR_EXISTS;
561                         goto fail;
562                 }
563         } else {
564                 /* first try in-place update, on modify or replace. */
565                 if (tdb_update_hash(tdb, key, hash, dbufs, num_dbufs,
566                                     dbufs_len) == 0) {
567                         goto done;
568                 }
569                 if (tdb->ecode == TDB_ERR_NOEXIST &&
570                     flag == TDB_MODIFY) {
571                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
572                          we should fail the store */
573                         goto fail;
574                 }
575         }
576         /* reset the error code potentially set by the tdb_update_hash() */
577         tdb->ecode = TDB_SUCCESS;
578
579         /* delete any existing record - if it doesn't exist we don't
580            care.  Doing this first reduces fragmentation, and avoids
581            coalescing with `allocated' block before it's updated. */
582         if (flag != TDB_INSERT)
583                 tdb_delete_hash(tdb, key, hash);
584
585         /* we have to allocate some space */
586         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
587
588         if (rec_ptr == 0) {
589                 goto fail;
590         }
591
592         /* Read hash top into next ptr */
593         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
594                 goto fail;
595
596         rec.key_len = key.dsize;
597         rec.data_len = dbufs_len;
598         rec.full_hash = hash;
599         rec.magic = TDB_MAGIC;
600
601         ofs = rec_ptr;
602
603         /* write out and point the top of the hash chain at it */
604         ret = tdb_rec_write(tdb, ofs, &rec);
605         if (ret == -1) {
606                 goto fail;
607         }
608         ofs += sizeof(rec);
609
610         ret = tdb->methods->tdb_write(tdb, ofs, key.dptr, key.dsize);
611         if (ret == -1) {
612                 goto fail;
613         }
614         ofs += key.dsize;
615
616         for (i=0; i<num_dbufs; i++) {
617                 ret = tdb->methods->tdb_write(tdb, ofs, dbufs[i].dptr,
618                                               dbufs[i].dsize);
619                 if (ret == -1) {
620                         goto fail;
621                 }
622                 ofs += dbufs[i].dsize;
623         }
624
625         ret = tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr);
626         if (ret == -1) {
627                 /* Need to tdb_unallocate() here */
628                 goto fail;
629         }
630
631  done:
632         ret = 0;
633  fail:
634         if (ret == 0) {
635                 tdb_increment_seqnum(tdb);
636         }
637         return ret;
638 }
639
640 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
641                       TDB_DATA dbuf, int flag, uint32_t hash)
642 {
643         return _tdb_storev(tdb, key, &dbuf, 1, flag, hash);
644 }
645
646 /* store an element in the database, replacing any existing element
647    with the same key
648
649    return 0 on success, -1 on failure
650 */
651 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
652 {
653         uint32_t hash;
654         int ret;
655
656         if (tdb->read_only || tdb->traverse_read) {
657                 tdb->ecode = TDB_ERR_RDONLY;
658                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
659                 return -1;
660         }
661
662         /* find which hash bucket it is in */
663         hash = tdb->hash_fn(&key);
664         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
665                 return -1;
666
667         ret = _tdb_store(tdb, key, dbuf, flag, hash);
668         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
669         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
670         return ret;
671 }
672
673 _PUBLIC_ int tdb_storev(struct tdb_context *tdb, TDB_DATA key,
674                         const TDB_DATA *dbufs, int num_dbufs, int flag)
675 {
676         uint32_t hash;
677         int ret;
678
679         if (tdb->read_only || tdb->traverse_read) {
680                 tdb->ecode = TDB_ERR_RDONLY;
681                 tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
682                                               dbufs, num_dbufs, flag, -1);
683                 return -1;
684         }
685
686         /* find which hash bucket it is in */
687         hash = tdb->hash_fn(&key);
688         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
689                 return -1;
690
691         ret = _tdb_storev(tdb, key, dbufs, num_dbufs, flag, hash);
692         tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
693                                       dbufs, num_dbufs, flag, -1);
694         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
695         return ret;
696 }
697
698 /* Append to an entry. Create if not exist. */
699 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
700 {
701         uint32_t hash;
702         TDB_DATA dbufs[2];
703         int ret = -1;
704
705         /* find which hash bucket it is in */
706         hash = tdb->hash_fn(&key);
707         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
708                 return -1;
709
710         dbufs[0] = _tdb_fetch(tdb, key);
711         dbufs[1] = new_dbuf;
712
713         ret = _tdb_storev(tdb, key, dbufs, 2, 0, hash);
714         tdb_trace_2rec_retrec(tdb, "tdb_append", key, dbufs[0], dbufs[1]);
715
716         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
717         SAFE_FREE(dbufs[0].dptr);
718         return ret;
719 }
720
721
722 /*
723   return the name of the current tdb file
724   useful for external logging functions
725 */
726 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
727 {
728         return tdb->name;
729 }
730
731 /*
732   return the underlying file descriptor being used by tdb, or -1
733   useful for external routines that want to check the device/inode
734   of the fd
735 */
736 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
737 {
738         return tdb->fd;
739 }
740
741 /*
742   return the current logging function
743   useful for external tdb routines that wish to log tdb errors
744 */
745 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
746 {
747         return tdb->log.log_fn;
748 }
749
750
751 /*
752   get the tdb sequence number. Only makes sense if the writers opened
753   with TDB_SEQNUM set. Note that this sequence number will wrap quite
754   quickly, so it should only be used for a 'has something changed'
755   test, not for code that relies on the count of the number of changes
756   made. If you want a counter then use a tdb record.
757
758   The aim of this sequence number is to allow for a very lightweight
759   test of a possible tdb change.
760 */
761 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
762 {
763         tdb_off_t seqnum=0;
764
765         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
766         return seqnum;
767 }
768
769 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
770 {
771         return tdb->hash_size;
772 }
773
774 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
775 {
776         return tdb->map_size;
777 }
778
779 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
780 {
781         return tdb->flags;
782 }
783
784 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
785 {
786         if ((flags & TDB_ALLOW_NESTING) &&
787             (flags & TDB_DISALLOW_NESTING)) {
788                 tdb->ecode = TDB_ERR_NESTING;
789                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
790                         "allow_nesting and disallow_nesting are not allowed together!"));
791                 return;
792         }
793
794         if (flags & TDB_ALLOW_NESTING) {
795                 tdb->flags &= ~TDB_DISALLOW_NESTING;
796         }
797         if (flags & TDB_DISALLOW_NESTING) {
798                 tdb->flags &= ~TDB_ALLOW_NESTING;
799         }
800
801         tdb->flags |= flags;
802 }
803
804 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
805 {
806         if ((flags & TDB_ALLOW_NESTING) &&
807             (flags & TDB_DISALLOW_NESTING)) {
808                 tdb->ecode = TDB_ERR_NESTING;
809                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
810                         "allow_nesting and disallow_nesting are not allowed together!"));
811                 return;
812         }
813
814         if ((flags & TDB_NOLOCK) &&
815             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
816             (tdb->mutexes == NULL)) {
817                 tdb->ecode = TDB_ERR_LOCK;
818                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
819                          "Can not remove NOLOCK flag on mutexed databases"));
820                 return;
821         }
822
823         if (flags & TDB_ALLOW_NESTING) {
824                 tdb->flags |= TDB_DISALLOW_NESTING;
825         }
826         if (flags & TDB_DISALLOW_NESTING) {
827                 tdb->flags |= TDB_ALLOW_NESTING;
828         }
829
830         tdb->flags &= ~flags;
831 }
832
833
834 /*
835   enable sequence number handling on an open tdb
836 */
837 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
838 {
839         tdb->flags |= TDB_SEQNUM;
840 }
841
842
843 /*
844   add a region of the file to the freelist. Length is the size of the region in bytes,
845   which includes the free list header that needs to be added
846  */
847 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
848 {
849         struct tdb_record rec;
850         if (length <= sizeof(rec)) {
851                 /* the region is not worth adding */
852                 return 0;
853         }
854         if (length + offset > tdb->map_size) {
855                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
856                 return -1;
857         }
858         memset(&rec,'\0',sizeof(rec));
859         rec.rec_len = length - sizeof(rec);
860         if (tdb_free(tdb, offset, &rec) == -1) {
861                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
862                 return -1;
863         }
864         return 0;
865 }
866
867 /*
868   wipe the entire database, deleting all records. This can be done
869   very fast by using a allrecord lock. The entire data portion of the
870   file becomes a single entry in the freelist.
871
872   This code carefully steps around the recovery area, leaving it alone
873  */
874 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
875 {
876         uint32_t i;
877         tdb_off_t offset = 0;
878         ssize_t data_len;
879         tdb_off_t recovery_head;
880         tdb_len_t recovery_size = 0;
881
882         if (tdb_lockall(tdb) != 0) {
883                 return -1;
884         }
885
886         tdb_trace(tdb, "tdb_wipe_all");
887
888         /* see if the tdb has a recovery area, and remember its size
889            if so. We don't want to lose this as otherwise each
890            tdb_wipe_all() in a transaction will increase the size of
891            the tdb by the size of the recovery area */
892         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
893                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
894                 goto failed;
895         }
896
897         if (recovery_head != 0) {
898                 struct tdb_record rec;
899                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
900                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
901                         return -1;
902                 }
903                 recovery_size = rec.rec_len + sizeof(rec);
904         }
905
906         /* wipe the hashes */
907         for (i=0;i<tdb->hash_size;i++) {
908                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
909                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
910                         goto failed;
911                 }
912         }
913
914         /* wipe the freelist */
915         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
916                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
917                 goto failed;
918         }
919
920         /* add all the rest of the file to the freelist, possibly leaving a gap
921            for the recovery area */
922         if (recovery_size == 0) {
923                 /* the simple case - the whole file can be used as a freelist */
924                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
925                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
926                         goto failed;
927                 }
928         } else {
929                 /* we need to add two freelist entries - one on either
930                    side of the recovery area
931
932                    Note that we cannot shift the recovery area during
933                    this operation. Only the transaction.c code may
934                    move the recovery area or we risk subtle data
935                    corruption
936                 */
937                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
938                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
939                         goto failed;
940                 }
941                 /* and the 2nd free list entry after the recovery area - if any */
942                 data_len = tdb->map_size - (recovery_head+recovery_size);
943                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
944                         goto failed;
945                 }
946         }
947
948         tdb_increment_seqnum_nonblock(tdb);
949
950         if (tdb_unlockall(tdb) != 0) {
951                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
952                 goto failed;
953         }
954
955         return 0;
956
957 failed:
958         tdb_unlockall(tdb);
959         return -1;
960 }
961
962 struct traverse_state {
963         bool error;
964         struct tdb_context *dest_db;
965 };
966
967 /*
968   traverse function for repacking
969  */
970 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
971 {
972         struct traverse_state *state = (struct traverse_state *)private_data;
973         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
974                 state->error = true;
975                 return -1;
976         }
977         return 0;
978 }
979
980 /*
981   repack a tdb
982  */
983 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
984 {
985         struct tdb_context *tmp_db;
986         struct traverse_state state;
987
988         tdb_trace(tdb, "tdb_repack");
989
990         if (tdb_transaction_start(tdb) != 0) {
991                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
992                 return -1;
993         }
994
995         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
996         if (tmp_db == NULL) {
997                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
998                 tdb_transaction_cancel(tdb);
999                 return -1;
1000         }
1001
1002         state.error = false;
1003         state.dest_db = tmp_db;
1004
1005         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
1006                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
1007                 tdb_transaction_cancel(tdb);
1008                 tdb_close(tmp_db);
1009                 return -1;
1010         }
1011
1012         if (state.error) {
1013                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
1014                 tdb_transaction_cancel(tdb);
1015                 tdb_close(tmp_db);
1016                 return -1;
1017         }
1018
1019         if (tdb_wipe_all(tdb) != 0) {
1020                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
1021                 tdb_transaction_cancel(tdb);
1022                 tdb_close(tmp_db);
1023                 return -1;
1024         }
1025
1026         state.error = false;
1027         state.dest_db = tdb;
1028
1029         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
1030                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
1031                 tdb_transaction_cancel(tdb);
1032                 tdb_close(tmp_db);
1033                 return -1;
1034         }
1035
1036         if (state.error) {
1037                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
1038                 tdb_transaction_cancel(tdb);
1039                 tdb_close(tmp_db);
1040                 return -1;
1041         }
1042
1043         tdb_close(tmp_db);
1044
1045         if (tdb_transaction_commit(tdb) != 0) {
1046                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1047                 return -1;
1048         }
1049
1050         return 0;
1051 }
1052
1053 /* Even on files, we can get partial writes due to signals. */
1054 bool tdb_write_all(int fd, const void *buf, size_t count)
1055 {
1056         while (count) {
1057                 ssize_t ret;
1058                 ret = write(fd, buf, count);
1059                 if (ret < 0)
1060                         return false;
1061                 buf = (const char *)buf + ret;
1062                 count -= ret;
1063         }
1064         return true;
1065 }
1066
1067 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1068 {
1069         tdb_off_t ret = a + b;
1070
1071         if ((ret < a) || (ret < b)) {
1072                 return false;
1073         }
1074         *pret = ret;
1075         return true;
1076 }
1077
1078 #ifdef TDB_TRACE
1079 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1080 {
1081         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1082                 close(tdb->tracefd);
1083                 tdb->tracefd = -1;
1084         }
1085 }
1086
1087 static void tdb_trace_start(struct tdb_context *tdb)
1088 {
1089         tdb_off_t seqnum=0;
1090         char msg[sizeof(tdb_off_t) * 4 + 1];
1091
1092         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1093         snprintf(msg, sizeof(msg), "%u ", seqnum);
1094         tdb_trace_write(tdb, msg);
1095 }
1096
1097 static void tdb_trace_end(struct tdb_context *tdb)
1098 {
1099         tdb_trace_write(tdb, "\n");
1100 }
1101
1102 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1103 {
1104         char msg[sizeof(ret) * 4 + 4];
1105         snprintf(msg, sizeof(msg), " = %i\n", ret);
1106         tdb_trace_write(tdb, msg);
1107 }
1108
1109 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1110 {
1111         char msg[20 + rec.dsize*2], *p;
1112         unsigned int i;
1113
1114         /* We differentiate zero-length records from non-existent ones. */
1115         if (rec.dptr == NULL) {
1116                 tdb_trace_write(tdb, " NULL");
1117                 return;
1118         }
1119
1120         /* snprintf here is purely cargo-cult programming. */
1121         p = msg;
1122         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1123         for (i = 0; i < rec.dsize; i++)
1124                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1125
1126         tdb_trace_write(tdb, msg);
1127 }
1128
1129 void tdb_trace(struct tdb_context *tdb, const char *op)
1130 {
1131         tdb_trace_start(tdb);
1132         tdb_trace_write(tdb, op);
1133         tdb_trace_end(tdb);
1134 }
1135
1136 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1137 {
1138         char msg[sizeof(tdb_off_t) * 4 + 1];
1139
1140         snprintf(msg, sizeof(msg), "%u ", seqnum);
1141         tdb_trace_write(tdb, msg);
1142         tdb_trace_write(tdb, op);
1143         tdb_trace_end(tdb);
1144 }
1145
1146 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1147                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1148 {
1149         char msg[128];
1150
1151         snprintf(msg, sizeof(msg),
1152                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1153         tdb_trace_start(tdb);
1154         tdb_trace_write(tdb, msg);
1155         tdb_trace_end(tdb);
1156 }
1157
1158 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1159 {
1160         tdb_trace_start(tdb);
1161         tdb_trace_write(tdb, op);
1162         tdb_trace_end_ret(tdb, ret);
1163 }
1164
1165 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1166 {
1167         tdb_trace_start(tdb);
1168         tdb_trace_write(tdb, op);
1169         tdb_trace_write(tdb, " =");
1170         tdb_trace_record(tdb, ret);
1171         tdb_trace_end(tdb);
1172 }
1173
1174 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1175                     TDB_DATA rec)
1176 {
1177         tdb_trace_start(tdb);
1178         tdb_trace_write(tdb, op);
1179         tdb_trace_record(tdb, rec);
1180         tdb_trace_end(tdb);
1181 }
1182
1183 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1184                         TDB_DATA rec, int ret)
1185 {
1186         tdb_trace_start(tdb);
1187         tdb_trace_write(tdb, op);
1188         tdb_trace_record(tdb, rec);
1189         tdb_trace_end_ret(tdb, ret);
1190 }
1191
1192 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1193                            TDB_DATA rec, TDB_DATA ret)
1194 {
1195         tdb_trace_start(tdb);
1196         tdb_trace_write(tdb, op);
1197         tdb_trace_record(tdb, rec);
1198         tdb_trace_write(tdb, " =");
1199         tdb_trace_record(tdb, ret);
1200         tdb_trace_end(tdb);
1201 }
1202
1203 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1204                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1205                              int ret)
1206 {
1207         char msg[1 + sizeof(ret) * 4];
1208
1209         snprintf(msg, sizeof(msg), " %#x", flag);
1210         tdb_trace_start(tdb);
1211         tdb_trace_write(tdb, op);
1212         tdb_trace_record(tdb, rec1);
1213         tdb_trace_record(tdb, rec2);
1214         tdb_trace_write(tdb, msg);
1215         tdb_trace_end_ret(tdb, ret);
1216 }
1217
1218 void tdb_trace_1plusn_rec_flag_ret(struct tdb_context *tdb, const char *op,
1219                                    TDB_DATA rec,
1220                                    const TDB_DATA *recs, int num_recs,
1221                                    unsigned flag, int ret)
1222 {
1223         char msg[1 + sizeof(ret) * 4];
1224         int i;
1225
1226         snprintf(msg, sizeof(msg), " %#x", flag);
1227         tdb_trace_start(tdb);
1228         tdb_trace_write(tdb, op);
1229         tdb_trace_record(tdb, rec);
1230         for (i=0; i<num_recs; i++) {
1231                 tdb_trace_record(tdb, recs[i]);
1232         }
1233         tdb_trace_write(tdb, msg);
1234         tdb_trace_end_ret(tdb, ret);
1235 }
1236
1237 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1238                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1239 {
1240         tdb_trace_start(tdb);
1241         tdb_trace_write(tdb, op);
1242         tdb_trace_record(tdb, rec1);
1243         tdb_trace_record(tdb, rec2);
1244         tdb_trace_write(tdb, " =");
1245         tdb_trace_record(tdb, ret);
1246         tdb_trace_end(tdb);
1247 }
1248 #endif