tdb: Vectorize tdb_update_hash
[amitay/samba.git] / lib / tdb / common / tdb.c
1  /*
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 _PUBLIC_ TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 _PUBLIC_ void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb->transaction != NULL) {
63                 tdb_increment_seqnum_nonblock(tdb);
64                 return;
65         }
66
67         if (tdb_nest_lock(tdb, TDB_SEQNUM_OFS, F_WRLCK,
68                           TDB_LOCK_WAIT|TDB_LOCK_PROBE) != 0) {
69                 return;
70         }
71
72         tdb_increment_seqnum_nonblock(tdb);
73
74         tdb_nest_unlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, false);
75 }
76
77 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
78 {
79         return memcmp(data.dptr, key.dptr, data.dsize);
80 }
81
82 /* Returns 0 on fail.  On success, return offset of record, and fills
83    in rec */
84 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, uint32_t hash,
85                         struct tdb_record *r)
86 {
87         tdb_off_t rec_ptr;
88
89         /* read in the hash top */
90         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
91                 return 0;
92
93         /* keep looking until we find the right record */
94         while (rec_ptr) {
95                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
96                         return 0;
97
98                 if (!TDB_DEAD(r) && hash==r->full_hash
99                     && key.dsize==r->key_len
100                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
101                                       r->key_len, tdb_key_compare,
102                                       NULL) == 0) {
103                         return rec_ptr;
104                 }
105                 /* detect tight infinite loop */
106                 if (rec_ptr == r->next) {
107                         tdb->ecode = TDB_ERR_CORRUPT;
108                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_find: loop detected.\n"));
109                         return 0;
110                 }
111                 rec_ptr = r->next;
112         }
113         tdb->ecode = TDB_ERR_NOEXIST;
114         return 0;
115 }
116
117 /* As tdb_find, but if you succeed, keep the lock */
118 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash, int locktype,
119                            struct tdb_record *rec)
120 {
121         uint32_t rec_ptr;
122
123         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
124                 return 0;
125         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
126                 tdb_unlock(tdb, BUCKET(hash), locktype);
127         return rec_ptr;
128 }
129
130 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key);
131
132 struct tdb_update_hash_state {
133         const TDB_DATA *dbufs;
134         int num_dbufs;
135         tdb_len_t dbufs_len;
136 };
137
138 static int tdb_update_hash_cmp(TDB_DATA key, TDB_DATA data, void *private_data)
139 {
140         struct tdb_update_hash_state *state = private_data;
141         unsigned char *dptr = data.dptr;
142         int i;
143
144         if (state->dbufs_len != data.dsize) {
145                 return -1;
146         }
147
148         for (i=0; i<state->num_dbufs; i++) {
149                 TDB_DATA dbuf = state->dbufs[i];
150                 int ret;
151                 ret = memcmp(dptr, dbuf.dptr, dbuf.dsize);
152                 if (ret != 0) {
153                         return -1;
154                 }
155                 dptr += dbuf.dsize;
156         }
157
158         return 0;
159 }
160
161 /* update an entry in place - this only works if the new data size
162    is <= the old data size and the key exists.
163    on failure return -1.
164 */
165 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key,
166                            uint32_t hash,
167                            const TDB_DATA *dbufs, int num_dbufs,
168                            tdb_len_t dbufs_len)
169 {
170         struct tdb_record rec;
171         tdb_off_t rec_ptr, ofs;
172         int i;
173
174         /* find entry */
175         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
176                 return -1;
177
178         /* it could be an exact duplicate of what is there - this is
179          * surprisingly common (eg. with a ldb re-index). */
180         if (rec.data_len == dbufs_len) {
181                 struct tdb_update_hash_state state = {
182                         .dbufs = dbufs, .num_dbufs = num_dbufs,
183                         .dbufs_len = dbufs_len
184                 };
185                 int ret;
186
187                 ret = tdb_parse_record(tdb, key, tdb_update_hash_cmp, &state);
188                 if (ret == 0) {
189                         return 0;
190                 }
191         }
192
193         /* must be long enough key, data and tailer */
194         if (rec.rec_len < key.dsize + dbufs_len + sizeof(tdb_off_t)) {
195                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
196                 return -1;
197         }
198
199         ofs = rec_ptr + sizeof(rec) + rec.key_len;
200
201         for (i=0; i<num_dbufs; i++) {
202                 TDB_DATA dbuf = dbufs[i];
203                 int ret;
204
205                 ret = tdb->methods->tdb_write(tdb, ofs, dbuf.dptr, dbuf.dsize);
206                 if (ret == -1) {
207                         return -1;
208                 }
209                 ofs += dbuf.dsize;
210         }
211
212         if (dbufs_len != rec.data_len) {
213                 /* update size */
214                 rec.data_len = dbufs_len;
215                 return tdb_rec_write(tdb, rec_ptr, &rec);
216         }
217
218         return 0;
219 }
220
221 /* find an entry in the database given a key */
222 /* If an entry doesn't exist tdb_err will be set to
223  * TDB_ERR_NOEXIST. If a key has no data attached
224  * then the TDB_DATA will have zero length but
225  * a non-zero pointer
226  */
227 static TDB_DATA _tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
228 {
229         tdb_off_t rec_ptr;
230         struct tdb_record rec;
231         TDB_DATA ret;
232         uint32_t hash;
233
234         /* find which hash bucket it is in */
235         hash = tdb->hash_fn(&key);
236         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
237                 return tdb_null;
238
239         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
240                                   rec.data_len);
241         ret.dsize = rec.data_len;
242         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
243         return ret;
244 }
245
246 _PUBLIC_ TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
247 {
248         TDB_DATA ret = _tdb_fetch(tdb, key);
249
250         tdb_trace_1rec_retrec(tdb, "tdb_fetch", key, ret);
251         return ret;
252 }
253
254 /*
255  * Find an entry in the database and hand the record's data to a parsing
256  * function. The parsing function is executed under the chain read lock, so it
257  * should be fast and should not block on other syscalls.
258  *
259  * DON'T CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
260  *
261  * For mmapped tdb's that do not have a transaction open it points the parsing
262  * function directly at the mmap area, it avoids the malloc/memcpy in this
263  * case. If a transaction is open or no mmap is available, it has to do
264  * malloc/read/parse/free.
265  *
266  * This is interesting for all readers of potentially large data structures in
267  * the tdb records, ldb indexes being one example.
268  *
269  * Return -1 if the record was not found.
270  */
271
272 _PUBLIC_ int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
273                      int (*parser)(TDB_DATA key, TDB_DATA data,
274                                    void *private_data),
275                      void *private_data)
276 {
277         tdb_off_t rec_ptr;
278         struct tdb_record rec;
279         int ret;
280         uint32_t hash;
281
282         /* find which hash bucket it is in */
283         hash = tdb->hash_fn(&key);
284
285         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
286                 /* record not found */
287                 tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, -1);
288                 tdb->ecode = TDB_ERR_NOEXIST;
289                 return -1;
290         }
291         tdb_trace_1rec_ret(tdb, "tdb_parse_record", key, 0);
292
293         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
294                              rec.data_len, parser, private_data);
295
296         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
297
298         return ret;
299 }
300
301 /* check if an entry in the database exists
302
303    note that 1 is returned if the key is found and 0 is returned if not found
304    this doesn't match the conventions in the rest of this module, but is
305    compatible with gdbm
306 */
307 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
308 {
309         struct tdb_record rec;
310
311         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
312                 return 0;
313         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
314         return 1;
315 }
316
317 _PUBLIC_ int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
318 {
319         uint32_t hash = tdb->hash_fn(&key);
320         int ret;
321
322         ret = tdb_exists_hash(tdb, key, hash);
323         tdb_trace_1rec_ret(tdb, "tdb_exists", key, ret);
324         return ret;
325 }
326
327 /* actually delete an entry in the database given the offset */
328 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct tdb_record *rec)
329 {
330         tdb_off_t last_ptr, i;
331         struct tdb_record lastrec;
332
333         if (tdb->read_only || tdb->traverse_read) return -1;
334
335         if (((tdb->traverse_write != 0) && (!TDB_DEAD(rec))) ||
336             tdb_write_lock_record(tdb, rec_ptr) == -1) {
337                 /* Someone traversing here: mark it as dead */
338                 rec->magic = TDB_DEAD_MAGIC;
339                 return tdb_rec_write(tdb, rec_ptr, rec);
340         }
341         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
342                 return -1;
343
344         /* find previous record in hash chain */
345         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
346                 return -1;
347         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
348                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
349                         return -1;
350
351         /* unlink it: next ptr is at start of record. */
352         if (last_ptr == 0)
353                 last_ptr = TDB_HASH_TOP(rec->full_hash);
354         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
355                 return -1;
356
357         /* recover the space */
358         if (tdb_free(tdb, rec_ptr, rec) == -1)
359                 return -1;
360         return 0;
361 }
362
363 static int tdb_count_dead(struct tdb_context *tdb, uint32_t hash)
364 {
365         int res = 0;
366         tdb_off_t rec_ptr;
367         struct tdb_record rec;
368
369         /* read in the hash top */
370         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
371                 return 0;
372
373         while (rec_ptr) {
374                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
375                         return 0;
376
377                 if (rec.magic == TDB_DEAD_MAGIC) {
378                         res += 1;
379                 }
380                 rec_ptr = rec.next;
381         }
382         return res;
383 }
384
385 /*
386  * Purge all DEAD records from a hash chain
387  */
388 int tdb_purge_dead(struct tdb_context *tdb, uint32_t hash)
389 {
390         int res = -1;
391         struct tdb_record rec;
392         tdb_off_t rec_ptr;
393
394         if (tdb_lock_nonblock(tdb, -1, F_WRLCK) == -1) {
395                 /*
396                  * Don't block the freelist if not strictly necessary
397                  */
398                 return -1;
399         }
400
401         /* read in the hash top */
402         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
403                 goto fail;
404
405         while (rec_ptr) {
406                 tdb_off_t next;
407
408                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
409                         goto fail;
410                 }
411
412                 next = rec.next;
413
414                 if (rec.magic == TDB_DEAD_MAGIC
415                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
416                         goto fail;
417                 }
418                 rec_ptr = next;
419         }
420         res = 0;
421  fail:
422         tdb_unlock(tdb, -1, F_WRLCK);
423         return res;
424 }
425
426 /* delete an entry in the database given a key */
427 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, uint32_t hash)
428 {
429         tdb_off_t rec_ptr;
430         struct tdb_record rec;
431         int ret;
432
433         rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec);
434         if (rec_ptr == 0) {
435                 return -1;
436         }
437
438         if (tdb->max_dead_records != 0) {
439
440                 uint32_t magic = TDB_DEAD_MAGIC;
441
442                 /*
443                  * Allow for some dead records per hash chain, mainly for
444                  * tdb's with a very high create/delete rate like locking.tdb.
445                  */
446
447                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
448                         /*
449                          * Don't let the per-chain freelist grow too large,
450                          * delete all existing dead records
451                          */
452                         tdb_purge_dead(tdb, hash);
453                 }
454
455                 /*
456                  * Just mark the record as dead.
457                  */
458                 ret = tdb_ofs_write(
459                         tdb, rec_ptr + offsetof(struct tdb_record, magic),
460                         &magic);
461         }
462         else {
463                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
464         }
465
466         if (ret == 0) {
467                 tdb_increment_seqnum(tdb);
468         }
469
470         if (tdb_unlock(tdb, BUCKET(hash), F_WRLCK) != 0)
471                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
472         return ret;
473 }
474
475 _PUBLIC_ int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
476 {
477         uint32_t hash = tdb->hash_fn(&key);
478         int ret;
479
480         ret = tdb_delete_hash(tdb, key, hash);
481         tdb_trace_1rec_ret(tdb, "tdb_delete", key, ret);
482         return ret;
483 }
484
485 /*
486  * See if we have a dead record around with enough space
487  */
488 tdb_off_t tdb_find_dead(struct tdb_context *tdb, uint32_t hash,
489                         struct tdb_record *r, tdb_len_t length,
490                         tdb_off_t *p_last_ptr)
491 {
492         tdb_off_t rec_ptr, last_ptr;
493         tdb_off_t best_rec_ptr = 0;
494         tdb_off_t best_last_ptr = 0;
495         struct tdb_record best = { .rec_len = UINT32_MAX };
496
497         length += sizeof(tdb_off_t); /* tailer */
498
499         last_ptr = TDB_HASH_TOP(hash);
500
501         /* read in the hash top */
502         if (tdb_ofs_read(tdb, last_ptr, &rec_ptr) == -1)
503                 return 0;
504
505         /* keep looking until we find the right record */
506         while (rec_ptr) {
507                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
508                         return 0;
509
510                 if (TDB_DEAD(r) && (r->rec_len >= length) &&
511                     (r->rec_len < best.rec_len)) {
512                         best_rec_ptr = rec_ptr;
513                         best_last_ptr = last_ptr;
514                         best = *r;
515                 }
516                 last_ptr = rec_ptr;
517                 rec_ptr = r->next;
518         }
519
520         if (best.rec_len == UINT32_MAX) {
521                 return 0;
522         }
523
524         *r = best;
525         *p_last_ptr = best_last_ptr;
526         return best_rec_ptr;
527 }
528
529 static int _tdb_store(struct tdb_context *tdb, TDB_DATA key,
530                        TDB_DATA dbuf, int flag, uint32_t hash)
531 {
532         struct tdb_record rec;
533         tdb_off_t rec_ptr;
534         tdb_len_t rec_len;
535         int ret = -1;
536
537         rec_len = key.dsize + dbuf.dsize;
538         if ((rec_len < key.dsize) || (rec_len < dbuf.dsize)) {
539                 tdb->ecode = TDB_ERR_OOM;
540                 goto fail;
541         }
542
543         /* check for it existing, on insert. */
544         if (flag == TDB_INSERT) {
545                 if (tdb_exists_hash(tdb, key, hash)) {
546                         tdb->ecode = TDB_ERR_EXISTS;
547                         goto fail;
548                 }
549         } else {
550                 /* first try in-place update, on modify or replace. */
551                 if (tdb_update_hash(tdb, key, hash, &dbuf, 1,
552                                     dbuf.dsize) == 0) {
553                         goto done;
554                 }
555                 if (tdb->ecode == TDB_ERR_NOEXIST &&
556                     flag == TDB_MODIFY) {
557                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
558                          we should fail the store */
559                         goto fail;
560                 }
561         }
562         /* reset the error code potentially set by the tdb_update_hash() */
563         tdb->ecode = TDB_SUCCESS;
564
565         /* delete any existing record - if it doesn't exist we don't
566            care.  Doing this first reduces fragmentation, and avoids
567            coalescing with `allocated' block before it's updated. */
568         if (flag != TDB_INSERT)
569                 tdb_delete_hash(tdb, key, hash);
570
571         /* we have to allocate some space */
572         rec_ptr = tdb_allocate(tdb, hash, rec_len, &rec);
573
574         if (rec_ptr == 0) {
575                 goto fail;
576         }
577
578         /* Read hash top into next ptr */
579         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
580                 goto fail;
581
582         rec.key_len = key.dsize;
583         rec.data_len = dbuf.dsize;
584         rec.full_hash = hash;
585         rec.magic = TDB_MAGIC;
586
587         /* write out and point the top of the hash chain at it */
588         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
589             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec),
590                                        key.dptr, key.dsize) == -1
591             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec)+key.dsize,
592                                        dbuf.dptr, dbuf.dsize) == -1
593             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
594                 /* Need to tdb_unallocate() here */
595                 goto fail;
596         }
597
598  done:
599         ret = 0;
600  fail:
601         if (ret == 0) {
602                 tdb_increment_seqnum(tdb);
603         }
604         return ret;
605 }
606
607 /* store an element in the database, replacing any existing element
608    with the same key
609
610    return 0 on success, -1 on failure
611 */
612 _PUBLIC_ int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
613 {
614         uint32_t hash;
615         int ret;
616
617         if (tdb->read_only || tdb->traverse_read) {
618                 tdb->ecode = TDB_ERR_RDONLY;
619                 tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, -1);
620                 return -1;
621         }
622
623         /* find which hash bucket it is in */
624         hash = tdb->hash_fn(&key);
625         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
626                 return -1;
627
628         ret = _tdb_store(tdb, key, dbuf, flag, hash);
629         tdb_trace_2rec_flag_ret(tdb, "tdb_store", key, dbuf, flag, ret);
630         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
631         return ret;
632 }
633
634 /* Append to an entry. Create if not exist. */
635 _PUBLIC_ int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
636 {
637         uint32_t hash;
638         TDB_DATA dbuf;
639         int ret = -1;
640
641         /* find which hash bucket it is in */
642         hash = tdb->hash_fn(&key);
643         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
644                 return -1;
645
646         dbuf = _tdb_fetch(tdb, key);
647
648         if (dbuf.dptr == NULL) {
649                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
650         } else {
651                 unsigned int new_len = dbuf.dsize + new_dbuf.dsize;
652                 unsigned char *new_dptr;
653
654                 /* realloc '0' is special: don't do that. */
655                 if (new_len == 0)
656                         new_len = 1;
657                 new_dptr = (unsigned char *)realloc(dbuf.dptr, new_len);
658                 if (new_dptr == NULL) {
659                         free(dbuf.dptr);
660                 }
661                 dbuf.dptr = new_dptr;
662         }
663
664         if (dbuf.dptr == NULL) {
665                 tdb->ecode = TDB_ERR_OOM;
666                 goto failed;
667         }
668
669         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
670         dbuf.dsize += new_dbuf.dsize;
671
672         ret = _tdb_store(tdb, key, dbuf, 0, hash);
673         tdb_trace_2rec_retrec(tdb, "tdb_append", key, new_dbuf, dbuf);
674
675 failed:
676         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
677         SAFE_FREE(dbuf.dptr);
678         return ret;
679 }
680
681
682 /*
683   return the name of the current tdb file
684   useful for external logging functions
685 */
686 _PUBLIC_ const char *tdb_name(struct tdb_context *tdb)
687 {
688         return tdb->name;
689 }
690
691 /*
692   return the underlying file descriptor being used by tdb, or -1
693   useful for external routines that want to check the device/inode
694   of the fd
695 */
696 _PUBLIC_ int tdb_fd(struct tdb_context *tdb)
697 {
698         return tdb->fd;
699 }
700
701 /*
702   return the current logging function
703   useful for external tdb routines that wish to log tdb errors
704 */
705 _PUBLIC_ tdb_log_func tdb_log_fn(struct tdb_context *tdb)
706 {
707         return tdb->log.log_fn;
708 }
709
710
711 /*
712   get the tdb sequence number. Only makes sense if the writers opened
713   with TDB_SEQNUM set. Note that this sequence number will wrap quite
714   quickly, so it should only be used for a 'has something changed'
715   test, not for code that relies on the count of the number of changes
716   made. If you want a counter then use a tdb record.
717
718   The aim of this sequence number is to allow for a very lightweight
719   test of a possible tdb change.
720 */
721 _PUBLIC_ int tdb_get_seqnum(struct tdb_context *tdb)
722 {
723         tdb_off_t seqnum=0;
724
725         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
726         return seqnum;
727 }
728
729 _PUBLIC_ int tdb_hash_size(struct tdb_context *tdb)
730 {
731         return tdb->hash_size;
732 }
733
734 _PUBLIC_ size_t tdb_map_size(struct tdb_context *tdb)
735 {
736         return tdb->map_size;
737 }
738
739 _PUBLIC_ int tdb_get_flags(struct tdb_context *tdb)
740 {
741         return tdb->flags;
742 }
743
744 _PUBLIC_ void tdb_add_flags(struct tdb_context *tdb, unsigned flags)
745 {
746         if ((flags & TDB_ALLOW_NESTING) &&
747             (flags & TDB_DISALLOW_NESTING)) {
748                 tdb->ecode = TDB_ERR_NESTING;
749                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_add_flags: "
750                         "allow_nesting and disallow_nesting are not allowed together!"));
751                 return;
752         }
753
754         if (flags & TDB_ALLOW_NESTING) {
755                 tdb->flags &= ~TDB_DISALLOW_NESTING;
756         }
757         if (flags & TDB_DISALLOW_NESTING) {
758                 tdb->flags &= ~TDB_ALLOW_NESTING;
759         }
760
761         tdb->flags |= flags;
762 }
763
764 _PUBLIC_ void tdb_remove_flags(struct tdb_context *tdb, unsigned flags)
765 {
766         if ((flags & TDB_ALLOW_NESTING) &&
767             (flags & TDB_DISALLOW_NESTING)) {
768                 tdb->ecode = TDB_ERR_NESTING;
769                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
770                         "allow_nesting and disallow_nesting are not allowed together!"));
771                 return;
772         }
773
774         if ((flags & TDB_NOLOCK) &&
775             (tdb->feature_flags & TDB_FEATURE_FLAG_MUTEX) &&
776             (tdb->mutexes == NULL)) {
777                 tdb->ecode = TDB_ERR_LOCK;
778                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_remove_flags: "
779                          "Can not remove NOLOCK flag on mutexed databases"));
780                 return;
781         }
782
783         if (flags & TDB_ALLOW_NESTING) {
784                 tdb->flags |= TDB_DISALLOW_NESTING;
785         }
786         if (flags & TDB_DISALLOW_NESTING) {
787                 tdb->flags |= TDB_ALLOW_NESTING;
788         }
789
790         tdb->flags &= ~flags;
791 }
792
793
794 /*
795   enable sequence number handling on an open tdb
796 */
797 _PUBLIC_ void tdb_enable_seqnum(struct tdb_context *tdb)
798 {
799         tdb->flags |= TDB_SEQNUM;
800 }
801
802
803 /*
804   add a region of the file to the freelist. Length is the size of the region in bytes,
805   which includes the free list header that needs to be added
806  */
807 static int tdb_free_region(struct tdb_context *tdb, tdb_off_t offset, ssize_t length)
808 {
809         struct tdb_record rec;
810         if (length <= sizeof(rec)) {
811                 /* the region is not worth adding */
812                 return 0;
813         }
814         if (length + offset > tdb->map_size) {
815                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: adding region beyond end of file\n"));
816                 return -1;
817         }
818         memset(&rec,'\0',sizeof(rec));
819         rec.rec_len = length - sizeof(rec);
820         if (tdb_free(tdb, offset, &rec) == -1) {
821                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_free_region: failed to add free record\n"));
822                 return -1;
823         }
824         return 0;
825 }
826
827 /*
828   wipe the entire database, deleting all records. This can be done
829   very fast by using a allrecord lock. The entire data portion of the
830   file becomes a single entry in the freelist.
831
832   This code carefully steps around the recovery area, leaving it alone
833  */
834 _PUBLIC_ int tdb_wipe_all(struct tdb_context *tdb)
835 {
836         uint32_t i;
837         tdb_off_t offset = 0;
838         ssize_t data_len;
839         tdb_off_t recovery_head;
840         tdb_len_t recovery_size = 0;
841
842         if (tdb_lockall(tdb) != 0) {
843                 return -1;
844         }
845
846         tdb_trace(tdb, "tdb_wipe_all");
847
848         /* see if the tdb has a recovery area, and remember its size
849            if so. We don't want to lose this as otherwise each
850            tdb_wipe_all() in a transaction will increase the size of
851            the tdb by the size of the recovery area */
852         if (tdb_ofs_read(tdb, TDB_RECOVERY_HEAD, &recovery_head) == -1) {
853                 TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery head\n"));
854                 goto failed;
855         }
856
857         if (recovery_head != 0) {
858                 struct tdb_record rec;
859                 if (tdb->methods->tdb_read(tdb, recovery_head, &rec, sizeof(rec), DOCONV()) == -1) {
860                         TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_wipe_all: failed to read recovery record\n"));
861                         return -1;
862                 }
863                 recovery_size = rec.rec_len + sizeof(rec);
864         }
865
866         /* wipe the hashes */
867         for (i=0;i<tdb->hash_size;i++) {
868                 if (tdb_ofs_write(tdb, TDB_HASH_TOP(i), &offset) == -1) {
869                         TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write hash %d\n", i));
870                         goto failed;
871                 }
872         }
873
874         /* wipe the freelist */
875         if (tdb_ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
876                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to write freelist\n"));
877                 goto failed;
878         }
879
880         /* add all the rest of the file to the freelist, possibly leaving a gap
881            for the recovery area */
882         if (recovery_size == 0) {
883                 /* the simple case - the whole file can be used as a freelist */
884                 data_len = (tdb->map_size - TDB_DATA_START(tdb->hash_size));
885                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
886                         goto failed;
887                 }
888         } else {
889                 /* we need to add two freelist entries - one on either
890                    side of the recovery area
891
892                    Note that we cannot shift the recovery area during
893                    this operation. Only the transaction.c code may
894                    move the recovery area or we risk subtle data
895                    corruption
896                 */
897                 data_len = (recovery_head - TDB_DATA_START(tdb->hash_size));
898                 if (tdb_free_region(tdb, TDB_DATA_START(tdb->hash_size), data_len) != 0) {
899                         goto failed;
900                 }
901                 /* and the 2nd free list entry after the recovery area - if any */
902                 data_len = tdb->map_size - (recovery_head+recovery_size);
903                 if (tdb_free_region(tdb, recovery_head+recovery_size, data_len) != 0) {
904                         goto failed;
905                 }
906         }
907
908         tdb_increment_seqnum_nonblock(tdb);
909
910         if (tdb_unlockall(tdb) != 0) {
911                 TDB_LOG((tdb, TDB_DEBUG_FATAL,"tdb_wipe_all: failed to unlock\n"));
912                 goto failed;
913         }
914
915         return 0;
916
917 failed:
918         tdb_unlockall(tdb);
919         return -1;
920 }
921
922 struct traverse_state {
923         bool error;
924         struct tdb_context *dest_db;
925 };
926
927 /*
928   traverse function for repacking
929  */
930 static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data)
931 {
932         struct traverse_state *state = (struct traverse_state *)private_data;
933         if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
934                 state->error = true;
935                 return -1;
936         }
937         return 0;
938 }
939
940 /*
941   repack a tdb
942  */
943 _PUBLIC_ int tdb_repack(struct tdb_context *tdb)
944 {
945         struct tdb_context *tmp_db;
946         struct traverse_state state;
947
948         tdb_trace(tdb, "tdb_repack");
949
950         if (tdb_transaction_start(tdb) != 0) {
951                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to start transaction\n"));
952                 return -1;
953         }
954
955         tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
956         if (tmp_db == NULL) {
957                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to create tmp_db\n"));
958                 tdb_transaction_cancel(tdb);
959                 return -1;
960         }
961
962         state.error = false;
963         state.dest_db = tmp_db;
964
965         if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
966                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying out\n"));
967                 tdb_transaction_cancel(tdb);
968                 tdb_close(tmp_db);
969                 return -1;
970         }
971
972         if (state.error) {
973                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during traversal\n"));
974                 tdb_transaction_cancel(tdb);
975                 tdb_close(tmp_db);
976                 return -1;
977         }
978
979         if (tdb_wipe_all(tdb) != 0) {
980                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to wipe database\n"));
981                 tdb_transaction_cancel(tdb);
982                 tdb_close(tmp_db);
983                 return -1;
984         }
985
986         state.error = false;
987         state.dest_db = tdb;
988
989         if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
990                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to traverse copying back\n"));
991                 tdb_transaction_cancel(tdb);
992                 tdb_close(tmp_db);
993                 return -1;
994         }
995
996         if (state.error) {
997                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Error during second traversal\n"));
998                 tdb_transaction_cancel(tdb);
999                 tdb_close(tmp_db);
1000                 return -1;
1001         }
1002
1003         tdb_close(tmp_db);
1004
1005         if (tdb_transaction_commit(tdb) != 0) {
1006                 TDB_LOG((tdb, TDB_DEBUG_FATAL, __location__ " Failed to commit\n"));
1007                 return -1;
1008         }
1009
1010         return 0;
1011 }
1012
1013 /* Even on files, we can get partial writes due to signals. */
1014 bool tdb_write_all(int fd, const void *buf, size_t count)
1015 {
1016         while (count) {
1017                 ssize_t ret;
1018                 ret = write(fd, buf, count);
1019                 if (ret < 0)
1020                         return false;
1021                 buf = (const char *)buf + ret;
1022                 count -= ret;
1023         }
1024         return true;
1025 }
1026
1027 bool tdb_add_off_t(tdb_off_t a, tdb_off_t b, tdb_off_t *pret)
1028 {
1029         tdb_off_t ret = a + b;
1030
1031         if ((ret < a) || (ret < b)) {
1032                 return false;
1033         }
1034         *pret = ret;
1035         return true;
1036 }
1037
1038 #ifdef TDB_TRACE
1039 static void tdb_trace_write(struct tdb_context *tdb, const char *str)
1040 {
1041         if (!tdb_write_all(tdb->tracefd, str, strlen(str))) {
1042                 close(tdb->tracefd);
1043                 tdb->tracefd = -1;
1044         }
1045 }
1046
1047 static void tdb_trace_start(struct tdb_context *tdb)
1048 {
1049         tdb_off_t seqnum=0;
1050         char msg[sizeof(tdb_off_t) * 4 + 1];
1051
1052         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
1053         snprintf(msg, sizeof(msg), "%u ", seqnum);
1054         tdb_trace_write(tdb, msg);
1055 }
1056
1057 static void tdb_trace_end(struct tdb_context *tdb)
1058 {
1059         tdb_trace_write(tdb, "\n");
1060 }
1061
1062 static void tdb_trace_end_ret(struct tdb_context *tdb, int ret)
1063 {
1064         char msg[sizeof(ret) * 4 + 4];
1065         snprintf(msg, sizeof(msg), " = %i\n", ret);
1066         tdb_trace_write(tdb, msg);
1067 }
1068
1069 static void tdb_trace_record(struct tdb_context *tdb, TDB_DATA rec)
1070 {
1071         char msg[20 + rec.dsize*2], *p;
1072         unsigned int i;
1073
1074         /* We differentiate zero-length records from non-existent ones. */
1075         if (rec.dptr == NULL) {
1076                 tdb_trace_write(tdb, " NULL");
1077                 return;
1078         }
1079
1080         /* snprintf here is purely cargo-cult programming. */
1081         p = msg;
1082         p += snprintf(p, sizeof(msg), " %zu:", rec.dsize);
1083         for (i = 0; i < rec.dsize; i++)
1084                 p += snprintf(p, 2, "%02x", rec.dptr[i]);
1085
1086         tdb_trace_write(tdb, msg);
1087 }
1088
1089 void tdb_trace(struct tdb_context *tdb, const char *op)
1090 {
1091         tdb_trace_start(tdb);
1092         tdb_trace_write(tdb, op);
1093         tdb_trace_end(tdb);
1094 }
1095
1096 void tdb_trace_seqnum(struct tdb_context *tdb, uint32_t seqnum, const char *op)
1097 {
1098         char msg[sizeof(tdb_off_t) * 4 + 1];
1099
1100         snprintf(msg, sizeof(msg), "%u ", seqnum);
1101         tdb_trace_write(tdb, msg);
1102         tdb_trace_write(tdb, op);
1103         tdb_trace_end(tdb);
1104 }
1105
1106 void tdb_trace_open(struct tdb_context *tdb, const char *op,
1107                     unsigned hash_size, unsigned tdb_flags, unsigned open_flags)
1108 {
1109         char msg[128];
1110
1111         snprintf(msg, sizeof(msg),
1112                  "%s %u 0x%x 0x%x", op, hash_size, tdb_flags, open_flags);
1113         tdb_trace_start(tdb);
1114         tdb_trace_write(tdb, msg);
1115         tdb_trace_end(tdb);
1116 }
1117
1118 void tdb_trace_ret(struct tdb_context *tdb, const char *op, int ret)
1119 {
1120         tdb_trace_start(tdb);
1121         tdb_trace_write(tdb, op);
1122         tdb_trace_end_ret(tdb, ret);
1123 }
1124
1125 void tdb_trace_retrec(struct tdb_context *tdb, const char *op, TDB_DATA ret)
1126 {
1127         tdb_trace_start(tdb);
1128         tdb_trace_write(tdb, op);
1129         tdb_trace_write(tdb, " =");
1130         tdb_trace_record(tdb, ret);
1131         tdb_trace_end(tdb);
1132 }
1133
1134 void tdb_trace_1rec(struct tdb_context *tdb, const char *op,
1135                     TDB_DATA rec)
1136 {
1137         tdb_trace_start(tdb);
1138         tdb_trace_write(tdb, op);
1139         tdb_trace_record(tdb, rec);
1140         tdb_trace_end(tdb);
1141 }
1142
1143 void tdb_trace_1rec_ret(struct tdb_context *tdb, const char *op,
1144                         TDB_DATA rec, int ret)
1145 {
1146         tdb_trace_start(tdb);
1147         tdb_trace_write(tdb, op);
1148         tdb_trace_record(tdb, rec);
1149         tdb_trace_end_ret(tdb, ret);
1150 }
1151
1152 void tdb_trace_1rec_retrec(struct tdb_context *tdb, const char *op,
1153                            TDB_DATA rec, TDB_DATA ret)
1154 {
1155         tdb_trace_start(tdb);
1156         tdb_trace_write(tdb, op);
1157         tdb_trace_record(tdb, rec);
1158         tdb_trace_write(tdb, " =");
1159         tdb_trace_record(tdb, ret);
1160         tdb_trace_end(tdb);
1161 }
1162
1163 void tdb_trace_2rec_flag_ret(struct tdb_context *tdb, const char *op,
1164                              TDB_DATA rec1, TDB_DATA rec2, unsigned flag,
1165                              int ret)
1166 {
1167         char msg[1 + sizeof(ret) * 4];
1168
1169         snprintf(msg, sizeof(msg), " %#x", flag);
1170         tdb_trace_start(tdb);
1171         tdb_trace_write(tdb, op);
1172         tdb_trace_record(tdb, rec1);
1173         tdb_trace_record(tdb, rec2);
1174         tdb_trace_write(tdb, msg);
1175         tdb_trace_end_ret(tdb, ret);
1176 }
1177
1178 void tdb_trace_2rec_retrec(struct tdb_context *tdb, const char *op,
1179                            TDB_DATA rec1, TDB_DATA rec2, TDB_DATA ret)
1180 {
1181         tdb_trace_start(tdb);
1182         tdb_trace_write(tdb, op);
1183         tdb_trace_record(tdb, rec1);
1184         tdb_trace_record(tdb, rec2);
1185         tdb_trace_write(tdb, " =");
1186         tdb_trace_record(tdb, ret);
1187         tdb_trace_end(tdb);
1188 }
1189 #endif