2a6dca16a8e0ff1a5c5ebfed7cc0623116a1a460
[kai/samba.git] / source3 / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23 #ifdef STANDALONE
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <string.h>
33 #include <fcntl.h>
34 #include <errno.h>
35 #include <sys/mman.h>
36 #include <sys/stat.h>
37 #include <signal.h>
38 #include "tdb.h"
39 #include "spinlock.h"
40 #else
41 #include "includes.h"
42 #endif
43
44 #define TDB_MAGIC_FOOD "TDB file\n"
45 #define TDB_VERSION (0x26011967 + 6)
46 #define TDB_MAGIC (0x26011999U)
47 #define TDB_FREE_MAGIC (~TDB_MAGIC)
48 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
49 #define TDB_ALIGNMENT 4
50 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
51 #define DEFAULT_HASH_SIZE 131
52 #define TDB_PAGE_SIZE 0x2000
53 #define FREELIST_TOP (sizeof(struct tdb_header))
54 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
55 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
56 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
57 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
58 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
59
60 /* NB assumes there is a local variable called "tdb" that is the
61  * current context, also takes doubly-parenthesized print-style
62  * argument. */
63 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
64
65 /* lock offsets */
66 #define GLOBAL_LOCK 0
67 #define ACTIVE_LOCK 4
68
69 #ifndef MAP_FILE
70 #define MAP_FILE 0
71 #endif
72
73 #ifndef MAP_FAILED
74 #define MAP_FAILED ((void *)-1)
75 #endif
76
77 /* free memory if the pointer is valid and zero the pointer */
78 #ifndef SAFE_FREE
79 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
80 #endif
81
82 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
83 TDB_DATA tdb_null;
84
85 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
86 static TDB_CONTEXT *tdbs = NULL;
87
88 static int tdb_munmap(TDB_CONTEXT *tdb)
89 {
90         if (tdb->flags & TDB_INTERNAL)
91                 return 0;
92
93 #ifdef HAVE_MMAP
94         if (tdb->map_ptr) {
95                 int ret = munmap(tdb->map_ptr, tdb->map_size);
96                 if (ret != 0)
97                         return ret;
98         }
99 #endif
100         tdb->map_ptr = NULL;
101         return 0;
102 }
103
104 static void tdb_mmap(TDB_CONTEXT *tdb)
105 {
106         if (tdb->flags & TDB_INTERNAL)
107                 return;
108
109 #ifdef HAVE_MMAP
110         if (!(tdb->flags & TDB_NOMMAP)) {
111                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
112                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
113                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
114
115                 /*
116                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
117                  */
118
119                 if (tdb->map_ptr == MAP_FAILED) {
120                         tdb->map_ptr = NULL;
121                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
122                                  tdb->map_size, strerror(errno)));
123                 }
124         } else {
125                 tdb->map_ptr = NULL;
126         }
127 #else
128         tdb->map_ptr = NULL;
129 #endif
130 }
131
132 /* Endian conversion: we only ever deal with 4 byte quantities */
133 static void *convert(void *buf, u32 size)
134 {
135         u32 i, *p = buf;
136         for (i = 0; i < size / 4; i++)
137                 p[i] = TDB_BYTEREV(p[i]);
138         return buf;
139 }
140 #define DOCONV() (tdb->flags & TDB_CONVERT)
141 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
142
143 /* the body of the database is made of one list_struct for the free space
144    plus a separate data list for each hash value */
145 struct list_struct {
146         tdb_off next; /* offset of the next record in the list */
147         tdb_len rec_len; /* total byte length of record */
148         tdb_len key_len; /* byte length of key */
149         tdb_len data_len; /* byte length of data */
150         u32 full_hash; /* the full 32 bit hash of the key */
151         u32 magic;   /* try to catch errors */
152         /* the following union is implied:
153                 union {
154                         char record[rec_len];
155                         struct {
156                                 char key[key_len];
157                                 char data[data_len];
158                         }
159                         u32 totalsize; (tailer)
160                 }
161         */
162 };
163
164 /***************************************************************
165  Allow a caller to set a "alarm" flag that tdb can check to abort
166  a blocking lock on SIGALRM.
167 ***************************************************************/
168
169 static sig_atomic_t *palarm_fired;
170
171 void tdb_set_lock_alarm(sig_atomic_t *palarm)
172 {
173         palarm_fired = palarm;
174 }
175
176 /* a byte range locking function - return 0 on success
177    this functions locks/unlocks 1 byte at the specified offset.
178
179    On error, errno is also set so that errors are passed back properly
180    through tdb_open(). */
181 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
182                       int rw_type, int lck_type, int probe)
183 {
184         struct flock fl;
185         int ret;
186
187         if (tdb->flags & TDB_NOLOCK)
188                 return 0;
189         if (tdb->read_only) {
190                 errno = EACCES;
191                 return -1;
192         }
193
194         fl.l_type = rw_type;
195         fl.l_whence = SEEK_SET;
196         fl.l_start = offset;
197         fl.l_len = 1;
198         fl.l_pid = 0;
199
200         do {
201                 ret = fcntl(tdb->fd,lck_type,&fl);
202                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
203                         break;
204         } while (ret == -1 && errno == EINTR);
205
206         if (ret == -1) {
207                 if (!probe && lck_type != F_SETLK) {
208                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
209                                  tdb->fd, offset, rw_type, lck_type));
210                 }
211                 /* Was it an alarm timeout ? */
212                 if (errno == EINTR && palarm_fired && *palarm_fired)
213                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
214                 /* Otherwise - generic lock error. */
215                 /* errno set by fcntl */
216                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
217         }
218         return 0;
219 }
220
221 /* lock a list in the database. list -1 is the alloc list */
222 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
223 {
224         if (list < -1 || list >= (int)tdb->header.hash_size) {
225                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
226                            list, ltype));
227                 return -1;
228         }
229         if (tdb->flags & TDB_NOLOCK)
230                 return 0;
231
232         /* Since fcntl locks don't nest, we do a lock for the first one,
233            and simply bump the count for future ones */
234         if (tdb->locked[list+1].count == 0) {
235                 if (!tdb->read_only && tdb->header.rwlocks) {
236                         if (tdb_spinlock(tdb, list, ltype)) {
237                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
238                                            list, ltype));
239                                 return -1;
240                         }
241                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
242                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
243                                            list, ltype, strerror(errno)));
244                         return -1;
245                 }
246                 tdb->locked[list+1].ltype = ltype;
247         }
248         tdb->locked[list+1].count++;
249         return 0;
250 }
251
252 /* unlock the database: returns void because it's too late for errors. */
253         /* changed to return int it may be interesting to know there
254            has been an error  --simo */
255 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
256 {
257         int ret = -1;
258
259         if (tdb->flags & TDB_NOLOCK)
260                 return 0;
261
262         /* Sanity checks */
263         if (list < -1 || list >= (int)tdb->header.hash_size) {
264                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
265                 return ret;
266         }
267
268         if (tdb->locked[list+1].count==0) {
269                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
270                 return ret;
271         }
272
273         if (tdb->locked[list+1].count == 1) {
274                 /* Down to last nested lock: unlock underneath */
275                 if (!tdb->read_only && tdb->header.rwlocks) {
276                         ret = tdb_spinunlock(tdb, list, ltype);
277                 } else {
278                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
279                 }
280         } else {
281                 ret = 0;
282         }
283         tdb->locked[list+1].count--;
284
285         if (ret)
286                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
287         return ret;
288 }
289
290 /* This is based on the hash algorithm from gdbm */
291 static u32 tdb_hash(TDB_DATA *key)
292 {
293         u32 value;      /* Used to compute the hash value.  */
294         u32   i;        /* Used to cycle through random values. */
295
296         /* Set the initial value from the key size. */
297         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
298                 value = (value + (key->dptr[i] << (i*5 % 24)));
299
300         return (1103515243 * value + 12345);  
301 }
302
303 /* check for an out of bounds access - if it is out of bounds then
304    see if the database has been expanded by someone else and expand
305    if necessary 
306    note that "len" is the minimum length needed for the db
307 */
308 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
309 {
310         struct stat st;
311         if (len <= tdb->map_size)
312                 return 0;
313         if (tdb->flags & TDB_INTERNAL) {
314                 if (!probe) {
315                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
316                                  (int)len, (int)tdb->map_size));
317                 }
318                 return TDB_ERRCODE(TDB_ERR_IO, -1);
319         }
320
321         if (fstat(tdb->fd, &st) == -1)
322                 return TDB_ERRCODE(TDB_ERR_IO, -1);
323
324         if (st.st_size < (size_t)len) {
325                 if (!probe) {
326                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
327                                  (int)len, (int)st.st_size));
328                 }
329                 return TDB_ERRCODE(TDB_ERR_IO, -1);
330         }
331
332         /* Unmap, update size, remap */
333         if (tdb_munmap(tdb) == -1)
334                 return TDB_ERRCODE(TDB_ERR_IO, -1);
335         tdb->map_size = st.st_size;
336         tdb_mmap(tdb);
337         return 0;
338 }
339
340 /* write a lump of data at a specified offset */
341 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
342 {
343         if (tdb_oob(tdb, off + len, 0) != 0)
344                 return -1;
345
346         if (tdb->map_ptr)
347                 memcpy(off + (char *)tdb->map_ptr, buf, len);
348 #ifdef HAVE_PWRITE
349         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
350 #else
351         else if (lseek(tdb->fd, off, SEEK_SET) != off
352                  || write(tdb->fd, buf, len) != (ssize_t)len) {
353 #endif
354                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
355                            off, len, strerror(errno)));
356                 return TDB_ERRCODE(TDB_ERR_IO, -1);
357         }
358         return 0;
359 }
360
361 /* read a lump of data at a specified offset, maybe convert */
362 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
363 {
364         if (tdb_oob(tdb, off + len, 0) != 0)
365                 return -1;
366
367         if (tdb->map_ptr)
368                 memcpy(buf, off + (char *)tdb->map_ptr, len);
369 #ifdef HAVE_PREAD
370         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
371 #else
372         else if (lseek(tdb->fd, off, SEEK_SET) != off
373                  || read(tdb->fd, buf, len) != (ssize_t)len) {
374 #endif
375                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
376                            off, len, strerror(errno)));
377                 return TDB_ERRCODE(TDB_ERR_IO, -1);
378         }
379         if (cv)
380                 convert(buf, len);
381         return 0;
382 }
383
384 /* read a lump of data, allocating the space for it */
385 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
386 {
387         char *buf;
388
389         if (!(buf = malloc(len))) {
390                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
391                            len, strerror(errno)));
392                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
393         }
394         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
395                 SAFE_FREE(buf);
396                 return NULL;
397         }
398         return buf;
399 }
400
401 /* read/write a tdb_off */
402 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
403 {
404         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
405 }
406 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
407 {
408         tdb_off off = *d;
409         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
410 }
411
412 /* read/write a record */
413 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
414 {
415         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
416                 return -1;
417         if (TDB_BAD_MAGIC(rec)) {
418                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
419                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
420         }
421         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
422 }
423 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
424 {
425         struct list_struct r = *rec;
426         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
427 }
428
429 /* read a freelist record and check for simple errors */
430 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
431 {
432         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
433                 return -1;
434
435         if (rec->magic == TDB_MAGIC) {
436                 /* this happens when a app is showdown while deleting a record - we should
437                    not completely fail when this happens */
438                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
439                          rec->magic, off));
440                 rec->magic = TDB_FREE_MAGIC;
441                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
442                         return -1;
443         }
444
445         if (rec->magic != TDB_FREE_MAGIC) {
446                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
447                            rec->magic, off));
448                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
449         }
450         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
451                 return -1;
452         return 0;
453 }
454
455 /* update a record tailer (must hold allocation lock) */
456 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
457                          const struct list_struct *rec)
458 {
459         tdb_off totalsize;
460
461         /* Offset of tailer from record header */
462         totalsize = sizeof(*rec) + rec->rec_len;
463         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
464                          &totalsize);
465 }
466
467 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
468 {
469         struct list_struct rec;
470         tdb_off tailer_ofs, tailer;
471
472         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
473                 printf("ERROR: failed to read record at %u\n", offset);
474                 return 0;
475         }
476
477         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
478                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
479
480         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
481         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
482                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
483                 return rec.next;
484         }
485
486         if (tailer != rec.rec_len + sizeof(rec)) {
487                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
488                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
489         }
490         return rec.next;
491 }
492
493 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
494 {
495         tdb_off rec_ptr, top;
496
497         top = TDB_HASH_TOP(i);
498
499         if (tdb_lock(tdb, i, F_WRLCK) != 0)
500                 return -1;
501
502         if (ofs_read(tdb, top, &rec_ptr) == -1)
503                 return tdb_unlock(tdb, i, F_WRLCK);
504
505         if (rec_ptr)
506                 printf("hash=%d\n", i);
507
508         while (rec_ptr) {
509                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
510         }
511
512         return tdb_unlock(tdb, i, F_WRLCK);
513 }
514
515 void tdb_dump_all(TDB_CONTEXT *tdb)
516 {
517         int i;
518         for (i=0;i<tdb->header.hash_size;i++) {
519                 tdb_dump_chain(tdb, i);
520         }
521         printf("freelist:\n");
522         tdb_dump_chain(tdb, -1);
523 }
524
525 int tdb_printfreelist(TDB_CONTEXT *tdb)
526 {
527         int ret;
528         long total_free = 0;
529         tdb_off offset, rec_ptr;
530         struct list_struct rec;
531
532         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
533                 return ret;
534
535         offset = FREELIST_TOP;
536
537         /* read in the freelist top */
538         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
539                 tdb_unlock(tdb, -1, F_WRLCK);
540                 return 0;
541         }
542
543         printf("freelist top=[0x%08x]\n", rec_ptr );
544         while (rec_ptr) {
545                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
546                         tdb_unlock(tdb, -1, F_WRLCK);
547                         return -1;
548                 }
549
550                 if (rec.magic != TDB_FREE_MAGIC) {
551                         printf("bad magic 0x%08x in free list\n", rec.magic);
552                         tdb_unlock(tdb, -1, F_WRLCK);
553                         return -1;
554                 }
555
556                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
557                 total_free += rec.rec_len;
558
559                 /* move to the next record */
560                 rec_ptr = rec.next;
561         }
562         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
563                (int)total_free);
564
565         return tdb_unlock(tdb, -1, F_WRLCK);
566 }
567
568 /* Remove an element from the freelist.  Must have alloc lock. */
569 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
570 {
571         tdb_off last_ptr, i;
572
573         /* read in the freelist top */
574         last_ptr = FREELIST_TOP;
575         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
576                 if (i == off) {
577                         /* We've found it! */
578                         return ofs_write(tdb, last_ptr, &next);
579                 }
580                 /* Follow chain (next offset is at start of record) */
581                 last_ptr = i;
582         }
583         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
584         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
585 }
586
587 /* Add an element into the freelist. Merge adjacent records if
588    neccessary. */
589 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
590 {
591         tdb_off right, left;
592
593         /* Allocation and tailer lock */
594         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
595                 return -1;
596
597         /* set an initial tailer, so if we fail we don't leave a bogus record */
598         if (update_tailer(tdb, offset, rec) != 0) {
599                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
600                 goto fail;
601         }
602
603         /* Look right first (I'm an Australian, dammit) */
604         right = offset + sizeof(*rec) + rec->rec_len;
605         if (right + sizeof(*rec) <= tdb->map_size) {
606                 struct list_struct r;
607
608                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
609                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
610                         goto left;
611                 }
612
613                 /* If it's free, expand to include it. */
614                 if (r.magic == TDB_FREE_MAGIC) {
615                         if (remove_from_freelist(tdb, right, r.next) == -1) {
616                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
617                                 goto left;
618                         }
619                         rec->rec_len += sizeof(r) + r.rec_len;
620                 }
621         }
622
623 left:
624         /* Look left */
625         left = offset - sizeof(tdb_off);
626         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
627                 struct list_struct l;
628                 tdb_off leftsize;
629
630                 /* Read in tailer and jump back to header */
631                 if (ofs_read(tdb, left, &leftsize) == -1) {
632                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
633                         goto update;
634                 }
635                 left = offset - leftsize;
636
637                 /* Now read in record */
638                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
639                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
640                         goto update;
641                 }
642
643                 /* If it's free, expand to include it. */
644                 if (l.magic == TDB_FREE_MAGIC) {
645                         if (remove_from_freelist(tdb, left, l.next) == -1) {
646                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
647                                 goto update;
648                         } else {
649                                 offset = left;
650                                 rec->rec_len += leftsize;
651                         }
652                 }
653         }
654
655 update:
656         if (update_tailer(tdb, offset, rec) == -1) {
657                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
658                 goto fail;
659         }
660
661         /* Now, prepend to free list */
662         rec->magic = TDB_FREE_MAGIC;
663
664         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
665             rec_write(tdb, offset, rec) == -1 ||
666             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
667                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
668                 goto fail;
669         }
670
671         /* And we're done. */
672         tdb_unlock(tdb, -1, F_WRLCK);
673         return 0;
674
675  fail:
676         tdb_unlock(tdb, -1, F_WRLCK);
677         return -1;
678 }
679
680
681 /* expand a file.  we prefer to use ftruncate, as that is what posix
682   says to use for mmap expansion */
683 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
684 {
685         char buf[1024];
686 #if HAVE_FTRUNCATE_EXTEND
687         if (ftruncate(tdb->fd, size+addition) != 0) {
688                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
689                            size+addition, strerror(errno)));
690                 return -1;
691         }
692 #else
693         char b = 0;
694
695 #ifdef HAVE_PWRITE
696         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
697 #else
698         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
699             write(tdb->fd, &b, 1) != 1) {
700 #endif
701                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
702                            size+addition, strerror(errno)));
703                 return -1;
704         }
705 #endif
706
707         /* now fill the file with something. This ensures that the file isn't sparse, which would be
708            very bad if we ran out of disk. This must be done with write, not via mmap */
709         memset(buf, 0x42, sizeof(buf));
710         while (addition) {
711                 int n = addition>sizeof(buf)?sizeof(buf):addition;
712 #ifdef HAVE_PWRITE
713                 int ret = pwrite(tdb->fd, buf, n, size);
714 #else
715                 int ret;
716                 if (lseek(tdb->fd, size, SEEK_SET) != size)
717                         return -1;
718                 ret = write(tdb->fd, buf, n);
719 #endif
720                 if (ret != n) {
721                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
722                                    n, strerror(errno)));
723                         return -1;
724                 }
725                 addition -= n;
726                 size += n;
727         }
728         return 0;
729 }
730
731
732 /* expand the database at least size bytes by expanding the underlying
733    file and doing the mmap again if necessary */
734 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
735 {
736         struct list_struct rec;
737         tdb_off offset;
738
739         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
740                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
741                 return -1;
742         }
743
744         /* must know about any previous expansions by another process */
745         tdb_oob(tdb, tdb->map_size + 1, 1);
746
747         /* always make room for at least 10 more records, and round
748            the database up to a multiple of TDB_PAGE_SIZE */
749         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
750
751         if (!(tdb->flags & TDB_INTERNAL))
752                 tdb_munmap(tdb);
753
754         /*
755          * We must ensure the file is unmapped before doing this
756          * to ensure consistency with systems like OpenBSD where
757          * writes and mmaps are not consistent.
758          */
759
760         /* expand the file itself */
761         if (!(tdb->flags & TDB_INTERNAL)) {
762                 if (expand_file(tdb, tdb->map_size, size) != 0)
763                         goto fail;
764         }
765
766         tdb->map_size += size;
767
768         if (tdb->flags & TDB_INTERNAL)
769                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
770         else {
771                 /*
772                  * We must ensure the file is remapped before adding the space
773                  * to ensure consistency with systems like OpenBSD where
774                  * writes and mmaps are not consistent.
775                  */
776
777                 /* We're ok if the mmap fails as we'll fallback to read/write */
778                 tdb_mmap(tdb);
779         }
780
781         /* form a new freelist record */
782         memset(&rec,'\0',sizeof(rec));
783         rec.rec_len = size - sizeof(rec);
784
785         /* link it into the free list */
786         offset = tdb->map_size - size;
787         if (tdb_free(tdb, offset, &rec) == -1)
788                 goto fail;
789
790         tdb_unlock(tdb, -1, F_WRLCK);
791         return 0;
792  fail:
793         tdb_unlock(tdb, -1, F_WRLCK);
794         return -1;
795 }
796
797 /* allocate some space from the free list. The offset returned points
798    to a unconnected list_struct within the database with room for at
799    least length bytes of total data
800
801    0 is returned if the space could not be allocated
802  */
803 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
804                             struct list_struct *rec)
805 {
806         tdb_off rec_ptr, last_ptr, newrec_ptr;
807         struct list_struct newrec;
808
809         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
810                 return 0;
811
812         /* Extra bytes required for tailer */
813         length += sizeof(tdb_off);
814
815  again:
816         last_ptr = FREELIST_TOP;
817
818         /* read in the freelist top */
819         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
820                 goto fail;
821
822         /* keep looking until we find a freelist record big enough */
823         while (rec_ptr) {
824                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
825                         goto fail;
826
827                 if (rec->rec_len >= length) {
828                         /* found it - now possibly split it up  */
829                         if (rec->rec_len > length + MIN_REC_SIZE) {
830                                 /* Length of left piece */
831                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
832
833                                 /* Right piece to go on free list */
834                                 newrec.rec_len = rec->rec_len
835                                         - (sizeof(*rec) + length);
836                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
837
838                                 /* And left record is shortened */
839                                 rec->rec_len = length;
840                         } else
841                                 newrec_ptr = 0;
842
843                         /* Remove allocated record from the free list */
844                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
845                                 goto fail;
846
847                         /* Update header: do this before we drop alloc
848                            lock, otherwise tdb_free() might try to
849                            merge with us, thinking we're free.
850                            (Thanks Jeremy Allison). */
851                         rec->magic = TDB_MAGIC;
852                         if (rec_write(tdb, rec_ptr, rec) == -1)
853                                 goto fail;
854
855                         /* Did we create new block? */
856                         if (newrec_ptr) {
857                                 /* Update allocated record tailer (we
858                                    shortened it). */
859                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
860                                         goto fail;
861
862                                 /* Free new record */
863                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
864                                         goto fail;
865                         }
866
867                         /* all done - return the new record offset */
868                         tdb_unlock(tdb, -1, F_WRLCK);
869                         return rec_ptr;
870                 }
871                 /* move to the next record */
872                 last_ptr = rec_ptr;
873                 rec_ptr = rec->next;
874         }
875         /* we didn't find enough space. See if we can expand the
876            database and if we can then try again */
877         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
878                 goto again;
879  fail:
880         tdb_unlock(tdb, -1, F_WRLCK);
881         return 0;
882 }
883
884 /* initialise a new database with a specified hash size */
885 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
886 {
887         struct tdb_header *newdb;
888         int size, ret = -1;
889
890         /* We make it up in memory, then write it out if not internal */
891         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
892         if (!(newdb = calloc(size, 1)))
893                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
894
895         /* Fill in the header */
896         newdb->version = TDB_VERSION;
897         newdb->hash_size = hash_size;
898 #ifdef USE_SPINLOCKS
899         newdb->rwlocks = size;
900 #endif
901         if (tdb->flags & TDB_INTERNAL) {
902                 tdb->map_size = size;
903                 tdb->map_ptr = (char *)newdb;
904                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
905                 /* Convert the `ondisk' version if asked. */
906                 CONVERT(*newdb);
907                 return 0;
908         }
909         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
910                 goto fail;
911
912         if (ftruncate(tdb->fd, 0) == -1)
913                 goto fail;
914
915         /* This creates an endian-converted header, as if read from disk */
916         CONVERT(*newdb);
917         memcpy(&tdb->header, newdb, sizeof(tdb->header));
918         /* Don't endian-convert the magic food! */
919         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
920         if (write(tdb->fd, newdb, size) != size)
921                 ret = -1;
922         else
923                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
924
925   fail:
926         SAFE_FREE(newdb);
927         return ret;
928 }
929
930 /* Returns 0 on fail.  On success, return offset of record, and fills
931    in rec */
932 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
933                         struct list_struct *r)
934 {
935         tdb_off rec_ptr;
936         
937         /* read in the hash top */
938         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
939                 return 0;
940
941         /* keep looking until we find the right record */
942         while (rec_ptr) {
943                 if (rec_read(tdb, rec_ptr, r) == -1)
944                         return 0;
945
946                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
947                         char *k;
948                         /* a very likely hit - read the key */
949                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
950                                            r->key_len);
951                         if (!k)
952                                 return 0;
953
954                         if (memcmp(key.dptr, k, key.dsize) == 0) {
955                                 SAFE_FREE(k);
956                                 return rec_ptr;
957                         }
958                         SAFE_FREE(k);
959                 }
960                 rec_ptr = r->next;
961         }
962         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
963 }
964
965 /* If they do lockkeys, check that this hash is one they locked */
966 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
967 {
968         u32 i;
969         if (!tdb->lockedkeys)
970                 return 1;
971         for (i = 0; i < tdb->lockedkeys[0]; i++)
972                 if (tdb->lockedkeys[i+1] == hash)
973                         return 1;
974         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
975 }
976
977 /* As tdb_find, but if you succeed, keep the lock */
978 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
979                              struct list_struct *rec)
980 {
981         u32 hash, rec_ptr;
982
983         hash = tdb_hash(&key);
984         if (!tdb_keylocked(tdb, hash))
985                 return 0;
986         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
987                 return 0;
988         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
989                 tdb_unlock(tdb, BUCKET(hash), locktype);
990         return rec_ptr;
991 }
992
993 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
994 {
995         return tdb->ecode;
996 }
997
998 static struct tdb_errname {
999         enum TDB_ERROR ecode; const char *estring;
1000 } emap[] = { {TDB_SUCCESS, "Success"},
1001              {TDB_ERR_CORRUPT, "Corrupt database"},
1002              {TDB_ERR_IO, "IO Error"},
1003              {TDB_ERR_LOCK, "Locking error"},
1004              {TDB_ERR_OOM, "Out of memory"},
1005              {TDB_ERR_EXISTS, "Record exists"},
1006              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1007              {TDB_ERR_NOEXIST, "Record does not exist"} };
1008
1009 /* Error string for the last tdb error */
1010 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1011 {
1012         u32 i;
1013         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1014                 if (tdb->ecode == emap[i].ecode)
1015                         return emap[i].estring;
1016         return "Invalid error code";
1017 }
1018
1019 /* update an entry in place - this only works if the new data size
1020    is <= the old data size and the key exists.
1021    on failure return -1
1022 */
1023 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1024 {
1025         struct list_struct rec;
1026         tdb_off rec_ptr;
1027         int ret = -1;
1028
1029         /* find entry */
1030         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1031                 return -1;
1032
1033         /* must be long enough key, data and tailer */
1034         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1035                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1036                 goto out;
1037         }
1038
1039         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1040                       dbuf.dptr, dbuf.dsize) == -1)
1041                 goto out;
1042
1043         if (dbuf.dsize != rec.data_len) {
1044                 /* update size */
1045                 rec.data_len = dbuf.dsize;
1046                 ret = rec_write(tdb, rec_ptr, &rec);
1047         } else
1048                 ret = 0;
1049  out:
1050         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1051         return ret;
1052 }
1053
1054 /* find an entry in the database given a key */
1055 /* If an entry doesn't exist tdb_err will be set to
1056  * TDB_ERR_NOEXIST. If a key has no data attached
1057  * tdb_err will not be set. Both will return a
1058  * zero pptr and zero dsize.
1059  */
1060
1061 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1062 {
1063         tdb_off rec_ptr;
1064         struct list_struct rec;
1065         TDB_DATA ret;
1066
1067         /* find which hash bucket it is in */
1068         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1069                 return tdb_null;
1070
1071         if (rec.data_len)
1072                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1073                                           rec.data_len);
1074         else
1075                 ret.dptr = NULL;
1076         ret.dsize = rec.data_len;
1077         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1078         return ret;
1079 }
1080
1081 /* check if an entry in the database exists 
1082
1083    note that 1 is returned if the key is found and 0 is returned if not found
1084    this doesn't match the conventions in the rest of this module, but is
1085    compatible with gdbm
1086 */
1087 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1088 {
1089         struct list_struct rec;
1090         
1091         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1092                 return 0;
1093         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1094         return 1;
1095 }
1096
1097 /* record lock stops delete underneath */
1098 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1099 {
1100         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1101 }
1102 /*
1103   Write locks override our own fcntl readlocks, so check it here.
1104   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1105   an error to fail to get the lock here.
1106 */
1107  
1108 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1109 {
1110         struct tdb_traverse_lock *i;
1111         for (i = &tdb->travlocks; i; i = i->next)
1112                 if (i->off == off)
1113                         return -1;
1114         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1115 }
1116
1117 /*
1118   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1119   an error to fail to get the lock here.
1120 */
1121
1122 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1123 {
1124         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1125 }
1126 /* fcntl locks don't stack: avoid unlocking someone else's */
1127 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1128 {
1129         struct tdb_traverse_lock *i;
1130         u32 count = 0;
1131
1132         if (off == 0)
1133                 return 0;
1134         for (i = &tdb->travlocks; i; i = i->next)
1135                 if (i->off == off)
1136                         count++;
1137         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1138 }
1139
1140 /* actually delete an entry in the database given the offset */
1141 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1142 {
1143         tdb_off last_ptr, i;
1144         struct list_struct lastrec;
1145
1146         if (tdb->read_only) return -1;
1147
1148         if (write_lock_record(tdb, rec_ptr) == -1) {
1149                 /* Someone traversing here: mark it as dead */
1150                 rec->magic = TDB_DEAD_MAGIC;
1151                 return rec_write(tdb, rec_ptr, rec);
1152         }
1153         if (write_unlock_record(tdb, rec_ptr) != 0)
1154                 return -1;
1155
1156         /* find previous record in hash chain */
1157         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1158                 return -1;
1159         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1160                 if (rec_read(tdb, i, &lastrec) == -1)
1161                         return -1;
1162
1163         /* unlink it: next ptr is at start of record. */
1164         if (last_ptr == 0)
1165                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1166         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1167                 return -1;
1168
1169         /* recover the space */
1170         if (tdb_free(tdb, rec_ptr, rec) == -1)
1171                 return -1;
1172         return 0;
1173 }
1174
1175 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1176 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1177                          struct list_struct *rec)
1178 {
1179         int want_next = (tlock->off != 0);
1180
1181         /* No traversal allows if you've called tdb_lockkeys() */
1182         if (tdb->lockedkeys)
1183                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1184
1185         /* Lock each chain from the start one. */
1186         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1187                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1188                         return -1;
1189
1190                 /* No previous record?  Start at top of chain. */
1191                 if (!tlock->off) {
1192                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1193                                      &tlock->off) == -1)
1194                                 goto fail;
1195                 } else {
1196                         /* Otherwise unlock the previous record. */
1197                         if (unlock_record(tdb, tlock->off) != 0)
1198                                 goto fail;
1199                 }
1200
1201                 if (want_next) {
1202                         /* We have offset of old record: grab next */
1203                         if (rec_read(tdb, tlock->off, rec) == -1)
1204                                 goto fail;
1205                         tlock->off = rec->next;
1206                 }
1207
1208                 /* Iterate through chain */
1209                 while( tlock->off) {
1210                         tdb_off current;
1211                         if (rec_read(tdb, tlock->off, rec) == -1)
1212                                 goto fail;
1213                         if (!TDB_DEAD(rec)) {
1214                                 /* Woohoo: we found one! */
1215                                 if (lock_record(tdb, tlock->off) != 0)
1216                                         goto fail;
1217                                 return tlock->off;
1218                         }
1219                         /* Try to clean dead ones from old traverses */
1220                         current = tlock->off;
1221                         tlock->off = rec->next;
1222                         if (do_delete(tdb, current, rec) != 0)
1223                                 goto fail;
1224                 }
1225                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1226                 want_next = 0;
1227         }
1228         /* We finished iteration without finding anything */
1229         return TDB_ERRCODE(TDB_SUCCESS, 0);
1230
1231  fail:
1232         tlock->off = 0;
1233         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1234                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1235         return -1;
1236 }
1237
1238 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1239    return -1 on error or the record count traversed
1240    if fn is NULL then it is not called
1241    a non-zero return value from fn() indicates that the traversal should stop
1242   */
1243 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1244 {
1245         TDB_DATA key, dbuf;
1246         struct list_struct rec;
1247         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1248         int ret, count = 0;
1249
1250         /* This was in the initializaton, above, but the IRIX compiler
1251          * did not like it.  crh
1252          */
1253         tl.next = tdb->travlocks.next;
1254
1255         /* fcntl locks don't stack: beware traverse inside traverse */
1256         tdb->travlocks.next = &tl;
1257
1258         /* tdb_next_lock places locks on the record returned, and its chain */
1259         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1260                 count++;
1261                 /* now read the full record */
1262                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1263                                           rec.key_len + rec.data_len);
1264                 if (!key.dptr) {
1265                         ret = -1;
1266                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1267                                 goto out;
1268                         if (unlock_record(tdb, tl.off) != 0)
1269                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1270                         goto out;
1271                 }
1272                 key.dsize = rec.key_len;
1273                 dbuf.dptr = key.dptr + rec.key_len;
1274                 dbuf.dsize = rec.data_len;
1275
1276                 /* Drop chain lock, call out */
1277                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1278                         ret = -1;
1279                         goto out;
1280                 }
1281                 if (fn && fn(tdb, key, dbuf, state)) {
1282                         /* They want us to terminate traversal */
1283                         ret = count;
1284                         if (unlock_record(tdb, tl.off) != 0) {
1285                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1286                                 ret = -1;
1287                         }
1288                         tdb->travlocks.next = tl.next;
1289                         SAFE_FREE(key.dptr);
1290                         return count;
1291                 }
1292                 SAFE_FREE(key.dptr);
1293         }
1294 out:
1295         tdb->travlocks.next = tl.next;
1296         if (ret < 0)
1297                 return -1;
1298         else
1299                 return count;
1300 }
1301
1302 /* find the first entry in the database and return its key */
1303 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1304 {
1305         TDB_DATA key;
1306         struct list_struct rec;
1307
1308         /* release any old lock */
1309         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1310                 return tdb_null;
1311         tdb->travlocks.off = tdb->travlocks.hash = 0;
1312
1313         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1314                 return tdb_null;
1315         /* now read the key */
1316         key.dsize = rec.key_len;
1317         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1318         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1319                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1320         return key;
1321 }
1322
1323 /* find the next entry in the database, returning its key */
1324 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1325 {
1326         u32 oldhash;
1327         TDB_DATA key = tdb_null;
1328         struct list_struct rec;
1329         char *k = NULL;
1330
1331         /* Is locked key the old key?  If so, traverse will be reliable. */
1332         if (tdb->travlocks.off) {
1333                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1334                         return tdb_null;
1335                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1336                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1337                                             rec.key_len))
1338                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1339                         /* No, it wasn't: unlock it and start from scratch */
1340                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1341                                 return tdb_null;
1342                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1343                                 return tdb_null;
1344                         tdb->travlocks.off = 0;
1345                 }
1346
1347                 SAFE_FREE(k);
1348         }
1349
1350         if (!tdb->travlocks.off) {
1351                 /* No previous element: do normal find, and lock record */
1352                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1353                 if (!tdb->travlocks.off)
1354                         return tdb_null;
1355                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1356                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1357                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1358                         return tdb_null;
1359                 }
1360         }
1361         oldhash = tdb->travlocks.hash;
1362
1363         /* Grab next record: locks chain and returned record,
1364            unlocks old record */
1365         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1366                 key.dsize = rec.key_len;
1367                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1368                                           key.dsize);
1369                 /* Unlock the chain of this new record */
1370                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1371                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1372         }
1373         /* Unlock the chain of old record */
1374         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1375                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1376         return key;
1377 }
1378
1379 /* delete an entry in the database given a key */
1380 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1381 {
1382         tdb_off rec_ptr;
1383         struct list_struct rec;
1384         int ret;
1385
1386         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1387                 return -1;
1388         ret = do_delete(tdb, rec_ptr, &rec);
1389         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1390                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1391         return ret;
1392 }
1393
1394 /* store an element in the database, replacing any existing element
1395    with the same key 
1396
1397    return 0 on success, -1 on failure
1398 */
1399 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1400 {
1401         struct list_struct rec;
1402         u32 hash;
1403         tdb_off rec_ptr;
1404         char *p = NULL;
1405         int ret = 0;
1406
1407         /* find which hash bucket it is in */
1408         hash = tdb_hash(&key);
1409         if (!tdb_keylocked(tdb, hash))
1410                 return -1;
1411         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1412                 return -1;
1413
1414         /* check for it existing, on insert. */
1415         if (flag == TDB_INSERT) {
1416                 if (tdb_exists(tdb, key)) {
1417                         tdb->ecode = TDB_ERR_EXISTS;
1418                         goto fail;
1419                 }
1420         } else {
1421                 /* first try in-place update, on modify or replace. */
1422                 if (tdb_update(tdb, key, dbuf) == 0)
1423                         goto out;
1424                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1425                         goto fail;
1426         }
1427         /* reset the error code potentially set by the tdb_update() */
1428         tdb->ecode = TDB_SUCCESS;
1429
1430         /* delete any existing record - if it doesn't exist we don't
1431            care.  Doing this first reduces fragmentation, and avoids
1432            coalescing with `allocated' block before it's updated. */
1433         if (flag != TDB_INSERT)
1434                 tdb_delete(tdb, key);
1435
1436         /* Copy key+value *before* allocating free space in case malloc
1437            fails and we are left with a dead spot in the tdb. */
1438
1439         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1440                 tdb->ecode = TDB_ERR_OOM;
1441                 goto fail;
1442         }
1443
1444         memcpy(p, key.dptr, key.dsize);
1445         if (dbuf.dsize)
1446                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1447
1448         /* now we're into insert / modify / replace of a record which
1449          * we know could not be optimised by an in-place store (for
1450          * various reasons).  */
1451         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1452                 goto fail;
1453
1454         /* Read hash top into next ptr */
1455         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1456                 goto fail;
1457
1458         rec.key_len = key.dsize;
1459         rec.data_len = dbuf.dsize;
1460         rec.full_hash = hash;
1461         rec.magic = TDB_MAGIC;
1462
1463         /* write out and point the top of the hash chain at it */
1464         if (rec_write(tdb, rec_ptr, &rec) == -1
1465             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1466             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1467                 /* Need to tdb_unallocate() here */
1468                 goto fail;
1469         }
1470  out:
1471         SAFE_FREE(p); 
1472         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1473         return ret;
1474 fail:
1475         ret = -1;
1476         goto out;
1477 }
1478
1479 static int tdb_already_open(dev_t device,
1480                             ino_t ino)
1481 {
1482         TDB_CONTEXT *i;
1483         
1484         for (i = tdbs; i; i = i->next) {
1485                 if (i->device == device && i->inode == ino) {
1486                         return 1;
1487                 }
1488         }
1489
1490         return 0;
1491 }
1492
1493 /* open the database, creating it if necessary 
1494
1495    The open_flags and mode are passed straight to the open call on the
1496    database file. A flags value of O_WRONLY is invalid. The hash size
1497    is advisory, use zero for a default value.
1498
1499    Return is NULL on error, in which case errno is also set.  Don't 
1500    try to call tdb_error or tdb_errname, just do strerror(errno).
1501
1502    @param name may be NULL for internal databases. */
1503 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1504                       int open_flags, mode_t mode)
1505 {
1506         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1507 }
1508
1509
1510 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1511                          int open_flags, mode_t mode,
1512                          tdb_log_func log_fn)
1513 {
1514         TDB_CONTEXT *tdb;
1515         struct stat st;
1516         int rev = 0, locked;
1517         unsigned char *vp;
1518         u32 vertest;
1519
1520         if (!(tdb = calloc(1, sizeof *tdb))) {
1521                 /* Can't log this */
1522                 errno = ENOMEM;
1523                 goto fail;
1524         }
1525         tdb->fd = -1;
1526         tdb->name = NULL;
1527         tdb->map_ptr = NULL;
1528         tdb->lockedkeys = NULL;
1529         tdb->flags = tdb_flags;
1530         tdb->open_flags = open_flags;
1531         tdb->log_fn = log_fn;
1532         
1533         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1534                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1535                          name));
1536                 errno = EINVAL;
1537                 goto fail;
1538         }
1539         
1540         if (hash_size == 0)
1541                 hash_size = DEFAULT_HASH_SIZE;
1542         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1543                 tdb->read_only = 1;
1544                 /* read only databases don't do locking or clear if first */
1545                 tdb->flags |= TDB_NOLOCK;
1546                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1547         }
1548
1549         /* internal databases don't mmap or lock, and start off cleared */
1550         if (tdb->flags & TDB_INTERNAL) {
1551                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1552                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1553                 if (tdb_new_database(tdb, hash_size) != 0) {
1554                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1555                         goto fail;
1556                 }
1557                 goto internal;
1558         }
1559
1560         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1561                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1562                          name, strerror(errno)));
1563                 goto fail;      /* errno set by open(2) */
1564         }
1565
1566         /* ensure there is only one process initialising at once */
1567         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1568                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1569                          name, strerror(errno)));
1570                 goto fail;      /* errno set by tdb_brlock */
1571         }
1572
1573         /* we need to zero database if we are the only one with it open */
1574         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1575             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1576                 open_flags |= O_CREAT;
1577                 if (ftruncate(tdb->fd, 0) == -1) {
1578                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1579                                  "failed to truncate %s: %s\n",
1580                                  name, strerror(errno)));
1581                         goto fail; /* errno set by ftruncate */
1582                 }
1583         }
1584
1585         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1586             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1587             || (tdb->header.version != TDB_VERSION
1588                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1589                 /* its not a valid database - possibly initialise it */
1590                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1591                         errno = EIO; /* ie bad format or something */
1592                         goto fail;
1593                 }
1594                 rev = (tdb->flags & TDB_CONVERT);
1595         }
1596         vp = (unsigned char *)&tdb->header.version;
1597         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1598                   (((u32)vp[2]) << 8) | (u32)vp[3];
1599         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1600         if (!rev)
1601                 tdb->flags &= ~TDB_CONVERT;
1602         else {
1603                 tdb->flags |= TDB_CONVERT;
1604                 convert(&tdb->header, sizeof(tdb->header));
1605         }
1606         if (fstat(tdb->fd, &st) == -1)
1607                 goto fail;
1608
1609         /* Is it already in the open list?  If so, fail. */
1610         if (tdb_already_open(st.st_dev, st.st_ino)) {
1611                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1612                          "%s (%d,%d) is already open in this process\n",
1613                          name, st.st_dev, st.st_ino));
1614                 errno = EBUSY;
1615                 goto fail;
1616         }
1617
1618         if (!(tdb->name = (char *)strdup(name))) {
1619                 errno = ENOMEM;
1620                 goto fail;
1621         }
1622
1623         tdb->map_size = st.st_size;
1624         tdb->device = st.st_dev;
1625         tdb->inode = st.st_ino;
1626         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1627         if (!tdb->locked) {
1628                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1629                          "failed to allocate lock structure for %s\n",
1630                          name));
1631                 errno = ENOMEM;
1632                 goto fail;
1633         }
1634         tdb_mmap(tdb);
1635         if (locked) {
1636                 if (!tdb->read_only)
1637                         if (tdb_clear_spinlocks(tdb) != 0) {
1638                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1639                                 "failed to clear spinlock\n"));
1640                                 goto fail;
1641                         }
1642                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1643                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1644                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1645                                  name, strerror(errno)));
1646                         goto fail;
1647                 }
1648         }
1649         /* leave this lock in place to indicate it's in use */
1650         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1651                 goto fail;
1652
1653  internal:
1654         /* Internal (memory-only) databases skip all the code above to
1655          * do with disk files, and resume here by releasing their
1656          * global lock and hooking into the active list. */
1657         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1658                 goto fail;
1659         tdb->next = tdbs;
1660         tdbs = tdb;
1661         return tdb;
1662
1663  fail:
1664         { int save_errno = errno;
1665
1666         if (!tdb)
1667                 return NULL;
1668         
1669         if (tdb->map_ptr) {
1670                 if (tdb->flags & TDB_INTERNAL)
1671                         SAFE_FREE(tdb->map_ptr);
1672                 else
1673                         tdb_munmap(tdb);
1674         }
1675         SAFE_FREE(tdb->name);
1676         if (tdb->fd != -1)
1677                 if (close(tdb->fd) != 0)
1678                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1679         SAFE_FREE(tdb->locked);
1680         SAFE_FREE(tdb);
1681         errno = save_errno;
1682         return NULL;
1683         }
1684 }
1685
1686 /* close a database */
1687 int tdb_close(TDB_CONTEXT *tdb)
1688 {
1689         TDB_CONTEXT **i;
1690         int ret = 0;
1691
1692         if (tdb->map_ptr) {
1693                 if (tdb->flags & TDB_INTERNAL)
1694                         SAFE_FREE(tdb->map_ptr);
1695                 else
1696                         tdb_munmap(tdb);
1697         }
1698         SAFE_FREE(tdb->name);
1699         if (tdb->fd != -1)
1700                 ret = close(tdb->fd);
1701         SAFE_FREE(tdb->locked);
1702         SAFE_FREE(tdb->lockedkeys);
1703
1704         /* Remove from contexts list */
1705         for (i = &tdbs; *i; i = &(*i)->next) {
1706                 if (*i == tdb) {
1707                         *i = tdb->next;
1708                         break;
1709                 }
1710         }
1711
1712         memset(tdb, 0, sizeof(*tdb));
1713         SAFE_FREE(tdb);
1714
1715         return ret;
1716 }
1717
1718 /* lock/unlock entire database */
1719 int tdb_lockall(TDB_CONTEXT *tdb)
1720 {
1721         u32 i;
1722
1723         /* There are no locks on read-only dbs */
1724         if (tdb->read_only)
1725                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1726         if (tdb->lockedkeys)
1727                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1728         for (i = 0; i < tdb->header.hash_size; i++) 
1729                 if (tdb_lock(tdb, i, F_WRLCK))
1730                         break;
1731
1732         /* If error, release locks we have... */
1733         if (i < tdb->header.hash_size) {
1734                 u32 j;
1735
1736                 for ( j = 0; j < i; j++)
1737                         tdb_unlock(tdb, j, F_WRLCK);
1738                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1739         }
1740
1741         return 0;
1742 }
1743 void tdb_unlockall(TDB_CONTEXT *tdb)
1744 {
1745         u32 i;
1746         for (i=0; i < tdb->header.hash_size; i++)
1747                 tdb_unlock(tdb, i, F_WRLCK);
1748 }
1749
1750 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1751 {
1752         u32 i, j, hash;
1753
1754         /* Can't lock more keys if already locked */
1755         if (tdb->lockedkeys)
1756                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1757         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1758                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1759         /* First number in array is # keys */
1760         tdb->lockedkeys[0] = number;
1761
1762         /* Insertion sort by bucket */
1763         for (i = 0; i < number; i++) {
1764                 hash = tdb_hash(&keys[i]);
1765                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1766                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1767                 tdb->lockedkeys[j+1] = hash;
1768         }
1769         /* Finally, lock in order */
1770         for (i = 0; i < number; i++)
1771                 if (tdb_lock(tdb, i, F_WRLCK))
1772                         break;
1773
1774         /* If error, release locks we have... */
1775         if (i < number) {
1776                 for ( j = 0; j < i; j++)
1777                         tdb_unlock(tdb, j, F_WRLCK);
1778                 SAFE_FREE(tdb->lockedkeys);
1779                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1780         }
1781         return 0;
1782 }
1783
1784 /* Unlock the keys previously locked by tdb_lockkeys() */
1785 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1786 {
1787         u32 i;
1788         for (i = 0; i < tdb->lockedkeys[0]; i++)
1789                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1790         SAFE_FREE(tdb->lockedkeys);
1791 }
1792
1793 /* lock/unlock one hash chain. This is meant to be used to reduce
1794    contention - it cannot guarantee how many records will be locked */
1795 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1796 {
1797         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1798 }
1799
1800 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1801 {
1802         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1803 }
1804
1805
1806 /* register a loging function */
1807 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1808 {
1809         tdb->log_fn = fn;
1810 }
1811
1812
1813 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1814    seek pointer from our parent and to re-establish locks */
1815 int tdb_reopen(TDB_CONTEXT *tdb)
1816 {
1817         struct stat st;
1818
1819         if (tdb_munmap(tdb) != 0) {
1820                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
1821                 goto fail;
1822         }
1823         if (close(tdb->fd) != 0)
1824                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
1825         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
1826         if (tdb->fd == -1) {
1827                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
1828                 goto fail;
1829         }
1830         if (fstat(tdb->fd, &st) != 0) {
1831                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
1832                 goto fail;
1833         }
1834         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
1835                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
1836                 goto fail;
1837         }
1838         tdb_mmap(tdb);
1839         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
1840                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
1841                 goto fail;
1842         }
1843
1844         return 0;
1845
1846 fail:
1847         tdb_close(tdb);
1848         return -1;
1849 }
1850
1851 /* reopen all tdb's */
1852 int tdb_reopen_all(void)
1853 {
1854         TDB_CONTEXT *tdb;
1855
1856         for (tdb=tdbs; tdb; tdb = tdb->next) {
1857                 if (tdb_reopen(tdb) != 0) return -1;
1858         }
1859
1860         return 0;
1861 }