tdb: Vectorize _tdb_store
[samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 /* Returns 0 on fail.  On success, return offset of record, and fills
83    in rec */
84 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
85                         struct tdb_record *r)
86 {
87         tdb_off_t rec_ptr;
88
89         /* read in the hash top */
90         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
91                 return 0;
92
93         /* keep looking until we find the right record */
94         while (rec_ptr) {
95                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
96                         return 0;
97
98                 if (!TDB_DEAD(r) && hash==r->full_hash
99                     && key.dsize==r->key_len
100                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
101                                       r->key_len, tdb_key_compare,
102                                       NULL) == 0) {
103                         return rec_ptr;
104                 }
105                 /* detect tight infinite loop */
106                 if (rec_ptr == r->next) {
107                         tdb->ecode = TDB_ERR_CORRUPT;
108                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
109                         return 0;
110                 }
111                 rec_ptr = r->next;
112         }
113         tdb->ecode = TDB_ERR_NOEXIST;
114         return 0;
115 }
116
117 /* As tdb_find, but if you succeed, keep the lock */
118 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
119                            struct tdb_record *rec)
120 {
121         uint32_t rec_ptr;
122
123         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
124                 return 0;
125         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
126                 tdb_unlock(tdb, BUCKET(hash), locktype);
127         return rec_ptr;
128 }
129
130 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
131
132 struct tdb_update_hash_state {
133         const TDB_DATA *dbufs;
134         int num_dbufs;
135         tdb_len_t dbufs_len;
136 };
137
138 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
139 {
140         struct tdb_update_hash_state *state = private_data;
141         unsigned char *dptr = data.dptr;
142         int i;
143
144         if (state->dbufs_len != data.dsize) {
145                 return -1;
146         }
147
148         for (i=0; i<state->num_dbufs; i++) {
149                 TDB_DATA dbuf = state->dbufs[i];
150                 int ret;
151                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
152                 if (ret != 0) {
153                         return -1;
154                 }
155                 dptr += dbuf.dsize;
156         }
157
158         return 0;
159 }
160
161 /* update an entry in place - this only works if the new data size
162    is <= the old data size and the key exists.
163    on failure return -1.
164 */
165 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
166                            uint32_t hash,
167                            const TDB_DATA *dbufs, int num_dbufs,
168                            tdb_len_t dbufs_len)
169 {
170         struct tdb_record rec;
171         tdb_off_t rec_ptr, ofs;
172         int i;
173
174         /* find entry */
175         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
176                 return -1;
177
178         /* it could be an exact duplicate of what is there - this is
179          * surprisingly common (eg. with a ldb re-index). */
180         if (rec.data_len == dbufs_len) {
181                 struct tdb_update_hash_state state = {
182                         .dbufs = dbufs, .num_dbufs = num_dbufs,
183                         .dbufs_len = dbufs_len
184                 };
185                 int ret;
186
187                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
188                 if (ret == 0) {
189                         return 0;
190                 }
191         }
192
193         /* must be long enough key, data and tailer */
194         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
195                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
196                 return -1;
197         }
198
199         ofs = rec_ptr + sizeof(rec) + rec.key_len;
200
201         for (i=0; i<num_dbufs; i++) {
202                 TDB_DATA dbuf = dbufs[i];
203                 int ret;
204
205                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
206                 if (ret == -1) {
207                         return -1;
208                 }
209                 ofs += dbuf.dsize;
210         }
211
212         if (dbufs_len != rec.data_len) {
213                 /* update size */
214                 rec.data_len = dbufs_len;
215                 return tdb_rec_write(tdb, rec_ptr, &rec);
216         }
217
218         return 0;
219 }
220
221 /* find an entry in the database given a key */
222 /* If an entry doesn't exist tdb_err will be set to
223  * TDB_ERR_NOEXIST. If a key has no data attached
224  * then the TDB_DATA will have zero length but
225  * a non-zero pointer
226  */
227 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
228 {
229         tdb_off_t rec_ptr;
230         struct tdb_record rec;
231         TDB_DATA ret;
232         uint32_t hash;
233
234         /* find which hash bucket it is in */
235         hash = tdb->hash_fn(&key);
236         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
237                 return tdb_null;
238
239         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
240                                   rec.data_len);
241         ret.dsize = rec.data_len;
242         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
243         return ret;
244 }
245
246 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
247 {
248         TDB_DATA ret = _tdb_fetch(tdb, key);
249
250         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
251         return ret;
252 }
253
254 /*
255  * Find an entry in the database and hand the record's data to a parsing
256  * function. The parsing function is executed under the chain read lock, so it
257  * should be fast and should not block on other syscalls.
258  *
259  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
260  *
261  * For mmapped tdb's that do not have a transaction open it points the parsing
262  * function directly at the mmap area, it avoids the malloc/memcpy in this
263  * case. If a transaction is open or no mmap is available, it has to do
264  * malloc/read/parse/free.
265  *
266  * This is interesting for all readers of potentially large data structures in
267  * the tdb records, ldb indexes being one example.
268  *
269  * Return -1 if the record was not found.
270  */
271
272 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
273                      int (*parser)(TDB_DATA key, TDB_DATA data,
274                                    void *private_data),
275                      void *private_data)
276 {
277         tdb_off_t rec_ptr;
278         struct tdb_record rec;
279         int ret;
280         uint32_t hash;
281
282         /* find which hash bucket it is in */
283         hash = tdb->hash_fn(&key);
284
285         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
286                 /* record not found */
287                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
288                 tdb->ecode = TDB_ERR_NOEXIST;
289                 return -1;
290         }
291         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
292
293         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
294                              rec.data_len, parser, private_data);
295
296         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
297
298         return ret;
299 }
300
301 /* check if an entry in the database exists
302
303    note that 1 is returned if the key is found and 0 is returned if not found
304    this doesn't match the conventions in the rest of this module, but is
305    compatible with gdbm
306 */
307 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
308 {
309         struct tdb_record rec;
310
311         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
312                 return 0;
313         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
314         return 1;
315 }
316
317 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
318 {
319         uint32_t hash = tdb->hash_fn(&key);
320         int ret;
321
322         ret = tdb_exists_hash(tdb, key, hash);
323         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
324         return ret;
325 }
326
327 /* actually delete an entry in the database given the offset */
328 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
329 {
330         tdb_off_t last_ptr, i;
331         struct tdb_record lastrec;
332
333         if (tdb->read_only || tdb->traverse_read) return -1;
334
335         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
336             tdb_write_lock_record(tdb, rec_ptr) == -1) {
337                 /* Someone traversing here: mark it as dead */
338                 rec->magic = TDB_DEAD_MAGIC;
339                 return tdb_rec_write(tdb, rec_ptr, rec);
340         }
341         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
342                 return -1;
343
344         /* find previous record in hash chain */
345         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
346                 return -1;
347         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
348                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
349                         return -1;
350
351         /* unlink it: next ptr is at start of record. */
352         if (last_ptr == 0)
353                 last_ptr = TDB_HASH_TOP(rec->full_hash);
354         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
355                 return -1;
356
357         /* recover the space */
358         if (tdb_free(tdb, rec_ptr, rec) == -1)
359                 return -1;
360         return 0;
361 }
362
363 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
364 {
365         int res = 0;
366         tdb_off_t rec_ptr;
367         struct tdb_record rec;
368
369         /* read in the hash top */
370         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
371                 return 0;
372
373         while (rec_ptr) {
374                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
375                         return 0;
376
377                 if (rec.magic == TDB_DEAD_MAGIC) {
378                         res += 1;
379                 }
380                 rec_ptr = rec.next;
381         }
382         return res;
383 }
384
385 /*
386  * Purge all DEAD records from a hash chain
387  */
388 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
389 {
390         int res = -1;
391         struct tdb_record rec;
392         tdb_off_t rec_ptr;
393
394         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
395                 /*
396                  * Don't block the freelist if not strictly necessary
397                  */
398                 return -1;
399         }
400
401         /* read in the hash top */
402         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
403                 goto fail;
404
405         while (rec_ptr) {
406                 tdb_off_t next;
407
408                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
409                         goto fail;
410                 }
411
412                 next = rec.next;
413
414                 if (rec.magic == TDB_DEAD_MAGIC
415                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
416                         goto fail;
417                 }
418                 rec_ptr = next;
419         }
420         res = 0;
421  fail:
422         tdb_unlock(tdb, -1, F_WRLCK);
423         return res;
424 }
425
426 /* delete an entry in the database given a key */
427 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
428 {
429         tdb_off_t rec_ptr;
430         struct tdb_record rec;
431         int ret;
432
433         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
434         if (rec_ptr == 0) {
435                 return -1;
436         }
437
438         if (tdb->max_dead_records != 0) {
439
440                 uint32_t magic = TDB_DEAD_MAGIC;
441
442                 /*
443                  * Allow for some dead records per hash chain, mainly for
444                  * tdb's with a very high create/delete rate like locking.tdb.
445                  */
446
447                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
448                         /*
449                          * Don't let the per-chain freelist grow too large,
450                          * delete all existing dead records
451                          */
452                         tdb_purge_dead(tdb, hash);
453                 }
454
455                 /*
456                  * Just mark the record as dead.
457                  */
458                 ret = tdb_ofs_write(
459                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
460                         &magic);
461         }
462         else {
463                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
464         }
465
466         if (ret == 0) {
467                 tdb_increment_seqnum(tdb);
468         }
469
470         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
471                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
472         return ret;
473 }
474
475 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
476 {
477         uint32_t hash = tdb->hash_fn(&key);
478         int ret;
479
480         ret = tdb_delete_hash(tdb, key, hash);
481         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
482         return ret;
483 }
484
485 /*
486  * See if we have a dead record around with enough space
487  */
488 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
489                         struct tdb_record *r, tdb_len_t length,
490                         tdb_off_t *p_last_ptr)
491 {
492         tdb_off_t rec_ptr, last_ptr;
493         tdb_off_t best_rec_ptr = 0;
494         tdb_off_t best_last_ptr = 0;
495         struct tdb_record best = { .rec_len = UINT32_MAX };
496
497         length += sizeof(tdb_off_t); /* tailer */
498
499         last_ptr = TDB_HASH_TOP(hash);
500
501         /* read in the hash top */
502         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
503                 return 0;
504
505         /* keep looking until we find the right record */
506         while (rec_ptr) {
507                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
508                         return 0;
509
510                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
511                     (r->rec_len < best.rec_len)) {
512                         best_rec_ptr = rec_ptr;
513                         best_last_ptr = last_ptr;
514                         best = *r;
515                 }
516                 last_ptr = rec_ptr;
517                 rec_ptr = r->next;
518         }
519
520         if (best.rec_len == UINT32_MAX) {
521                 return 0;
522         }
523
524         *r = best;
525         *p_last_ptr = best_last_ptr;
526         return best_rec_ptr;
527 }
528
529 static int _tdb_storev(struct tdb_context *tdb, TDB_DATA key,
530                        const TDB_DATA *dbufs, int num_dbufs,
531                        int flag, uint32_t hash)
532 {
533         struct tdb_record rec;
534         tdb_off_t rec_ptr, ofs;
535         tdb_len_t rec_len, dbufs_len;
536         int i;
537         int ret = -1;
538
539         dbufs_len = 0;
540
541         for (i=0; i<num_dbufs; i++) {
542                 size_t dsize = dbufs[i].dsize;
543
544                 dbufs_len += dsize;
545                 if (dbufs_len < dsize) {
546                         tdb->ecode = TDB_ERR_OOM;
547                         goto fail;
548                 }
549         }
550
551         rec_len = key.dsize + dbufs_len;
552         if ((rec_len < key.dsize) || (rec_len < dbufs_len)) {
553                 tdb->ecode = TDB_ERR_OOM;
554                 goto fail;
555         }
556
557         /* check for it existing, on insert. */
558         if (flag == TDB_INSERT) {
559                 if (tdb_exists_hash(tdb, key, hash)) {
560                         tdb->ecode = TDB_ERR_EXISTS;
561                         goto fail;
562                 }
563         } else {
564                 /* first try in-place update, on modify or replace. */
565                 if (tdb_update_hash(tdb, key, hash, dbufs, num_dbufs,
566                                     dbufs_len) == 0) {
567                         goto done;
568                 }
569                 if (tdb->ecode == TDB_ERR_NOEXIST &&
570                     flag == TDB_MODIFY) {
571                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
572                          we should fail the store */
573                         goto fail;
574                 }
575         }
576         /* reset the error code potentially set by the tdb_update_hash() */
577         tdb->ecode = TDB_SUCCESS;
578
579         /* delete any existing record - if it doesn't exist we don't
580            care.  Doing this first reduces fragmentation, and avoids
581            coalescing with `allocated' block before it's updated. */
582         if (flag != TDB_INSERT)
583                 tdb_delete_hash(tdb, key, hash);
584
585         /* we have to allocate some space */
586         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
587
588         if (rec_ptr == 0) {
589                 goto fail;
590         }
591
592         /* Read hash top into next ptr */
593         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
594                 goto fail;
595
596         rec.key_len = key.dsize;
597         rec.data_len = dbufs_len;
598         rec.full_hash = hash;
599         rec.magic = TDB_MAGIC;
600
601         ofs = rec_ptr;
602
603         /* write out and point the top of the hash chain at it */
604         ret = tdb_rec_write(tdb, ofs, &rec);
605         if (ret == -1) {
606                 goto fail;
607         }
608         ofs += sizeof(rec);
609
610         ret = tdb->methods->tdb_write(tdb, ofs, key.dptr, key.dsize);
611         if (ret == -1) {
612                 goto fail;
613         }
614         ofs += key.dsize;
615
616         for (i=0; i<num_dbufs; i++) {
617                 ret = tdb->methods->tdb_write(tdb, ofs, dbufs[i].dptr,
618                                               dbufs[i].dsize);
619                 if (ret == -1) {
620                         goto fail;
621                 }
622                 ofs += dbufs[i].dsize;
623         }
624
625         ret = tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr);
626         if (ret == -1) {
627                 /* Need to tdb_unallocate() here */
628                 goto fail;
629         }
630
631  done:
632         ret = 0;
633  fail:
634         if (ret == 0) {
635                 tdb_increment_seqnum(tdb);
636         }
637         return ret;
638 }
639
640 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
641                       TDB_DATA dbuf, int flag, uint32_t hash)
642 {
643         return _tdb_storev(tdb, key, &dbuf, 1, flag, hash);
644 }
645
646 /* store an element in the database, replacing any existing element
647    with the same key
648
649    return 0 on success, -1 on failure
650 */
651 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
652 {
653         uint32_t hash;
654         int ret;
655
656         if (tdb->read_only || tdb->traverse_read) {
657                 tdb->ecode = TDB_ERR_RDONLY;
658                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
659                 return -1;
660         }
661
662         /* find which hash bucket it is in */
663         hash = tdb->hash_fn(&key);
664         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
665                 return -1;
666
667         ret = _tdb_store(tdb, key, dbuf, flag, hash);
668         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
669         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
670         return ret;
671 }
672
673 /* Append to an entry. Create if not exist. */
674 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
675 {
676         uint32_t hash;
677         TDB_DATA dbuf;
678         int ret = -1;
679
680         /* find which hash bucket it is in */
681         hash = tdb->hash_fn(&key);
682         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
683                 return -1;
684
685         dbuf = _tdb_fetch(tdb, key);
686
687         if (dbuf.dptr == NULL) {
688                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
689         } else {
690                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
691                 unsigned char *new_dptr;
692
693                 /* realloc '0' is special: don't do that. */
694                 if (new_len == 0)
695                         new_len = 1;
696                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
697                 if (new_dptr == NULL) {
698                         free(dbuf.dptr);
699                 }
700                 dbuf.dptr = new_dptr;
701         }
702
703         if (dbuf.dptr == NULL) {
704                 tdb->ecode = TDB_ERR_OOM;
705                 goto failed;
706         }
707
708         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
709         dbuf.dsize += new_dbuf.dsize;
710
711         ret = _tdb_store(tdb, key, dbuf, 0, hash);
712         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
713
714 failed:
715         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
716         SAFE_FREE(dbuf.dptr);
717         return ret;
718 }
719
720
721 /*
722   return the name of the current tdb file
723   useful for external logging functions
724 */
725 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
726 {
727         return tdb->name;
728 }
729
730 /*
731   return the underlying file descriptor being used by tdb, or -1
732   useful for external routines that want to check the device/inode
733   of the fd
734 */
735 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
736 {
737         return tdb->fd;
738 }
739
740 /*
741   return the current logging function
742   useful for external tdb routines that wish to log tdb errors
743 */
744 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
745 {
746         return tdb->log.log_fn;
747 }
748
749
750 /*
751   get the tdb sequence number. Only makes sense if the writers opened
752   with TDB_SEQNUM set. Note that this sequence number will wrap quite
753   quickly, so it should only be used for a 'has something changed'
754   test, not for code that relies on the count of the number of changes
755   made. If you want a counter then use a tdb record.
756
757   The aim of this sequence number is to allow for a very lightweight
758   test of a possible tdb change.
759 */
760 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
761 {
762         tdb_off_t seqnum=0;
763
764         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
765         return seqnum;
766 }
767
768 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
769 {
770         return tdb->hash_size;
771 }
772
773 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
774 {
775         return tdb->map_size;
776 }
777
778 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
779 {
780         return tdb->flags;
781 }
782
783 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
784 {
785         if ((flags & TDB_ALLOW_NESTING) &&
786             (flags & TDB_DISALLOW_NESTING)) {
787                 tdb->ecode = TDB_ERR_NESTING;
788                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
789                         "allow_nesting and disallow_nesting are not allowed together!"));
790                 return;
791         }
792
793         if (flags & TDB_ALLOW_NESTING) {
794                 tdb->flags &= ~TDB_DISALLOW_NESTING;
795         }
796         if (flags & TDB_DISALLOW_NESTING) {
797                 tdb->flags &= ~TDB_ALLOW_NESTING;
798         }
799
800         tdb->flags |= flags;
801 }
802
803 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
804 {
805         if ((flags & TDB_ALLOW_NESTING) &&
806             (flags & TDB_DISALLOW_NESTING)) {
807                 tdb->ecode = TDB_ERR_NESTING;
808                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
809                         "allow_nesting and disallow_nesting are not allowed together!"));
810                 return;
811         }
812
813         if ((flags & TDB_NOLOCK) &&
814             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
815             (tdb->mutexes == NULL)) {
816                 tdb->ecode = TDB_ERR_LOCK;
817                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
818                          "Can not remove NOLOCK flag on mutexed databases"));
819                 return;
820         }
821
822         if (flags & TDB_ALLOW_NESTING) {
823                 tdb->flags |= TDB_DISALLOW_NESTING;
824         }
825         if (flags & TDB_DISALLOW_NESTING) {
826                 tdb->flags |= TDB_ALLOW_NESTING;
827         }
828
829         tdb->flags &= ~flags;
830 }
831
832
833 /*
834   enable sequence number handling on an open tdb
835 */
836 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
837 {
838         tdb->flags |= TDB_SEQNUM;
839 }
840
841
842 /*
843   add a region of the file to the freelist. Length is the size of the region in bytes,
844   which includes the free list header that needs to be added
845  */
846 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
847 {
848         struct tdb_record rec;
849         if (length <= sizeof(rec)) {
850                 /* the region is not worth adding */
851                 return 0;
852         }
853         if (length + offset > tdb->map_size) {
854                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
855                 return -1;
856         }
857         memset(&rec,'\0',sizeof(rec));
858         rec.rec_len = length - sizeof(rec);
859         if (tdb_free(tdb, offset, &rec) == -1) {
860                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
861                 return -1;
862         }
863         return 0;
864 }
865
866 /*
867   wipe the entire database, deleting all records. This can be done
868   very fast by using a allrecord lock. The entire data portion of the
869   file becomes a single entry in the freelist.
870
871   This code carefully steps around the recovery area, leaving it alone
872  */
873 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
874 {
875         uint32_t i;
876         tdb_off_t offset = 0;
877         ssize_t data_len;
878         tdb_off_t recovery_head;
879         tdb_len_t recovery_size = 0;
880
881         if (tdb_lockall(tdb) != 0) {
882                 return -1;
883         }
884
885         tdb_trace(tdb, "tdb_wipe_all");
886
887         /* see if the tdb has a recovery area, and remember its size
888            if so. We don't want to lose this as otherwise each
889            tdb_wipe_all() in a transaction will increase the size of
890            the tdb by the size of the recovery area */
891         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
892                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
893                 goto failed;
894         }
895
896         if (recovery_head != 0) {
897                 struct tdb_record rec;
898                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
899                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
900                         return -1;
901                 }
902                 recovery_size = rec.rec_len + sizeof(rec);
903         }
904
905         /* wipe the hashes */
906         for (i=0;i<tdb->hash_size;i++) {
907                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
908                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
909                         goto failed;
910                 }
911         }
912
913         /* wipe the freelist */
914         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
915                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
916                 goto failed;
917         }
918
919         /* add all the rest of the file to the freelist, possibly leaving a gap
920            for the recovery area */
921         if (recovery_size == 0) {
922                 /* the simple case - the whole file can be used as a freelist */
923                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
924                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
925                         goto failed;
926                 }
927         } else {
928                 /* we need to add two freelist entries - one on either
929                    side of the recovery area
930
931                    Note that we cannot shift the recovery area during
932                    this operation. Only the transaction.c code may
933                    move the recovery area or we risk subtle data
934                    corruption
935                 */
936                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
937                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
938                         goto failed;
939                 }
940                 /* and the 2nd free list entry after the recovery area - if any */
941                 data_len = tdb->map_size - (recovery_head+recovery_size);
942                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
943                         goto failed;
944                 }
945         }
946
947         tdb_increment_seqnum_nonblock(tdb);
948
949         if (tdb_unlockall(tdb) != 0) {
950                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
951                 goto failed;
952         }
953
954         return 0;
955
956 failed:
957         tdb_unlockall(tdb);
958         return -1;
959 }
960
961 struct traverse_state {
962         bool error;
963         struct tdb_context *dest_db;
964 };
965
966 /*
967   traverse function for repacking
968  */
969 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
970 {
971         struct traverse_state *state = (struct traverse_state *)private_data;
972         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
973                 state->error = true;
974                 return -1;
975         }
976         return 0;
977 }
978
979 /*
980   repack a tdb
981  */
982 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
983 {
984         struct tdb_context *tmp_db;
985         struct traverse_state state;
986
987         tdb_trace(tdb, "tdb_repack");
988
989         if (tdb_transaction_start(tdb) != 0) {
990                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
991                 return -1;
992         }
993
994         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
995         if (tmp_db == NULL) {
996                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
997                 tdb_transaction_cancel(tdb);
998                 return -1;
999         }
1000
1001         state.error = false;
1002         state.dest_db = tmp_db;
1003
1004         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
1005                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
1006                 tdb_transaction_cancel(tdb);
1007                 tdb_close(tmp_db);
1008                 return -1;
1009         }
1010
1011         if (state.error) {
1012                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
1013                 tdb_transaction_cancel(tdb);
1014                 tdb_close(tmp_db);
1015                 return -1;
1016         }
1017
1018         if (tdb_wipe_all(tdb) != 0) {
1019                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
1020                 tdb_transaction_cancel(tdb);
1021                 tdb_close(tmp_db);
1022                 return -1;
1023         }
1024
1025         state.error = false;
1026         state.dest_db = tdb;
1027
1028         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
1029                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
1030                 tdb_transaction_cancel(tdb);
1031                 tdb_close(tmp_db);
1032                 return -1;
1033         }
1034
1035         if (state.error) {
1036                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
1037                 tdb_transaction_cancel(tdb);
1038                 tdb_close(tmp_db);
1039                 return -1;
1040         }
1041
1042         tdb_close(tmp_db);
1043
1044         if (tdb_transaction_commit(tdb) != 0) {
1045                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1046                 return -1;
1047         }
1048
1049         return 0;
1050 }
1051
1052 /* Even on files, we can get partial writes due to signals. */
1053 bool tdb_write_all(int fd, const void *buf, size_t count)
1054 {
1055         while (count) {
1056                 ssize_t ret;
1057                 ret = write(fd, buf, count);
1058                 if (ret < 0)
1059                         return false;
1060                 buf = (const char *)buf + ret;
1061                 count -= ret;
1062         }
1063         return true;
1064 }
1065
1066 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1067 {
1068         tdb_off_t ret = a + b;
1069
1070         if ((ret < a) || (ret < b)) {
1071                 return false;
1072         }
1073         *pret = ret;
1074         return true;
1075 }
1076
1077 #ifdef TDB_TRACE
1078 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1079 {
1080         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1081                 close(tdb->tracefd);
1082                 tdb->tracefd = -1;
1083         }
1084 }
1085
1086 static void tdb_trace_start(struct tdb_context *tdb)
1087 {
1088         tdb_off_t seqnum=0;
1089         char msg[sizeof(tdb_off_t) * 4 + 1];
1090
1091         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1092         snprintf(msg, sizeof(msg), "%u ", seqnum);
1093         tdb_trace_write(tdb, msg);
1094 }
1095
1096 static void tdb_trace_end(struct tdb_context *tdb)
1097 {
1098         tdb_trace_write(tdb, "\n");
1099 }
1100
1101 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1102 {
1103         char msg[sizeof(ret) * 4 + 4];
1104         snprintf(msg, sizeof(msg), " = %i\n", ret);
1105         tdb_trace_write(tdb, msg);
1106 }
1107
1108 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1109 {
1110         char msg[20 + rec.dsize*2], *p;
1111         unsigned int i;
1112
1113         /* We differentiate zero-length records from non-existent ones. */
1114         if (rec.dptr == NULL) {
1115                 tdb_trace_write(tdb, " NULL");
1116                 return;
1117         }
1118
1119         /* snprintf here is purely cargo-cult programming. */
1120         p = msg;
1121         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1122         for (i = 0; i < rec.dsize; i++)
1123                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1124
1125         tdb_trace_write(tdb, msg);
1126 }
1127
1128 void tdb_trace(struct tdb_context *tdb, const char *op)
1129 {
1130         tdb_trace_start(tdb);
1131         tdb_trace_write(tdb, op);
1132         tdb_trace_end(tdb);
1133 }
1134
1135 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1136 {
1137         char msg[sizeof(tdb_off_t) * 4 + 1];
1138
1139         snprintf(msg, sizeof(msg), "%u ", seqnum);
1140         tdb_trace_write(tdb, msg);
1141         tdb_trace_write(tdb, op);
1142         tdb_trace_end(tdb);
1143 }
1144
1145 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1146                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1147 {
1148         char msg[128];
1149
1150         snprintf(msg, sizeof(msg),
1151                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1152         tdb_trace_start(tdb);
1153         tdb_trace_write(tdb, msg);
1154         tdb_trace_end(tdb);
1155 }
1156
1157 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1158 {
1159         tdb_trace_start(tdb);
1160         tdb_trace_write(tdb, op);
1161         tdb_trace_end_ret(tdb, ret);
1162 }
1163
1164 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1165 {
1166         tdb_trace_start(tdb);
1167         tdb_trace_write(tdb, op);
1168         tdb_trace_write(tdb, " =");
1169         tdb_trace_record(tdb, ret);
1170         tdb_trace_end(tdb);
1171 }
1172
1173 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1174                     TDB_DATA rec)
1175 {
1176         tdb_trace_start(tdb);
1177         tdb_trace_write(tdb, op);
1178         tdb_trace_record(tdb, rec);
1179         tdb_trace_end(tdb);
1180 }
1181
1182 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1183                         TDB_DATA rec, int ret)
1184 {
1185         tdb_trace_start(tdb);
1186         tdb_trace_write(tdb, op);
1187         tdb_trace_record(tdb, rec);
1188         tdb_trace_end_ret(tdb, ret);
1189 }
1190
1191 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1192                            TDB_DATA rec, TDB_DATA ret)
1193 {
1194         tdb_trace_start(tdb);
1195         tdb_trace_write(tdb, op);
1196         tdb_trace_record(tdb, rec);
1197         tdb_trace_write(tdb, " =");
1198         tdb_trace_record(tdb, ret);
1199         tdb_trace_end(tdb);
1200 }
1201
1202 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1203                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1204                              int ret)
1205 {
1206         char msg[1 + sizeof(ret) * 4];
1207
1208         snprintf(msg, sizeof(msg), " %#x", flag);
1209         tdb_trace_start(tdb);
1210         tdb_trace_write(tdb, op);
1211         tdb_trace_record(tdb, rec1);
1212         tdb_trace_record(tdb, rec2);
1213         tdb_trace_write(tdb, msg);
1214         tdb_trace_end_ret(tdb, ret);
1215 }
1216
1217 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1218                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1219 {
1220         tdb_trace_start(tdb);
1221         tdb_trace_write(tdb, op);
1222         tdb_trace_record(tdb, rec1);
1223         tdb_trace_record(tdb, rec2);
1224         tdb_trace_write(tdb, " =");
1225         tdb_trace_record(tdb, ret);
1226         tdb_trace_end(tdb);
1227 }
1228 #endif