Added Shirish's client side caching policy change.
[tprouty/samba.git] / source / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23 #ifdef STANDALONE
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <string.h>
33 #include <fcntl.h>
34 #include <errno.h>
35 #include <sys/mman.h>
36 #include <sys/stat.h>
37 #include "tdb.h"
38 #include "spinlock.h"
39 #else
40 #include "includes.h"
41 #endif
42
43 #define TDB_MAGIC_FOOD "TDB file\n"
44 #define TDB_VERSION (0x26011967 + 6)
45 #define TDB_MAGIC (0x26011999U)
46 #define TDB_FREE_MAGIC (~TDB_MAGIC)
47 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
48 #define TDB_ALIGNMENT 4
49 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
50 #define DEFAULT_HASH_SIZE 131
51 #define TDB_PAGE_SIZE 0x2000
52 #define FREELIST_TOP (sizeof(struct tdb_header))
53 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
54 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
55 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
56 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
57 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
58
59 /* NB assumes there is a local variable called "tdb" that is the
60  * current context, also takes doubly-parenthesized print-style
61  * argument. */
62 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
63
64 /* lock offsets */
65 #define GLOBAL_LOCK 0
66 #define ACTIVE_LOCK 4
67
68 #ifndef MAP_FILE
69 #define MAP_FILE 0
70 #endif
71
72 #ifndef MAP_FAILED
73 #define MAP_FAILED ((void *)-1)
74 #endif
75
76 /* free memory if the pointer is valid and zero the pointer */
77 #ifndef SAFE_FREE
78 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
79 #endif
80
81 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
82 TDB_DATA tdb_null;
83
84 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
85 static TDB_CONTEXT *tdbs = NULL;
86
87 static int tdb_munmap(TDB_CONTEXT *tdb)
88 {
89         if (tdb->flags & TDB_INTERNAL)
90                 return 0;
91
92 #ifdef HAVE_MMAP
93         if (tdb->map_ptr) {
94                 int ret = munmap(tdb->map_ptr, tdb->map_size);
95                 if (ret != 0)
96                         return ret;
97         }
98 #endif
99         tdb->map_ptr = NULL;
100         return 0;
101 }
102
103 static void tdb_mmap(TDB_CONTEXT *tdb)
104 {
105         if (tdb->flags & TDB_INTERNAL)
106                 return;
107
108 #ifdef HAVE_MMAP
109         if (!(tdb->flags & TDB_NOMMAP)) {
110                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
111                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
112                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
113
114                 /*
115                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
116                  */
117
118                 if (tdb->map_ptr == MAP_FAILED) {
119                         tdb->map_ptr = NULL;
120                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
121                                  tdb->map_size, strerror(errno)));
122                 }
123         } else {
124                 tdb->map_ptr = NULL;
125         }
126 #else
127         tdb->map_ptr = NULL;
128 #endif
129 }
130
131 /* Endian conversion: we only ever deal with 4 byte quantities */
132 static void *convert(void *buf, u32 size)
133 {
134         u32 i, *p = buf;
135         for (i = 0; i < size / 4; i++)
136                 p[i] = TDB_BYTEREV(p[i]);
137         return buf;
138 }
139 #define DOCONV() (tdb->flags & TDB_CONVERT)
140 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
141
142 /* the body of the database is made of one list_struct for the free space
143    plus a separate data list for each hash value */
144 struct list_struct {
145         tdb_off next; /* offset of the next record in the list */
146         tdb_len rec_len; /* total byte length of record */
147         tdb_len key_len; /* byte length of key */
148         tdb_len data_len; /* byte length of data */
149         u32 full_hash; /* the full 32 bit hash of the key */
150         u32 magic;   /* try to catch errors */
151         /* the following union is implied:
152                 union {
153                         char record[rec_len];
154                         struct {
155                                 char key[key_len];
156                                 char data[data_len];
157                         }
158                         u32 totalsize; (tailer)
159                 }
160         */
161 };
162
163 /* a byte range locking function - return 0 on success
164    this functions locks/unlocks 1 byte at the specified offset.
165
166    On error, errno is also set so that errors are passed back properly
167    through tdb_open(). */
168 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
169                       int rw_type, int lck_type, int probe)
170 {
171         struct flock fl;
172
173         if (tdb->flags & TDB_NOLOCK)
174                 return 0;
175         if (tdb->read_only) {
176                 errno = EACCES;
177                 return -1;
178         }
179
180         fl.l_type = rw_type;
181         fl.l_whence = SEEK_SET;
182         fl.l_start = offset;
183         fl.l_len = 1;
184         fl.l_pid = 0;
185
186         if (fcntl(tdb->fd,lck_type,&fl) == -1) {
187                 if (!probe) {
188                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
189                                  tdb->fd, offset, rw_type, lck_type));
190                 }
191                 /* errno set by fcntl */
192                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
193         }
194         return 0;
195 }
196
197 /* lock a list in the database. list -1 is the alloc list */
198 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
199 {
200         if (list < -1 || list >= (int)tdb->header.hash_size) {
201                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
202                            list, ltype));
203                 return -1;
204         }
205         if (tdb->flags & TDB_NOLOCK)
206                 return 0;
207
208         /* Since fcntl locks don't nest, we do a lock for the first one,
209            and simply bump the count for future ones */
210         if (tdb->locked[list+1].count == 0) {
211                 if (!tdb->read_only && tdb->header.rwlocks) {
212                         if (tdb_spinlock(tdb, list, ltype)) {
213                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
214                                            list, ltype));
215                                 return -1;
216                         }
217                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
218                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
219                                            list, ltype, strerror(errno)));
220                         return -1;
221                 }
222                 tdb->locked[list+1].ltype = ltype;
223         }
224         tdb->locked[list+1].count++;
225         return 0;
226 }
227
228 /* unlock the database: returns void because it's too late for errors. */
229         /* changed to return int it may be interesting to know there
230            has been an error  --simo */
231 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
232 {
233         int ret = -1;
234
235         if (tdb->flags & TDB_NOLOCK)
236                 return 0;
237
238         /* Sanity checks */
239         if (list < -1 || list >= (int)tdb->header.hash_size) {
240                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
241                 return ret;
242         }
243
244         if (tdb->locked[list+1].count==0) {
245                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
246                 return ret;
247         }
248
249         if (tdb->locked[list+1].count == 1) {
250                 /* Down to last nested lock: unlock underneath */
251                 if (!tdb->read_only && tdb->header.rwlocks) {
252                         ret = tdb_spinunlock(tdb, list, ltype);
253                 } else {
254                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
255                 }
256         } else {
257                 ret = 0;
258         }
259         tdb->locked[list+1].count--;
260
261         if (ret)
262                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
263         return ret;
264 }
265
266 /* This is based on the hash algorithm from gdbm */
267 static u32 tdb_hash(TDB_DATA *key)
268 {
269         u32 value;      /* Used to compute the hash value.  */
270         u32   i;        /* Used to cycle through random values. */
271
272         /* Set the initial value from the key size. */
273         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
274                 value = (value + (key->dptr[i] << (i*5 % 24)));
275
276         return (1103515243 * value + 12345);  
277 }
278
279 /* check for an out of bounds access - if it is out of bounds then
280    see if the database has been expanded by someone else and expand
281    if necessary 
282    note that "len" is the minimum length needed for the db
283 */
284 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
285 {
286         struct stat st;
287         if (len <= tdb->map_size)
288                 return 0;
289         if (tdb->flags & TDB_INTERNAL) {
290                 if (!probe) {
291                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
292                                  (int)len, (int)tdb->map_size));
293                 }
294                 return TDB_ERRCODE(TDB_ERR_IO, -1);
295         }
296
297         if (fstat(tdb->fd, &st) == -1)
298                 return TDB_ERRCODE(TDB_ERR_IO, -1);
299
300         if (st.st_size < (size_t)len) {
301                 if (!probe) {
302                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
303                                  (int)len, (int)st.st_size));
304                 }
305                 return TDB_ERRCODE(TDB_ERR_IO, -1);
306         }
307
308         /* Unmap, update size, remap */
309         if (tdb_munmap(tdb) == -1)
310                 return TDB_ERRCODE(TDB_ERR_IO, -1);
311         tdb->map_size = st.st_size;
312         tdb_mmap(tdb);
313         return 0;
314 }
315
316 /* write a lump of data at a specified offset */
317 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
318 {
319         if (tdb_oob(tdb, off + len, 0) != 0)
320                 return -1;
321
322         if (tdb->map_ptr)
323                 memcpy(off + (char *)tdb->map_ptr, buf, len);
324 #ifdef HAVE_PWRITE
325         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
326 #else
327         else if (lseek(tdb->fd, off, SEEK_SET) != off
328                  || write(tdb->fd, buf, len) != (ssize_t)len) {
329 #endif
330                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
331                            off, len, strerror(errno)));
332                 return TDB_ERRCODE(TDB_ERR_IO, -1);
333         }
334         return 0;
335 }
336
337 /* read a lump of data at a specified offset, maybe convert */
338 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
339 {
340         if (tdb_oob(tdb, off + len, 0) != 0)
341                 return -1;
342
343         if (tdb->map_ptr)
344                 memcpy(buf, off + (char *)tdb->map_ptr, len);
345 #ifdef HAVE_PREAD
346         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
347 #else
348         else if (lseek(tdb->fd, off, SEEK_SET) != off
349                  || read(tdb->fd, buf, len) != (ssize_t)len) {
350 #endif
351                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
352                            off, len, strerror(errno)));
353                 return TDB_ERRCODE(TDB_ERR_IO, -1);
354         }
355         if (cv)
356                 convert(buf, len);
357         return 0;
358 }
359
360 /* read a lump of data, allocating the space for it */
361 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
362 {
363         char *buf;
364
365         if (!(buf = malloc(len))) {
366                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
367                            len, strerror(errno)));
368                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
369         }
370         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
371                 SAFE_FREE(buf);
372                 return NULL;
373         }
374         return buf;
375 }
376
377 /* read/write a tdb_off */
378 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
379 {
380         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
381 }
382 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
383 {
384         tdb_off off = *d;
385         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
386 }
387
388 /* read/write a record */
389 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
390 {
391         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
392                 return -1;
393         if (TDB_BAD_MAGIC(rec)) {
394                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
395                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
396         }
397         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
398 }
399 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
400 {
401         struct list_struct r = *rec;
402         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
403 }
404
405 /* read a freelist record and check for simple errors */
406 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
407 {
408         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
409                 return -1;
410         if (rec->magic != TDB_FREE_MAGIC) {
411                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
412                            rec->magic, off));
413                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
414         }
415         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
416                 return -1;
417         return 0;
418 }
419
420 /* update a record tailer (must hold allocation lock) */
421 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
422                          const struct list_struct *rec)
423 {
424         tdb_off totalsize;
425
426         /* Offset of tailer from record header */
427         totalsize = sizeof(*rec) + rec->rec_len;
428         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
429                          &totalsize);
430 }
431
432 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
433 {
434         struct list_struct rec;
435         tdb_off tailer_ofs, tailer;
436
437         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
438                 printf("ERROR: failed to read record at %u\n", offset);
439                 return 0;
440         }
441
442         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
443                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
444
445         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
446         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
447                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
448                 return rec.next;
449         }
450
451         if (tailer != rec.rec_len + sizeof(rec)) {
452                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
453                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
454         }
455         return rec.next;
456 }
457
458 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
459 {
460         tdb_off rec_ptr, top;
461
462         top = TDB_HASH_TOP(i);
463
464         if (tdb_lock(tdb, i, F_WRLCK) != 0)
465                 return -1;
466
467         if (ofs_read(tdb, top, &rec_ptr) == -1)
468                 return tdb_unlock(tdb, i, F_WRLCK);
469
470         if (rec_ptr)
471                 printf("hash=%d\n", i);
472
473         while (rec_ptr) {
474                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
475         }
476
477         return tdb_unlock(tdb, i, F_WRLCK);
478 }
479
480 void tdb_dump_all(TDB_CONTEXT *tdb)
481 {
482         int i;
483         for (i=0;i<tdb->header.hash_size;i++) {
484                 tdb_dump_chain(tdb, i);
485         }
486         printf("freelist:\n");
487         tdb_dump_chain(tdb, -1);
488 }
489
490 int tdb_printfreelist(TDB_CONTEXT *tdb)
491 {
492         int ret;
493         long total_free = 0;
494         tdb_off offset, rec_ptr;
495         struct list_struct rec;
496
497         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
498                 return ret;
499
500         offset = FREELIST_TOP;
501
502         /* read in the freelist top */
503         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
504                 return 0;
505         }
506
507         printf("freelist top=[0x%08x]\n", rec_ptr );
508         while (rec_ptr) {
509                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
510                         return -1;
511                 }
512
513                 if (rec.magic != TDB_FREE_MAGIC) {
514                         printf("bad magic 0x%08x in free list\n", rec.magic);
515                         return -1;
516                 }
517
518                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
519                 total_free += rec.rec_len;
520
521                 /* move to the next record */
522                 rec_ptr = rec.next;
523         }
524         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
525                (int)total_free);
526
527         return tdb_unlock(tdb, -1, F_WRLCK);
528 }
529
530 /* Remove an element from the freelist.  Must have alloc lock. */
531 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
532 {
533         tdb_off last_ptr, i;
534
535         /* read in the freelist top */
536         last_ptr = FREELIST_TOP;
537         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
538                 if (i == off) {
539                         /* We've found it! */
540                         return ofs_write(tdb, last_ptr, &next);
541                 }
542                 /* Follow chain (next offset is at start of record) */
543                 last_ptr = i;
544         }
545         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
546         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
547 }
548
549 /* Add an element into the freelist. Merge adjacent records if
550    neccessary. */
551 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
552 {
553         tdb_off right, left;
554
555         /* Allocation and tailer lock */
556         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
557                 return -1;
558
559         /* set an initial tailer, so if we fail we don't leave a bogus record */
560         if (update_tailer(tdb, offset, rec) != 0) {
561                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
562                 goto fail;
563         }
564
565         /* Look right first (I'm an Australian, dammit) */
566         right = offset + sizeof(*rec) + rec->rec_len;
567         if (right + sizeof(*rec) <= tdb->map_size) {
568                 struct list_struct r;
569
570                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
571                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
572                         goto left;
573                 }
574
575                 /* If it's free, expand to include it. */
576                 if (r.magic == TDB_FREE_MAGIC) {
577                         if (remove_from_freelist(tdb, right, r.next) == -1) {
578                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
579                                 goto left;
580                         }
581                         rec->rec_len += sizeof(r) + r.rec_len;
582                 }
583         }
584
585 left:
586         /* Look left */
587         left = offset - sizeof(tdb_off);
588         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
589                 struct list_struct l;
590                 tdb_off leftsize;
591
592                 /* Read in tailer and jump back to header */
593                 if (ofs_read(tdb, left, &leftsize) == -1) {
594                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
595                         goto update;
596                 }
597                 left = offset - leftsize;
598
599                 /* Now read in record */
600                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
601                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
602                         goto update;
603                 }
604
605                 /* If it's free, expand to include it. */
606                 if (l.magic == TDB_FREE_MAGIC) {
607                         if (remove_from_freelist(tdb, left, l.next) == -1) {
608                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
609                                 goto update;
610                         } else {
611                                 offset = left;
612                                 rec->rec_len += leftsize;
613                         }
614                 }
615         }
616
617 update:
618         if (update_tailer(tdb, offset, rec) == -1) {
619                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
620                 goto fail;
621         }
622
623         /* Now, prepend to free list */
624         rec->magic = TDB_FREE_MAGIC;
625
626         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
627             rec_write(tdb, offset, rec) == -1 ||
628             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
629                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
630                 goto fail;
631         }
632
633         /* And we're done. */
634         tdb_unlock(tdb, -1, F_WRLCK);
635         return 0;
636
637  fail:
638         tdb_unlock(tdb, -1, F_WRLCK);
639         return -1;
640 }
641
642
643 /* expand a file.  we prefer to use ftruncate, as that is what posix
644   says to use for mmap expansion */
645 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
646 {
647         char buf[1024];
648 #if HAVE_FTRUNCATE_EXTEND
649         if (ftruncate(tdb->fd, size+addition) != 0) {
650                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
651                            size+addition, strerror(errno)));
652                 return -1;
653         }
654 #else
655         char b = 0;
656
657 #ifdef HAVE_PWRITE
658         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
659 #else
660         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
661             write(tdb->fd, &b, 1) != 1) {
662 #endif
663                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
664                            size+addition, strerror(errno)));
665                 return -1;
666         }
667 #endif
668
669         /* now fill the file with something. This ensures that the file isn't sparse, which would be
670            very bad if we ran out of disk. This must be done with write, not via mmap */
671         memset(buf, 0x42, sizeof(buf));
672         while (addition) {
673                 int n = addition>sizeof(buf)?sizeof(buf):addition;
674 #ifdef HAVE_PWRITE
675                 int ret = pwrite(tdb->fd, buf, n, size);
676 #else
677                 int ret;
678                 if (lseek(tdb->fd, size, SEEK_SET) != size)
679                         return -1;
680                 ret = write(tdb->fd, buf, n);
681 #endif
682                 if (ret != n) {
683                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
684                                    n, strerror(errno)));
685                         return -1;
686                 }
687                 addition -= n;
688                 size += n;
689         }
690         return 0;
691 }
692
693
694 /* expand the database at least size bytes by expanding the underlying
695    file and doing the mmap again if necessary */
696 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
697 {
698         struct list_struct rec;
699         tdb_off offset;
700
701         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
702                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
703                 return -1;
704         }
705
706         /* must know about any previous expansions by another process */
707         tdb_oob(tdb, tdb->map_size + 1, 1);
708
709         /* always make room for at least 10 more records, and round
710            the database up to a multiple of TDB_PAGE_SIZE */
711         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
712
713         if (!(tdb->flags & TDB_INTERNAL))
714                 tdb_munmap(tdb);
715
716         /*
717          * We must ensure the file is unmapped before doing this
718          * to ensure consistency with systems like OpenBSD where
719          * writes and mmaps are not consistent.
720          */
721
722         /* expand the file itself */
723         if (!(tdb->flags & TDB_INTERNAL)) {
724                 if (expand_file(tdb, tdb->map_size, size) != 0)
725                         goto fail;
726         }
727
728         tdb->map_size += size;
729
730         if (tdb->flags & TDB_INTERNAL)
731                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
732         else {
733                 /*
734                  * We must ensure the file is remapped before adding the space
735                  * to ensure consistency with systems like OpenBSD where
736                  * writes and mmaps are not consistent.
737                  */
738
739                 /* We're ok if the mmap fails as we'll fallback to read/write */
740                 tdb_mmap(tdb);
741         }
742
743         /* form a new freelist record */
744         memset(&rec,'\0',sizeof(rec));
745         rec.rec_len = size - sizeof(rec);
746
747         /* link it into the free list */
748         offset = tdb->map_size - size;
749         if (tdb_free(tdb, offset, &rec) == -1)
750                 goto fail;
751
752         tdb_unlock(tdb, -1, F_WRLCK);
753         return 0;
754  fail:
755         tdb_unlock(tdb, -1, F_WRLCK);
756         return -1;
757 }
758
759 /* allocate some space from the free list. The offset returned points
760    to a unconnected list_struct within the database with room for at
761    least length bytes of total data
762
763    0 is returned if the space could not be allocated
764  */
765 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
766                             struct list_struct *rec)
767 {
768         tdb_off rec_ptr, last_ptr, newrec_ptr;
769         struct list_struct newrec;
770
771         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
772                 return 0;
773
774         /* Extra bytes required for tailer */
775         length += sizeof(tdb_off);
776
777  again:
778         last_ptr = FREELIST_TOP;
779
780         /* read in the freelist top */
781         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
782                 goto fail;
783
784         /* keep looking until we find a freelist record big enough */
785         while (rec_ptr) {
786                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
787                         goto fail;
788
789                 if (rec->rec_len >= length) {
790                         /* found it - now possibly split it up  */
791                         if (rec->rec_len > length + MIN_REC_SIZE) {
792                                 /* Length of left piece */
793                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
794
795                                 /* Right piece to go on free list */
796                                 newrec.rec_len = rec->rec_len
797                                         - (sizeof(*rec) + length);
798                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
799
800                                 /* And left record is shortened */
801                                 rec->rec_len = length;
802                         } else
803                                 newrec_ptr = 0;
804
805                         /* Remove allocated record from the free list */
806                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
807                                 goto fail;
808
809                         /* Update header: do this before we drop alloc
810                            lock, otherwise tdb_free() might try to
811                            merge with us, thinking we're free.
812                            (Thanks Jeremy Allison). */
813                         rec->magic = TDB_MAGIC;
814                         if (rec_write(tdb, rec_ptr, rec) == -1)
815                                 goto fail;
816
817                         /* Did we create new block? */
818                         if (newrec_ptr) {
819                                 /* Update allocated record tailer (we
820                                    shortened it). */
821                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
822                                         goto fail;
823
824                                 /* Free new record */
825                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
826                                         goto fail;
827                         }
828
829                         /* all done - return the new record offset */
830                         tdb_unlock(tdb, -1, F_WRLCK);
831                         return rec_ptr;
832                 }
833                 /* move to the next record */
834                 last_ptr = rec_ptr;
835                 rec_ptr = rec->next;
836         }
837         /* we didn't find enough space. See if we can expand the
838            database and if we can then try again */
839         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
840                 goto again;
841  fail:
842         tdb_unlock(tdb, -1, F_WRLCK);
843         return 0;
844 }
845
846 /* initialise a new database with a specified hash size */
847 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
848 {
849         struct tdb_header *newdb;
850         int size, ret = -1;
851
852         /* We make it up in memory, then write it out if not internal */
853         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
854         if (!(newdb = calloc(size, 1)))
855                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
856
857         /* Fill in the header */
858         newdb->version = TDB_VERSION;
859         newdb->hash_size = hash_size;
860 #ifdef USE_SPINLOCKS
861         newdb->rwlocks = size;
862 #endif
863         if (tdb->flags & TDB_INTERNAL) {
864                 tdb->map_size = size;
865                 tdb->map_ptr = (char *)newdb;
866                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
867                 /* Convert the `ondisk' version if asked. */
868                 CONVERT(*newdb);
869                 return 0;
870         }
871         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
872                 goto fail;
873
874         if (ftruncate(tdb->fd, 0) == -1)
875                 goto fail;
876
877         /* This creates an endian-converted header, as if read from disk */
878         CONVERT(*newdb);
879         memcpy(&tdb->header, newdb, sizeof(tdb->header));
880         /* Don't endian-convert the magic food! */
881         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
882         if (write(tdb->fd, newdb, size) != size)
883                 ret = -1;
884         else
885                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
886
887   fail:
888         SAFE_FREE(newdb);
889         return ret;
890 }
891
892 /* Returns 0 on fail.  On success, return offset of record, and fills
893    in rec */
894 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
895                         struct list_struct *r)
896 {
897         tdb_off rec_ptr;
898         
899         /* read in the hash top */
900         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
901                 return 0;
902
903         /* keep looking until we find the right record */
904         while (rec_ptr) {
905                 if (rec_read(tdb, rec_ptr, r) == -1)
906                         return 0;
907
908                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
909                         char *k;
910                         /* a very likely hit - read the key */
911                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
912                                            r->key_len);
913                         if (!k)
914                                 return 0;
915
916                         if (memcmp(key.dptr, k, key.dsize) == 0) {
917                                 SAFE_FREE(k);
918                                 return rec_ptr;
919                         }
920                         SAFE_FREE(k);
921                 }
922                 rec_ptr = r->next;
923         }
924         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
925 }
926
927 /* If they do lockkeys, check that this hash is one they locked */
928 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
929 {
930         u32 i;
931         if (!tdb->lockedkeys)
932                 return 1;
933         for (i = 0; i < tdb->lockedkeys[0]; i++)
934                 if (tdb->lockedkeys[i+1] == hash)
935                         return 1;
936         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
937 }
938
939 /* As tdb_find, but if you succeed, keep the lock */
940 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
941                              struct list_struct *rec)
942 {
943         u32 hash, rec_ptr;
944
945         hash = tdb_hash(&key);
946         if (!tdb_keylocked(tdb, hash))
947                 return 0;
948         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
949                 return 0;
950         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
951                 tdb_unlock(tdb, BUCKET(hash), locktype);
952         return rec_ptr;
953 }
954
955 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
956 {
957         return tdb->ecode;
958 }
959
960 static struct tdb_errname {
961         enum TDB_ERROR ecode; const char *estring;
962 } emap[] = { {TDB_SUCCESS, "Success"},
963              {TDB_ERR_CORRUPT, "Corrupt database"},
964              {TDB_ERR_IO, "IO Error"},
965              {TDB_ERR_LOCK, "Locking error"},
966              {TDB_ERR_OOM, "Out of memory"},
967              {TDB_ERR_EXISTS, "Record exists"},
968              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
969              {TDB_ERR_NOEXIST, "Record does not exist"} };
970
971 /* Error string for the last tdb error */
972 const char *tdb_errorstr(TDB_CONTEXT *tdb)
973 {
974         u32 i;
975         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
976                 if (tdb->ecode == emap[i].ecode)
977                         return emap[i].estring;
978         return "Invalid error code";
979 }
980
981 /* update an entry in place - this only works if the new data size
982    is <= the old data size and the key exists.
983    on failure return -1
984 */
985 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
986 {
987         struct list_struct rec;
988         tdb_off rec_ptr;
989         int ret = -1;
990
991         /* find entry */
992         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
993                 return -1;
994
995         /* must be long enough key, data and tailer */
996         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
997                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
998                 goto out;
999         }
1000
1001         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1002                       dbuf.dptr, dbuf.dsize) == -1)
1003                 goto out;
1004
1005         if (dbuf.dsize != rec.data_len) {
1006                 /* update size */
1007                 rec.data_len = dbuf.dsize;
1008                 ret = rec_write(tdb, rec_ptr, &rec);
1009         } else
1010                 ret = 0;
1011  out:
1012         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1013         return ret;
1014 }
1015
1016 /* find an entry in the database given a key */
1017 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1018 {
1019         tdb_off rec_ptr;
1020         struct list_struct rec;
1021         TDB_DATA ret;
1022
1023         /* find which hash bucket it is in */
1024         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1025                 return tdb_null;
1026
1027         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1028                                   rec.data_len);
1029         ret.dsize = rec.data_len;
1030         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1031         return ret;
1032 }
1033
1034 /* check if an entry in the database exists 
1035
1036    note that 1 is returned if the key is found and 0 is returned if not found
1037    this doesn't match the conventions in the rest of this module, but is
1038    compatible with gdbm
1039 */
1040 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1041 {
1042         struct list_struct rec;
1043         
1044         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1045                 return 0;
1046         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1047         return 1;
1048 }
1049
1050 /* record lock stops delete underneath */
1051 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1052 {
1053         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1054 }
1055 /*
1056   Write locks override our own fcntl readlocks, so check it here.
1057   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1058   an error to fail to get the lock here.
1059 */
1060  
1061 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1062 {
1063         struct tdb_traverse_lock *i;
1064         for (i = &tdb->travlocks; i; i = i->next)
1065                 if (i->off == off)
1066                         return -1;
1067         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1068 }
1069
1070 /*
1071   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1072   an error to fail to get the lock here.
1073 */
1074
1075 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1076 {
1077         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1078 }
1079 /* fcntl locks don't stack: avoid unlocking someone else's */
1080 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1081 {
1082         struct tdb_traverse_lock *i;
1083         u32 count = 0;
1084
1085         if (off == 0)
1086                 return 0;
1087         for (i = &tdb->travlocks; i; i = i->next)
1088                 if (i->off == off)
1089                         count++;
1090         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1091 }
1092
1093 /* actually delete an entry in the database given the offset */
1094 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1095 {
1096         tdb_off last_ptr, i;
1097         struct list_struct lastrec;
1098
1099         if (tdb->read_only) return -1;
1100
1101         if (write_lock_record(tdb, rec_ptr) == -1) {
1102                 /* Someone traversing here: mark it as dead */
1103                 rec->magic = TDB_DEAD_MAGIC;
1104                 return rec_write(tdb, rec_ptr, rec);
1105         }
1106         if (write_unlock_record(tdb, rec_ptr) != 0)
1107                 return -1;
1108
1109         /* find previous record in hash chain */
1110         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1111                 return -1;
1112         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1113                 if (rec_read(tdb, i, &lastrec) == -1)
1114                         return -1;
1115
1116         /* unlink it: next ptr is at start of record. */
1117         if (last_ptr == 0)
1118                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1119         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1120                 return -1;
1121
1122         /* recover the space */
1123         if (tdb_free(tdb, rec_ptr, rec) == -1)
1124                 return -1;
1125         return 0;
1126 }
1127
1128 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1129 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1130                          struct list_struct *rec)
1131 {
1132         int want_next = (tlock->off != 0);
1133
1134         /* No traversal allows if you've called tdb_lockkeys() */
1135         if (tdb->lockedkeys)
1136                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1137
1138         /* Lock each chain from the start one. */
1139         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1140                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1141                         return -1;
1142
1143                 /* No previous record?  Start at top of chain. */
1144                 if (!tlock->off) {
1145                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1146                                      &tlock->off) == -1)
1147                                 goto fail;
1148                 } else {
1149                         /* Otherwise unlock the previous record. */
1150                         if (unlock_record(tdb, tlock->off) != 0)
1151                                 goto fail;
1152                 }
1153
1154                 if (want_next) {
1155                         /* We have offset of old record: grab next */
1156                         if (rec_read(tdb, tlock->off, rec) == -1)
1157                                 goto fail;
1158                         tlock->off = rec->next;
1159                 }
1160
1161                 /* Iterate through chain */
1162                 while( tlock->off) {
1163                         tdb_off current;
1164                         if (rec_read(tdb, tlock->off, rec) == -1)
1165                                 goto fail;
1166                         if (!TDB_DEAD(rec)) {
1167                                 /* Woohoo: we found one! */
1168                                 if (lock_record(tdb, tlock->off) != 0)
1169                                         goto fail;
1170                                 return tlock->off;
1171                         }
1172                         /* Try to clean dead ones from old traverses */
1173                         current = tlock->off;
1174                         tlock->off = rec->next;
1175                         if (do_delete(tdb, current, rec) != 0)
1176                                 goto fail;
1177                 }
1178                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1179                 want_next = 0;
1180         }
1181         /* We finished iteration without finding anything */
1182         return TDB_ERRCODE(TDB_SUCCESS, 0);
1183
1184  fail:
1185         tlock->off = 0;
1186         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1187                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1188         return -1;
1189 }
1190
1191 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1192    return -1 on error or the record count traversed
1193    if fn is NULL then it is not called
1194    a non-zero return value from fn() indicates that the traversal should stop
1195   */
1196 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1197 {
1198         TDB_DATA key, dbuf;
1199         struct list_struct rec;
1200         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1201         int ret, count = 0;
1202
1203         /* This was in the initializaton, above, but the IRIX compiler
1204          * did not like it.  crh
1205          */
1206         tl.next = tdb->travlocks.next;
1207
1208         /* fcntl locks don't stack: beware traverse inside traverse */
1209         tdb->travlocks.next = &tl;
1210
1211         /* tdb_next_lock places locks on the record returned, and its chain */
1212         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1213                 count++;
1214                 /* now read the full record */
1215                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1216                                           rec.key_len + rec.data_len);
1217                 if (!key.dptr) {
1218                         ret = -1;
1219                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1220                                 goto out;
1221                         if (unlock_record(tdb, tl.off) != 0)
1222                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1223                         goto out;
1224                 }
1225                 key.dsize = rec.key_len;
1226                 dbuf.dptr = key.dptr + rec.key_len;
1227                 dbuf.dsize = rec.data_len;
1228
1229                 /* Drop chain lock, call out */
1230                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1231                         ret = -1;
1232                         goto out;
1233                 }
1234                 if (fn && fn(tdb, key, dbuf, state)) {
1235                         /* They want us to terminate traversal */
1236                         ret = count;
1237                         if (unlock_record(tdb, tl.off) != 0) {
1238                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1239                                 ret = -1;
1240                         }
1241                         tdb->travlocks.next = tl.next;
1242                         SAFE_FREE(key.dptr);
1243                         return count;
1244                 }
1245                 SAFE_FREE(key.dptr);
1246         }
1247 out:
1248         tdb->travlocks.next = tl.next;
1249         if (ret < 0)
1250                 return -1;
1251         else
1252                 return count;
1253 }
1254
1255 /* find the first entry in the database and return its key */
1256 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1257 {
1258         TDB_DATA key;
1259         struct list_struct rec;
1260
1261         /* release any old lock */
1262         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1263                 return tdb_null;
1264         tdb->travlocks.off = tdb->travlocks.hash = 0;
1265
1266         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1267                 return tdb_null;
1268         /* now read the key */
1269         key.dsize = rec.key_len;
1270         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1271         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1272                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1273         return key;
1274 }
1275
1276 /* find the next entry in the database, returning its key */
1277 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1278 {
1279         u32 oldhash;
1280         TDB_DATA key = tdb_null;
1281         struct list_struct rec;
1282         char *k = NULL;
1283
1284         /* Is locked key the old key?  If so, traverse will be reliable. */
1285         if (tdb->travlocks.off) {
1286                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1287                         return tdb_null;
1288                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1289                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1290                                             rec.key_len))
1291                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1292                         /* No, it wasn't: unlock it and start from scratch */
1293                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1294                                 return tdb_null;
1295                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1296                                 return tdb_null;
1297                         tdb->travlocks.off = 0;
1298                 }
1299
1300                 SAFE_FREE(k);
1301         }
1302
1303         if (!tdb->travlocks.off) {
1304                 /* No previous element: do normal find, and lock record */
1305                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1306                 if (!tdb->travlocks.off)
1307                         return tdb_null;
1308                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1309                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1310                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1311                         return tdb_null;
1312                 }
1313         }
1314         oldhash = tdb->travlocks.hash;
1315
1316         /* Grab next record: locks chain and returned record,
1317            unlocks old record */
1318         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1319                 key.dsize = rec.key_len;
1320                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1321                                           key.dsize);
1322                 /* Unlock the chain of this new record */
1323                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1324                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1325         }
1326         /* Unlock the chain of old record */
1327         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1328                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1329         return key;
1330 }
1331
1332 /* delete an entry in the database given a key */
1333 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1334 {
1335         tdb_off rec_ptr;
1336         struct list_struct rec;
1337         int ret;
1338
1339         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1340                 return -1;
1341         ret = do_delete(tdb, rec_ptr, &rec);
1342         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1343                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1344         return ret;
1345 }
1346
1347 /* store an element in the database, replacing any existing element
1348    with the same key 
1349
1350    return 0 on success, -1 on failure
1351 */
1352 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1353 {
1354         struct list_struct rec;
1355         u32 hash;
1356         tdb_off rec_ptr;
1357         char *p = NULL;
1358         int ret = 0;
1359
1360         /* find which hash bucket it is in */
1361         hash = tdb_hash(&key);
1362         if (!tdb_keylocked(tdb, hash))
1363                 return -1;
1364         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1365                 return -1;
1366
1367         /* check for it existing, on insert. */
1368         if (flag == TDB_INSERT) {
1369                 if (tdb_exists(tdb, key)) {
1370                         tdb->ecode = TDB_ERR_EXISTS;
1371                         goto fail;
1372                 }
1373         } else {
1374                 /* first try in-place update, on modify or replace. */
1375                 if (tdb_update(tdb, key, dbuf) == 0)
1376                         goto out;
1377                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1378                         goto fail;
1379         }
1380         /* reset the error code potentially set by the tdb_update() */
1381         tdb->ecode = TDB_SUCCESS;
1382
1383         /* delete any existing record - if it doesn't exist we don't
1384            care.  Doing this first reduces fragmentation, and avoids
1385            coalescing with `allocated' block before it's updated. */
1386         if (flag != TDB_INSERT)
1387                 tdb_delete(tdb, key);
1388
1389         /* Copy key+value *before* allocating free space in case malloc
1390            fails and we are left with a dead spot in the tdb. */
1391
1392         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1393                 tdb->ecode = TDB_ERR_OOM;
1394                 goto fail;
1395         }
1396
1397         memcpy(p, key.dptr, key.dsize);
1398         memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1399
1400         /* now we're into insert / modify / replace of a record which
1401          * we know could not be optimised by an in-place store (for
1402          * various reasons).  */
1403         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1404                 goto fail;
1405
1406         /* Read hash top into next ptr */
1407         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1408                 goto fail;
1409
1410         rec.key_len = key.dsize;
1411         rec.data_len = dbuf.dsize;
1412         rec.full_hash = hash;
1413         rec.magic = TDB_MAGIC;
1414
1415         /* write out and point the top of the hash chain at it */
1416         if (rec_write(tdb, rec_ptr, &rec) == -1
1417             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1418             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1419                 /* Need to tdb_unallocate() here */
1420                 goto fail;
1421         }
1422  out:
1423         SAFE_FREE(p); 
1424         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1425         return ret;
1426 fail:
1427         ret = -1;
1428         goto out;
1429 }
1430
1431 static int tdb_already_open(dev_t device,
1432                             ino_t ino)
1433 {
1434         TDB_CONTEXT *i;
1435         
1436         for (i = tdbs; i; i = i->next) {
1437                 if (i->device == device && i->inode == ino) {
1438                         return 1;
1439                 }
1440         }
1441
1442         return 0;
1443 }
1444
1445 /* open the database, creating it if necessary 
1446
1447    The open_flags and mode are passed straight to the open call on the
1448    database file. A flags value of O_WRONLY is invalid. The hash size
1449    is advisory, use zero for a default value.
1450
1451    Return is NULL on error, in which case errno is also set.  Don't 
1452    try to call tdb_error or tdb_errname, just do strerror(errno).
1453
1454    @param name may be NULL for internal databases. */
1455 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1456                       int open_flags, mode_t mode)
1457 {
1458         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1459 }
1460
1461
1462 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1463                          int open_flags, mode_t mode,
1464                          tdb_log_func log_fn)
1465 {
1466         TDB_CONTEXT *tdb;
1467         struct stat st;
1468         int rev = 0, locked;
1469
1470         if (!(tdb = calloc(1, sizeof *tdb))) {
1471                 /* Can't log this */
1472                 errno = ENOMEM;
1473                 goto fail;
1474         }
1475         tdb->fd = -1;
1476         tdb->name = NULL;
1477         tdb->map_ptr = NULL;
1478         tdb->lockedkeys = NULL;
1479         tdb->flags = tdb_flags;
1480         tdb->open_flags = open_flags;
1481         tdb->log_fn = log_fn;
1482         
1483         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1484                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1485                          name));
1486                 errno = EINVAL;
1487                 goto fail;
1488         }
1489         
1490         if (hash_size == 0)
1491                 hash_size = DEFAULT_HASH_SIZE;
1492         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1493                 tdb->read_only = 1;
1494                 /* read only databases don't do locking or clear if first */
1495                 tdb->flags |= TDB_NOLOCK;
1496                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1497         }
1498
1499         /* internal databases don't mmap or lock, and start off cleared */
1500         if (tdb->flags & TDB_INTERNAL) {
1501                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1502                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1503                 if (tdb_new_database(tdb, hash_size) != 0) {
1504                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1505                         goto fail;
1506                 }
1507                 goto internal;
1508         }
1509
1510         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1511                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1512                          name, strerror(errno)));
1513                 goto fail;      /* errno set by open(2) */
1514         }
1515
1516         /* ensure there is only one process initialising at once */
1517         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1518                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1519                          name, strerror(errno)));
1520                 goto fail;      /* errno set by tdb_brlock */
1521         }
1522
1523         /* we need to zero database if we are the only one with it open */
1524         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1525             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1526                 open_flags |= O_CREAT;
1527                 if (ftruncate(tdb->fd, 0) == -1) {
1528                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1529                                  "failed to truncate %s: %s\n",
1530                                  name, strerror(errno)));
1531                         goto fail; /* errno set by ftruncate */
1532                 }
1533         }
1534
1535         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1536             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1537             || (tdb->header.version != TDB_VERSION
1538                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1539                 /* its not a valid database - possibly initialise it */
1540                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1541                         errno = EIO; /* ie bad format or something */
1542                         goto fail;
1543                 }
1544                 rev = (tdb->flags & TDB_CONVERT);
1545         }
1546         if (!rev)
1547                 tdb->flags &= ~TDB_CONVERT;
1548         else {
1549                 tdb->flags |= TDB_CONVERT;
1550                 convert(&tdb->header, sizeof(tdb->header));
1551         }
1552         if (fstat(tdb->fd, &st) == -1)
1553                 goto fail;
1554
1555         /* Is it already in the open list?  If so, fail. */
1556         if (tdb_already_open(st.st_dev, st.st_ino)) {
1557                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1558                          "%s (%d,%d) is already open in this process\n",
1559                          name, st.st_dev, st.st_ino));
1560                 errno = EBUSY;
1561                 goto fail;
1562         }
1563
1564         if (!(tdb->name = (char *)strdup(name))) {
1565                 errno = ENOMEM;
1566                 goto fail;
1567         }
1568
1569         tdb->map_size = st.st_size;
1570         tdb->device = st.st_dev;
1571         tdb->inode = st.st_ino;
1572         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1573         if (!tdb->locked) {
1574                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1575                          "failed to allocate lock structure for %s\n",
1576                          name));
1577                 errno = ENOMEM;
1578                 goto fail;
1579         }
1580         tdb_mmap(tdb);
1581         if (locked) {
1582                 if (!tdb->read_only)
1583                         if (tdb_clear_spinlocks(tdb) != 0) {
1584                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1585                                 "failed to clear spinlock\n"));
1586                                 goto fail;
1587                         }
1588                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1589                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1590                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1591                                  name, strerror(errno)));
1592                         goto fail;
1593                 }
1594         }
1595         /* leave this lock in place to indicate it's in use */
1596         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1597                 goto fail;
1598
1599  internal:
1600         /* Internal (memory-only) databases skip all the code above to
1601          * do with disk files, and resume here by releasing their
1602          * global lock and hooking into the active list. */
1603         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1604                 goto fail;
1605         tdb->next = tdbs;
1606         tdbs = tdb;
1607         return tdb;
1608
1609  fail:
1610         { int save_errno = errno;
1611
1612         if (!tdb)
1613                 return NULL;
1614         
1615         if (tdb->map_ptr) {
1616                 if (tdb->flags & TDB_INTERNAL)
1617                         SAFE_FREE(tdb->map_ptr);
1618                 else
1619                         tdb_munmap(tdb);
1620         }
1621         SAFE_FREE(tdb->name);
1622         if (tdb->fd != -1)
1623                 if (close(tdb->fd) != 0)
1624                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1625         SAFE_FREE(tdb->locked);
1626         SAFE_FREE(tdb);
1627         errno = save_errno;
1628         return NULL;
1629         }
1630 }
1631
1632 /* close a database */
1633 int tdb_close(TDB_CONTEXT *tdb)
1634 {
1635         TDB_CONTEXT **i;
1636         int ret = 0;
1637
1638         if (tdb->map_ptr) {
1639                 if (tdb->flags & TDB_INTERNAL)
1640                         SAFE_FREE(tdb->map_ptr);
1641                 else
1642                         tdb_munmap(tdb);
1643         }
1644         SAFE_FREE(tdb->name);
1645         if (tdb->fd != -1)
1646                 ret = close(tdb->fd);
1647         SAFE_FREE(tdb->locked);
1648         SAFE_FREE(tdb->lockedkeys);
1649
1650         /* Remove from contexts list */
1651         for (i = &tdbs; *i; i = &(*i)->next) {
1652                 if (*i == tdb) {
1653                         *i = tdb->next;
1654                         break;
1655                 }
1656         }
1657
1658         memset(tdb, 0, sizeof(*tdb));
1659         SAFE_FREE(tdb);
1660
1661         return ret;
1662 }
1663
1664 /* lock/unlock entire database */
1665 int tdb_lockall(TDB_CONTEXT *tdb)
1666 {
1667         u32 i;
1668
1669         /* There are no locks on read-only dbs */
1670         if (tdb->read_only)
1671                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1672         if (tdb->lockedkeys)
1673                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1674         for (i = 0; i < tdb->header.hash_size; i++) 
1675                 if (tdb_lock(tdb, i, F_WRLCK))
1676                         break;
1677
1678         /* If error, release locks we have... */
1679         if (i < tdb->header.hash_size) {
1680                 u32 j;
1681
1682                 for ( j = 0; j < i; j++)
1683                         tdb_unlock(tdb, j, F_WRLCK);
1684                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1685         }
1686
1687         return 0;
1688 }
1689 void tdb_unlockall(TDB_CONTEXT *tdb)
1690 {
1691         u32 i;
1692         for (i=0; i < tdb->header.hash_size; i++)
1693                 tdb_unlock(tdb, i, F_WRLCK);
1694 }
1695
1696 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1697 {
1698         u32 i, j, hash;
1699
1700         /* Can't lock more keys if already locked */
1701         if (tdb->lockedkeys)
1702                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1703         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1704                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1705         /* First number in array is # keys */
1706         tdb->lockedkeys[0] = number;
1707
1708         /* Insertion sort by bucket */
1709         for (i = 0; i < number; i++) {
1710                 hash = tdb_hash(&keys[i]);
1711                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1712                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1713                 tdb->lockedkeys[j+1] = hash;
1714         }
1715         /* Finally, lock in order */
1716         for (i = 0; i < number; i++)
1717                 if (tdb_lock(tdb, i, F_WRLCK))
1718                         break;
1719
1720         /* If error, release locks we have... */
1721         if (i < number) {
1722                 for ( j = 0; j < i; j++)
1723                         tdb_unlock(tdb, j, F_WRLCK);
1724                 SAFE_FREE(tdb->lockedkeys);
1725                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1726         }
1727         return 0;
1728 }
1729
1730 /* Unlock the keys previously locked by tdb_lockkeys() */
1731 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1732 {
1733         u32 i;
1734         for (i = 0; i < tdb->lockedkeys[0]; i++)
1735                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1736         SAFE_FREE(tdb->lockedkeys);
1737 }
1738
1739 /* lock/unlock one hash chain. This is meant to be used to reduce
1740    contention - it cannot guarantee how many records will be locked */
1741 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1742 {
1743         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1744 }
1745
1746 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1747 {
1748         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1749 }
1750
1751
1752 /* register a loging function */
1753 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1754 {
1755         tdb->log_fn = fn;
1756 }
1757
1758
1759 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1760    seek pointer from our parent and to re-establish locks */
1761 int tdb_reopen(TDB_CONTEXT *tdb)
1762 {
1763         struct stat st;
1764
1765         if (tdb_munmap(tdb) != 0) {
1766                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
1767                 goto fail;
1768         }
1769         if (close(tdb->fd) != 0)
1770                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
1771         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
1772         if (tdb->fd == -1) {
1773                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
1774                 goto fail;
1775         }
1776         if (fstat(tdb->fd, &st) != 0) {
1777                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
1778                 goto fail;
1779         }
1780         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
1781                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
1782                 goto fail;
1783         }
1784         tdb_mmap(tdb);
1785         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
1786                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
1787                 goto fail;
1788         }
1789
1790         return 0;
1791
1792 fail:
1793         tdb_close(tdb);
1794         return -1;
1795 }
1796
1797 /* reopen all tdb's */
1798 int tdb_reopen_all(void)
1799 {
1800         TDB_CONTEXT *tdb;
1801
1802         for (tdb=tdbs; tdb; tdb = tdb->next) {
1803                 if (tdb_reopen(tdb) != 0) return -1;
1804         }
1805
1806         return 0;
1807 }