don't set page size by default
[tridge/junkcode.git] / tsums / tdb.c
1 /* 
2    Unix SMB/Netbios implementation.
3    Version 3.0
4    Samba database functions
5    Copyright (C) Andrew Tridgell              1999-2000
6    Copyright (C) Luke Kenneth Casson Leighton      2000
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24 #ifdef STANDALONE
25 #if HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <sys/mman.h>
37 #include <sys/stat.h>
38 #include "tdb.h"
39 #include "spinlock.h"
40 #else
41 #include "includes.h"
42 #endif
43
44 #define TDB_MAGIC_FOOD "TDB file\n"
45 #define TDB_VERSION (0x26011967 + 6)
46 #define TDB_MAGIC (0x26011999U)
47 #define TDB_FREE_MAGIC (~TDB_MAGIC)
48 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
49 #define TDB_ALIGNMENT 4
50 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
51 #define DEFAULT_HASH_SIZE 131
52 #define TDB_PAGE_SIZE 0x2000
53 #define FREELIST_TOP (sizeof(struct tdb_header))
54 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
55 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
56 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
57 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
58 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
59 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
60
61 /* lock offsets */
62 #define GLOBAL_LOCK 0
63 #define ACTIVE_LOCK 4
64
65 #ifndef MAP_FILE
66 #define MAP_FILE 0
67 #endif
68
69 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
70 TDB_DATA tdb_null;
71
72 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
73 static TDB_CONTEXT *tdbs = NULL;
74
75 static void tdb_munmap(TDB_CONTEXT *tdb)
76 {
77 #ifdef HAVE_MMAP
78         if (tdb->map_ptr) munmap(tdb->map_ptr, tdb->map_size);
79 #endif
80         tdb->map_ptr = NULL;
81 }
82
83 static void tdb_mmap(TDB_CONTEXT *tdb)
84 {
85 #ifdef HAVE_MMAP
86         if (!(tdb->flags & TDB_NOMMAP)) {
87                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
88                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
89                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
90                 if (!tdb->map_ptr) {
91                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
92                                  tdb->map_size, strerror(errno)));
93                 }
94         }
95 #else
96         tdb->map_ptr = NULL;
97 #endif
98 }
99
100 /* Endian conversion: we only ever deal with 4 byte quantities */
101 static void *convert(void *buf, u32 size)
102 {
103         u32 i, *p = buf;
104         for (i = 0; i < size / 4; i++) p[i] = TDB_BYTEREV(p[i]);
105         return buf;
106 }
107 #define DOCONV() (tdb->flags & TDB_CONVERT)
108 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
109
110 /* the body of the database is made of one list_struct for the free space
111    plus a separate data list for each hash value */
112 struct list_struct {
113         tdb_off next; /* offset of the next record in the list */
114         tdb_len rec_len; /* total byte length of record */
115         tdb_len key_len; /* byte length of key */
116         tdb_len data_len; /* byte length of data */
117         u32 full_hash; /* the full 32 bit hash of the key */
118         u32 magic;   /* try to catch errors */
119         /* the following union is implied:
120            union {
121               char record[rec_len];
122               struct {
123                 char key[key_len];
124                 char data[data_len];
125               }
126               u32 totalsize; (tailer)
127            } */
128 };
129
130 /* a byte range locking function - return 0 on success
131    this functions locks/unlocks 1 byte at the specified offset */
132 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
133                       int rw_type, int lck_type, int probe)
134 {
135         struct flock fl;
136
137         if (tdb->flags & TDB_NOLOCK) return 0;
138         if (tdb->read_only) return -1;
139
140         fl.l_type = rw_type;
141         fl.l_whence = SEEK_SET;
142         fl.l_start = offset;
143         fl.l_len = 1;
144         fl.l_pid = 0;
145
146         if (fcntl(tdb->fd,lck_type,&fl)) {
147                 if (!probe) {
148                         TDB_LOG((tdb, 5,"tdb_brlock failed at offset %d rw_type=%d lck_type=%d\n", 
149                                  offset, rw_type, lck_type));
150                 }
151                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
152         }
153         return 0;
154 }
155
156 /* lock a list in the database. list -1 is the alloc list */
157 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
158 {
159         if (list < -1 || list >= (int)tdb->header.hash_size) {
160                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
161                            list, ltype));
162                 return -1;
163         }
164         if (tdb->flags & TDB_NOLOCK) return 0;
165
166         /* Since fcntl locks don't nest, we do a lock for the first one,
167            and simply bump the count for future ones */
168         if (tdb->locked[list+1].count == 0) {
169                 if (tdb->header.rwlocks) {
170                         if (tdb_spinlock(tdb, list, ltype)) {
171                                 TDB_LOG((tdb, 0, "tdb_lock spinlock on list ltype=%d\n", 
172                                            list, ltype));
173                                 return -1;
174                         }
175                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
176                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
177                                            list, ltype, strerror(errno)));
178                         return -1;
179                 }
180                 tdb->locked[list+1].ltype = ltype;
181         }
182         tdb->locked[list+1].count++;
183         return 0;
184 }
185
186 /* unlock the database: returns void because it's too late for errors. */
187 static void tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
188 {
189         if (tdb->flags & TDB_NOLOCK) return;
190
191         /* Sanity checks */
192         if (list < -1 || list >= (int)tdb->header.hash_size) return;
193         if (tdb->locked[list+1].count==0) return;
194
195         if (tdb->locked[list+1].count == 1) {
196                 /* Down to last nested lock: unlock underneath */
197                 if (tdb->header.rwlocks) tdb_spinunlock(tdb, list, ltype);
198                 else tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
199         }
200         tdb->locked[list+1].count--;
201 }
202
203 /* This is based on the hash agorithm from gdbm */
204 static u32 tdb_hash(TDB_DATA *key)
205 {
206         u32 value;      /* Used to compute the hash value.  */
207         u32   i;        /* Used to cycle through random values. */
208
209         /* Set the initial value from the key size. */
210         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
211                 value = (value + (key->dptr[i] << (i*5 % 24)));
212
213         return (1103515243 * value + 12345);  
214 }
215
216 /* check for an out of bounds access - if it is out of bounds then
217    see if the database has been expanded by someone else and expand
218    if necessary 
219    note that "len" is the minimum length needed for the db
220 */
221 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
222 {
223         struct stat st;
224         if (len <= tdb->map_size) return 0;
225         if (tdb->flags & TDB_INTERNAL) return 0;
226
227         fstat(tdb->fd, &st);
228         if (st.st_size < (size_t)len) {
229                 if (!probe) {
230                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
231                                  (int)len, (int)st.st_size));
232                 }
233                 return TDB_ERRCODE(TDB_ERR_IO, -1);
234         }
235
236         /* Unmap, update size, remap */
237         tdb_munmap(tdb);
238         tdb->map_size = st.st_size;
239         tdb_mmap(tdb);
240         return 0;
241 }
242
243 /* write a lump of data at a specified offset */
244 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
245 {
246         if (tdb_oob(tdb, off + len, 0) != 0) return -1;
247
248         if (tdb->map_ptr) memcpy(off + (char *)tdb->map_ptr, buf, len);
249         else if (lseek(tdb->fd, off, SEEK_SET) != off
250                  || write(tdb->fd, buf, len) != (ssize_t)len) {
251                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
252                            off, len, strerror(errno)));
253                 return TDB_ERRCODE(TDB_ERR_IO, -1);
254         }
255         return 0;
256 }
257
258 /* read a lump of data at a specified offset, maybe convert */
259 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
260 {
261         if (tdb_oob(tdb, off + len, 0) != 0) return -1;
262
263         if (tdb->map_ptr) memcpy(buf, off + (char *)tdb->map_ptr, len);
264         else if (lseek(tdb->fd, off, SEEK_SET) != off
265                  || read(tdb->fd, buf, len) != (ssize_t)len) {
266                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
267                            off, len, strerror(errno)));
268                 return TDB_ERRCODE(TDB_ERR_IO, -1);
269         }
270         if (cv) convert(buf, len);
271         return 0;
272 }
273
274 /* read a lump of data, allocating the space for it */
275 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
276 {
277         char *buf;
278
279         if (!(buf = malloc(len))) {
280                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
281                            len, strerror(errno)));
282                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
283         }
284         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
285                 free(buf);
286                 return NULL;
287         }
288         return buf;
289 }
290
291 /* read/write a tdb_off */
292 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
293 {
294         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
295 }
296 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
297 {
298         tdb_off off = *d;
299         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
300 }
301
302 /* read/write a record */
303 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
304 {
305         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1) return -1;
306         if (TDB_BAD_MAGIC(rec)) {
307                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
308                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
309         }
310         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
311 }
312 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
313 {
314         struct list_struct r = *rec;
315         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
316 }
317
318 /* read a freelist record and check for simple errors */
319 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
320 {
321         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1) return -1;
322         if (rec->magic != TDB_FREE_MAGIC) {
323                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
324                            rec->magic, off));
325                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
326         }
327         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0) return -1;
328         return 0;
329 }
330
331 /* update a record tailer (must hold allocation lock) */
332 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
333                          const struct list_struct *rec)
334 {
335         tdb_off totalsize;
336
337         /* Offset of tailer from record header */
338         totalsize = sizeof(*rec) + rec->rec_len;
339         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
340                          &totalsize);
341 }
342
343 #ifdef TDB_DEBUG
344 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
345 {
346         struct list_struct rec;
347         tdb_off tailer_ofs, tailer;
348
349         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
350                 printf("ERROR: failed to read record at %u\n", offset);
351                 return 0;
352         }
353
354         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
355                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
356
357         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
358         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
359                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
360                 return rec.next;
361         }
362
363         if (tailer != rec.rec_len + sizeof(rec)) {
364                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n", tailer, rec.rec_len + sizeof(rec));
365         }
366         return rec.next;
367 }
368
369 static void tdb_dump_chain(TDB_CONTEXT *tdb, int i)
370 {
371         tdb_off rec_ptr, top;
372
373         top = TDB_HASH_TOP(i);
374
375         tdb_lock(tdb, i, F_WRLCK);
376
377         if (ofs_read(tdb, top, &rec_ptr) == -1) {
378                 tdb_unlock(tdb, i, F_WRLCK);
379                 return;
380         }
381
382         if (rec_ptr) printf("hash=%d\n", i);
383
384         while (rec_ptr) {
385                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
386         }
387         tdb_unlock(tdb, i, F_WRLCK);
388 }
389
390 void tdb_dump_all(TDB_CONTEXT *tdb)
391 {
392         tdb_off off;
393         int i;
394         for (i=0;i<tdb->header.hash_size;i++) {
395                 tdb_dump_chain(tdb, i);
396         }
397         printf("freelist:\n");
398         tdb_dump_chain(tdb, -1);
399 }
400
401 void tdb_printfreelist(TDB_CONTEXT *tdb)
402 {
403         long total_free = 0;
404         tdb_off offset, rec_ptr, last_ptr;
405         struct list_struct rec, lastrec, newrec;
406
407         tdb_lock(tdb, -1, F_WRLCK);
408
409         last_ptr = 0;
410         offset = FREELIST_TOP;
411
412         /* read in the freelist top */
413         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
414                 return;
415         }
416
417         printf("freelist top=[0x%08x]\n", rec_ptr );
418         while (rec_ptr) {
419                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
420                         return;
421                 }
422
423                 if (rec.magic != TDB_FREE_MAGIC) {
424                         printf("bad magic 0x%08x in free list\n", rec.magic);
425                         return;
426                 }
427
428                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
429                 total_free += rec.rec_len;
430
431                 /* move to the next record */
432                 rec_ptr = rec.next;
433         }
434         printf("total rec_len = [0x%08x (%d)]\n", total_free, total_free );
435
436         tdb_unlock(tdb, -1, F_WRLCK);
437 }
438 #endif
439
440 /* Remove an element from the freelist.  Must have alloc lock. */
441 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
442 {
443         tdb_off last_ptr, i;
444
445         /* read in the freelist top */
446         last_ptr = FREELIST_TOP;
447         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
448                 if (i == off) {
449                         /* We've found it! */
450                         return ofs_write(tdb, last_ptr, &next);
451                 }
452                 /* Follow chain (next offset is at start of record) */
453                 last_ptr = i;
454         }
455         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
456         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
457 }
458
459 /* Add an element into the freelist. Merge adjacent records if
460    neccessary. */
461 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
462 {
463         tdb_off right, left;
464
465         /* Allocation and tailer lock */
466         if (tdb_lock(tdb, -1, F_WRLCK) != 0) return -1;
467
468         /* set an initial tailer, so if we fail we don't leave a bogus record */
469         update_tailer(tdb, offset, rec);
470
471         /* Look right first (I'm an Australian, dammit) */
472         right = offset + sizeof(*rec) + rec->rec_len;
473         if (right + sizeof(*rec) <= tdb->map_size) {
474                 struct list_struct r;
475
476                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
477                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
478                         goto left;
479                 }
480
481                 /* If it's free, expand to include it. */
482                 if (r.magic == TDB_FREE_MAGIC) {
483                         if (remove_from_freelist(tdb, right, r.next) == -1) {
484                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
485                                 goto left;
486                         }
487                         rec->rec_len += sizeof(r) + r.rec_len;
488                 }
489         }
490
491 left:
492         /* Look left */
493         left = offset - sizeof(tdb_off);
494         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
495                 struct list_struct l;
496                 tdb_off leftsize;
497
498                 /* Read in tailer and jump back to header */
499                 if (ofs_read(tdb, left, &leftsize) == -1) {
500                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
501                         goto update;
502                 }
503                 left = offset - leftsize;
504
505                 /* Now read in record */
506                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
507                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
508                         goto update;
509                 }
510
511                 /* If it's free, expand to include it. */
512                 if (l.magic == TDB_FREE_MAGIC) {
513                         if (remove_from_freelist(tdb, left, l.next) == -1) {
514                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
515                                 goto update;
516                         } else {
517                                 offset = left;
518                                 rec->rec_len += leftsize;
519                         }
520                 }
521         }
522
523 update:
524         if (update_tailer(tdb, offset, rec) == -1) {
525                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
526                 goto fail;
527         }
528
529         /* Now, prepend to free list */
530         rec->magic = TDB_FREE_MAGIC;
531
532         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
533             rec_write(tdb, offset, rec) == -1 ||
534             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
535                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
536                 goto fail;
537         }
538
539         /* And we're done. */
540         tdb_unlock(tdb, -1, F_WRLCK);
541         return 0;
542
543  fail:
544         tdb_unlock(tdb, -1, F_WRLCK);
545         return -1;
546 }
547
548
549 /* expand a file.  we prefer to use ftruncate, as that is what posix
550   says to use for mmap expansion */
551 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
552 {
553         char buf[1024];
554 #if HAVE_FTRUNCATE_EXTEND
555         if (ftruncate(tdb->fd, size+addition) != 0) {
556                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
557                            size+addition, strerror(errno)));
558                 return -1;
559         }
560 #else
561         char b = 0;
562         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
563             write(tdb->fd, &b, 1) != 1) {
564                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
565                            size+addition, strerror(errno)));
566                 return -1;
567         }
568 #endif
569         /* now fill the file with something. This ensures that the file isn't sparse, which would be
570            very bad if we ran out of disk. This must be done with write, not via mmap */
571         memset(buf, 0x42, sizeof(buf));
572         if (lseek(tdb->fd, size, SEEK_SET) != size) return -1;
573         while (addition) {
574                 int n = addition>sizeof(buf)?sizeof(buf):addition;
575                 int ret = write(tdb->fd, buf, n);
576                 if (ret != n) {
577                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
578                                    n, strerror(errno)));
579                         return -1;
580                 }
581                 addition -= n;
582         }
583         return 0;
584 }
585
586
587 /* expand the database at least size bytes by expanding the underlying
588    file and doing the mmap again if necessary */
589 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
590 {
591         struct list_struct rec;
592         tdb_off offset;
593
594         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
595                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
596                 return 0;
597         }
598
599         /* must know about any previous expansions by another process */
600         tdb_oob(tdb, tdb->map_size + 1, 1);
601
602         /* always make room for at least 10 more records, and round
603            the database up to a multiple of TDB_PAGE_SIZE */
604         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
605
606         if (!(tdb->flags & TDB_INTERNAL))
607                 tdb_munmap(tdb);
608
609         /*
610          * We must ensure the file is unmapped before doing this
611          * to ensure consistency with systems like OpenBSD where
612          * writes and mmaps are not consistent.
613          */
614
615         /* expand the file itself */
616         if (!(tdb->flags & TDB_INTERNAL)) {
617                 if (expand_file(tdb, tdb->map_size, size) != 0) goto fail;
618         }
619
620         tdb->map_size += size;
621
622         if (tdb->flags & TDB_INTERNAL)
623                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
624
625         /*
626          * We must ensure the file is remapped before adding the space
627          * to ensure consistency with systems like OpenBSD where
628          * writes and mmaps are not consistent.
629          */
630
631         /* We're ok if the mmap fails as we'll fallback to read/write */
632         tdb_mmap(tdb);
633
634         /* form a new freelist record */
635         memset(&rec,'\0',sizeof(rec));
636         rec.rec_len = size - sizeof(rec);
637
638         /* link it into the free list */
639         offset = tdb->map_size - size;
640         if (tdb_free(tdb, offset, &rec) == -1) goto fail;
641
642         tdb_unlock(tdb, -1, F_WRLCK);
643         return 0;
644  fail:
645         tdb_unlock(tdb, -1, F_WRLCK);
646         return -1;
647 }
648
649 /* allocate some space from the free list. The offset returned points
650    to a unconnected list_struct within the database with room for at
651    least length bytes of total data
652
653    0 is returned if the space could not be allocated
654  */
655 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
656                             struct list_struct *rec)
657 {
658         tdb_off rec_ptr, last_ptr, newrec_ptr;
659         struct list_struct newrec;
660
661         if (tdb_lock(tdb, -1, F_WRLCK) == -1) return 0;
662
663         /* Extra bytes required for tailer */
664         length += sizeof(tdb_off);
665
666  again:
667         last_ptr = FREELIST_TOP;
668
669         /* read in the freelist top */
670         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1) goto fail;
671
672         /* keep looking until we find a freelist record big enough */
673         while (rec_ptr) {
674                 if (rec_free_read(tdb, rec_ptr, rec) == -1) goto fail;
675
676                 if (rec->rec_len >= length) {
677                         /* found it - now possibly split it up  */
678                         if (rec->rec_len > length + MIN_REC_SIZE) {
679                                 /* Length of left piece */
680                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
681
682                                 /* Right piece to go on free list */
683                                 newrec.rec_len = rec->rec_len
684                                         - (sizeof(*rec) + length);
685                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
686
687                                 /* And left record is shortened */
688                                 rec->rec_len = length;
689                         } else
690                                 newrec_ptr = 0;
691
692                         /* Remove allocated record from the free list */
693                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
694                                 goto fail;
695
696                         /* Update header: do this before we drop alloc
697                            lock, otherwise tdb_free() might try to
698                            merge with us, thinking we're free.
699                            (Thanks Jeremy Allison). */
700                         rec->magic = TDB_MAGIC;
701                         if (rec_write(tdb, rec_ptr, rec) == -1)
702                                 goto fail;
703
704                         /* Did we create new block? */
705                         if (newrec_ptr) {
706                                 /* Update allocated record tailer (we
707                                    shortened it). */
708                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
709                                         goto fail;
710
711                                 /* Free new record */
712                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
713                                         goto fail;
714                         }
715
716                         /* all done - return the new record offset */
717                         tdb_unlock(tdb, -1, F_WRLCK);
718                         return rec_ptr;
719                 }
720                 /* move to the next record */
721                 last_ptr = rec_ptr;
722                 rec_ptr = rec->next;
723         }
724         /* we didn't find enough space. See if we can expand the
725            database and if we can then try again */
726         if (tdb_expand(tdb, length + sizeof(*rec)) == 0) goto again;
727  fail:
728         tdb_unlock(tdb, -1, F_WRLCK);
729         return 0;
730 }
731
732 /* initialise a new database with a specified hash size */
733 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
734 {
735         struct tdb_header *newdb;
736         int size, ret;
737
738         /* We make it up in memory, then write it out if not internal */
739         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
740         if (!(newdb = calloc(size, 1))) return TDB_ERRCODE(TDB_ERR_OOM, -1);
741
742         /* Fill in the header */
743         newdb->version = TDB_VERSION;
744         newdb->hash_size = hash_size;
745 #ifdef USE_SPINLOCKS
746         newdb->rwlocks = size;
747 #endif
748         if (tdb->flags & TDB_INTERNAL) {
749                 tdb->map_size = size;
750                 tdb->map_ptr = (char *)newdb;
751                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
752                 /* Convert the `ondisk' version if asked. */
753                 CONVERT(*newdb);
754                 return 0;
755         }
756         lseek(tdb->fd, 0, SEEK_SET);
757         ftruncate(tdb->fd, 0);
758         /* This creates an endian-converted header, as if read from disk */
759         CONVERT(*newdb);
760         memcpy(&tdb->header, newdb, sizeof(tdb->header));
761         /* Don't endian-convert the magic food! */
762         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
763         if (write(tdb->fd, newdb, size) != size) ret = -1;
764         else ret = tdb_create_rwlocks(tdb->fd, hash_size);
765
766         free(newdb);
767         return ret;
768 }
769
770 /* Returns 0 on fail.  On success, return offset of record, and fills
771    in rec */
772 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
773                         struct list_struct *r)
774 {
775         tdb_off rec_ptr;
776         
777         /* read in the hash top */
778         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) return 0;
779
780         /* keep looking until we find the right record */
781         while (rec_ptr) {
782                 if (rec_read(tdb, rec_ptr, r) == -1) return 0;
783
784                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
785                         char *k;
786                         /* a very likely hit - read the key */
787                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
788                                            r->key_len);
789                         if (!k) return 0;
790
791                         if (memcmp(key.dptr, k, key.dsize) == 0) {
792                                 free(k);
793                                 return rec_ptr;
794                         }
795                         free(k);
796                 }
797                 rec_ptr = r->next;
798         }
799         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
800 }
801
802 /* If they do lockkeys, check that this hash is one they locked */
803 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
804 {
805         u32 i;
806         if (!tdb->lockedkeys) return 1;
807         for (i = 0; i < tdb->lockedkeys[0]; i++)
808                 if (tdb->lockedkeys[i+1] == hash) return 1;
809         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
810 }
811
812 /* As tdb_find, but if you succeed, keep the lock */
813 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
814                              struct list_struct *rec)
815 {
816         u32 hash, rec_ptr;
817
818         hash = tdb_hash(&key);
819         if (!tdb_keylocked(tdb, hash)) return 0;
820         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1) return 0;
821         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
822                 tdb_unlock(tdb, BUCKET(hash), locktype);
823         return rec_ptr;
824 }
825
826 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
827 {
828         return tdb->ecode;
829 }
830
831 static struct tdb_errname {
832         enum TDB_ERROR ecode; const char *estring;
833 } emap[] = { {TDB_SUCCESS, "Success"},
834              {TDB_ERR_CORRUPT, "Corrupt database"},
835              {TDB_ERR_IO, "IO Error"},
836              {TDB_ERR_LOCK, "Locking error"},
837              {TDB_ERR_OOM, "Out of memory"},
838              {TDB_ERR_EXISTS, "Record exists"},
839              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
840              {TDB_ERR_NOEXIST, "Record does not exist"} };
841
842 /* Error string for the last tdb error */
843 const char *tdb_errorstr(TDB_CONTEXT *tdb)
844 {
845         u32 i;
846         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
847                 if (tdb->ecode == emap[i].ecode) return emap[i].estring;
848         return "Invalid error code";
849 }
850
851 /* update an entry in place - this only works if the new data size
852    is <= the old data size and the key exists.
853    on failure return -1
854 */
855 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
856 {
857         struct list_struct rec;
858         tdb_off rec_ptr;
859         int ret = -1;
860
861         /* find entry */
862         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec))) return -1;
863
864         /* must be long enough key, data and tailer */
865         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) goto out;
866
867         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
868                       dbuf.dptr, dbuf.dsize) == -1)
869                 goto out;
870
871         if (dbuf.dsize != rec.data_len) {
872                 /* update size */
873                 rec.data_len = dbuf.dsize;
874                 ret = rec_write(tdb, rec_ptr, &rec);
875         }
876         else ret = 0;
877  out:
878         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
879         return ret;
880 }
881
882 /* find an entry in the database given a key */
883 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
884 {
885         tdb_off rec_ptr;
886         struct list_struct rec;
887         TDB_DATA ret;
888
889         /* find which hash bucket it is in */
890         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec))) return tdb_null;
891
892         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
893                                   rec.data_len);
894         ret.dsize = rec.data_len;
895         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
896         return ret;
897 }
898
899 /* check if an entry in the database exists 
900
901    note that 1 is returned if the key is found and 0 is returned if not found
902    this doesn't match the conventions in the rest of this module, but is
903    compatible with gdbm
904 */
905 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
906 {
907         struct list_struct rec;
908         
909         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0) return 0;
910         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
911         return 1;
912 }
913
914 /* record lock stops delete underneath */
915 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
916 {
917         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
918 }
919 /* write locks override our own fcntl readlocks, so check it here */
920 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
921 {
922         struct tdb_traverse_lock *i;
923         for (i = &tdb->travlocks; i; i = i->next) if (i->off == off) return -1;
924         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
925 }
926 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
927 {
928         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
929 }
930 /* fcntl locks don't stack: avoid unlocking someone else's */
931 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
932 {
933         struct tdb_traverse_lock *i;
934         u32 count = 0;
935
936         if (off == 0) return 0;
937         for (i = &tdb->travlocks; i; i = i->next) if (i->off == off) count++;
938         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
939 }
940
941 /* actually delete an entry in the database given the offset */
942 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
943 {
944         tdb_off last_ptr, i;
945         struct list_struct lastrec;
946
947         if (tdb->read_only) return -1;
948
949         if (write_lock_record(tdb, rec_ptr) == -1) {
950                 /* Someone traversing here: mark it as dead */
951                 rec->magic = TDB_DEAD_MAGIC;
952                 return rec_write(tdb, rec_ptr, rec);
953         }
954         write_unlock_record(tdb, rec_ptr);
955
956         /* find previous record in hash chain */
957         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1) return -1;
958         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
959                 if (rec_read(tdb, i, &lastrec) == -1) return -1;
960
961         /* unlink it: next ptr is at start of record. */
962         if (last_ptr == 0) last_ptr = TDB_HASH_TOP(rec->full_hash);
963         if (ofs_write(tdb, last_ptr, &rec->next) == -1) return -1;
964
965         /* recover the space */
966         if (tdb_free(tdb, rec_ptr, rec) == -1) return -1;
967         return 0;
968 }
969
970 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
971 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
972                          struct list_struct *rec)
973 {
974         int want_next = (tlock->off != 0);
975
976         /* No traversal allows if you've called tdb_lockkeys() */
977         if (tdb->lockedkeys) return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
978
979         /* Lock each chain from the start one. */
980         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
981                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1) return -1;
982
983                 /* No previous record?  Start at top of chain. */
984                 if (!tlock->off) {
985                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
986                                      &tlock->off) == -1)
987                                 goto fail;
988                 } else {
989                         /* Otherwise unlock the previous record. */
990                         unlock_record(tdb, tlock->off);
991                 }
992
993                 if (want_next) {
994                         /* We have offset of old record: grab next */
995                         if (rec_read(tdb, tlock->off, rec) == -1) goto fail;
996                         tlock->off = rec->next;
997                 }
998
999                 /* Iterate through chain */
1000                 while( tlock->off) {
1001                         tdb_off current;
1002                         if (rec_read(tdb, tlock->off, rec) == -1) goto fail;
1003                         if (!TDB_DEAD(rec)) {
1004                                 /* Woohoo: we found one! */
1005                                 lock_record(tdb, tlock->off);
1006                                 return tlock->off;
1007                         }
1008                         /* Try to clean dead ones from old traverses */
1009                         current = tlock->off;
1010                         tlock->off = rec->next;
1011                         do_delete(tdb, current, rec);
1012                 }
1013                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1014                 want_next = 0;
1015         }
1016         /* We finished iteration without finding anything */
1017         return TDB_ERRCODE(TDB_SUCCESS, 0);
1018
1019  fail:
1020         tlock->off = 0;
1021         tdb_unlock(tdb, tlock->hash, F_WRLCK);
1022         return -1;
1023 }
1024
1025 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1026    return -1 on error or the record count traversed
1027    if fn is NULL then it is not called
1028    a non-zero return value from fn() indicates that the traversal should stop
1029   */
1030 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1031 {
1032         TDB_DATA key, dbuf;
1033         struct list_struct rec;
1034         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1035         int ret, count = 0;
1036
1037         /* This was in the initializaton, above, but the IRIX compiler
1038          * did not like it.  crh
1039          */
1040         tl.next = tdb->travlocks.next;
1041
1042         /* fcntl locks don't stack: beware traverse inside traverse */
1043         tdb->travlocks.next = &tl;
1044
1045         /* tdb_next_lock places locks on the record returned, and its chain */
1046         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1047                 count++;
1048                 /* now read the full record */
1049                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1050                                           rec.key_len + rec.data_len);
1051                 if (!key.dptr) {
1052                         tdb_unlock(tdb, tl.hash, F_WRLCK);
1053                         unlock_record(tdb, tl.off);
1054                         tdb->travlocks.next = tl.next;
1055                         return -1;
1056                 }
1057                 key.dsize = rec.key_len;
1058                 dbuf.dptr = key.dptr + rec.key_len;
1059                 dbuf.dsize = rec.data_len;
1060
1061                 /* Drop chain lock, call out */
1062                 tdb_unlock(tdb, tl.hash, F_WRLCK);
1063                 if (fn && fn(tdb, key, dbuf, state)) {
1064                         /* They want us to terminate traversal */
1065                         unlock_record(tdb, tl.off);
1066                         tdb->travlocks.next = tl.next;
1067                         free(key.dptr);
1068                         return count;
1069                 }
1070                 free(key.dptr);
1071         }
1072         tdb->travlocks.next = tl.next;
1073         if (ret < 0)
1074                 return -1;
1075         else
1076                 return count;
1077 }
1078
1079 /* find the first entry in the database and return its key */
1080 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1081 {
1082         TDB_DATA key;
1083         struct list_struct rec;
1084
1085         /* release any old lock */
1086         unlock_record(tdb, tdb->travlocks.off);
1087         tdb->travlocks.off = tdb->travlocks.hash = 0;
1088
1089         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0) return tdb_null;
1090         /* now read the key */
1091         key.dsize = rec.key_len;
1092         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1093         tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK);
1094         return key;
1095 }
1096
1097 /* find the next entry in the database, returning its key */
1098 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1099 {
1100         u32 oldhash;
1101         TDB_DATA key = tdb_null;
1102         struct list_struct rec;
1103         char *k = NULL;
1104
1105         /* Is locked key the old key?  If so, traverse will be reliable. */
1106         if (tdb->travlocks.off) {
1107                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK)) return tdb_null;
1108                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1109                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1110                                             rec.key_len))
1111                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1112                         /* No, it wasn't: unlock it and start from scratch */
1113                         unlock_record(tdb, tdb->travlocks.off);
1114                         tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
1115                         tdb->travlocks.off = 0;
1116                 }
1117
1118                 if (k)
1119                         free(k);
1120         }
1121
1122         if (!tdb->travlocks.off) {
1123                 /* No previous element: do normal find, and lock record */
1124                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1125                 if (!tdb->travlocks.off) return tdb_null;
1126                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1127                 lock_record(tdb, tdb->travlocks.off);
1128         }
1129         oldhash = tdb->travlocks.hash;
1130
1131         /* Grab next record: locks chain and returned record,
1132            unlocks old record */
1133         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1134                 key.dsize = rec.key_len;
1135                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1136                                           key.dsize);
1137                 /* Unlock the chain of this new record */
1138                 tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
1139         }
1140         /* Unlock the chain of old record */
1141         tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK);
1142         return key;
1143 }
1144
1145 /* delete an entry in the database given a key */
1146 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1147 {
1148         tdb_off rec_ptr;
1149         struct list_struct rec;
1150         int ret;
1151
1152         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec))) return -1;
1153         ret = do_delete(tdb, rec_ptr, &rec);
1154         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1155         return ret;
1156 }
1157
1158 /* store an element in the database, replacing any existing element
1159    with the same key 
1160
1161    return 0 on success, -1 on failure
1162 */
1163 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1164 {
1165         struct list_struct rec;
1166         u32 hash;
1167         tdb_off rec_ptr;
1168         char *p = NULL;
1169         int ret = 0;
1170
1171         /* find which hash bucket it is in */
1172         hash = tdb_hash(&key);
1173         if (!tdb_keylocked(tdb, hash)) return -1;
1174         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1) return -1;
1175
1176         /* check for it existing, on insert. */
1177         if (flag == TDB_INSERT) {
1178                 if (tdb_exists(tdb, key)) {
1179                         tdb->ecode = TDB_ERR_EXISTS;
1180                         goto fail;
1181                 }
1182         } else {
1183                 /* first try in-place update, on modify or replace. */
1184                 if (tdb_update(tdb, key, dbuf) == 0) goto out;
1185                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1186                         goto fail;
1187         }
1188         /* reset the error code potentially set by the tdb_update() */
1189         tdb->ecode = TDB_SUCCESS;
1190
1191         /* delete any existing record - if it doesn't exist we don't
1192            care.  Doing this first reduces fragmentation, and avoids
1193            coalescing with `allocated' block before it's updated. */
1194         if (flag != TDB_INSERT) tdb_delete(tdb, key);
1195
1196         /* Copy key+value *before* allocating free space in case malloc
1197            fails and we are left with a dead spot in the tdb. */
1198
1199         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1200                 tdb->ecode = TDB_ERR_OOM;
1201                 goto fail;
1202         }
1203
1204         memcpy(p, key.dptr, key.dsize);
1205         memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1206
1207         /* now we're into insert / modify / replace of a record which
1208          * we know could not be optimised by an in-place store (for
1209          * various reasons).  */
1210         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1211                 goto fail;
1212
1213         /* Read hash top into next ptr */
1214         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1) goto fail;
1215
1216         rec.key_len = key.dsize;
1217         rec.data_len = dbuf.dsize;
1218         rec.full_hash = hash;
1219         rec.magic = TDB_MAGIC;
1220
1221         /* write out and point the top of the hash chain at it */
1222         if (rec_write(tdb, rec_ptr, &rec) == -1
1223             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1224             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1225         fail:
1226                 /* Need to tdb_unallocate() here */
1227                 ret = -1;
1228         }
1229  out:
1230         free(p); 
1231         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1232         return ret;
1233 }
1234
1235 /* open the database, creating it if necessary 
1236
1237    The open_flags and mode are passed straight to the open call on the
1238    database file. A flags value of O_WRONLY is invalid. The hash size
1239    is advisory, use zero for a default value.
1240
1241    return is NULL on error */
1242 TDB_CONTEXT *tdb_open(char *name, int hash_size, int tdb_flags,
1243                       int open_flags, mode_t mode)
1244 {
1245         TDB_CONTEXT tdb, *ret, *i;
1246         struct stat st;
1247         int rev = 0, locked;
1248
1249         memset(&tdb, 0, sizeof(tdb));
1250         tdb.fd = -1;
1251         tdb.name = NULL;
1252         tdb.map_ptr = NULL;
1253         tdb.lockedkeys = NULL;
1254         tdb.flags = tdb_flags;
1255
1256         if ((open_flags & O_ACCMODE) == O_WRONLY) goto fail;
1257         if (hash_size == 0) hash_size = DEFAULT_HASH_SIZE;
1258         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1259                 tdb.read_only = 1;
1260                 /* read only databases don't do locking */
1261                 tdb.flags |= TDB_NOLOCK;
1262         }
1263
1264         /* internal databases don't mmap or lock, and start off cleared */
1265         if (tdb.flags & TDB_INTERNAL) {
1266                 tdb.flags |= (TDB_NOLOCK | TDB_NOMMAP);
1267                 tdb.flags &= ~TDB_CLEAR_IF_FIRST;
1268                 tdb_new_database(&tdb, hash_size);
1269                 goto internal;
1270         }
1271
1272         if ((tdb.fd = open(name, open_flags, mode)) == -1) goto fail;
1273
1274         /* ensure there is only one process initialising at once */
1275         tdb_brlock(&tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0);
1276         
1277         /* we need to zero database if we are the only one with it open */
1278         if ((locked = (tdb_brlock(&tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1279             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1280                 open_flags |= O_CREAT;
1281                 ftruncate(tdb.fd, 0);
1282         }
1283
1284         if (read(tdb.fd, &tdb.header, sizeof(tdb.header)) != sizeof(tdb.header)
1285             || strcmp(tdb.header.magic_food, TDB_MAGIC_FOOD) != 0
1286             || (tdb.header.version != TDB_VERSION
1287                 && !(rev = (tdb.header.version==TDB_BYTEREV(TDB_VERSION))))) {
1288                 /* its not a valid database - possibly initialise it */
1289                 if (!(open_flags & O_CREAT)
1290                     || tdb_new_database(&tdb, hash_size) == -1) goto fail;
1291                 rev = (tdb.flags & TDB_CONVERT);
1292         }
1293         if (!rev) tdb.flags &= ~TDB_CONVERT;
1294         else {
1295                 tdb.flags |= TDB_CONVERT;
1296                 convert(&tdb.header, sizeof(tdb.header));
1297         }
1298         fstat(tdb.fd, &st);
1299         /* Is it already in the open list?  If so, fail. */
1300         for (i = tdbs; i; i = i->next) {
1301                 if (i->device == st.st_dev && i->inode == st.st_ino) {
1302                         errno = EBUSY;
1303                         close(tdb.fd);
1304                         return NULL;
1305                 }
1306         }
1307
1308         /* map the database and fill in the return structure */
1309         tdb.name = (char *)strdup(name);
1310         tdb.map_size = st.st_size;
1311         tdb.device = st.st_dev;
1312         tdb.inode = st.st_ino;
1313         tdb.locked = calloc(tdb.header.hash_size+1, sizeof(tdb.locked[0]));
1314         if (!tdb.locked) goto fail;
1315         tdb_mmap(&tdb);
1316         if (locked) {
1317                 tdb_clear_spinlocks(&tdb);
1318                 tdb_brlock(&tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0);
1319         }
1320         /* leave this lock in place to indicate it's in use */
1321         tdb_brlock(&tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0);
1322
1323  internal:
1324         if (!(ret = malloc(sizeof(tdb)))) goto fail;
1325         *ret = tdb;
1326         tdb_brlock(&tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0);
1327         ret->next = tdbs;
1328         tdbs = ret;
1329         return ret;
1330
1331  fail:
1332         if (tdb.name) free(tdb.name);
1333         if (tdb.fd != -1) close(tdb.fd);
1334         tdb_munmap(&tdb);
1335         return NULL;
1336 }
1337
1338 /* close a database */
1339 int tdb_close(TDB_CONTEXT *tdb)
1340 {
1341         TDB_CONTEXT **i;
1342         int ret = 0;
1343
1344         if (tdb->map_ptr) {
1345                 if (tdb->flags & TDB_INTERNAL) free(tdb->map_ptr);
1346                 else tdb_munmap(tdb);
1347         }
1348         if (tdb->name) free(tdb->name);
1349         if (tdb->fd != -1) {
1350                 ret = close(tdb->fd);
1351         }
1352         if (tdb->locked) free(tdb->locked);
1353         if (tdb->lockedkeys) free(tdb->lockedkeys);
1354
1355         /* Remove from contexts list */
1356         for (i = &tdbs; *i; i = &(*i)->next) {
1357                 if (*i == tdb) {
1358                         *i = tdb->next;
1359                         break;
1360                 }
1361         }
1362
1363         memset(tdb, 0, sizeof(*tdb));
1364         free(tdb);
1365
1366         return ret;
1367 }
1368
1369 /* lock/unlock entire database */
1370 int tdb_lockall(TDB_CONTEXT *tdb)
1371 {
1372         u32 i;
1373
1374         /* There are no locks on read-only dbs */
1375         if (tdb->read_only) return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1376         if (tdb->lockedkeys) return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1377         for (i = 0; i < tdb->header.hash_size; i++) 
1378                 if (tdb_lock(tdb, i, F_WRLCK))
1379                         break;
1380
1381         /* If error, release locks we have... */
1382         if (i < tdb->header.hash_size) {
1383                 u32 j;
1384
1385                 for ( j = 0; j < i; j++)
1386                         tdb_unlock(tdb, j, F_WRLCK);
1387                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1388         }
1389
1390         return 0;
1391 }
1392 void tdb_unlockall(TDB_CONTEXT *tdb)
1393 {
1394         u32 i;
1395         for (i=0; i < tdb->header.hash_size; i++) tdb_unlock(tdb, i, F_WRLCK);
1396 }
1397
1398 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1399 {
1400         u32 i, j, hash;
1401
1402         /* Can't lock more keys if already locked */
1403         if (tdb->lockedkeys) return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1404         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1405                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1406         /* First number in array is # keys */
1407         tdb->lockedkeys[0] = number;
1408
1409         /* Insertion sort by bucket */
1410         for (i = 0; i < number; i++) {
1411                 hash = tdb_hash(&keys[i]);
1412                 for (j = 0;
1413                      j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash);
1414                      j++);
1415                 memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1],
1416                         sizeof(u32) * (i-j));
1417                 tdb->lockedkeys[j+1] = hash;
1418         }
1419         /* Finally, lock in order */
1420         for (i = 0; i < number; i++)
1421                 if (tdb_lock(tdb, i, F_WRLCK))
1422                         break;
1423
1424         /* If error, release locks we have... */
1425         if (i < number) {
1426                 for ( j = 0; j < i; j++)
1427                         tdb_unlock(tdb, j, F_WRLCK);
1428                 free(tdb->lockedkeys);
1429                 tdb->lockedkeys = NULL;
1430                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1431         }
1432         return 0;
1433 }
1434
1435 /* Unlock the keys previously locked by tdb_lockkeys() */
1436 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1437 {
1438         u32 i;
1439         for (i = 0; i < tdb->lockedkeys[0]; i++)
1440                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1441         free(tdb->lockedkeys);
1442         tdb->lockedkeys = NULL;
1443 }
1444
1445 /* lock/unlock one hash chain. This is meant to be used to reduce
1446    contention - it cannot guarantee how many records will be locked */
1447 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1448 {
1449         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1450 }
1451 void tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1452 {
1453         tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1454 }
1455
1456
1457 /* register a loging function */
1458 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1459 {
1460         tdb->log_fn = fn;
1461 }