tdb: Make tdb_find circular-safe
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 void tdb_chainwalk_init(struct tdb_chainwalk_ctx *ctx, tdb_off_t ptr)
83 {
84         *ctx = (struct tdb_chainwalk_ctx) { .slow_ptr = ptr };
85 }
86
87 bool tdb_chainwalk_check(struct tdb_context *tdb,
88                          struct tdb_chainwalk_ctx *ctx,
89                          tdb_off_t next_ptr)
90 {
91         int ret;
92
93         if (ctx->slow_chase) {
94                 ret = tdb_ofs_read(tdb, ctx->slow_ptr, &ctx->slow_ptr);
95                 if (ret == -1) {
96                         return false;
97                 }
98         }
99         ctx->slow_chase = !ctx->slow_chase;
100
101         if (next_ptr == ctx->slow_ptr) {
102                 tdb->ecode = TDB_ERR_CORRUPT;
103                 TDB_LOG((tdb, TDB_DEBUG_ERROR,
104                          "tdb_chainwalk_check: circular chain\n"));
105                 return false;
106         }
107
108         return true;
109 }
110
111 /* Returns 0 on fail.  On success, return offset of record, and fills
112    in rec */
113 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
114                         struct tdb_record *r)
115 {
116         tdb_off_t rec_ptr;
117         struct tdb_chainwalk_ctx chainwalk;
118
119         /* read in the hash top */
120         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
121                 return 0;
122
123         tdb_chainwalk_init(&chainwalk, rec_ptr);
124
125         /* keep looking until we find the right record */
126         while (rec_ptr) {
127                 bool ok;
128
129                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
130                         return 0;
131
132                 if (!TDB_DEAD(r) && hash==r->full_hash
133                     && key.dsize==r->key_len
134                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
135                                       r->key_len, tdb_key_compare,
136                                       NULL) == 0) {
137                         return rec_ptr;
138                 }
139                 rec_ptr = r->next;
140
141                 ok = tdb_chainwalk_check(tdb, &chainwalk, rec_ptr);
142                 if (!ok) {
143                         return 0;
144                 }
145         }
146         tdb->ecode = TDB_ERR_NOEXIST;
147         return 0;
148 }
149
150 /* As tdb_find, but if you succeed, keep the lock */
151 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
152                            struct tdb_record *rec)
153 {
154         uint32_t rec_ptr;
155
156         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
157                 return 0;
158         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
159                 tdb_unlock(tdb, BUCKET(hash), locktype);
160         return rec_ptr;
161 }
162
163 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
164
165 struct tdb_update_hash_state {
166         const TDB_DATA *dbufs;
167         int num_dbufs;
168         tdb_len_t dbufs_len;
169 };
170
171 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
172 {
173         struct tdb_update_hash_state *state = private_data;
174         unsigned char *dptr = data.dptr;
175         int i;
176
177         if (state->dbufs_len != data.dsize) {
178                 return -1;
179         }
180
181         for (i=0; i<state->num_dbufs; i++) {
182                 TDB_DATA dbuf = state->dbufs[i];
183                 int ret;
184                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
185                 if (ret != 0) {
186                         return -1;
187                 }
188                 dptr += dbuf.dsize;
189         }
190
191         return 0;
192 }
193
194 /* update an entry in place - this only works if the new data size
195    is <= the old data size and the key exists.
196    on failure return -1.
197 */
198 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
199                            uint32_t hash,
200                            const TDB_DATA *dbufs, int num_dbufs,
201                            tdb_len_t dbufs_len)
202 {
203         struct tdb_record rec;
204         tdb_off_t rec_ptr, ofs;
205         int i;
206
207         /* find entry */
208         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
209                 return -1;
210
211         /* it could be an exact duplicate of what is there - this is
212          * surprisingly common (eg. with a ldb re-index). */
213         if (rec.data_len == dbufs_len) {
214                 struct tdb_update_hash_state state = {
215                         .dbufs = dbufs, .num_dbufs = num_dbufs,
216                         .dbufs_len = dbufs_len
217                 };
218                 int ret;
219
220                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
221                 if (ret == 0) {
222                         return 0;
223                 }
224         }
225
226         /* must be long enough key, data and tailer */
227         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
228                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
229                 return -1;
230         }
231
232         ofs = rec_ptr + sizeof(rec) + rec.key_len;
233
234         for (i=0; i<num_dbufs; i++) {
235                 TDB_DATA dbuf = dbufs[i];
236                 int ret;
237
238                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
239                 if (ret == -1) {
240                         return -1;
241                 }
242                 ofs += dbuf.dsize;
243         }
244
245         if (dbufs_len != rec.data_len) {
246                 /* update size */
247                 rec.data_len = dbufs_len;
248                 return tdb_rec_write(tdb, rec_ptr, &rec);
249         }
250
251         return 0;
252 }
253
254 /* find an entry in the database given a key */
255 /* If an entry doesn't exist tdb_err will be set to
256  * TDB_ERR_NOEXIST. If a key has no data attached
257  * then the TDB_DATA will have zero length but
258  * a non-zero pointer
259  */
260 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
261 {
262         tdb_off_t rec_ptr;
263         struct tdb_record rec;
264         TDB_DATA ret;
265         uint32_t hash;
266
267         /* find which hash bucket it is in */
268         hash = tdb->hash_fn(&key);
269         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
270                 return tdb_null;
271
272         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
273                                   rec.data_len);
274         ret.dsize = rec.data_len;
275         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
276         return ret;
277 }
278
279 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
280 {
281         TDB_DATA ret = _tdb_fetch(tdb, key);
282
283         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
284         return ret;
285 }
286
287 /*
288  * Find an entry in the database and hand the record's data to a parsing
289  * function. The parsing function is executed under the chain read lock, so it
290  * should be fast and should not block on other syscalls.
291  *
292  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
293  *
294  * For mmapped tdb's that do not have a transaction open it points the parsing
295  * function directly at the mmap area, it avoids the malloc/memcpy in this
296  * case. If a transaction is open or no mmap is available, it has to do
297  * malloc/read/parse/free.
298  *
299  * This is interesting for all readers of potentially large data structures in
300  * the tdb records, ldb indexes being one example.
301  *
302  * Return -1 if the record was not found.
303  */
304
305 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
306                      int (*parser)(TDB_DATA key, TDB_DATA data,
307                                    void *private_data),
308                      void *private_data)
309 {
310         tdb_off_t rec_ptr;
311         struct tdb_record rec;
312         int ret;
313         uint32_t hash;
314
315         /* find which hash bucket it is in */
316         hash = tdb->hash_fn(&key);
317
318         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
319                 /* record not found */
320                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
321                 tdb->ecode = TDB_ERR_NOEXIST;
322                 return -1;
323         }
324         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
325
326         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
327                              rec.data_len, parser, private_data);
328
329         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
330
331         return ret;
332 }
333
334 /* check if an entry in the database exists
335
336    note that 1 is returned if the key is found and 0 is returned if not found
337    this doesn't match the conventions in the rest of this module, but is
338    compatible with gdbm
339 */
340 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
341 {
342         struct tdb_record rec;
343
344         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
345                 return 0;
346         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
347         return 1;
348 }
349
350 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
351 {
352         uint32_t hash = tdb->hash_fn(&key);
353         int ret;
354
355         ret = tdb_exists_hash(tdb, key, hash);
356         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
357         return ret;
358 }
359
360 /* actually delete an entry in the database given the offset */
361 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
362 {
363         tdb_off_t last_ptr, i;
364         struct tdb_record lastrec;
365
366         if (tdb->read_only || tdb->traverse_read) return -1;
367
368         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
369             tdb_write_lock_record(tdb, rec_ptr) == -1) {
370                 /* Someone traversing here: mark it as dead */
371                 rec->magic = TDB_DEAD_MAGIC;
372                 return tdb_rec_write(tdb, rec_ptr, rec);
373         }
374         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
375                 return -1;
376
377         /* find previous record in hash chain */
378         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
379                 return -1;
380         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
381                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
382                         return -1;
383
384         /* unlink it: next ptr is at start of record. */
385         if (last_ptr == 0)
386                 last_ptr = TDB_HASH_TOP(rec->full_hash);
387         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
388                 return -1;
389
390         /* recover the space */
391         if (tdb_free(tdb, rec_ptr, rec) == -1)
392                 return -1;
393         return 0;
394 }
395
396 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
397 {
398         int res = 0;
399         tdb_off_t rec_ptr;
400         struct tdb_record rec;
401
402         /* read in the hash top */
403         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
404                 return 0;
405
406         while (rec_ptr) {
407                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
408                         return 0;
409
410                 if (rec.magic == TDB_DEAD_MAGIC) {
411                         res += 1;
412                 }
413                 rec_ptr = rec.next;
414         }
415         return res;
416 }
417
418 /*
419  * Purge all DEAD records from a hash chain
420  */
421 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
422 {
423         int res = -1;
424         struct tdb_record rec;
425         tdb_off_t rec_ptr;
426
427         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
428                 /*
429                  * Don't block the freelist if not strictly necessary
430                  */
431                 return -1;
432         }
433
434         /* read in the hash top */
435         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
436                 goto fail;
437
438         while (rec_ptr) {
439                 tdb_off_t next;
440
441                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
442                         goto fail;
443                 }
444
445                 next = rec.next;
446
447                 if (rec.magic == TDB_DEAD_MAGIC
448                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
449                         goto fail;
450                 }
451                 rec_ptr = next;
452         }
453         res = 0;
454  fail:
455         tdb_unlock(tdb, -1, F_WRLCK);
456         return res;
457 }
458
459 /* delete an entry in the database given a key */
460 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
461 {
462         tdb_off_t rec_ptr;
463         struct tdb_record rec;
464         int ret;
465
466         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
467         if (rec_ptr == 0) {
468                 return -1;
469         }
470
471         if (tdb->max_dead_records != 0) {
472
473                 uint32_t magic = TDB_DEAD_MAGIC;
474
475                 /*
476                  * Allow for some dead records per hash chain, mainly for
477                  * tdb's with a very high create/delete rate like locking.tdb.
478                  */
479
480                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
481                         /*
482                          * Don't let the per-chain freelist grow too large,
483                          * delete all existing dead records
484                          */
485                         tdb_purge_dead(tdb, hash);
486                 }
487
488                 /*
489                  * Just mark the record as dead.
490                  */
491                 ret = tdb_ofs_write(
492                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
493                         &magic);
494         }
495         else {
496                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
497         }
498
499         if (ret == 0) {
500                 tdb_increment_seqnum(tdb);
501         }
502
503         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
504                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
505         return ret;
506 }
507
508 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
509 {
510         uint32_t hash = tdb->hash_fn(&key);
511         int ret;
512
513         ret = tdb_delete_hash(tdb, key, hash);
514         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
515         return ret;
516 }
517
518 /*
519  * See if we have a dead record around with enough space
520  */
521 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
522                         struct tdb_record *r, tdb_len_t length,
523                         tdb_off_t *p_last_ptr)
524 {
525         tdb_off_t rec_ptr, last_ptr;
526         tdb_off_t best_rec_ptr = 0;
527         tdb_off_t best_last_ptr = 0;
528         struct tdb_record best = { .rec_len = UINT32_MAX };
529
530         length += sizeof(tdb_off_t); /* tailer */
531
532         last_ptr = TDB_HASH_TOP(hash);
533
534         /* read in the hash top */
535         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
536                 return 0;
537
538         /* keep looking until we find the right record */
539         while (rec_ptr) {
540                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
541                         return 0;
542
543                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
544                     (r->rec_len < best.rec_len)) {
545                         best_rec_ptr = rec_ptr;
546                         best_last_ptr = last_ptr;
547                         best = *r;
548                 }
549                 last_ptr = rec_ptr;
550                 rec_ptr = r->next;
551         }
552
553         if (best.rec_len == UINT32_MAX) {
554                 return 0;
555         }
556
557         *r = best;
558         *p_last_ptr = best_last_ptr;
559         return best_rec_ptr;
560 }
561
562 static int _tdb_storev(struct tdb_context *tdb, TDB_DATA key,
563                        const TDB_DATA *dbufs, int num_dbufs,
564                        int flag, uint32_t hash)
565 {
566         struct tdb_record rec;
567         tdb_off_t rec_ptr, ofs;
568         tdb_len_t rec_len, dbufs_len;
569         int i;
570         int ret = -1;
571
572         dbufs_len = 0;
573
574         for (i=0; i<num_dbufs; i++) {
575                 size_t dsize = dbufs[i].dsize;
576
577                 if ((dsize != 0) && (dbufs[i].dptr == NULL)) {
578                         tdb->ecode = TDB_ERR_EINVAL;
579                         goto fail;
580                 }
581
582                 dbufs_len += dsize;
583                 if (dbufs_len < dsize) {
584                         tdb->ecode = TDB_ERR_OOM;
585                         goto fail;
586                 }
587         }
588
589         rec_len = key.dsize + dbufs_len;
590         if ((rec_len < key.dsize) || (rec_len < dbufs_len)) {
591                 tdb->ecode = TDB_ERR_OOM;
592                 goto fail;
593         }
594
595         /* check for it existing, on insert. */
596         if (flag == TDB_INSERT) {
597                 if (tdb_exists_hash(tdb, key, hash)) {
598                         tdb->ecode = TDB_ERR_EXISTS;
599                         goto fail;
600                 }
601         } else {
602                 /* first try in-place update, on modify or replace. */
603                 if (tdb_update_hash(tdb, key, hash, dbufs, num_dbufs,
604                                     dbufs_len) == 0) {
605                         goto done;
606                 }
607                 if (tdb->ecode == TDB_ERR_NOEXIST &&
608                     flag == TDB_MODIFY) {
609                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
610                          we should fail the store */
611                         goto fail;
612                 }
613         }
614         /* reset the error code potentially set by the tdb_update_hash() */
615         tdb->ecode = TDB_SUCCESS;
616
617         /* delete any existing record - if it doesn't exist we don't
618            care.  Doing this first reduces fragmentation, and avoids
619            coalescing with `allocated' block before it's updated. */
620         if (flag != TDB_INSERT)
621                 tdb_delete_hash(tdb, key, hash);
622
623         /* we have to allocate some space */
624         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
625
626         if (rec_ptr == 0) {
627                 goto fail;
628         }
629
630         /* Read hash top into next ptr */
631         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
632                 goto fail;
633
634         rec.key_len = key.dsize;
635         rec.data_len = dbufs_len;
636         rec.full_hash = hash;
637         rec.magic = TDB_MAGIC;
638
639         ofs = rec_ptr;
640
641         /* write out and point the top of the hash chain at it */
642         ret = tdb_rec_write(tdb, ofs, &rec);
643         if (ret == -1) {
644                 goto fail;
645         }
646         ofs += sizeof(rec);
647
648         ret = tdb->methods->tdb_write(tdb, ofs, key.dptr, key.dsize);
649         if (ret == -1) {
650                 goto fail;
651         }
652         ofs += key.dsize;
653
654         for (i=0; i<num_dbufs; i++) {
655                 if (dbufs[i].dsize == 0) {
656                         continue;
657                 }
658
659                 ret = tdb->methods->tdb_write(tdb, ofs, dbufs[i].dptr,
660                                               dbufs[i].dsize);
661                 if (ret == -1) {
662                         goto fail;
663                 }
664                 ofs += dbufs[i].dsize;
665         }
666
667         ret = tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr);
668         if (ret == -1) {
669                 /* Need to tdb_unallocate() here */
670                 goto fail;
671         }
672
673  done:
674         ret = 0;
675  fail:
676         if (ret == 0) {
677                 tdb_increment_seqnum(tdb);
678         }
679         return ret;
680 }
681
682 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
683                       TDB_DATA dbuf, int flag, uint32_t hash)
684 {
685         return _tdb_storev(tdb, key, &dbuf, 1, flag, hash);
686 }
687
688 /* store an element in the database, replacing any existing element
689    with the same key
690
691    return 0 on success, -1 on failure
692 */
693 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
694 {
695         uint32_t hash;
696         int ret;
697
698         if (tdb->read_only || tdb->traverse_read) {
699                 tdb->ecode = TDB_ERR_RDONLY;
700                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
701                 return -1;
702         }
703
704         /* find which hash bucket it is in */
705         hash = tdb->hash_fn(&key);
706         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
707                 return -1;
708
709         ret = _tdb_store(tdb, key, dbuf, flag, hash);
710         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
711         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
712         return ret;
713 }
714
715 _PUBLIC_ int tdb_storev(struct tdb_context *tdb, TDB_DATA key,
716                         const TDB_DATA *dbufs, int num_dbufs, int flag)
717 {
718         uint32_t hash;
719         int ret;
720
721         if (tdb->read_only || tdb->traverse_read) {
722                 tdb->ecode = TDB_ERR_RDONLY;
723                 tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
724                                               dbufs, num_dbufs, flag, -1);
725                 return -1;
726         }
727
728         /* find which hash bucket it is in */
729         hash = tdb->hash_fn(&key);
730         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
731                 return -1;
732
733         ret = _tdb_storev(tdb, key, dbufs, num_dbufs, flag, hash);
734         tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
735                                       dbufs, num_dbufs, flag, -1);
736         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
737         return ret;
738 }
739
740 /* Append to an entry. Create if not exist. */
741 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
742 {
743         uint32_t hash;
744         TDB_DATA dbufs[2];
745         int ret = -1;
746
747         /* find which hash bucket it is in */
748         hash = tdb->hash_fn(&key);
749         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
750                 return -1;
751
752         dbufs[0] = _tdb_fetch(tdb, key);
753         dbufs[1] = new_dbuf;
754
755         ret = _tdb_storev(tdb, key, dbufs, 2, 0, hash);
756         tdb_trace_2rec_retrec(tdb, "tdb_append", key, dbufs[0], dbufs[1]);
757
758         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
759         SAFE_FREE(dbufs[0].dptr);
760         return ret;
761 }
762
763
764 /*
765   return the name of the current tdb file
766   useful for external logging functions
767 */
768 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
769 {
770         return tdb->name;
771 }
772
773 /*
774   return the underlying file descriptor being used by tdb, or -1
775   useful for external routines that want to check the device/inode
776   of the fd
777 */
778 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
779 {
780         return tdb->fd;
781 }
782
783 /*
784   return the current logging function
785   useful for external tdb routines that wish to log tdb errors
786 */
787 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
788 {
789         return tdb->log.log_fn;
790 }
791
792
793 /*
794   get the tdb sequence number. Only makes sense if the writers opened
795   with TDB_SEQNUM set. Note that this sequence number will wrap quite
796   quickly, so it should only be used for a 'has something changed'
797   test, not for code that relies on the count of the number of changes
798   made. If you want a counter then use a tdb record.
799
800   The aim of this sequence number is to allow for a very lightweight
801   test of a possible tdb change.
802 */
803 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
804 {
805         tdb_off_t seqnum=0;
806
807         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
808         return seqnum;
809 }
810
811 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
812 {
813         return tdb->hash_size;
814 }
815
816 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
817 {
818         return tdb->map_size;
819 }
820
821 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
822 {
823         return tdb->flags;
824 }
825
826 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
827 {
828         if ((flags & TDB_ALLOW_NESTING) &&
829             (flags & TDB_DISALLOW_NESTING)) {
830                 tdb->ecode = TDB_ERR_NESTING;
831                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
832                         "allow_nesting and disallow_nesting are not allowed together!"));
833                 return;
834         }
835
836         if (flags & TDB_ALLOW_NESTING) {
837                 tdb->flags &= ~TDB_DISALLOW_NESTING;
838         }
839         if (flags & TDB_DISALLOW_NESTING) {
840                 tdb->flags &= ~TDB_ALLOW_NESTING;
841         }
842
843         tdb->flags |= flags;
844 }
845
846 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
847 {
848         if ((flags & TDB_ALLOW_NESTING) &&
849             (flags & TDB_DISALLOW_NESTING)) {
850                 tdb->ecode = TDB_ERR_NESTING;
851                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
852                         "allow_nesting and disallow_nesting are not allowed together!"));
853                 return;
854         }
855
856         if ((flags & TDB_NOLOCK) &&
857             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
858             (tdb->mutexes == NULL)) {
859                 tdb->ecode = TDB_ERR_LOCK;
860                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
861                          "Can not remove NOLOCK flag on mutexed databases"));
862                 return;
863         }
864
865         if (flags & TDB_ALLOW_NESTING) {
866                 tdb->flags |= TDB_DISALLOW_NESTING;
867         }
868         if (flags & TDB_DISALLOW_NESTING) {
869                 tdb->flags |= TDB_ALLOW_NESTING;
870         }
871
872         tdb->flags &= ~flags;
873 }
874
875
876 /*
877   enable sequence number handling on an open tdb
878 */
879 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
880 {
881         tdb->flags |= TDB_SEQNUM;
882 }
883
884
885 /*
886   add a region of the file to the freelist. Length is the size of the region in bytes,
887   which includes the free list header that needs to be added
888  */
889 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
890 {
891         struct tdb_record rec;
892         if (length <= sizeof(rec)) {
893                 /* the region is not worth adding */
894                 return 0;
895         }
896         if (length + offset > tdb->map_size) {
897                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
898                 return -1;
899         }
900         memset(&rec,'\0',sizeof(rec));
901         rec.rec_len = length - sizeof(rec);
902         if (tdb_free(tdb, offset, &rec) == -1) {
903                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
904                 return -1;
905         }
906         return 0;
907 }
908
909 /*
910   wipe the entire database, deleting all records. This can be done
911   very fast by using a allrecord lock. The entire data portion of the
912   file becomes a single entry in the freelist.
913
914   This code carefully steps around the recovery area, leaving it alone
915  */
916 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
917 {
918         uint32_t i;
919         tdb_off_t offset = 0;
920         ssize_t data_len;
921         tdb_off_t recovery_head;
922         tdb_len_t recovery_size = 0;
923
924         if (tdb_lockall(tdb) != 0) {
925                 return -1;
926         }
927
928         tdb_trace(tdb, "tdb_wipe_all");
929
930         /* see if the tdb has a recovery area, and remember its size
931            if so. We don't want to lose this as otherwise each
932            tdb_wipe_all() in a transaction will increase the size of
933            the tdb by the size of the recovery area */
934         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
935                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
936                 goto failed;
937         }
938
939         if (recovery_head != 0) {
940                 struct tdb_record rec;
941                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
942                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
943                         return -1;
944                 }
945                 recovery_size = rec.rec_len + sizeof(rec);
946         }
947
948         /* wipe the hashes */
949         for (i=0;i<tdb->hash_size;i++) {
950                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
951                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
952                         goto failed;
953                 }
954         }
955
956         /* wipe the freelist */
957         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
958                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
959                 goto failed;
960         }
961
962         /* add all the rest of the file to the freelist, possibly leaving a gap
963            for the recovery area */
964         if (recovery_size == 0) {
965                 /* the simple case - the whole file can be used as a freelist */
966                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
967                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
968                         goto failed;
969                 }
970         } else {
971                 /* we need to add two freelist entries - one on either
972                    side of the recovery area
973
974                    Note that we cannot shift the recovery area during
975                    this operation. Only the transaction.c code may
976                    move the recovery area or we risk subtle data
977                    corruption
978                 */
979                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
980                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
981                         goto failed;
982                 }
983                 /* and the 2nd free list entry after the recovery area - if any */
984                 data_len = tdb->map_size - (recovery_head+recovery_size);
985                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
986                         goto failed;
987                 }
988         }
989
990         tdb_increment_seqnum_nonblock(tdb);
991
992         if (tdb_unlockall(tdb) != 0) {
993                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
994                 goto failed;
995         }
996
997         return 0;
998
999 failed:
1000         tdb_unlockall(tdb);
1001         return -1;
1002 }
1003
1004 struct traverse_state {
1005         bool error;
1006         struct tdb_context *dest_db;
1007 };
1008
1009 /*
1010   traverse function for repacking
1011  */
1012 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
1013 {
1014         struct traverse_state *state = (struct traverse_state *)private_data;
1015         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
1016                 state->error = true;
1017                 return -1;
1018         }
1019         return 0;
1020 }
1021
1022 /*
1023   repack a tdb
1024  */
1025 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
1026 {
1027         struct tdb_context *tmp_db;
1028         struct traverse_state state;
1029
1030         tdb_trace(tdb, "tdb_repack");
1031
1032         if (tdb_transaction_start(tdb) != 0) {
1033                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
1034                 return -1;
1035         }
1036
1037         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
1038         if (tmp_db == NULL) {
1039                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
1040                 tdb_transaction_cancel(tdb);
1041                 return -1;
1042         }
1043
1044         state.error = false;
1045         state.dest_db = tmp_db;
1046
1047         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
1048                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
1049                 tdb_transaction_cancel(tdb);
1050                 tdb_close(tmp_db);
1051                 return -1;
1052         }
1053
1054         if (state.error) {
1055                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
1056                 tdb_transaction_cancel(tdb);
1057                 tdb_close(tmp_db);
1058                 return -1;
1059         }
1060
1061         if (tdb_wipe_all(tdb) != 0) {
1062                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
1063                 tdb_transaction_cancel(tdb);
1064                 tdb_close(tmp_db);
1065                 return -1;
1066         }
1067
1068         state.error = false;
1069         state.dest_db = tdb;
1070
1071         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
1072                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
1073                 tdb_transaction_cancel(tdb);
1074                 tdb_close(tmp_db);
1075                 return -1;
1076         }
1077
1078         if (state.error) {
1079                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
1080                 tdb_transaction_cancel(tdb);
1081                 tdb_close(tmp_db);
1082                 return -1;
1083         }
1084
1085         tdb_close(tmp_db);
1086
1087         if (tdb_transaction_commit(tdb) != 0) {
1088                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1089                 return -1;
1090         }
1091
1092         return 0;
1093 }
1094
1095 /* Even on files, we can get partial writes due to signals. */
1096 bool tdb_write_all(int fd, const void *buf, size_t count)
1097 {
1098         while (count) {
1099                 ssize_t ret;
1100                 ret = write(fd, buf, count);
1101                 if (ret < 0)
1102                         return false;
1103                 buf = (const char *)buf + ret;
1104                 count -= ret;
1105         }
1106         return true;
1107 }
1108
1109 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1110 {
1111         tdb_off_t ret = a + b;
1112
1113         if ((ret < a) || (ret < b)) {
1114                 return false;
1115         }
1116         *pret = ret;
1117         return true;
1118 }
1119
1120 #ifdef TDB_TRACE
1121 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1122 {
1123         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1124                 close(tdb->tracefd);
1125                 tdb->tracefd = -1;
1126         }
1127 }
1128
1129 static void tdb_trace_start(struct tdb_context *tdb)
1130 {
1131         tdb_off_t seqnum=0;
1132         char msg[sizeof(tdb_off_t) * 4 + 1];
1133
1134         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1135         snprintf(msg, sizeof(msg), "%u ", seqnum);
1136         tdb_trace_write(tdb, msg);
1137 }
1138
1139 static void tdb_trace_end(struct tdb_context *tdb)
1140 {
1141         tdb_trace_write(tdb, "\n");
1142 }
1143
1144 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1145 {
1146         char msg[sizeof(ret) * 4 + 4];
1147         snprintf(msg, sizeof(msg), " = %i\n", ret);
1148         tdb_trace_write(tdb, msg);
1149 }
1150
1151 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1152 {
1153         char msg[20 + rec.dsize*2], *p;
1154         unsigned int i;
1155
1156         /* We differentiate zero-length records from non-existent ones. */
1157         if (rec.dptr == NULL) {
1158                 tdb_trace_write(tdb, " NULL");
1159                 return;
1160         }
1161
1162         /* snprintf here is purely cargo-cult programming. */
1163         p = msg;
1164         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1165         for (i = 0; i < rec.dsize; i++)
1166                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1167
1168         tdb_trace_write(tdb, msg);
1169 }
1170
1171 void tdb_trace(struct tdb_context *tdb, const char *op)
1172 {
1173         tdb_trace_start(tdb);
1174         tdb_trace_write(tdb, op);
1175         tdb_trace_end(tdb);
1176 }
1177
1178 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1179 {
1180         char msg[sizeof(tdb_off_t) * 4 + 1];
1181
1182         snprintf(msg, sizeof(msg), "%u ", seqnum);
1183         tdb_trace_write(tdb, msg);
1184         tdb_trace_write(tdb, op);
1185         tdb_trace_end(tdb);
1186 }
1187
1188 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1189                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1190 {
1191         char msg[128];
1192
1193         snprintf(msg, sizeof(msg),
1194                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1195         tdb_trace_start(tdb);
1196         tdb_trace_write(tdb, msg);
1197         tdb_trace_end(tdb);
1198 }
1199
1200 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1201 {
1202         tdb_trace_start(tdb);
1203         tdb_trace_write(tdb, op);
1204         tdb_trace_end_ret(tdb, ret);
1205 }
1206
1207 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1208 {
1209         tdb_trace_start(tdb);
1210         tdb_trace_write(tdb, op);
1211         tdb_trace_write(tdb, " =");
1212         tdb_trace_record(tdb, ret);
1213         tdb_trace_end(tdb);
1214 }
1215
1216 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1217                     TDB_DATA rec)
1218 {
1219         tdb_trace_start(tdb);
1220         tdb_trace_write(tdb, op);
1221         tdb_trace_record(tdb, rec);
1222         tdb_trace_end(tdb);
1223 }
1224
1225 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1226                         TDB_DATA rec, int ret)
1227 {
1228         tdb_trace_start(tdb);
1229         tdb_trace_write(tdb, op);
1230         tdb_trace_record(tdb, rec);
1231         tdb_trace_end_ret(tdb, ret);
1232 }
1233
1234 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1235                            TDB_DATA rec, TDB_DATA ret)
1236 {
1237         tdb_trace_start(tdb);
1238         tdb_trace_write(tdb, op);
1239         tdb_trace_record(tdb, rec);
1240         tdb_trace_write(tdb, " =");
1241         tdb_trace_record(tdb, ret);
1242         tdb_trace_end(tdb);
1243 }
1244
1245 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1246                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1247                              int ret)
1248 {
1249         char msg[1 + sizeof(ret) * 4];
1250
1251         snprintf(msg, sizeof(msg), " %#x", flag);
1252         tdb_trace_start(tdb);
1253         tdb_trace_write(tdb, op);
1254         tdb_trace_record(tdb, rec1);
1255         tdb_trace_record(tdb, rec2);
1256         tdb_trace_write(tdb, msg);
1257         tdb_trace_end_ret(tdb, ret);
1258 }
1259
1260 void tdb_trace_1plusn_rec_flag_ret(struct tdb_context *tdb, const char *op,
1261                                    TDB_DATA rec,
1262                                    const TDB_DATA *recs, int num_recs,
1263                                    unsigned flag, int ret)
1264 {
1265         char msg[1 + sizeof(ret) * 4];
1266         int i;
1267
1268         snprintf(msg, sizeof(msg), " %#x", flag);
1269         tdb_trace_start(tdb);
1270         tdb_trace_write(tdb, op);
1271         tdb_trace_record(tdb, rec);
1272         for (i=0; i<num_recs; i++) {
1273                 tdb_trace_record(tdb, recs[i]);
1274         }
1275         tdb_trace_write(tdb, msg);
1276         tdb_trace_end_ret(tdb, ret);
1277 }
1278
1279 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1280                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1281 {
1282         tdb_trace_start(tdb);
1283         tdb_trace_write(tdb, op);
1284         tdb_trace_record(tdb, rec1);
1285         tdb_trace_record(tdb, rec2);
1286         tdb_trace_write(tdb, " =");
1287         tdb_trace_record(tdb, ret);
1288         tdb_trace_end(tdb);
1289 }
1290 #endif