tdb: Add tdb_storev
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 /* Returns 0 on fail.  On success, return offset of record, and fills
83    in rec */
84 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
85                         struct tdb_record *r)
86 {
87         tdb_off_t rec_ptr;
88
89         /* read in the hash top */
90         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
91                 return 0;
92
93         /* keep looking until we find the right record */
94         while (rec_ptr) {
95                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
96                         return 0;
97
98                 if (!TDB_DEAD(r) && hash==r->full_hash
99                     && key.dsize==r->key_len
100                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
101                                       r->key_len, tdb_key_compare,
102                                       NULL) == 0) {
103                         return rec_ptr;
104                 }
105                 /* detect tight infinite loop */
106                 if (rec_ptr == r->next) {
107                         tdb->ecode = TDB_ERR_CORRUPT;
108                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
109                         return 0;
110                 }
111                 rec_ptr = r->next;
112         }
113         tdb->ecode = TDB_ERR_NOEXIST;
114         return 0;
115 }
116
117 /* As tdb_find, but if you succeed, keep the lock */
118 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
119                            struct tdb_record *rec)
120 {
121         uint32_t rec_ptr;
122
123         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
124                 return 0;
125         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
126                 tdb_unlock(tdb, BUCKET(hash), locktype);
127         return rec_ptr;
128 }
129
130 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
131
132 struct tdb_update_hash_state {
133         const TDB_DATA *dbufs;
134         int num_dbufs;
135         tdb_len_t dbufs_len;
136 };
137
138 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
139 {
140         struct tdb_update_hash_state *state = private_data;
141         unsigned char *dptr = data.dptr;
142         int i;
143
144         if (state->dbufs_len != data.dsize) {
145                 return -1;
146         }
147
148         for (i=0; i<state->num_dbufs; i++) {
149                 TDB_DATA dbuf = state->dbufs[i];
150                 int ret;
151                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
152                 if (ret != 0) {
153                         return -1;
154                 }
155                 dptr += dbuf.dsize;
156         }
157
158         return 0;
159 }
160
161 /* update an entry in place - this only works if the new data size
162    is <= the old data size and the key exists.
163    on failure return -1.
164 */
165 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
166                            uint32_t hash,
167                            const TDB_DATA *dbufs, int num_dbufs,
168                            tdb_len_t dbufs_len)
169 {
170         struct tdb_record rec;
171         tdb_off_t rec_ptr, ofs;
172         int i;
173
174         /* find entry */
175         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
176                 return -1;
177
178         /* it could be an exact duplicate of what is there - this is
179          * surprisingly common (eg. with a ldb re-index). */
180         if (rec.data_len == dbufs_len) {
181                 struct tdb_update_hash_state state = {
182                         .dbufs = dbufs, .num_dbufs = num_dbufs,
183                         .dbufs_len = dbufs_len
184                 };
185                 int ret;
186
187                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
188                 if (ret == 0) {
189                         return 0;
190                 }
191         }
192
193         /* must be long enough key, data and tailer */
194         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
195                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
196                 return -1;
197         }
198
199         ofs = rec_ptr + sizeof(rec) + rec.key_len;
200
201         for (i=0; i<num_dbufs; i++) {
202                 TDB_DATA dbuf = dbufs[i];
203                 int ret;
204
205                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
206                 if (ret == -1) {
207                         return -1;
208                 }
209                 ofs += dbuf.dsize;
210         }
211
212         if (dbufs_len != rec.data_len) {
213                 /* update size */
214                 rec.data_len = dbufs_len;
215                 return tdb_rec_write(tdb, rec_ptr, &rec);
216         }
217
218         return 0;
219 }
220
221 /* find an entry in the database given a key */
222 /* If an entry doesn't exist tdb_err will be set to
223  * TDB_ERR_NOEXIST. If a key has no data attached
224  * then the TDB_DATA will have zero length but
225  * a non-zero pointer
226  */
227 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
228 {
229         tdb_off_t rec_ptr;
230         struct tdb_record rec;
231         TDB_DATA ret;
232         uint32_t hash;
233
234         /* find which hash bucket it is in */
235         hash = tdb->hash_fn(&key);
236         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
237                 return tdb_null;
238
239         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
240                                   rec.data_len);
241         ret.dsize = rec.data_len;
242         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
243         return ret;
244 }
245
246 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
247 {
248         TDB_DATA ret = _tdb_fetch(tdb, key);
249
250         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
251         return ret;
252 }
253
254 /*
255  * Find an entry in the database and hand the record's data to a parsing
256  * function. The parsing function is executed under the chain read lock, so it
257  * should be fast and should not block on other syscalls.
258  *
259  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
260  *
261  * For mmapped tdb's that do not have a transaction open it points the parsing
262  * function directly at the mmap area, it avoids the malloc/memcpy in this
263  * case. If a transaction is open or no mmap is available, it has to do
264  * malloc/read/parse/free.
265  *
266  * This is interesting for all readers of potentially large data structures in
267  * the tdb records, ldb indexes being one example.
268  *
269  * Return -1 if the record was not found.
270  */
271
272 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
273                      int (*parser)(TDB_DATA key, TDB_DATA data,
274                                    void *private_data),
275                      void *private_data)
276 {
277         tdb_off_t rec_ptr;
278         struct tdb_record rec;
279         int ret;
280         uint32_t hash;
281
282         /* find which hash bucket it is in */
283         hash = tdb->hash_fn(&key);
284
285         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
286                 /* record not found */
287                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
288                 tdb->ecode = TDB_ERR_NOEXIST;
289                 return -1;
290         }
291         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
292
293         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
294                              rec.data_len, parser, private_data);
295
296         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
297
298         return ret;
299 }
300
301 /* check if an entry in the database exists
302
303    note that 1 is returned if the key is found and 0 is returned if not found
304    this doesn't match the conventions in the rest of this module, but is
305    compatible with gdbm
306 */
307 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
308 {
309         struct tdb_record rec;
310
311         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
312                 return 0;
313         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
314         return 1;
315 }
316
317 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
318 {
319         uint32_t hash = tdb->hash_fn(&key);
320         int ret;
321
322         ret = tdb_exists_hash(tdb, key, hash);
323         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
324         return ret;
325 }
326
327 /* actually delete an entry in the database given the offset */
328 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
329 {
330         tdb_off_t last_ptr, i;
331         struct tdb_record lastrec;
332
333         if (tdb->read_only || tdb->traverse_read) return -1;
334
335         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
336             tdb_write_lock_record(tdb, rec_ptr) == -1) {
337                 /* Someone traversing here: mark it as dead */
338                 rec->magic = TDB_DEAD_MAGIC;
339                 return tdb_rec_write(tdb, rec_ptr, rec);
340         }
341         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
342                 return -1;
343
344         /* find previous record in hash chain */
345         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
346                 return -1;
347         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
348                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
349                         return -1;
350
351         /* unlink it: next ptr is at start of record. */
352         if (last_ptr == 0)
353                 last_ptr = TDB_HASH_TOP(rec->full_hash);
354         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
355                 return -1;
356
357         /* recover the space */
358         if (tdb_free(tdb, rec_ptr, rec) == -1)
359                 return -1;
360         return 0;
361 }
362
363 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
364 {
365         int res = 0;
366         tdb_off_t rec_ptr;
367         struct tdb_record rec;
368
369         /* read in the hash top */
370         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
371                 return 0;
372
373         while (rec_ptr) {
374                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
375                         return 0;
376
377                 if (rec.magic == TDB_DEAD_MAGIC) {
378                         res += 1;
379                 }
380                 rec_ptr = rec.next;
381         }
382         return res;
383 }
384
385 /*
386  * Purge all DEAD records from a hash chain
387  */
388 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
389 {
390         int res = -1;
391         struct tdb_record rec;
392         tdb_off_t rec_ptr;
393
394         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
395                 /*
396                  * Don't block the freelist if not strictly necessary
397                  */
398                 return -1;
399         }
400
401         /* read in the hash top */
402         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
403                 goto fail;
404
405         while (rec_ptr) {
406                 tdb_off_t next;
407
408                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
409                         goto fail;
410                 }
411
412                 next = rec.next;
413
414                 if (rec.magic == TDB_DEAD_MAGIC
415                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
416                         goto fail;
417                 }
418                 rec_ptr = next;
419         }
420         res = 0;
421  fail:
422         tdb_unlock(tdb, -1, F_WRLCK);
423         return res;
424 }
425
426 /* delete an entry in the database given a key */
427 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
428 {
429         tdb_off_t rec_ptr;
430         struct tdb_record rec;
431         int ret;
432
433         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
434         if (rec_ptr == 0) {
435                 return -1;
436         }
437
438         if (tdb->max_dead_records != 0) {
439
440                 uint32_t magic = TDB_DEAD_MAGIC;
441
442                 /*
443                  * Allow for some dead records per hash chain, mainly for
444                  * tdb's with a very high create/delete rate like locking.tdb.
445                  */
446
447                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
448                         /*
449                          * Don't let the per-chain freelist grow too large,
450                          * delete all existing dead records
451                          */
452                         tdb_purge_dead(tdb, hash);
453                 }
454
455                 /*
456                  * Just mark the record as dead.
457                  */
458                 ret = tdb_ofs_write(
459                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
460                         &magic);
461         }
462         else {
463                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
464         }
465
466         if (ret == 0) {
467                 tdb_increment_seqnum(tdb);
468         }
469
470         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
471                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
472         return ret;
473 }
474
475 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
476 {
477         uint32_t hash = tdb->hash_fn(&key);
478         int ret;
479
480         ret = tdb_delete_hash(tdb, key, hash);
481         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
482         return ret;
483 }
484
485 /*
486  * See if we have a dead record around with enough space
487  */
488 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
489                         struct tdb_record *r, tdb_len_t length,
490                         tdb_off_t *p_last_ptr)
491 {
492         tdb_off_t rec_ptr, last_ptr;
493         tdb_off_t best_rec_ptr = 0;
494         tdb_off_t best_last_ptr = 0;
495         struct tdb_record best = { .rec_len = UINT32_MAX };
496
497         length += sizeof(tdb_off_t); /* tailer */
498
499         last_ptr = TDB_HASH_TOP(hash);
500
501         /* read in the hash top */
502         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
503                 return 0;
504
505         /* keep looking until we find the right record */
506         while (rec_ptr) {
507                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
508                         return 0;
509
510                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
511                     (r->rec_len < best.rec_len)) {
512                         best_rec_ptr = rec_ptr;
513                         best_last_ptr = last_ptr;
514                         best = *r;
515                 }
516                 last_ptr = rec_ptr;
517                 rec_ptr = r->next;
518         }
519
520         if (best.rec_len == UINT32_MAX) {
521                 return 0;
522         }
523
524         *r = best;
525         *p_last_ptr = best_last_ptr;
526         return best_rec_ptr;
527 }
528
529 static int _tdb_storev(struct tdb_context *tdb, TDB_DATA key,
530                        const TDB_DATA *dbufs, int num_dbufs,
531                        int flag, uint32_t hash)
532 {
533         struct tdb_record rec;
534         tdb_off_t rec_ptr, ofs;
535         tdb_len_t rec_len, dbufs_len;
536         int i;
537         int ret = -1;
538
539         dbufs_len = 0;
540
541         for (i=0; i<num_dbufs; i++) {
542                 size_t dsize = dbufs[i].dsize;
543
544                 dbufs_len += dsize;
545                 if (dbufs_len < dsize) {
546                         tdb->ecode = TDB_ERR_OOM;
547                         goto fail;
548                 }
549         }
550
551         rec_len = key.dsize + dbufs_len;
552         if ((rec_len < key.dsize) || (rec_len < dbufs_len)) {
553                 tdb->ecode = TDB_ERR_OOM;
554                 goto fail;
555         }
556
557         /* check for it existing, on insert. */
558         if (flag == TDB_INSERT) {
559                 if (tdb_exists_hash(tdb, key, hash)) {
560                         tdb->ecode = TDB_ERR_EXISTS;
561                         goto fail;
562                 }
563         } else {
564                 /* first try in-place update, on modify or replace. */
565                 if (tdb_update_hash(tdb, key, hash, dbufs, num_dbufs,
566                                     dbufs_len) == 0) {
567                         goto done;
568                 }
569                 if (tdb->ecode == TDB_ERR_NOEXIST &&
570                     flag == TDB_MODIFY) {
571                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
572                          we should fail the store */
573                         goto fail;
574                 }
575         }
576         /* reset the error code potentially set by the tdb_update_hash() */
577         tdb->ecode = TDB_SUCCESS;
578
579         /* delete any existing record - if it doesn't exist we don't
580            care.  Doing this first reduces fragmentation, and avoids
581            coalescing with `allocated' block before it's updated. */
582         if (flag != TDB_INSERT)
583                 tdb_delete_hash(tdb, key, hash);
584
585         /* we have to allocate some space */
586         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
587
588         if (rec_ptr == 0) {
589                 goto fail;
590         }
591
592         /* Read hash top into next ptr */
593         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
594                 goto fail;
595
596         rec.key_len = key.dsize;
597         rec.data_len = dbufs_len;
598         rec.full_hash = hash;
599         rec.magic = TDB_MAGIC;
600
601         ofs = rec_ptr;
602
603         /* write out and point the top of the hash chain at it */
604         ret = tdb_rec_write(tdb, ofs, &rec);
605         if (ret == -1) {
606                 goto fail;
607         }
608         ofs += sizeof(rec);
609
610         ret = tdb->methods->tdb_write(tdb, ofs, key.dptr, key.dsize);
611         if (ret == -1) {
612                 goto fail;
613         }
614         ofs += key.dsize;
615
616         for (i=0; i<num_dbufs; i++) {
617                 ret = tdb->methods->tdb_write(tdb, ofs, dbufs[i].dptr,
618                                               dbufs[i].dsize);
619                 if (ret == -1) {
620                         goto fail;
621                 }
622                 ofs += dbufs[i].dsize;
623         }
624
625         ret = tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr);
626         if (ret == -1) {
627                 /* Need to tdb_unallocate() here */
628                 goto fail;
629         }
630
631  done:
632         ret = 0;
633  fail:
634         if (ret == 0) {
635                 tdb_increment_seqnum(tdb);
636         }
637         return ret;
638 }
639
640 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
641                       TDB_DATA dbuf, int flag, uint32_t hash)
642 {
643         return _tdb_storev(tdb, key, &dbuf, 1, flag, hash);
644 }
645
646 /* store an element in the database, replacing any existing element
647    with the same key
648
649    return 0 on success, -1 on failure
650 */
651 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
652 {
653         uint32_t hash;
654         int ret;
655
656         if (tdb->read_only || tdb->traverse_read) {
657                 tdb->ecode = TDB_ERR_RDONLY;
658                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
659                 return -1;
660         }
661
662         /* find which hash bucket it is in */
663         hash = tdb->hash_fn(&key);
664         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
665                 return -1;
666
667         ret = _tdb_store(tdb, key, dbuf, flag, hash);
668         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
669         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
670         return ret;
671 }
672
673 _PUBLIC_ int tdb_storev(struct tdb_context *tdb, TDB_DATA key,
674                         const TDB_DATA *dbufs, int num_dbufs, int flag)
675 {
676         uint32_t hash;
677         int ret;
678
679         if (tdb->read_only || tdb->traverse_read) {
680                 tdb->ecode = TDB_ERR_RDONLY;
681                 tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
682                                               dbufs, num_dbufs, flag, -1);
683                 return -1;
684         }
685
686         /* find which hash bucket it is in */
687         hash = tdb->hash_fn(&key);
688         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
689                 return -1;
690
691         ret = _tdb_storev(tdb, key, dbufs, num_dbufs, flag, hash);
692         tdb_trace_1plusn_rec_flag_ret(tdb, "tdb_storev", key,
693                                       dbufs, num_dbufs, flag, -1);
694         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
695         return ret;
696 }
697
698 /* Append to an entry. Create if not exist. */
699 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
700 {
701         uint32_t hash;
702         TDB_DATA dbuf;
703         int ret = -1;
704
705         /* find which hash bucket it is in */
706         hash = tdb->hash_fn(&key);
707         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
708                 return -1;
709
710         dbuf = _tdb_fetch(tdb, key);
711
712         if (dbuf.dptr == NULL) {
713                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
714         } else {
715                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
716                 unsigned char *new_dptr;
717
718                 /* realloc '0' is special: don't do that. */
719                 if (new_len == 0)
720                         new_len = 1;
721                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
722                 if (new_dptr == NULL) {
723                         free(dbuf.dptr);
724                 }
725                 dbuf.dptr = new_dptr;
726         }
727
728         if (dbuf.dptr == NULL) {
729                 tdb->ecode = TDB_ERR_OOM;
730                 goto failed;
731         }
732
733         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
734         dbuf.dsize += new_dbuf.dsize;
735
736         ret = _tdb_store(tdb, key, dbuf, 0, hash);
737         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
738
739 failed:
740         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
741         SAFE_FREE(dbuf.dptr);
742         return ret;
743 }
744
745
746 /*
747   return the name of the current tdb file
748   useful for external logging functions
749 */
750 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
751 {
752         return tdb->name;
753 }
754
755 /*
756   return the underlying file descriptor being used by tdb, or -1
757   useful for external routines that want to check the device/inode
758   of the fd
759 */
760 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
761 {
762         return tdb->fd;
763 }
764
765 /*
766   return the current logging function
767   useful for external tdb routines that wish to log tdb errors
768 */
769 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
770 {
771         return tdb->log.log_fn;
772 }
773
774
775 /*
776   get the tdb sequence number. Only makes sense if the writers opened
777   with TDB_SEQNUM set. Note that this sequence number will wrap quite
778   quickly, so it should only be used for a 'has something changed'
779   test, not for code that relies on the count of the number of changes
780   made. If you want a counter then use a tdb record.
781
782   The aim of this sequence number is to allow for a very lightweight
783   test of a possible tdb change.
784 */
785 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
786 {
787         tdb_off_t seqnum=0;
788
789         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
790         return seqnum;
791 }
792
793 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
794 {
795         return tdb->hash_size;
796 }
797
798 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
799 {
800         return tdb->map_size;
801 }
802
803 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
804 {
805         return tdb->flags;
806 }
807
808 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
809 {
810         if ((flags & TDB_ALLOW_NESTING) &&
811             (flags & TDB_DISALLOW_NESTING)) {
812                 tdb->ecode = TDB_ERR_NESTING;
813                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
814                         "allow_nesting and disallow_nesting are not allowed together!"));
815                 return;
816         }
817
818         if (flags & TDB_ALLOW_NESTING) {
819                 tdb->flags &= ~TDB_DISALLOW_NESTING;
820         }
821         if (flags & TDB_DISALLOW_NESTING) {
822                 tdb->flags &= ~TDB_ALLOW_NESTING;
823         }
824
825         tdb->flags |= flags;
826 }
827
828 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
829 {
830         if ((flags & TDB_ALLOW_NESTING) &&
831             (flags & TDB_DISALLOW_NESTING)) {
832                 tdb->ecode = TDB_ERR_NESTING;
833                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
834                         "allow_nesting and disallow_nesting are not allowed together!"));
835                 return;
836         }
837
838         if ((flags & TDB_NOLOCK) &&
839             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
840             (tdb->mutexes == NULL)) {
841                 tdb->ecode = TDB_ERR_LOCK;
842                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
843                          "Can not remove NOLOCK flag on mutexed databases"));
844                 return;
845         }
846
847         if (flags & TDB_ALLOW_NESTING) {
848                 tdb->flags |= TDB_DISALLOW_NESTING;
849         }
850         if (flags & TDB_DISALLOW_NESTING) {
851                 tdb->flags |= TDB_ALLOW_NESTING;
852         }
853
854         tdb->flags &= ~flags;
855 }
856
857
858 /*
859   enable sequence number handling on an open tdb
860 */
861 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
862 {
863         tdb->flags |= TDB_SEQNUM;
864 }
865
866
867 /*
868   add a region of the file to the freelist. Length is the size of the region in bytes,
869   which includes the free list header that needs to be added
870  */
871 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
872 {
873         struct tdb_record rec;
874         if (length <= sizeof(rec)) {
875                 /* the region is not worth adding */
876                 return 0;
877         }
878         if (length + offset > tdb->map_size) {
879                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
880                 return -1;
881         }
882         memset(&rec,'\0',sizeof(rec));
883         rec.rec_len = length - sizeof(rec);
884         if (tdb_free(tdb, offset, &rec) == -1) {
885                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
886                 return -1;
887         }
888         return 0;
889 }
890
891 /*
892   wipe the entire database, deleting all records. This can be done
893   very fast by using a allrecord lock. The entire data portion of the
894   file becomes a single entry in the freelist.
895
896   This code carefully steps around the recovery area, leaving it alone
897  */
898 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
899 {
900         uint32_t i;
901         tdb_off_t offset = 0;
902         ssize_t data_len;
903         tdb_off_t recovery_head;
904         tdb_len_t recovery_size = 0;
905
906         if (tdb_lockall(tdb) != 0) {
907                 return -1;
908         }
909
910         tdb_trace(tdb, "tdb_wipe_all");
911
912         /* see if the tdb has a recovery area, and remember its size
913            if so. We don't want to lose this as otherwise each
914            tdb_wipe_all() in a transaction will increase the size of
915            the tdb by the size of the recovery area */
916         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
917                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
918                 goto failed;
919         }
920
921         if (recovery_head != 0) {
922                 struct tdb_record rec;
923                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
924                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
925                         return -1;
926                 }
927                 recovery_size = rec.rec_len + sizeof(rec);
928         }
929
930         /* wipe the hashes */
931         for (i=0;i<tdb->hash_size;i++) {
932                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
933                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
934                         goto failed;
935                 }
936         }
937
938         /* wipe the freelist */
939         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
940                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
941                 goto failed;
942         }
943
944         /* add all the rest of the file to the freelist, possibly leaving a gap
945            for the recovery area */
946         if (recovery_size == 0) {
947                 /* the simple case - the whole file can be used as a freelist */
948                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
949                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
950                         goto failed;
951                 }
952         } else {
953                 /* we need to add two freelist entries - one on either
954                    side of the recovery area
955
956                    Note that we cannot shift the recovery area during
957                    this operation. Only the transaction.c code may
958                    move the recovery area or we risk subtle data
959                    corruption
960                 */
961                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
962                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
963                         goto failed;
964                 }
965                 /* and the 2nd free list entry after the recovery area - if any */
966                 data_len = tdb->map_size - (recovery_head+recovery_size);
967                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
968                         goto failed;
969                 }
970         }
971
972         tdb_increment_seqnum_nonblock(tdb);
973
974         if (tdb_unlockall(tdb) != 0) {
975                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
976                 goto failed;
977         }
978
979         return 0;
980
981 failed:
982         tdb_unlockall(tdb);
983         return -1;
984 }
985
986 struct traverse_state {
987         bool error;
988         struct tdb_context *dest_db;
989 };
990
991 /*
992   traverse function for repacking
993  */
994 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
995 {
996         struct traverse_state *state = (struct traverse_state *)private_data;
997         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
998                 state->error = true;
999                 return -1;
1000         }
1001         return 0;
1002 }
1003
1004 /*
1005   repack a tdb
1006  */
1007 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
1008 {
1009         struct tdb_context *tmp_db;
1010         struct traverse_state state;
1011
1012         tdb_trace(tdb, "tdb_repack");
1013
1014         if (tdb_transaction_start(tdb) != 0) {
1015                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
1016                 return -1;
1017         }
1018
1019         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
1020         if (tmp_db == NULL) {
1021                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
1022                 tdb_transaction_cancel(tdb);
1023                 return -1;
1024         }
1025
1026         state.error = false;
1027         state.dest_db = tmp_db;
1028
1029         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
1030                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
1031                 tdb_transaction_cancel(tdb);
1032                 tdb_close(tmp_db);
1033                 return -1;
1034         }
1035
1036         if (state.error) {
1037                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
1038                 tdb_transaction_cancel(tdb);
1039                 tdb_close(tmp_db);
1040                 return -1;
1041         }
1042
1043         if (tdb_wipe_all(tdb) != 0) {
1044                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
1045                 tdb_transaction_cancel(tdb);
1046                 tdb_close(tmp_db);
1047                 return -1;
1048         }
1049
1050         state.error = false;
1051         state.dest_db = tdb;
1052
1053         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
1054                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
1055                 tdb_transaction_cancel(tdb);
1056                 tdb_close(tmp_db);
1057                 return -1;
1058         }
1059
1060         if (state.error) {
1061                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
1062                 tdb_transaction_cancel(tdb);
1063                 tdb_close(tmp_db);
1064                 return -1;
1065         }
1066
1067         tdb_close(tmp_db);
1068
1069         if (tdb_transaction_commit(tdb) != 0) {
1070                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1071                 return -1;
1072         }
1073
1074         return 0;
1075 }
1076
1077 /* Even on files, we can get partial writes due to signals. */
1078 bool tdb_write_all(int fd, const void *buf, size_t count)
1079 {
1080         while (count) {
1081                 ssize_t ret;
1082                 ret = write(fd, buf, count);
1083                 if (ret < 0)
1084                         return false;
1085                 buf = (const char *)buf + ret;
1086                 count -= ret;
1087         }
1088         return true;
1089 }
1090
1091 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1092 {
1093         tdb_off_t ret = a + b;
1094
1095         if ((ret < a) || (ret < b)) {
1096                 return false;
1097         }
1098         *pret = ret;
1099         return true;
1100 }
1101
1102 #ifdef TDB_TRACE
1103 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1104 {
1105         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1106                 close(tdb->tracefd);
1107                 tdb->tracefd = -1;
1108         }
1109 }
1110
1111 static void tdb_trace_start(struct tdb_context *tdb)
1112 {
1113         tdb_off_t seqnum=0;
1114         char msg[sizeof(tdb_off_t) * 4 + 1];
1115
1116         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1117         snprintf(msg, sizeof(msg), "%u ", seqnum);
1118         tdb_trace_write(tdb, msg);
1119 }
1120
1121 static void tdb_trace_end(struct tdb_context *tdb)
1122 {
1123         tdb_trace_write(tdb, "\n");
1124 }
1125
1126 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1127 {
1128         char msg[sizeof(ret) * 4 + 4];
1129         snprintf(msg, sizeof(msg), " = %i\n", ret);
1130         tdb_trace_write(tdb, msg);
1131 }
1132
1133 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1134 {
1135         char msg[20 + rec.dsize*2], *p;
1136         unsigned int i;
1137
1138         /* We differentiate zero-length records from non-existent ones. */
1139         if (rec.dptr == NULL) {
1140                 tdb_trace_write(tdb, " NULL");
1141                 return;
1142         }
1143
1144         /* snprintf here is purely cargo-cult programming. */
1145         p = msg;
1146         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1147         for (i = 0; i < rec.dsize; i++)
1148                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1149
1150         tdb_trace_write(tdb, msg);
1151 }
1152
1153 void tdb_trace(struct tdb_context *tdb, const char *op)
1154 {
1155         tdb_trace_start(tdb);
1156         tdb_trace_write(tdb, op);
1157         tdb_trace_end(tdb);
1158 }
1159
1160 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1161 {
1162         char msg[sizeof(tdb_off_t) * 4 + 1];
1163
1164         snprintf(msg, sizeof(msg), "%u ", seqnum);
1165         tdb_trace_write(tdb, msg);
1166         tdb_trace_write(tdb, op);
1167         tdb_trace_end(tdb);
1168 }
1169
1170 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1171                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1172 {
1173         char msg[128];
1174
1175         snprintf(msg, sizeof(msg),
1176                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1177         tdb_trace_start(tdb);
1178         tdb_trace_write(tdb, msg);
1179         tdb_trace_end(tdb);
1180 }
1181
1182 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1183 {
1184         tdb_trace_start(tdb);
1185         tdb_trace_write(tdb, op);
1186         tdb_trace_end_ret(tdb, ret);
1187 }
1188
1189 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1190 {
1191         tdb_trace_start(tdb);
1192         tdb_trace_write(tdb, op);
1193         tdb_trace_write(tdb, " =");
1194         tdb_trace_record(tdb, ret);
1195         tdb_trace_end(tdb);
1196 }
1197
1198 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1199                     TDB_DATA rec)
1200 {
1201         tdb_trace_start(tdb);
1202         tdb_trace_write(tdb, op);
1203         tdb_trace_record(tdb, rec);
1204         tdb_trace_end(tdb);
1205 }
1206
1207 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1208                         TDB_DATA rec, int ret)
1209 {
1210         tdb_trace_start(tdb);
1211         tdb_trace_write(tdb, op);
1212         tdb_trace_record(tdb, rec);
1213         tdb_trace_end_ret(tdb, ret);
1214 }
1215
1216 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1217                            TDB_DATA rec, TDB_DATA ret)
1218 {
1219         tdb_trace_start(tdb);
1220         tdb_trace_write(tdb, op);
1221         tdb_trace_record(tdb, rec);
1222         tdb_trace_write(tdb, " =");
1223         tdb_trace_record(tdb, ret);
1224         tdb_trace_end(tdb);
1225 }
1226
1227 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1228                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1229                              int ret)
1230 {
1231         char msg[1 + sizeof(ret) * 4];
1232
1233         snprintf(msg, sizeof(msg), " %#x", flag);
1234         tdb_trace_start(tdb);
1235         tdb_trace_write(tdb, op);
1236         tdb_trace_record(tdb, rec1);
1237         tdb_trace_record(tdb, rec2);
1238         tdb_trace_write(tdb, msg);
1239         tdb_trace_end_ret(tdb, ret);
1240 }
1241
1242 void tdb_trace_1plusn_rec_flag_ret(struct tdb_context *tdb, const char *op,
1243                                    TDB_DATA rec,
1244                                    const TDB_DATA *recs, int num_recs,
1245                                    unsigned flag, int ret)
1246 {
1247         char msg[1 + sizeof(ret) * 4];
1248         int i;
1249
1250         snprintf(msg, sizeof(msg), " %#x", flag);
1251         tdb_trace_start(tdb);
1252         tdb_trace_write(tdb, op);
1253         tdb_trace_record(tdb, rec);
1254         for (i=0; i<num_recs; i++) {
1255                 tdb_trace_record(tdb, recs[i]);
1256         }
1257         tdb_trace_write(tdb, msg);
1258         tdb_trace_end_ret(tdb, ret);
1259 }
1260
1261 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1262                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1263 {
1264         tdb_trace_start(tdb);
1265         tdb_trace_write(tdb, op);
1266         tdb_trace_record(tdb, rec1);
1267         tdb_trace_record(tdb, rec2);
1268         tdb_trace_write(tdb, " =");
1269         tdb_trace_record(tdb, ret);
1270         tdb_trace_end(tdb);
1271 }
1272 #endif