Fixed a couple of nasty bugs only easily seen with no mmap. Firstly,
[kai/samba.git] / source3 / tdb / tdb.c
1 /* 
2    Unix SMB/Netbios implementation.
3    Version 3.0
4    Samba database functions
5    Copyright (C) Andrew Tridgell              1999-2000
6    Copyright (C) Luke Kenneth Casson Leighton      2000
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24 #ifdef STANDALONE
25 #if HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <sys/mman.h>
37 #include <sys/stat.h>
38 #include "tdb.h"
39 #include "spinlock.h"
40 #else
41 #include "includes.h"
42 #endif
43
44 #define TDB_MAGIC_FOOD "TDB file\n"
45 #define TDB_VERSION (0x26011967 + 6)
46 #define TDB_MAGIC (0x26011999U)
47 #define TDB_FREE_MAGIC (~TDB_MAGIC)
48 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
49 #define TDB_ALIGNMENT 4
50 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
51 #define DEFAULT_HASH_SIZE 131
52 #define TDB_PAGE_SIZE 0x2000
53 #define FREELIST_TOP (sizeof(struct tdb_header))
54 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
55 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
56 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
57 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
58 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
59 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
60
61 /* lock offsets */
62 #define GLOBAL_LOCK 0
63 #define ACTIVE_LOCK 4
64
65 #ifndef MAP_FILE
66 #define MAP_FILE 0
67 #endif
68
69 #ifndef MAP_FAILED
70 #define MAP_FAILED ((void *)-1)
71 #endif
72
73 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
74 TDB_DATA tdb_null;
75
76 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
77 static TDB_CONTEXT *tdbs = NULL;
78
79 static void tdb_munmap(TDB_CONTEXT *tdb)
80 {
81         if (tdb->flags & TDB_INTERNAL)
82                 return;
83
84 #ifdef HAVE_MMAP
85         if (tdb->map_ptr)
86                 munmap(tdb->map_ptr, tdb->map_size);
87 #endif
88         tdb->map_ptr = NULL;
89 }
90
91 static void tdb_mmap(TDB_CONTEXT *tdb)
92 {
93         if (tdb->flags & TDB_INTERNAL)
94                 return;
95
96 #ifdef HAVE_MMAP
97         if (!(tdb->flags & TDB_NOMMAP)) {
98                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
99                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
100                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
101
102                 /*
103                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
104                  */
105
106                 if (tdb->map_ptr == MAP_FAILED) {
107                         tdb->map_ptr = NULL;
108                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
109                                  tdb->map_size, strerror(errno)));
110                 }
111         } else {
112                 tdb->map_ptr = NULL;
113         }
114 #else
115         tdb->map_ptr = NULL;
116 #endif
117 }
118
119 /* Endian conversion: we only ever deal with 4 byte quantities */
120 static void *convert(void *buf, u32 size)
121 {
122         u32 i, *p = buf;
123         for (i = 0; i < size / 4; i++)
124                 p[i] = TDB_BYTEREV(p[i]);
125         return buf;
126 }
127 #define DOCONV() (tdb->flags & TDB_CONVERT)
128 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
129
130 /* the body of the database is made of one list_struct for the free space
131    plus a separate data list for each hash value */
132 struct list_struct {
133         tdb_off next; /* offset of the next record in the list */
134         tdb_len rec_len; /* total byte length of record */
135         tdb_len key_len; /* byte length of key */
136         tdb_len data_len; /* byte length of data */
137         u32 full_hash; /* the full 32 bit hash of the key */
138         u32 magic;   /* try to catch errors */
139         /* the following union is implied:
140                 union {
141                         char record[rec_len];
142                         struct {
143                                 char key[key_len];
144                                 char data[data_len];
145                         }
146                         u32 totalsize; (tailer)
147                 }
148         */
149 };
150
151 /* a byte range locking function - return 0 on success
152    this functions locks/unlocks 1 byte at the specified offset */
153 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
154                       int rw_type, int lck_type, int probe)
155 {
156         struct flock fl;
157
158         if (tdb->flags & TDB_NOLOCK)
159                 return 0;
160         if (tdb->read_only)
161                 return -1;
162
163         fl.l_type = rw_type;
164         fl.l_whence = SEEK_SET;
165         fl.l_start = offset;
166         fl.l_len = 1;
167         fl.l_pid = 0;
168
169         if (fcntl(tdb->fd,lck_type,&fl)) {
170                 if (!probe) {
171                         TDB_LOG((tdb, 5,"tdb_brlock failed at offset %d rw_type=%d lck_type=%d\n", 
172                                  offset, rw_type, lck_type));
173                 }
174                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
175         }
176         return 0;
177 }
178
179 /* lock a list in the database. list -1 is the alloc list */
180 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
181 {
182         if (list < -1 || list >= (int)tdb->header.hash_size) {
183                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
184                            list, ltype));
185                 return -1;
186         }
187         if (tdb->flags & TDB_NOLOCK)
188                 return 0;
189
190         /* Since fcntl locks don't nest, we do a lock for the first one,
191            and simply bump the count for future ones */
192         if (tdb->locked[list+1].count == 0) {
193                 if (tdb->header.rwlocks) {
194                         if (tdb_spinlock(tdb, list, ltype)) {
195                                 TDB_LOG((tdb, 0, "tdb_lock spinlock on list ltype=%d\n", 
196                                            list, ltype));
197                                 return -1;
198                         }
199                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
200                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
201                                            list, ltype, strerror(errno)));
202                         return -1;
203                 }
204                 tdb->locked[list+1].ltype = ltype;
205         }
206         tdb->locked[list+1].count++;
207         return 0;
208 }
209
210 /* unlock the database: returns void because it's too late for errors. */
211 static void tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
212 {
213         if (tdb->flags & TDB_NOLOCK)
214                 return;
215
216         /* Sanity checks */
217         if (list < -1 || list >= (int)tdb->header.hash_size)
218                 return;
219         if (tdb->locked[list+1].count==0)
220                 return;
221
222         if (tdb->locked[list+1].count == 1) {
223                 /* Down to last nested lock: unlock underneath */
224                 if (tdb->header.rwlocks)
225                         tdb_spinunlock(tdb, list, ltype);
226                 else
227                         tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
228         }
229         tdb->locked[list+1].count--;
230 }
231
232 /* This is based on the hash agorithm from gdbm */
233 static u32 tdb_hash(TDB_DATA *key)
234 {
235         u32 value;      /* Used to compute the hash value.  */
236         u32   i;        /* Used to cycle through random values. */
237
238         /* Set the initial value from the key size. */
239         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
240                 value = (value + (key->dptr[i] << (i*5 % 24)));
241
242         return (1103515243 * value + 12345);  
243 }
244
245 /* check for an out of bounds access - if it is out of bounds then
246    see if the database has been expanded by someone else and expand
247    if necessary 
248    note that "len" is the minimum length needed for the db
249 */
250 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
251 {
252         struct stat st;
253         if (len <= tdb->map_size)
254                 return 0;
255         if (tdb->flags & TDB_INTERNAL)
256                 return 0;
257
258         if (fstat(tdb->fd, &st) == -1)
259                 return TDB_ERRCODE(TDB_ERR_IO, -1);
260
261         if (st.st_size < (size_t)len) {
262                 if (!probe) {
263                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
264                                  (int)len, (int)st.st_size));
265                 }
266                 return TDB_ERRCODE(TDB_ERR_IO, -1);
267         }
268
269         /* Unmap, update size, remap */
270         tdb_munmap(tdb);
271         tdb->map_size = st.st_size;
272         tdb_mmap(tdb);
273         return 0;
274 }
275
276 /* write a lump of data at a specified offset */
277 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
278 {
279         if (tdb_oob(tdb, off + len, 0) != 0)
280                 return -1;
281
282         if (tdb->map_ptr)
283                 memcpy(off + (char *)tdb->map_ptr, buf, len);
284         else if (lseek(tdb->fd, off, SEEK_SET) != off
285                  || write(tdb->fd, buf, len) != (ssize_t)len) {
286                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
287                            off, len, strerror(errno)));
288                 return TDB_ERRCODE(TDB_ERR_IO, -1);
289         }
290         return 0;
291 }
292
293 /* read a lump of data at a specified offset, maybe convert */
294 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
295 {
296         if (tdb_oob(tdb, off + len, 0) != 0)
297                 return -1;
298
299         if (tdb->map_ptr)
300                 memcpy(buf, off + (char *)tdb->map_ptr, len);
301         else if (lseek(tdb->fd, off, SEEK_SET) != off
302                  || read(tdb->fd, buf, len) != (ssize_t)len) {
303                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
304                            off, len, strerror(errno)));
305                 return TDB_ERRCODE(TDB_ERR_IO, -1);
306         }
307         if (cv)
308                 convert(buf, len);
309         return 0;
310 }
311
312 /* read a lump of data, allocating the space for it */
313 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
314 {
315         char *buf;
316
317         if (!(buf = malloc(len))) {
318                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
319                            len, strerror(errno)));
320                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
321         }
322         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
323                 free(buf);
324                 return NULL;
325         }
326         return buf;
327 }
328
329 /* read/write a tdb_off */
330 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
331 {
332         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
333 }
334 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
335 {
336         tdb_off off = *d;
337         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
338 }
339
340 /* read/write a record */
341 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
342 {
343         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
344                 return -1;
345         if (TDB_BAD_MAGIC(rec)) {
346                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
347                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
348         }
349         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
350 }
351 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
352 {
353         struct list_struct r = *rec;
354         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
355 }
356
357 /* read a freelist record and check for simple errors */
358 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
359 {
360         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
361                 return -1;
362         if (rec->magic != TDB_FREE_MAGIC) {
363                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
364                            rec->magic, off));
365                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
366         }
367         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
368                 return -1;
369         return 0;
370 }
371
372 /* update a record tailer (must hold allocation lock) */
373 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
374                          const struct list_struct *rec)
375 {
376         tdb_off totalsize;
377
378         /* Offset of tailer from record header */
379         totalsize = sizeof(*rec) + rec->rec_len;
380         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
381                          &totalsize);
382 }
383
384 #ifdef TDB_DEBUG
385 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
386 {
387         struct list_struct rec;
388         tdb_off tailer_ofs, tailer;
389
390         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
391                 printf("ERROR: failed to read record at %u\n", offset);
392                 return 0;
393         }
394
395         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
396                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
397
398         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
399         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
400                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
401                 return rec.next;
402         }
403
404         if (tailer != rec.rec_len + sizeof(rec)) {
405                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n", tailer, rec.rec_len + sizeof(rec));
406         }
407         return rec.next;
408 }
409
410 static void tdb_dump_chain(TDB_CONTEXT *tdb, int i)
411 {
412         tdb_off rec_ptr, top;
413
414         top = TDB_HASH_TOP(i);
415
416         tdb_lock(tdb, i, F_WRLCK);
417
418         if (ofs_read(tdb, top, &rec_ptr) == -1) {
419                 tdb_unlock(tdb, i, F_WRLCK);
420                 return;
421         }
422
423         if (rec_ptr)
424                 printf("hash=%d\n", i);
425
426         while (rec_ptr) {
427                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
428         }
429         tdb_unlock(tdb, i, F_WRLCK);
430 }
431
432 void tdb_dump_all(TDB_CONTEXT *tdb)
433 {
434         tdb_off off;
435         int i;
436         for (i=0;i<tdb->header.hash_size;i++) {
437                 tdb_dump_chain(tdb, i);
438         }
439         printf("freelist:\n");
440         tdb_dump_chain(tdb, -1);
441 }
442
443 void tdb_printfreelist(TDB_CONTEXT *tdb)
444 {
445         long total_free = 0;
446         tdb_off offset, rec_ptr, last_ptr;
447         struct list_struct rec, lastrec, newrec;
448
449         tdb_lock(tdb, -1, F_WRLCK);
450
451         last_ptr = 0;
452         offset = FREELIST_TOP;
453
454         /* read in the freelist top */
455         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
456                 return;
457         }
458
459         printf("freelist top=[0x%08x]\n", rec_ptr );
460         while (rec_ptr) {
461                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
462                         return;
463                 }
464
465                 if (rec.magic != TDB_FREE_MAGIC) {
466                         printf("bad magic 0x%08x in free list\n", rec.magic);
467                         return;
468                 }
469
470                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
471                 total_free += rec.rec_len;
472
473                 /* move to the next record */
474                 rec_ptr = rec.next;
475         }
476         printf("total rec_len = [0x%08x (%d)]\n", total_free, total_free );
477
478         tdb_unlock(tdb, -1, F_WRLCK);
479 }
480 #endif
481
482 /* Remove an element from the freelist.  Must have alloc lock. */
483 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
484 {
485         tdb_off last_ptr, i;
486
487         /* read in the freelist top */
488         last_ptr = FREELIST_TOP;
489         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
490                 if (i == off) {
491                         /* We've found it! */
492                         return ofs_write(tdb, last_ptr, &next);
493                 }
494                 /* Follow chain (next offset is at start of record) */
495                 last_ptr = i;
496         }
497         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
498         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
499 }
500
501 /* Add an element into the freelist. Merge adjacent records if
502    neccessary. */
503 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
504 {
505         tdb_off right, left;
506
507         /* Allocation and tailer lock */
508         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
509                 return -1;
510
511         /* set an initial tailer, so if we fail we don't leave a bogus record */
512         update_tailer(tdb, offset, rec);
513
514         /* Look right first (I'm an Australian, dammit) */
515         right = offset + sizeof(*rec) + rec->rec_len;
516         if (right + sizeof(*rec) <= tdb->map_size) {
517                 struct list_struct r;
518
519                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
520                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
521                         goto left;
522                 }
523
524                 /* If it's free, expand to include it. */
525                 if (r.magic == TDB_FREE_MAGIC) {
526                         if (remove_from_freelist(tdb, right, r.next) == -1) {
527                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
528                                 goto left;
529                         }
530                         rec->rec_len += sizeof(r) + r.rec_len;
531                 }
532         }
533
534 left:
535         /* Look left */
536         left = offset - sizeof(tdb_off);
537         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
538                 struct list_struct l;
539                 tdb_off leftsize;
540
541                 /* Read in tailer and jump back to header */
542                 if (ofs_read(tdb, left, &leftsize) == -1) {
543                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
544                         goto update;
545                 }
546                 left = offset - leftsize;
547
548                 /* Now read in record */
549                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
550                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
551                         goto update;
552                 }
553
554                 /* If it's free, expand to include it. */
555                 if (l.magic == TDB_FREE_MAGIC) {
556                         if (remove_from_freelist(tdb, left, l.next) == -1) {
557                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
558                                 goto update;
559                         } else {
560                                 offset = left;
561                                 rec->rec_len += leftsize;
562                         }
563                 }
564         }
565
566 update:
567         if (update_tailer(tdb, offset, rec) == -1) {
568                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
569                 goto fail;
570         }
571
572         /* Now, prepend to free list */
573         rec->magic = TDB_FREE_MAGIC;
574
575         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
576             rec_write(tdb, offset, rec) == -1 ||
577             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
578                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
579                 goto fail;
580         }
581
582         /* And we're done. */
583         tdb_unlock(tdb, -1, F_WRLCK);
584         return 0;
585
586  fail:
587         tdb_unlock(tdb, -1, F_WRLCK);
588         return -1;
589 }
590
591
592 /* expand a file.  we prefer to use ftruncate, as that is what posix
593   says to use for mmap expansion */
594 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
595 {
596         char buf[1024];
597 #if HAVE_FTRUNCATE_EXTEND
598         if (ftruncate(tdb->fd, size+addition) != 0) {
599                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
600                            size+addition, strerror(errno)));
601                 return -1;
602         }
603 #else
604         char b = 0;
605         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
606             write(tdb->fd, &b, 1) != 1) {
607                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
608                            size+addition, strerror(errno)));
609                 return -1;
610         }
611 #endif
612         /* now fill the file with something. This ensures that the file isn't sparse, which would be
613            very bad if we ran out of disk. This must be done with write, not via mmap */
614         memset(buf, 0x42, sizeof(buf));
615         if (lseek(tdb->fd, size, SEEK_SET) != size)
616                 return -1;
617         while (addition) {
618                 int n = addition>sizeof(buf)?sizeof(buf):addition;
619                 int ret = write(tdb->fd, buf, n);
620                 if (ret != n) {
621                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
622                                    n, strerror(errno)));
623                         return -1;
624                 }
625                 addition -= n;
626         }
627         return 0;
628 }
629
630
631 /* expand the database at least size bytes by expanding the underlying
632    file and doing the mmap again if necessary */
633 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
634 {
635         struct list_struct rec;
636         tdb_off offset;
637
638         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
639                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
640                 return 0;
641         }
642
643         /* must know about any previous expansions by another process */
644         tdb_oob(tdb, tdb->map_size + 1, 1);
645
646         /* always make room for at least 10 more records, and round
647            the database up to a multiple of TDB_PAGE_SIZE */
648         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
649
650         if (!(tdb->flags & TDB_INTERNAL))
651                 tdb_munmap(tdb);
652
653         /*
654          * We must ensure the file is unmapped before doing this
655          * to ensure consistency with systems like OpenBSD where
656          * writes and mmaps are not consistent.
657          */
658
659         /* expand the file itself */
660         if (!(tdb->flags & TDB_INTERNAL)) {
661                 if (expand_file(tdb, tdb->map_size, size) != 0)
662                         goto fail;
663         }
664
665         tdb->map_size += size;
666
667         if (tdb->flags & TDB_INTERNAL)
668                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
669         else {
670                 /*
671                  * We must ensure the file is remapped before adding the space
672                  * to ensure consistency with systems like OpenBSD where
673                  * writes and mmaps are not consistent.
674                  */
675
676                 /* We're ok if the mmap fails as we'll fallback to read/write */
677                 tdb_mmap(tdb);
678         }
679
680         /* form a new freelist record */
681         memset(&rec,'\0',sizeof(rec));
682         rec.rec_len = size - sizeof(rec);
683
684         /* link it into the free list */
685         offset = tdb->map_size - size;
686         if (tdb_free(tdb, offset, &rec) == -1)
687                 goto fail;
688
689         tdb_unlock(tdb, -1, F_WRLCK);
690         return 0;
691  fail:
692         tdb_unlock(tdb, -1, F_WRLCK);
693         return -1;
694 }
695
696 /* allocate some space from the free list. The offset returned points
697    to a unconnected list_struct within the database with room for at
698    least length bytes of total data
699
700    0 is returned if the space could not be allocated
701  */
702 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
703                             struct list_struct *rec)
704 {
705         tdb_off rec_ptr, last_ptr, newrec_ptr;
706         struct list_struct newrec;
707
708         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
709                 return 0;
710
711         /* Extra bytes required for tailer */
712         length += sizeof(tdb_off);
713
714  again:
715         last_ptr = FREELIST_TOP;
716
717         /* read in the freelist top */
718         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
719                 goto fail;
720
721         /* keep looking until we find a freelist record big enough */
722         while (rec_ptr) {
723                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
724                         goto fail;
725
726                 if (rec->rec_len >= length) {
727                         /* found it - now possibly split it up  */
728                         if (rec->rec_len > length + MIN_REC_SIZE) {
729                                 /* Length of left piece */
730                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
731
732                                 /* Right piece to go on free list */
733                                 newrec.rec_len = rec->rec_len
734                                         - (sizeof(*rec) + length);
735                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
736
737                                 /* And left record is shortened */
738                                 rec->rec_len = length;
739                         } else
740                                 newrec_ptr = 0;
741
742                         /* Remove allocated record from the free list */
743                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
744                                 goto fail;
745
746                         /* Update header: do this before we drop alloc
747                            lock, otherwise tdb_free() might try to
748                            merge with us, thinking we're free.
749                            (Thanks Jeremy Allison). */
750                         rec->magic = TDB_MAGIC;
751                         if (rec_write(tdb, rec_ptr, rec) == -1)
752                                 goto fail;
753
754                         /* Did we create new block? */
755                         if (newrec_ptr) {
756                                 /* Update allocated record tailer (we
757                                    shortened it). */
758                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
759                                         goto fail;
760
761                                 /* Free new record */
762                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
763                                         goto fail;
764                         }
765
766                         /* all done - return the new record offset */
767                         tdb_unlock(tdb, -1, F_WRLCK);
768                         return rec_ptr;
769                 }
770                 /* move to the next record */
771                 last_ptr = rec_ptr;
772                 rec_ptr = rec->next;
773         }
774         /* we didn't find enough space. See if we can expand the
775            database and if we can then try again */
776         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
777                 goto again;
778  fail:
779         tdb_unlock(tdb, -1, F_WRLCK);
780         return 0;
781 }
782
783 /* initialise a new database with a specified hash size */
784 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
785 {
786         struct tdb_header *newdb;
787         int size, ret = -1;
788
789         /* We make it up in memory, then write it out if not internal */
790         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
791         if (!(newdb = calloc(size, 1)))
792                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
793
794         /* Fill in the header */
795         newdb->version = TDB_VERSION;
796         newdb->hash_size = hash_size;
797 #ifdef USE_SPINLOCKS
798         newdb->rwlocks = size;
799 #endif
800         if (tdb->flags & TDB_INTERNAL) {
801                 tdb->map_size = size;
802                 tdb->map_ptr = (char *)newdb;
803                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
804                 /* Convert the `ondisk' version if asked. */
805                 CONVERT(*newdb);
806                 return 0;
807         }
808         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
809                 goto fail;
810
811         if (ftruncate(tdb->fd, 0) == -1)
812                 goto fail;
813
814         /* This creates an endian-converted header, as if read from disk */
815         CONVERT(*newdb);
816         memcpy(&tdb->header, newdb, sizeof(tdb->header));
817         /* Don't endian-convert the magic food! */
818         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
819         if (write(tdb->fd, newdb, size) != size)
820                 ret = -1;
821         else
822                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
823
824   fail:
825         free(newdb);
826         return ret;
827 }
828
829 /* Returns 0 on fail.  On success, return offset of record, and fills
830    in rec */
831 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
832                         struct list_struct *r)
833 {
834         tdb_off rec_ptr;
835         
836         /* read in the hash top */
837         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
838                 return 0;
839
840         /* keep looking until we find the right record */
841         while (rec_ptr) {
842                 if (rec_read(tdb, rec_ptr, r) == -1)
843                         return 0;
844
845                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
846                         char *k;
847                         /* a very likely hit - read the key */
848                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
849                                            r->key_len);
850                         if (!k)
851                                 return 0;
852
853                         if (memcmp(key.dptr, k, key.dsize) == 0) {
854                                 free(k);
855                                 return rec_ptr;
856                         }
857                         free(k);
858                 }
859                 rec_ptr = r->next;
860         }
861         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
862 }
863
864 /* If they do lockkeys, check that this hash is one they locked */
865 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
866 {
867         u32 i;
868         if (!tdb->lockedkeys)
869                 return 1;
870         for (i = 0; i < tdb->lockedkeys[0]; i++)
871                 if (tdb->lockedkeys[i+1] == hash)
872                         return 1;
873         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
874 }
875
876 /* As tdb_find, but if you succeed, keep the lock */
877 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
878                              struct list_struct *rec)
879 {
880         u32 hash, rec_ptr;
881
882         hash = tdb_hash(&key);
883         if (!tdb_keylocked(tdb, hash))
884                 return 0;
885         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
886                 return 0;
887         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
888                 tdb_unlock(tdb, BUCKET(hash), locktype);
889         return rec_ptr;
890 }
891
892 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
893 {
894         return tdb->ecode;
895 }
896
897 static struct tdb_errname {
898         enum TDB_ERROR ecode; const char *estring;
899 } emap[] = { {TDB_SUCCESS, "Success"},
900              {TDB_ERR_CORRUPT, "Corrupt database"},
901              {TDB_ERR_IO, "IO Error"},
902              {TDB_ERR_LOCK, "Locking error"},
903              {TDB_ERR_OOM, "Out of memory"},
904              {TDB_ERR_EXISTS, "Record exists"},
905              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
906              {TDB_ERR_NOEXIST, "Record does not exist"} };
907
908 /* Error string for the last tdb error */
909 const char *tdb_errorstr(TDB_CONTEXT *tdb)
910 {
911         u32 i;
912         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
913                 if (tdb->ecode == emap[i].ecode)
914                         return emap[i].estring;
915         return "Invalid error code";
916 }
917
918 /* update an entry in place - this only works if the new data size
919    is <= the old data size and the key exists.
920    on failure return -1
921 */
922 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
923 {
924         struct list_struct rec;
925         tdb_off rec_ptr;
926         int ret = -1;
927
928         /* find entry */
929         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
930                 return -1;
931
932         /* must be long enough key, data and tailer */
933         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
934                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
935                 goto out;
936         }
937
938         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
939                       dbuf.dptr, dbuf.dsize) == -1)
940                 goto out;
941
942         if (dbuf.dsize != rec.data_len) {
943                 /* update size */
944                 rec.data_len = dbuf.dsize;
945                 ret = rec_write(tdb, rec_ptr, &rec);
946         } else
947                 ret = 0;
948  out:
949         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
950         return ret;
951 }
952
953 /* find an entry in the database given a key */
954 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
955 {
956         tdb_off rec_ptr;
957         struct list_struct rec;
958         TDB_DATA ret;
959
960         /* find which hash bucket it is in */
961         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
962                 return tdb_null;
963
964         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
965                                   rec.data_len);
966         ret.dsize = rec.data_len;
967         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
968         return ret;
969 }
970
971 /* check if an entry in the database exists 
972
973    note that 1 is returned if the key is found and 0 is returned if not found
974    this doesn't match the conventions in the rest of this module, but is
975    compatible with gdbm
976 */
977 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
978 {
979         struct list_struct rec;
980         
981         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
982                 return 0;
983         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
984         return 1;
985 }
986
987 /* record lock stops delete underneath */
988 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
989 {
990         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
991 }
992 /* write locks override our own fcntl readlocks, so check it here */
993 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
994 {
995         struct tdb_traverse_lock *i;
996         for (i = &tdb->travlocks; i; i = i->next)
997                 if (i->off == off)
998                         return -1;
999         return tdb_brlock(tdb, off, F_WRLCK, F_SETLKW, 1);
1000 }
1001 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1002 {
1003         return tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0);
1004 }
1005 /* fcntl locks don't stack: avoid unlocking someone else's */
1006 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1007 {
1008         struct tdb_traverse_lock *i;
1009         u32 count = 0;
1010
1011         if (off == 0)
1012                 return 0;
1013         for (i = &tdb->travlocks; i; i = i->next)
1014                 if (i->off == off)
1015                         count++;
1016         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1017 }
1018
1019 /* actually delete an entry in the database given the offset */
1020 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1021 {
1022         tdb_off last_ptr, i;
1023         struct list_struct lastrec;
1024
1025         if (tdb->read_only) return -1;
1026
1027         if (write_lock_record(tdb, rec_ptr) == -1) {
1028                 /* Someone traversing here: mark it as dead */
1029                 rec->magic = TDB_DEAD_MAGIC;
1030                 return rec_write(tdb, rec_ptr, rec);
1031         }
1032         write_unlock_record(tdb, rec_ptr);
1033
1034         /* find previous record in hash chain */
1035         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1036                 return -1;
1037         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1038                 if (rec_read(tdb, i, &lastrec) == -1)
1039                         return -1;
1040
1041         /* unlink it: next ptr is at start of record. */
1042         if (last_ptr == 0)
1043                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1044         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1045                 return -1;
1046
1047         /* recover the space */
1048         if (tdb_free(tdb, rec_ptr, rec) == -1)
1049                 return -1;
1050         return 0;
1051 }
1052
1053 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1054 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1055                          struct list_struct *rec)
1056 {
1057         int want_next = (tlock->off != 0);
1058
1059         /* No traversal allows if you've called tdb_lockkeys() */
1060         if (tdb->lockedkeys)
1061                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1062
1063         /* Lock each chain from the start one. */
1064         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1065                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1066                         return -1;
1067
1068                 /* No previous record?  Start at top of chain. */
1069                 if (!tlock->off) {
1070                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1071                                      &tlock->off) == -1)
1072                                 goto fail;
1073                 } else {
1074                         /* Otherwise unlock the previous record. */
1075                         unlock_record(tdb, tlock->off);
1076                 }
1077
1078                 if (want_next) {
1079                         /* We have offset of old record: grab next */
1080                         if (rec_read(tdb, tlock->off, rec) == -1)
1081                                 goto fail;
1082                         tlock->off = rec->next;
1083                 }
1084
1085                 /* Iterate through chain */
1086                 while( tlock->off) {
1087                         tdb_off current;
1088                         if (rec_read(tdb, tlock->off, rec) == -1)
1089                                 goto fail;
1090                         if (!TDB_DEAD(rec)) {
1091                                 /* Woohoo: we found one! */
1092                                 lock_record(tdb, tlock->off);
1093                                 return tlock->off;
1094                         }
1095                         /* Try to clean dead ones from old traverses */
1096                         current = tlock->off;
1097                         tlock->off = rec->next;
1098                         do_delete(tdb, current, rec);
1099                 }
1100                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1101                 want_next = 0;
1102         }
1103         /* We finished iteration without finding anything */
1104         return TDB_ERRCODE(TDB_SUCCESS, 0);
1105
1106  fail:
1107         tlock->off = 0;
1108         tdb_unlock(tdb, tlock->hash, F_WRLCK);
1109         return -1;
1110 }
1111
1112 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1113    return -1 on error or the record count traversed
1114    if fn is NULL then it is not called
1115    a non-zero return value from fn() indicates that the traversal should stop
1116   */
1117 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1118 {
1119         TDB_DATA key, dbuf;
1120         struct list_struct rec;
1121         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1122         int ret, count = 0;
1123
1124         /* This was in the initializaton, above, but the IRIX compiler
1125          * did not like it.  crh
1126          */
1127         tl.next = tdb->travlocks.next;
1128
1129         /* fcntl locks don't stack: beware traverse inside traverse */
1130         tdb->travlocks.next = &tl;
1131
1132         /* tdb_next_lock places locks on the record returned, and its chain */
1133         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1134                 count++;
1135                 /* now read the full record */
1136                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1137                                           rec.key_len + rec.data_len);
1138                 if (!key.dptr) {
1139                         tdb_unlock(tdb, tl.hash, F_WRLCK);
1140                         unlock_record(tdb, tl.off);
1141                         tdb->travlocks.next = tl.next;
1142                         return -1;
1143                 }
1144                 key.dsize = rec.key_len;
1145                 dbuf.dptr = key.dptr + rec.key_len;
1146                 dbuf.dsize = rec.data_len;
1147
1148                 /* Drop chain lock, call out */
1149                 tdb_unlock(tdb, tl.hash, F_WRLCK);
1150                 if (fn && fn(tdb, key, dbuf, state)) {
1151                         /* They want us to terminate traversal */
1152                         unlock_record(tdb, tl.off);
1153                         tdb->travlocks.next = tl.next;
1154                         free(key.dptr);
1155                         return count;
1156                 }
1157                 free(key.dptr);
1158         }
1159         tdb->travlocks.next = tl.next;
1160         if (ret < 0)
1161                 return -1;
1162         else
1163                 return count;
1164 }
1165
1166 /* find the first entry in the database and return its key */
1167 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1168 {
1169         TDB_DATA key;
1170         struct list_struct rec;
1171
1172         /* release any old lock */
1173         unlock_record(tdb, tdb->travlocks.off);
1174         tdb->travlocks.off = tdb->travlocks.hash = 0;
1175
1176         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1177                 return tdb_null;
1178         /* now read the key */
1179         key.dsize = rec.key_len;
1180         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1181         tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK);
1182         return key;
1183 }
1184
1185 /* find the next entry in the database, returning its key */
1186 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1187 {
1188         u32 oldhash;
1189         TDB_DATA key = tdb_null;
1190         struct list_struct rec;
1191         char *k = NULL;
1192
1193         /* Is locked key the old key?  If so, traverse will be reliable. */
1194         if (tdb->travlocks.off) {
1195                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1196                         return tdb_null;
1197                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1198                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1199                                             rec.key_len))
1200                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1201                         /* No, it wasn't: unlock it and start from scratch */
1202                         unlock_record(tdb, tdb->travlocks.off);
1203                         tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
1204                         tdb->travlocks.off = 0;
1205                 }
1206
1207                 if (k)
1208                         free(k);
1209         }
1210
1211         if (!tdb->travlocks.off) {
1212                 /* No previous element: do normal find, and lock record */
1213                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1214                 if (!tdb->travlocks.off)
1215                         return tdb_null;
1216                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1217                 lock_record(tdb, tdb->travlocks.off);
1218         }
1219         oldhash = tdb->travlocks.hash;
1220
1221         /* Grab next record: locks chain and returned record,
1222            unlocks old record */
1223         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1224                 key.dsize = rec.key_len;
1225                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1226                                           key.dsize);
1227                 /* Unlock the chain of this new record */
1228                 tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
1229         }
1230         /* Unlock the chain of old record */
1231         tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK);
1232         return key;
1233 }
1234
1235 /* delete an entry in the database given a key */
1236 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1237 {
1238         tdb_off rec_ptr;
1239         struct list_struct rec;
1240         int ret;
1241
1242         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1243                 return -1;
1244         ret = do_delete(tdb, rec_ptr, &rec);
1245         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1246         return ret;
1247 }
1248
1249 /* store an element in the database, replacing any existing element
1250    with the same key 
1251
1252    return 0 on success, -1 on failure
1253 */
1254 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1255 {
1256         struct list_struct rec;
1257         u32 hash;
1258         tdb_off rec_ptr;
1259         char *p = NULL;
1260         int ret = 0;
1261
1262         /* find which hash bucket it is in */
1263         hash = tdb_hash(&key);
1264         if (!tdb_keylocked(tdb, hash))
1265                 return -1;
1266         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1267                 return -1;
1268
1269         /* check for it existing, on insert. */
1270         if (flag == TDB_INSERT) {
1271                 if (tdb_exists(tdb, key)) {
1272                         tdb->ecode = TDB_ERR_EXISTS;
1273                         goto fail;
1274                 }
1275         } else {
1276                 /* first try in-place update, on modify or replace. */
1277                 if (tdb_update(tdb, key, dbuf) == 0)
1278                         goto out;
1279                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1280                         goto fail;
1281         }
1282         /* reset the error code potentially set by the tdb_update() */
1283         tdb->ecode = TDB_SUCCESS;
1284
1285         /* delete any existing record - if it doesn't exist we don't
1286            care.  Doing this first reduces fragmentation, and avoids
1287            coalescing with `allocated' block before it's updated. */
1288         if (flag != TDB_INSERT)
1289                 tdb_delete(tdb, key);
1290
1291         /* Copy key+value *before* allocating free space in case malloc
1292            fails and we are left with a dead spot in the tdb. */
1293
1294         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1295                 tdb->ecode = TDB_ERR_OOM;
1296                 goto fail;
1297         }
1298
1299         memcpy(p, key.dptr, key.dsize);
1300         memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1301
1302         /* now we're into insert / modify / replace of a record which
1303          * we know could not be optimised by an in-place store (for
1304          * various reasons).  */
1305         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1306                 goto fail;
1307
1308         /* Read hash top into next ptr */
1309         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1310                 goto fail;
1311
1312         rec.key_len = key.dsize;
1313         rec.data_len = dbuf.dsize;
1314         rec.full_hash = hash;
1315         rec.magic = TDB_MAGIC;
1316
1317         /* write out and point the top of the hash chain at it */
1318         if (rec_write(tdb, rec_ptr, &rec) == -1
1319             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1320             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1321         fail:
1322                 /* Need to tdb_unallocate() here */
1323                 ret = -1;
1324         }
1325  out:
1326         if (p)
1327                 free(p); 
1328         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1329         return ret;
1330 }
1331
1332 /* open the database, creating it if necessary 
1333
1334    The open_flags and mode are passed straight to the open call on the
1335    database file. A flags value of O_WRONLY is invalid. The hash size
1336    is advisory, use zero for a default value.
1337
1338    return is NULL on error */
1339 TDB_CONTEXT *tdb_open(char *name, int hash_size, int tdb_flags,
1340                       int open_flags, mode_t mode)
1341 {
1342         TDB_CONTEXT tdb, *ret, *i;
1343         struct stat st;
1344         int rev = 0, locked;
1345
1346         memset(&tdb, 0, sizeof(tdb));
1347         tdb.fd = -1;
1348         tdb.name = NULL;
1349         tdb.map_ptr = NULL;
1350         tdb.lockedkeys = NULL;
1351         tdb.flags = tdb_flags;
1352
1353         if ((open_flags & O_ACCMODE) == O_WRONLY)
1354                 goto fail;
1355         if (hash_size == 0)
1356                 hash_size = DEFAULT_HASH_SIZE;
1357         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1358                 tdb.read_only = 1;
1359                 /* read only databases don't do locking */
1360                 tdb.flags |= TDB_NOLOCK;
1361         }
1362
1363         /* internal databases don't mmap or lock, and start off cleared */
1364         if (tdb.flags & TDB_INTERNAL) {
1365                 tdb.flags |= (TDB_NOLOCK | TDB_NOMMAP);
1366                 tdb.flags &= ~TDB_CLEAR_IF_FIRST;
1367                 tdb_new_database(&tdb, hash_size);
1368                 goto internal;
1369         }
1370
1371         if ((tdb.fd = open(name, open_flags, mode)) == -1)
1372                 goto fail;
1373
1374         /* ensure there is only one process initialising at once */
1375         if (tdb_brlock(&tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1)
1376                 goto fail;
1377
1378         /* we need to zero database if we are the only one with it open */
1379         if ((locked = (tdb_brlock(&tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1380             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1381                 open_flags |= O_CREAT;
1382                 if (ftruncate(tdb.fd, 0) == -1)
1383                         goto fail;
1384         }
1385
1386         if (read(tdb.fd, &tdb.header, sizeof(tdb.header)) != sizeof(tdb.header)
1387             || strcmp(tdb.header.magic_food, TDB_MAGIC_FOOD) != 0
1388             || (tdb.header.version != TDB_VERSION
1389                 && !(rev = (tdb.header.version==TDB_BYTEREV(TDB_VERSION))))) {
1390                 /* its not a valid database - possibly initialise it */
1391                 if (!(open_flags & O_CREAT) || tdb_new_database(&tdb, hash_size) == -1)
1392                         goto fail;
1393                 rev = (tdb.flags & TDB_CONVERT);
1394         }
1395         if (!rev)
1396                 tdb.flags &= ~TDB_CONVERT;
1397         else {
1398                 tdb.flags |= TDB_CONVERT;
1399                 convert(&tdb.header, sizeof(tdb.header));
1400         }
1401         if (fstat(tdb.fd, &st) == -1)
1402                 goto fail;
1403
1404         /* Is it already in the open list?  If so, fail. */
1405         for (i = tdbs; i; i = i->next) {
1406                 if (i->device == st.st_dev && i->inode == st.st_ino) {
1407                         errno = EBUSY;
1408                         close(tdb.fd);
1409                         return NULL;
1410                 }
1411         }
1412
1413         /* map the database and fill in the return structure */
1414         tdb.name = (char *)strdup(name);
1415         if (!tdb.name)
1416                 goto fail;
1417         tdb.map_size = st.st_size;
1418         tdb.device = st.st_dev;
1419         tdb.inode = st.st_ino;
1420         tdb.locked = calloc(tdb.header.hash_size+1, sizeof(tdb.locked[0]));
1421         if (!tdb.locked)
1422                 goto fail;
1423         tdb_mmap(&tdb);
1424         if (locked) {
1425                 tdb_clear_spinlocks(&tdb);
1426                 if (tdb_brlock(&tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1)
1427                         goto fail;
1428         }
1429         /* leave this lock in place to indicate it's in use */
1430         if (tdb_brlock(&tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1431                 goto fail;
1432
1433  internal:
1434         if (!(ret = malloc(sizeof(tdb))))
1435                 goto fail;
1436         *ret = tdb;
1437         if (tdb_brlock(&tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1438                 goto fail;
1439         ret->next = tdbs;
1440         tdbs = ret;
1441         return ret;
1442
1443  fail:
1444         if (tdb.map_ptr) {
1445                 if (tdb.flags & TDB_INTERNAL)
1446                         free(tdb.map_ptr);
1447                 else
1448                         tdb_munmap(&tdb);
1449         }
1450         if (tdb.name)
1451                 free(tdb.name);
1452         if (tdb.fd != -1)
1453                 close(tdb.fd);
1454         if (tdb.locked)
1455                 free(tdb.locked);
1456         return NULL;
1457 }
1458
1459 /* close a database */
1460 int tdb_close(TDB_CONTEXT *tdb)
1461 {
1462         TDB_CONTEXT **i;
1463         int ret = 0;
1464
1465         if (tdb->map_ptr) {
1466                 if (tdb->flags & TDB_INTERNAL)
1467                         free(tdb->map_ptr);
1468                 else
1469                         tdb_munmap(tdb);
1470         }
1471         if (tdb->name)
1472                 free(tdb->name);
1473         if (tdb->fd != -1)
1474                 ret = close(tdb->fd);
1475         if (tdb->locked)
1476                 free(tdb->locked);
1477         if (tdb->lockedkeys)
1478                 free(tdb->lockedkeys);
1479
1480         /* Remove from contexts list */
1481         for (i = &tdbs; *i; i = &(*i)->next) {
1482                 if (*i == tdb) {
1483                         *i = tdb->next;
1484                         break;
1485                 }
1486         }
1487
1488         memset(tdb, 0, sizeof(*tdb));
1489         free(tdb);
1490
1491         return ret;
1492 }
1493
1494 /* lock/unlock entire database */
1495 int tdb_lockall(TDB_CONTEXT *tdb)
1496 {
1497         u32 i;
1498
1499         /* There are no locks on read-only dbs */
1500         if (tdb->read_only)
1501                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1502         if (tdb->lockedkeys)
1503                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1504         for (i = 0; i < tdb->header.hash_size; i++) 
1505                 if (tdb_lock(tdb, i, F_WRLCK))
1506                         break;
1507
1508         /* If error, release locks we have... */
1509         if (i < tdb->header.hash_size) {
1510                 u32 j;
1511
1512                 for ( j = 0; j < i; j++)
1513                         tdb_unlock(tdb, j, F_WRLCK);
1514                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1515         }
1516
1517         return 0;
1518 }
1519 void tdb_unlockall(TDB_CONTEXT *tdb)
1520 {
1521         u32 i;
1522         for (i=0; i < tdb->header.hash_size; i++)
1523                 tdb_unlock(tdb, i, F_WRLCK);
1524 }
1525
1526 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1527 {
1528         u32 i, j, hash;
1529
1530         /* Can't lock more keys if already locked */
1531         if (tdb->lockedkeys)
1532                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1533         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1534                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1535         /* First number in array is # keys */
1536         tdb->lockedkeys[0] = number;
1537
1538         /* Insertion sort by bucket */
1539         for (i = 0; i < number; i++) {
1540                 hash = tdb_hash(&keys[i]);
1541                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1542                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1543                 tdb->lockedkeys[j+1] = hash;
1544         }
1545         /* Finally, lock in order */
1546         for (i = 0; i < number; i++)
1547                 if (tdb_lock(tdb, i, F_WRLCK))
1548                         break;
1549
1550         /* If error, release locks we have... */
1551         if (i < number) {
1552                 for ( j = 0; j < i; j++)
1553                         tdb_unlock(tdb, j, F_WRLCK);
1554                 free(tdb->lockedkeys);
1555                 tdb->lockedkeys = NULL;
1556                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1557         }
1558         return 0;
1559 }
1560
1561 /* Unlock the keys previously locked by tdb_lockkeys() */
1562 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1563 {
1564         u32 i;
1565         for (i = 0; i < tdb->lockedkeys[0]; i++)
1566                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1567         free(tdb->lockedkeys);
1568         tdb->lockedkeys = NULL;
1569 }
1570
1571 /* lock/unlock one hash chain. This is meant to be used to reduce
1572    contention - it cannot guarantee how many records will be locked */
1573 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1574 {
1575         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1576 }
1577 void tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1578 {
1579         tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1580 }
1581
1582
1583 /* register a loging function */
1584 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1585 {
1586         tdb->log_fn = fn;
1587 }