4e433c89e1e36cffcc05b28e5622bb5ae50f6987
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 void tdb_chainwalk_init(struct tdb_chainwalk_ctx *ctx, tdb_off_t ptr)
83 {
84         *ctx = (struct tdb_chainwalk_ctx) { .slow_ptr = ptr };
85 }
86
87 bool tdb_chainwalk_check(struct tdb_context *tdb,
88                          struct tdb_chainwalk_ctx *ctx,
89                          tdb_off_t next_ptr)
90 {
91         int ret;
92
93         if (ctx->slow_chase) {
94                 ret = tdb_ofs_read(tdb, ctx->slow_ptr, &ctx->slow_ptr);
95                 if (ret == -1) {
96                         return false;
97                 }
98         }
99         ctx->slow_chase = !ctx->slow_chase;
100
101         if (next_ptr == ctx->slow_ptr) {
102                 tdb->ecode = TDB_ERR_CORRUPT;
103                 TDB_LOG((tdb, TDB_DEBUG_ERROR,
104                          "tdb_chainwalk_check: circular chain\n"));
105                 return false;
106         }
107
108         return true;
109 }
110
111 /* Returns 0 on fail.  On success, return offset of record, and fills
112    in rec */
113 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
114                         struct tdb_record *r)
115 {
116         tdb_off_t rec_ptr;
117         struct tdb_chainwalk_ctx chainwalk;
118
119         /* read in the hash top */
120         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
121                 return 0;
122
123         tdb_chainwalk_init(&chainwalk, rec_ptr);
124
125         /* keep looking until we find the right record */
126         while (rec_ptr) {
127                 bool ok;
128
129                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
130                         return 0;
131
132                 if (!TDB_DEAD(r) && hash==r->full_hash
133                     && key.dsize==r->key_len
134                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
135                                       r->key_len, tdb_key_compare,
136                                       NULL) == 0) {
137                         return rec_ptr;
138                 }
139                 rec_ptr = r->next;
140
141                 ok = tdb_chainwalk_check(tdb, &chainwalk, rec_ptr);
142                 if (!ok) {
143                         return 0;
144                 }
145         }
146         tdb->ecode = TDB_ERR_NOEXIST;
147         return 0;
148 }
149
150 /* As tdb_find, but if you succeed, keep the lock */
151 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
152                            struct tdb_record *rec)
153 {
154         uint32_t rec_ptr;
155
156         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
157                 return 0;
158         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
159                 tdb_unlock(tdb, BUCKET(hash), locktype);
160         return rec_ptr;
161 }
162
163 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
164
165 struct tdb_update_hash_state {
166         const TDB_DATA *dbufs;
167         int num_dbufs;
168         tdb_len_t dbufs_len;
169 };
170
171 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
172 {
173         struct tdb_update_hash_state *state = private_data;
174         unsigned char *dptr = data.dptr;
175         int i;
176
177         if (state->dbufs_len != data.dsize) {
178                 return -1;
179         }
180
181         for (i=0; i<state->num_dbufs; i++) {
182                 TDB_DATA dbuf = state->dbufs[i];
183                 int ret;
184                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
185                 if (ret != 0) {
186                         return -1;
187                 }
188                 dptr += dbuf.dsize;
189         }
190
191         return 0;
192 }
193
194 /* update an entry in place - this only works if the new data size
195    is <= the old data size and the key exists.
196    on failure return -1.
197 */
198 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
199                            uint32_t hash,
200                            const TDB_DATA *dbufs, int num_dbufs,
201                            tdb_len_t dbufs_len)
202 {
203         struct tdb_record rec;
204         tdb_off_t rec_ptr, ofs;
205         int i;
206
207         /* find entry */
208         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
209                 return -1;
210
211         /* it could be an exact duplicate of what is there - this is
212          * surprisingly common (eg. with a ldb re-index). */
213         if (rec.data_len == dbufs_len) {
214                 struct tdb_update_hash_state state = {
215                         .dbufs = dbufs, .num_dbufs = num_dbufs,
216                         .dbufs_len = dbufs_len
217                 };
218                 int ret;
219
220                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
221                 if (ret == 0) {
222                         return 0;
223                 }
224         }
225
226         /* must be long enough key, data and tailer */
227         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
228                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
229                 return -1;
230         }
231
232         ofs = rec_ptr + sizeof(rec) + rec.key_len;
233
234         for (i=0; i<num_dbufs; i++) {
235                 TDB_DATA dbuf = dbufs[i];
236                 int ret;
237
238                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
239                 if (ret == -1) {
240                         return -1;
241                 }
242                 ofs += dbuf.dsize;
243         }
244
245         if (dbufs_len != rec.data_len) {
246                 /* update size */
247                 rec.data_len = dbufs_len;
248                 return tdb_rec_write(tdb, rec_ptr, &rec);
249         }
250
251         return 0;
252 }
253
254 /* find an entry in the database given a key */
255 /* If an entry doesn't exist tdb_err will be set to
256  * TDB_ERR_NOEXIST. If a key has no data attached
257  * then the TDB_DATA will have zero length but
258  * a non-zero pointer
259  */
260 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
261 {
262         tdb_off_t rec_ptr;
263         struct tdb_record rec;
264         TDB_DATA ret;
265         uint32_t hash;
266
267         /* find which hash bucket it is in */
268         hash = tdb->hash_fn(&key);
269         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
270                 return tdb_null;
271
272         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
273                                   rec.data_len);
274         ret.dsize = rec.data_len;
275         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
276         return ret;
277 }
278
279 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
280 {
281         TDB_DATA ret = _tdb_fetch(tdb, key);
282
283         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
284         return ret;
285 }
286
287 /*
288  * Find an entry in the database and hand the record's data to a parsing
289  * function. The parsing function is executed under the chain read lock, so it
290  * should be fast and should not block on other syscalls.
291  *
292  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
293  *
294  * For mmapped tdb's that do not have a transaction open it points the parsing
295  * function directly at the mmap area, it avoids the malloc/memcpy in this
296  * case. If a transaction is open or no mmap is available, it has to do
297  * malloc/read/parse/free.
298  *
299  * This is interesting for all readers of potentially large data structures in
300  * the tdb records, ldb indexes being one example.
301  *
302  * Return -1 if the record was not found.
303  */
304
305 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
306                      int (*parser)(TDB_DATA key, TDB_DATA data,
307                                    void *private_data),
308                      void *private_data)
309 {
310         tdb_off_t rec_ptr;
311         struct tdb_record rec;
312         int ret;
313         uint32_t hash;
314
315         /* find which hash bucket it is in */
316         hash = tdb->hash_fn(&key);
317
318         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
319                 /* record not found */
320                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
321                 tdb->ecode = TDB_ERR_NOEXIST;
322                 return -1;
323         }
324         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
325
326         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
327                              rec.data_len, parser, private_data);
328
329         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
330
331         return ret;
332 }
333
334 /* check if an entry in the database exists
335
336    note that 1 is returned if the key is found and 0 is returned if not found
337    this doesn't match the conventions in the rest of this module, but is
338    compatible with gdbm
339 */
340 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
341 {
342         struct tdb_record rec;
343
344         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
345                 return 0;
346         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
347         return 1;
348 }
349
350 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
351 {
352         uint32_t hash = tdb->hash_fn(&key);
353         int ret;
354
355         ret = tdb_exists_hash(tdb, key, hash);
356         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
357         return ret;
358 }
359
360 /* actually delete an entry in the database given the offset */
361 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
362 {
363         tdb_off_t last_ptr, i;
364         struct tdb_record lastrec;
365
366         if (tdb->read_only || tdb->traverse_read) return -1;
367
368         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
369             tdb_write_lock_record(tdb, rec_ptr) == -1) {
370                 /* Someone traversing here: mark it as dead */
371                 rec->magic = TDB_DEAD_MAGIC;
372                 return tdb_rec_write(tdb, rec_ptr, rec);
373         }
374         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
375                 return -1;
376
377         /* find previous record in hash chain */
378         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
379                 return -1;
380         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
381                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
382                         return -1;
383
384         /* unlink it: next ptr is at start of record. */
385         if (last_ptr == 0)
386                 last_ptr = TDB_HASH_TOP(rec->full_hash);
387         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
388                 return -1;
389
390         /* recover the space */
391         if (tdb_free(tdb, rec_ptr, rec) == -1)
392                 return -1;
393         return 0;
394 }
395
396 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
397 {
398         int res = 0;
399         tdb_off_t rec_ptr;
400         struct tdb_record rec;
401
402         /* read in the hash top */
403         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
404                 return 0;
405
406         while (rec_ptr) {
407                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
408                         return 0;
409
410                 if (rec.magic == TDB_DEAD_MAGIC) {
411                         res += 1;
412                 }
413                 rec_ptr = rec.next;
414         }
415         return res;
416 }
417
418 /*
419  * Purge all DEAD records from a hash chain
420  */
421 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
422 {
423         int res = -1;
424         struct tdb_record rec;
425         tdb_off_t rec_ptr;
426
427         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
428                 /*
429                  * Don't block the freelist if not strictly necessary
430                  */
431                 return -1;
432         }
433
434         /* read in the hash top */
435         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
436                 goto fail;
437
438         while (rec_ptr) {
439                 tdb_off_t next;
440
441                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
442                         goto fail;
443                 }
444
445                 next = rec.next;
446
447                 if (rec.magic == TDB_DEAD_MAGIC
448                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
449                         goto fail;
450                 }
451                 rec_ptr = next;
452         }
453         res = 0;
454  fail:
455         tdb_unlock(tdb, -1, F_WRLCK);
456         return res;
457 }
458
459 /* delete an entry in the database given a key */
460 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
461 {
462         tdb_off_t rec_ptr;
463         struct tdb_record rec;
464         int ret;
465
466         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
467         if (rec_ptr == 0) {
468                 return -1;
469         }
470
471         if (tdb->max_dead_records != 0) {
472
473                 uint32_t magic = TDB_DEAD_MAGIC;
474
475                 /*
476                  * Allow for some dead records per hash chain, mainly for
477                  * tdb's with a very high create/delete rate like locking.tdb.
478                  */
479
480                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
481                         /*
482                          * Don't let the per-chain freelist grow too large,
483                          * delete all existing dead records
484                          */
485                         tdb_purge_dead(tdb, hash);
486                 }
487
488                 /*
489                  * Just mark the record as dead.
490                  */
491                 ret = tdb_ofs_write(
492                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
493                         &magic);
494         }
495         else {
496                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
497         }
498
499         if (ret == 0) {
500                 tdb_increment_seqnum(tdb);
501         }
502
503         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
504                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
505         return ret;
506 }
507
508 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
509 {
510         uint32_t hash = tdb->hash_fn(&key);
511         int ret;
512
513         ret = tdb_delete_hash(tdb, key, hash);
514         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
515         return ret;
516 }
517
518 /*
519  * See if we have a dead record around with enough space
520  */
521 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
522                         struct tdb_record *r, tdb_len_t length,
523                         tdb_off_t *p_last_ptr)
524 {
525         tdb_off_t rec_ptr, last_ptr;
526         struct tdb_chainwalk_ctx chainwalk;
527         tdb_off_t best_rec_ptr = 0;
528         tdb_off_t best_last_ptr = 0;
529         struct tdb_record best = { .rec_len = UINT32_MAX };
530
531         length += sizeof(tdb_off_t); /* tailer */
532
533         last_ptr = TDB_HASH_TOP(hash);
534
535         /* read in the hash top */
536         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
537                 return 0;
538
539         tdb_chainwalk_init(&chainwalk, rec_ptr);
540
541         /* keep looking until we find the right record */
542         while (rec_ptr) {
543                 bool ok;
544
545                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
546                         return 0;
547
548                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
549                     (r->rec_len < best.rec_len)) {
550                         best_rec_ptr = rec_ptr;
551                         best_last_ptr = last_ptr;
552                         best = *r;
553                 }
554                 last_ptr = rec_ptr;
555                 rec_ptr = r->next;
556
557                 ok = tdb_chainwalk_check(tdb, &chainwalk, rec_ptr);
558                 if (!ok) {
559                         return 0;
560                 }
561         }
562
563         if (best.rec_len == UINT32_MAX) {
564                 return 0;
565         }
566
567         *r = best;
568         *p_last_ptr = best_last_ptr;
569         return best_rec_ptr;
570 }
571
572 static int _tdb_storev(struct tdb_context *tdb, TDB_DATA key,
573                        const TDB_DATA *dbufs, int num_dbufs,
574                        int flag, uint32_t hash)
575 {
576         struct tdb_record rec;
577         tdb_off_t rec_ptr, ofs;
578         tdb_len_t rec_len, dbufs_len;
579         int i;
580         int ret = -1;
581
582         dbufs_len = 0;
583
584         for (i=0; i<num_dbufs; i++) {
585                 size_t dsize = dbufs[i].dsize;
586
587                 if ((dsize != 0) && (dbufs[i].dptr == NULL)) {
588                         tdb->ecode = TDB_ERR_EINVAL;
589                         goto fail;
590                 }
591
592                 dbufs_len += dsize;
593                 if (dbufs_len < dsize) {
594                         tdb->ecode = TDB_ERR_OOM;
595                         goto fail;
596                 }
597         }
598
599         rec_len = key.dsize + dbufs_len;
600         if ((rec_len < key.dsize) || (rec_len < dbufs_len)) {
601                 tdb->ecode = TDB_ERR_OOM;
602                 goto fail;
603         }
604
605         /* check for it existing, on insert. */
606         if (flag == TDB_INSERT) {
607                 if (tdb_exists_hash(tdb, key, hash)) {
608                         tdb->ecode = TDB_ERR_EXISTS;
609                         goto fail;
610                 }
611         } else {
612                 /* first try in-place update, on modify or replace. */
613                 if (tdb_update_hash(tdb, key, hash, dbufs, num_dbufs,
614                                     dbufs_len) == 0) {
615                         goto done;
616                 }
617                 if (tdb->ecode == TDB_ERR_NOEXIST &&
618                     flag == TDB_MODIFY) {
619                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
620                          we should fail the store */
621                         goto fail;
622                 }
623         }
624         /* reset the error code potentially set by the tdb_update_hash() */
625         tdb->ecode = TDB_SUCCESS;
626
627         /* delete any existing record - if it doesn't exist we don't
628            care.  Doing this first reduces fragmentation, and avoids
629            coalescing with `allocated' block before it's updated. */
630         if (flag != TDB_INSERT)
631                 tdb_delete_hash(tdb, key, hash);
632
633         /* we have to allocate some space */
634         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
635
636         if (rec_ptr == 0) {
637                 goto fail;
638         }
639
640         /* Read hash top into next ptr */
641         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
642                 goto fail;
643
644         rec.key_len = key.dsize;
645         rec.data_len = dbufs_len;
646         rec.full_hash = hash;
647         rec.magic = TDB_MAGIC;
648
649         ofs = rec_ptr;
650
651         /* write out and point the top of the hash chain at it */
652         ret = tdb_rec_write(tdb, ofs, &rec);
653         if (ret == -1) {
654                 goto fail;
655         }
656         ofs += sizeof(rec);
657
658         ret = tdb->methods->tdb_write(tdb, ofs, key.dptr, key.dsize);
659         if (ret == -1) {
660                 goto fail;
661         }
662         ofs += key.dsize;
663
664         for (i=0; i<num_dbufs; i++) {
665                 if (dbufs[i].dsize == 0) {
666                         continue;
667                 }
668
669                 ret = tdb->methods->tdb_write(tdb, ofs, dbufs[i].dptr,
670                                               dbufs[i].dsize);
671                 if (ret == -1) {
672                         goto fail;
673                 }
674                 ofs += dbufs[i].dsize;
675         }
676
677         ret = tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr);
678         if (ret == -1) {
679                 /* Need to tdb_unallocate() here */
680                 goto fail;
681         }
682
683  done:
684         ret = 0;
685  fail:
686         if (ret == 0) {
687                 tdb_increment_seqnum(tdb);
688         }
689         return ret;
690 }
691
692 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
693                       TDB_DATA dbuf, int flag, uint32_t hash)
694 {
695         return _tdb_storev(tdb, key, &dbuf, 1, flag, hash);
696 }
697
698 /* store an element in the database, replacing any existing element
699    with the same key
700
701    return 0 on success, -1 on failure
702 */
703 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
704 {
705         uint32_t hash;
706         int ret;
707
708         if (tdb->read_only || tdb->traverse_read) {
709                 tdb->ecode = TDB_ERR_RDONLY;
710                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
711                 return -1;
712         }
713
714         /* find which hash bucket it is in */
715         hash = tdb->hash_fn(&key);
716         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
717                 return -1;
718
719         ret = _tdb_store(tdb, key, dbuf, flag, hash);
720         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
721         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
722         return ret;
723 }
724
725 _PUBLIC_ int tdb_storev(struct tdb_context *tdb, TDB_DATA key,
726                         const TDB_DATA *dbufs, int num_dbufs, int flag)
727 {
728         uint32_t hash;
729         int ret;
730
731         if (tdb->read_only || tdb->traverse_read) {
732                 tdb->ecode = TDB_ERR_RDONLY;
733                 tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
734                                               dbufs, num_dbufs, flag, -1);
735                 return -1;
736         }
737
738         /* find which hash bucket it is in */
739         hash = tdb->hash_fn(&key);
740         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
741                 return -1;
742
743         ret = _tdb_storev(tdb, key, dbufs, num_dbufs, flag, hash);
744         tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
745                                       dbufs, num_dbufs, flag, -1);
746         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
747         return ret;
748 }
749
750 /* Append to an entry. Create if not exist. */
751 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
752 {
753         uint32_t hash;
754         TDB_DATA dbufs[2];
755         int ret = -1;
756
757         /* find which hash bucket it is in */
758         hash = tdb->hash_fn(&key);
759         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
760                 return -1;
761
762         dbufs[0] = _tdb_fetch(tdb, key);
763         dbufs[1] = new_dbuf;
764
765         ret = _tdb_storev(tdb, key, dbufs, 2, 0, hash);
766         tdb_trace_2rec_retrec(tdb, "tdb_append", key, dbufs[0], dbufs[1]);
767
768         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
769         SAFE_FREE(dbufs[0].dptr);
770         return ret;
771 }
772
773
774 /*
775   return the name of the current tdb file
776   useful for external logging functions
777 */
778 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
779 {
780         return tdb->name;
781 }
782
783 /*
784   return the underlying file descriptor being used by tdb, or -1
785   useful for external routines that want to check the device/inode
786   of the fd
787 */
788 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
789 {
790         return tdb->fd;
791 }
792
793 /*
794   return the current logging function
795   useful for external tdb routines that wish to log tdb errors
796 */
797 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
798 {
799         return tdb->log.log_fn;
800 }
801
802
803 /*
804   get the tdb sequence number. Only makes sense if the writers opened
805   with TDB_SEQNUM set. Note that this sequence number will wrap quite
806   quickly, so it should only be used for a 'has something changed'
807   test, not for code that relies on the count of the number of changes
808   made. If you want a counter then use a tdb record.
809
810   The aim of this sequence number is to allow for a very lightweight
811   test of a possible tdb change.
812 */
813 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
814 {
815         tdb_off_t seqnum=0;
816
817         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
818         return seqnum;
819 }
820
821 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
822 {
823         return tdb->hash_size;
824 }
825
826 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
827 {
828         return tdb->map_size;
829 }
830
831 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
832 {
833         return tdb->flags;
834 }
835
836 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
837 {
838         if ((flags & TDB_ALLOW_NESTING) &&
839             (flags & TDB_DISALLOW_NESTING)) {
840                 tdb->ecode = TDB_ERR_NESTING;
841                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
842                         "allow_nesting and disallow_nesting are not allowed together!"));
843                 return;
844         }
845
846         if (flags & TDB_ALLOW_NESTING) {
847                 tdb->flags &= ~TDB_DISALLOW_NESTING;
848         }
849         if (flags & TDB_DISALLOW_NESTING) {
850                 tdb->flags &= ~TDB_ALLOW_NESTING;
851         }
852
853         tdb->flags |= flags;
854 }
855
856 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
857 {
858         if ((flags & TDB_ALLOW_NESTING) &&
859             (flags & TDB_DISALLOW_NESTING)) {
860                 tdb->ecode = TDB_ERR_NESTING;
861                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
862                         "allow_nesting and disallow_nesting are not allowed together!"));
863                 return;
864         }
865
866         if ((flags & TDB_NOLOCK) &&
867             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
868             (tdb->mutexes == NULL)) {
869                 tdb->ecode = TDB_ERR_LOCK;
870                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
871                          "Can not remove NOLOCK flag on mutexed databases"));
872                 return;
873         }
874
875         if (flags & TDB_ALLOW_NESTING) {
876                 tdb->flags |= TDB_DISALLOW_NESTING;
877         }
878         if (flags & TDB_DISALLOW_NESTING) {
879                 tdb->flags |= TDB_ALLOW_NESTING;
880         }
881
882         tdb->flags &= ~flags;
883 }
884
885
886 /*
887   enable sequence number handling on an open tdb
888 */
889 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
890 {
891         tdb->flags |= TDB_SEQNUM;
892 }
893
894
895 /*
896   add a region of the file to the freelist. Length is the size of the region in bytes,
897   which includes the free list header that needs to be added
898  */
899 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
900 {
901         struct tdb_record rec;
902         if (length <= sizeof(rec)) {
903                 /* the region is not worth adding */
904                 return 0;
905         }
906         if (length + offset > tdb->map_size) {
907                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
908                 return -1;
909         }
910         memset(&rec,'\0',sizeof(rec));
911         rec.rec_len = length - sizeof(rec);
912         if (tdb_free(tdb, offset, &rec) == -1) {
913                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
914                 return -1;
915         }
916         return 0;
917 }
918
919 /*
920   wipe the entire database, deleting all records. This can be done
921   very fast by using a allrecord lock. The entire data portion of the
922   file becomes a single entry in the freelist.
923
924   This code carefully steps around the recovery area, leaving it alone
925  */
926 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
927 {
928         uint32_t i;
929         tdb_off_t offset = 0;
930         ssize_t data_len;
931         tdb_off_t recovery_head;
932         tdb_len_t recovery_size = 0;
933
934         if (tdb_lockall(tdb) != 0) {
935                 return -1;
936         }
937
938         tdb_trace(tdb, "tdb_wipe_all");
939
940         /* see if the tdb has a recovery area, and remember its size
941            if so. We don't want to lose this as otherwise each
942            tdb_wipe_all() in a transaction will increase the size of
943            the tdb by the size of the recovery area */
944         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
945                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
946                 goto failed;
947         }
948
949         if (recovery_head != 0) {
950                 struct tdb_record rec;
951                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
952                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
953                         return -1;
954                 }
955                 recovery_size = rec.rec_len + sizeof(rec);
956         }
957
958         /* wipe the hashes */
959         for (i=0;i<tdb->hash_size;i++) {
960                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
961                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
962                         goto failed;
963                 }
964         }
965
966         /* wipe the freelist */
967         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
968                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
969                 goto failed;
970         }
971
972         /* add all the rest of the file to the freelist, possibly leaving a gap
973            for the recovery area */
974         if (recovery_size == 0) {
975                 /* the simple case - the whole file can be used as a freelist */
976                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
977                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
978                         goto failed;
979                 }
980         } else {
981                 /* we need to add two freelist entries - one on either
982                    side of the recovery area
983
984                    Note that we cannot shift the recovery area during
985                    this operation. Only the transaction.c code may
986                    move the recovery area or we risk subtle data
987                    corruption
988                 */
989                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
990                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
991                         goto failed;
992                 }
993                 /* and the 2nd free list entry after the recovery area - if any */
994                 data_len = tdb->map_size - (recovery_head+recovery_size);
995                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
996                         goto failed;
997                 }
998         }
999
1000         tdb_increment_seqnum_nonblock(tdb);
1001
1002         if (tdb_unlockall(tdb) != 0) {
1003                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
1004                 goto failed;
1005         }
1006
1007         return 0;
1008
1009 failed:
1010         tdb_unlockall(tdb);
1011         return -1;
1012 }
1013
1014 struct traverse_state {
1015         bool error;
1016         struct tdb_context *dest_db;
1017 };
1018
1019 /*
1020   traverse function for repacking
1021  */
1022 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
1023 {
1024         struct traverse_state *state = (struct traverse_state *)private_data;
1025         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
1026                 state->error = true;
1027                 return -1;
1028         }
1029         return 0;
1030 }
1031
1032 /*
1033   repack a tdb
1034  */
1035 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
1036 {
1037         struct tdb_context *tmp_db;
1038         struct traverse_state state;
1039
1040         tdb_trace(tdb, "tdb_repack");
1041
1042         if (tdb_transaction_start(tdb) != 0) {
1043                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
1044                 return -1;
1045         }
1046
1047         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
1048         if (tmp_db == NULL) {
1049                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
1050                 tdb_transaction_cancel(tdb);
1051                 return -1;
1052         }
1053
1054         state.error = false;
1055         state.dest_db = tmp_db;
1056
1057         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
1058                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
1059                 tdb_transaction_cancel(tdb);
1060                 tdb_close(tmp_db);
1061                 return -1;
1062         }
1063
1064         if (state.error) {
1065                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
1066                 tdb_transaction_cancel(tdb);
1067                 tdb_close(tmp_db);
1068                 return -1;
1069         }
1070
1071         if (tdb_wipe_all(tdb) != 0) {
1072                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
1073                 tdb_transaction_cancel(tdb);
1074                 tdb_close(tmp_db);
1075                 return -1;
1076         }
1077
1078         state.error = false;
1079         state.dest_db = tdb;
1080
1081         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
1082                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
1083                 tdb_transaction_cancel(tdb);
1084                 tdb_close(tmp_db);
1085                 return -1;
1086         }
1087
1088         if (state.error) {
1089                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
1090                 tdb_transaction_cancel(tdb);
1091                 tdb_close(tmp_db);
1092                 return -1;
1093         }
1094
1095         tdb_close(tmp_db);
1096
1097         if (tdb_transaction_commit(tdb) != 0) {
1098                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1099                 return -1;
1100         }
1101
1102         return 0;
1103 }
1104
1105 /* Even on files, we can get partial writes due to signals. */
1106 bool tdb_write_all(int fd, const void *buf, size_t count)
1107 {
1108         while (count) {
1109                 ssize_t ret;
1110                 ret = write(fd, buf, count);
1111                 if (ret < 0)
1112                         return false;
1113                 buf = (const char *)buf + ret;
1114                 count -= ret;
1115         }
1116         return true;
1117 }
1118
1119 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1120 {
1121         tdb_off_t ret = a + b;
1122
1123         if ((ret < a) || (ret < b)) {
1124                 return false;
1125         }
1126         *pret = ret;
1127         return true;
1128 }
1129
1130 #ifdef TDB_TRACE
1131 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1132 {
1133         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1134                 close(tdb->tracefd);
1135                 tdb->tracefd = -1;
1136         }
1137 }
1138
1139 static void tdb_trace_start(struct tdb_context *tdb)
1140 {
1141         tdb_off_t seqnum=0;
1142         char msg[sizeof(tdb_off_t) * 4 + 1];
1143
1144         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1145         snprintf(msg, sizeof(msg), "%u ", seqnum);
1146         tdb_trace_write(tdb, msg);
1147 }
1148
1149 static void tdb_trace_end(struct tdb_context *tdb)
1150 {
1151         tdb_trace_write(tdb, "\n");
1152 }
1153
1154 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1155 {
1156         char msg[sizeof(ret) * 4 + 4];
1157         snprintf(msg, sizeof(msg), " = %i\n", ret);
1158         tdb_trace_write(tdb, msg);
1159 }
1160
1161 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1162 {
1163         char msg[20 + rec.dsize*2], *p;
1164         unsigned int i;
1165
1166         /* We differentiate zero-length records from non-existent ones. */
1167         if (rec.dptr == NULL) {
1168                 tdb_trace_write(tdb, " NULL");
1169                 return;
1170         }
1171
1172         /* snprintf here is purely cargo-cult programming. */
1173         p = msg;
1174         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1175         for (i = 0; i < rec.dsize; i++)
1176                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1177
1178         tdb_trace_write(tdb, msg);
1179 }
1180
1181 void tdb_trace(struct tdb_context *tdb, const char *op)
1182 {
1183         tdb_trace_start(tdb);
1184         tdb_trace_write(tdb, op);
1185         tdb_trace_end(tdb);
1186 }
1187
1188 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1189 {
1190         char msg[sizeof(tdb_off_t) * 4 + 1];
1191
1192         snprintf(msg, sizeof(msg), "%u ", seqnum);
1193         tdb_trace_write(tdb, msg);
1194         tdb_trace_write(tdb, op);
1195         tdb_trace_end(tdb);
1196 }
1197
1198 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1199                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1200 {
1201         char msg[128];
1202
1203         snprintf(msg, sizeof(msg),
1204                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1205         tdb_trace_start(tdb);
1206         tdb_trace_write(tdb, msg);
1207         tdb_trace_end(tdb);
1208 }
1209
1210 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1211 {
1212         tdb_trace_start(tdb);
1213         tdb_trace_write(tdb, op);
1214         tdb_trace_end_ret(tdb, ret);
1215 }
1216
1217 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1218 {
1219         tdb_trace_start(tdb);
1220         tdb_trace_write(tdb, op);
1221         tdb_trace_write(tdb, " =");
1222         tdb_trace_record(tdb, ret);
1223         tdb_trace_end(tdb);
1224 }
1225
1226 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1227                     TDB_DATA rec)
1228 {
1229         tdb_trace_start(tdb);
1230         tdb_trace_write(tdb, op);
1231         tdb_trace_record(tdb, rec);
1232         tdb_trace_end(tdb);
1233 }
1234
1235 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1236                         TDB_DATA rec, int ret)
1237 {
1238         tdb_trace_start(tdb);
1239         tdb_trace_write(tdb, op);
1240         tdb_trace_record(tdb, rec);
1241         tdb_trace_end_ret(tdb, ret);
1242 }
1243
1244 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1245                            TDB_DATA rec, TDB_DATA ret)
1246 {
1247         tdb_trace_start(tdb);
1248         tdb_trace_write(tdb, op);
1249         tdb_trace_record(tdb, rec);
1250         tdb_trace_write(tdb, " =");
1251         tdb_trace_record(tdb, ret);
1252         tdb_trace_end(tdb);
1253 }
1254
1255 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1256                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1257                              int ret)
1258 {
1259         char msg[1 + sizeof(ret) * 4];
1260
1261         snprintf(msg, sizeof(msg), " %#x", flag);
1262         tdb_trace_start(tdb);
1263         tdb_trace_write(tdb, op);
1264         tdb_trace_record(tdb, rec1);
1265         tdb_trace_record(tdb, rec2);
1266         tdb_trace_write(tdb, msg);
1267         tdb_trace_end_ret(tdb, ret);
1268 }
1269
1270 void tdb_trace_1plusn_rec_flag_ret(struct tdb_context *tdb, const char *op,
1271                                    TDB_DATA rec,
1272                                    const TDB_DATA *recs, int num_recs,
1273                                    unsigned flag, int ret)
1274 {
1275         char msg[1 + sizeof(ret) * 4];
1276         int i;
1277
1278         snprintf(msg, sizeof(msg), " %#x", flag);
1279         tdb_trace_start(tdb);
1280         tdb_trace_write(tdb, op);
1281         tdb_trace_record(tdb, rec);
1282         for (i=0; i<num_recs; i++) {
1283                 tdb_trace_record(tdb, recs[i]);
1284         }
1285         tdb_trace_write(tdb, msg);
1286         tdb_trace_end_ret(tdb, ret);
1287 }
1288
1289 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1290                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1291 {
1292         tdb_trace_start(tdb);
1293         tdb_trace_write(tdb, op);
1294         tdb_trace_record(tdb, rec1);
1295         tdb_trace_record(tdb, rec2);
1296         tdb_trace_write(tdb, " =");
1297         tdb_trace_record(tdb, ret);
1298         tdb_trace_end(tdb);
1299 }
1300 #endif