r5779: Remove signal and timeout gubbage from tdb.
[ira/wip.git] / source / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2004
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 2 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, write to the Free Software
26    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 */
28
29
30 /* NOTE: If you use tdbs under valgrind, and in particular if you run
31  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
32  * think this is because valgrind doesn't understand that the mmap'd
33  * area may be written to by other processes.  Memory can, from the
34  * point of view of the grinded process, spontaneously become
35  * initialized.
36  *
37  * I can think of a few solutions.  [mbp 20030311]
38  *
39  * 1 - Write suppressions for Valgrind so that it doesn't complain
40  * about this.  Probably the most reasonable but people need to
41  * remember to use them.
42  *
43  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
44  *
45  * 3 - Use the special valgrind macros to mark memory as valid at the
46  * right time.  Probably too hard -- the process just doesn't know.
47  */ 
48
49 #ifndef _SAMBA_BUILD_
50 #if HAVE_CONFIG_H
51 #include <config.h>
52 #endif
53
54 #include <stdlib.h>
55 #include <stdio.h>
56 #include <stdint.h>
57 #include <fcntl.h>
58 #include <unistd.h>
59 #include <string.h>
60 #include <fcntl.h>
61 #include <errno.h>
62 #include <sys/mman.h>
63 #include <sys/stat.h>
64 #include "tdb.h"
65 #else
66 #include "includes.h"
67 #include "lib/tdb/include/tdb.h"
68 #include "system/time.h"
69 #include "system/shmem.h"
70 #include "system/filesys.h"
71 #endif
72
73 #define TDB_MAGIC_FOOD "TDB file\n"
74 #define TDB_VERSION (0x26011967 + 6)
75 #define TDB_MAGIC (0x26011999U)
76 #define TDB_FREE_MAGIC (~TDB_MAGIC)
77 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
78 #define TDB_ALIGNMENT 4
79 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
80 #define DEFAULT_HASH_SIZE 131
81 #define TDB_PAGE_SIZE 0x2000
82 #define FREELIST_TOP (sizeof(struct tdb_header))
83 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
84 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
85 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
86 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
87 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
88 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
89
90
91 /* NB assumes there is a local variable called "tdb" that is the
92  * current context, also takes doubly-parenthesized print-style
93  * argument. */
94 #define TDB_LOG(x) tdb->log_fn x
95
96 /* lock offsets */
97 #define GLOBAL_LOCK 0
98 #define ACTIVE_LOCK 4
99
100 #ifndef MAP_FILE
101 #define MAP_FILE 0
102 #endif
103
104 #ifndef MAP_FAILED
105 #define MAP_FAILED ((void *)-1)
106 #endif
107
108 /* free memory if the pointer is valid and zero the pointer */
109 #ifndef SAFE_FREE
110 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
111 #endif
112
113 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
114 TDB_DATA tdb_null;
115
116 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
117 static TDB_CONTEXT *tdbs = NULL;
118
119 static int tdb_munmap(TDB_CONTEXT *tdb)
120 {
121         if (tdb->flags & TDB_INTERNAL)
122                 return 0;
123
124 #ifdef HAVE_MMAP
125         if (tdb->map_ptr) {
126                 int ret = munmap(tdb->map_ptr, tdb->map_size);
127                 if (ret != 0)
128                         return ret;
129         }
130 #endif
131         tdb->map_ptr = NULL;
132         return 0;
133 }
134
135 static void tdb_mmap(TDB_CONTEXT *tdb)
136 {
137         if (tdb->flags & TDB_INTERNAL)
138                 return;
139
140 #ifdef HAVE_MMAP
141         if (!(tdb->flags & TDB_NOMMAP)) {
142                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
143                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
144                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
145
146                 /*
147                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
148                  */
149
150                 if (tdb->map_ptr == MAP_FAILED) {
151                         tdb->map_ptr = NULL;
152                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
153                                  tdb->map_size, strerror(errno)));
154                 }
155         } else {
156                 tdb->map_ptr = NULL;
157         }
158 #else
159         tdb->map_ptr = NULL;
160 #endif
161 }
162
163 /* Endian conversion: we only ever deal with 4 byte quantities */
164 static void *convert(void *buf, u32 size)
165 {
166         u32 i, *p = buf;
167         for (i = 0; i < size / 4; i++)
168                 p[i] = TDB_BYTEREV(p[i]);
169         return buf;
170 }
171 #define DOCONV() (tdb->flags & TDB_CONVERT)
172 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
173
174 /* the body of the database is made of one list_struct for the free space
175    plus a separate data list for each hash value */
176 struct list_struct {
177         tdb_off next; /* offset of the next record in the list */
178         tdb_len rec_len; /* total byte length of record */
179         tdb_len key_len; /* byte length of key */
180         tdb_len data_len; /* byte length of data */
181         u32 full_hash; /* the full 32 bit hash of the key */
182         u32 magic;   /* try to catch errors */
183         /* the following union is implied:
184                 union {
185                         char record[rec_len];
186                         struct {
187                                 char key[key_len];
188                                 char data[data_len];
189                         }
190                         u32 totalsize; (tailer)
191                 }
192         */
193 };
194
195 /* a byte range locking function - return 0 on success
196    this functions locks/unlocks 1 byte at the specified offset.
197
198    On error, errno is also set so that errors are passed back properly
199    through tdb_open(). */
200 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
201                       int rw_type, int lck_type, int probe)
202 {
203         struct flock fl;
204         int ret;
205
206         if (tdb->flags & TDB_NOLOCK)
207                 return 0;
208         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
209                 errno = EACCES;
210                 return -1;
211         }
212
213         fl.l_type = rw_type;
214         fl.l_whence = SEEK_SET;
215         fl.l_start = offset;
216         fl.l_len = 1;
217         fl.l_pid = 0;
218
219         do {
220                 ret = fcntl(tdb->fd,lck_type,&fl);
221         } while (ret == -1 && errno == EINTR);
222
223         if (ret == -1) {
224                 if (!probe && lck_type != F_SETLK) {
225                         /* Ensure error code is set for log fun to examine. */
226                         tdb->ecode = TDB_ERR_LOCK;
227                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
228                                  tdb->fd, offset, rw_type, lck_type));
229                 }
230                 /* Generic lock error. errno set by fcntl.
231                  * EAGAIN is an expected return from non-blocking
232                  * locks. */
233                 if (errno != EAGAIN) {
234                 TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
235                                  tdb->fd, offset, rw_type, lck_type, 
236                                  strerror(errno)));
237                 }
238                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
239         }
240         return 0;
241 }
242
243 /* lock a list in the database. list -1 is the alloc list */
244 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
245 {
246         if (list < -1 || list >= (int)tdb->header.hash_size) {
247                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
248                            list, ltype));
249                 return -1;
250         }
251         if (tdb->flags & TDB_NOLOCK)
252                 return 0;
253
254         /* Since fcntl locks don't nest, we do a lock for the first one,
255            and simply bump the count for future ones */
256         if (tdb->locked[list+1].count == 0) {
257                 if (!tdb->read_only && tdb->header.rwlocks) {
258                         if (tdb_spinlock(tdb, list, ltype)) {
259                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list %d ltype=%d\n", 
260                                            list, ltype));
261                                 return -1;
262                         }
263                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
264                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
265                                            list, ltype, strerror(errno)));
266                         return -1;
267                 }
268                 tdb->locked[list+1].ltype = ltype;
269         }
270         tdb->locked[list+1].count++;
271         return 0;
272 }
273
274 /* unlock the database: returns void because it's too late for errors. */
275         /* changed to return int it may be interesting to know there
276            has been an error  --simo */
277 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
278 {
279         int ret = -1;
280
281         if (tdb->flags & TDB_NOLOCK)
282                 return 0;
283
284         /* Sanity checks */
285         if (list < -1 || list >= (int)tdb->header.hash_size) {
286                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
287                 return ret;
288         }
289
290         if (tdb->locked[list+1].count==0) {
291                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
292                 return ret;
293         }
294
295         if (tdb->locked[list+1].count == 1) {
296                 /* Down to last nested lock: unlock underneath */
297                 if (!tdb->read_only && tdb->header.rwlocks) {
298                         ret = tdb_spinunlock(tdb, list, ltype);
299                 } else {
300                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
301                 }
302         } else {
303                 ret = 0;
304         }
305         tdb->locked[list+1].count--;
306
307         if (ret)
308                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
309         return ret;
310 }
311
312 /* This is based on the hash algorithm from gdbm */
313 static u32 default_tdb_hash(TDB_DATA *key)
314 {
315         u32 value;      /* Used to compute the hash value.  */
316         u32   i;        /* Used to cycle through random values. */
317
318         /* Set the initial value from the key size. */
319         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
320                 value = (value + (key->dptr[i] << (i*5 % 24)));
321
322         return (1103515243 * value + 12345);  
323 }
324
325 /* check for an out of bounds access - if it is out of bounds then
326    see if the database has been expanded by someone else and expand
327    if necessary 
328    note that "len" is the minimum length needed for the db
329 */
330 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
331 {
332         struct stat st;
333         if (len <= tdb->map_size)
334                 return 0;
335         if (tdb->flags & TDB_INTERNAL) {
336                 if (!probe) {
337                         /* Ensure ecode is set for log fn. */
338                         tdb->ecode = TDB_ERR_IO;
339                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
340                                  (int)len, (int)tdb->map_size));
341                 }
342                 return TDB_ERRCODE(TDB_ERR_IO, -1);
343         }
344
345         if (fstat(tdb->fd, &st) == -1)
346                 return TDB_ERRCODE(TDB_ERR_IO, -1);
347
348         if (st.st_size < (size_t)len) {
349                 if (!probe) {
350                         /* Ensure ecode is set for log fn. */
351                         tdb->ecode = TDB_ERR_IO;
352                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
353                                  (int)len, (int)st.st_size));
354                 }
355                 return TDB_ERRCODE(TDB_ERR_IO, -1);
356         }
357
358         /* Unmap, update size, remap */
359         if (tdb_munmap(tdb) == -1)
360                 return TDB_ERRCODE(TDB_ERR_IO, -1);
361         tdb->map_size = st.st_size;
362         tdb_mmap(tdb);
363         return 0;
364 }
365
366 /* write a lump of data at a specified offset */
367 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
368 {
369         if (tdb_oob(tdb, off + len, 0) != 0)
370                 return -1;
371
372         if (tdb->map_ptr)
373                 memcpy(off + (char *)tdb->map_ptr, buf, len);
374 #ifdef HAVE_PWRITE
375         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
376 #else
377         else if (lseek(tdb->fd, off, SEEK_SET) != off
378                  || write(tdb->fd, buf, len) != (ssize_t)len) {
379 #endif
380                 /* Ensure ecode is set for log fn. */
381                 tdb->ecode = TDB_ERR_IO;
382                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
383                            off, len, strerror(errno)));
384                 return TDB_ERRCODE(TDB_ERR_IO, -1);
385         }
386         return 0;
387 }
388
389 /* read a lump of data at a specified offset, maybe convert */
390 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
391 {
392         if (tdb_oob(tdb, off + len, 0) != 0)
393                 return -1;
394
395         if (tdb->map_ptr)
396                 memcpy(buf, off + (char *)tdb->map_ptr, len);
397 #ifdef HAVE_PREAD
398         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
399 #else
400         else if (lseek(tdb->fd, off, SEEK_SET) != off
401                  || read(tdb->fd, buf, len) != (ssize_t)len) {
402 #endif
403                 /* Ensure ecode is set for log fn. */
404                 tdb->ecode = TDB_ERR_IO;
405                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
406                            off, len, strerror(errno)));
407                 return TDB_ERRCODE(TDB_ERR_IO, -1);
408         }
409         if (cv)
410                 convert(buf, len);
411         return 0;
412 }
413
414 /* read a lump of data, allocating the space for it */
415 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
416 {
417         char *buf;
418
419         if (!(buf = malloc(len))) {
420                 /* Ensure ecode is set for log fn. */
421                 tdb->ecode = TDB_ERR_OOM;
422                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
423                            len, strerror(errno)));
424                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
425         }
426         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
427                 SAFE_FREE(buf);
428                 return NULL;
429         }
430         return buf;
431 }
432
433 /* read/write a tdb_off */
434 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
435 {
436         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
437 }
438 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
439 {
440         tdb_off off = *d;
441         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
442 }
443
444 /* read/write a record */
445 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
446 {
447         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
448                 return -1;
449         if (TDB_BAD_MAGIC(rec)) {
450                 /* Ensure ecode is set for log fn. */
451                 tdb->ecode = TDB_ERR_CORRUPT;
452                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
453                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
454         }
455         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
456 }
457 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
458 {
459         struct list_struct r = *rec;
460         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
461 }
462
463 /* read a freelist record and check for simple errors */
464 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
465 {
466         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
467                 return -1;
468
469         if (rec->magic == TDB_MAGIC) {
470                 /* this happens when a app is showdown while deleting a record - we should
471                    not completely fail when this happens */
472                 TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
473                          rec->magic, off));
474                 rec->magic = TDB_FREE_MAGIC;
475                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
476                         return -1;
477         }
478
479         if (rec->magic != TDB_FREE_MAGIC) {
480                 /* Ensure ecode is set for log fn. */
481                 tdb->ecode = TDB_ERR_CORRUPT;
482                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
483                            rec->magic, off));
484                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
485         }
486         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
487                 return -1;
488         return 0;
489 }
490
491 /* update a record tailer (must hold allocation lock) */
492 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
493                          const struct list_struct *rec)
494 {
495         tdb_off totalsize;
496
497         /* Offset of tailer from record header */
498         totalsize = sizeof(*rec) + rec->rec_len;
499         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
500                          &totalsize);
501 }
502
503 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
504 {
505         struct list_struct rec;
506         tdb_off tailer_ofs, tailer;
507
508         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
509                 printf("ERROR: failed to read record at %u\n", offset);
510                 return 0;
511         }
512
513         printf(" rec: offset=0x%08x next=0x%08x rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
514                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
515
516         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
517         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
518                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
519                 return rec.next;
520         }
521
522         if (tailer != rec.rec_len + sizeof(rec)) {
523                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
524                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
525         }
526         return rec.next;
527 }
528
529 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
530 {
531         tdb_off rec_ptr, top;
532
533         top = TDB_HASH_TOP(i);
534
535         if (tdb_lock(tdb, i, F_WRLCK) != 0)
536                 return -1;
537
538         if (ofs_read(tdb, top, &rec_ptr) == -1)
539                 return tdb_unlock(tdb, i, F_WRLCK);
540
541         if (rec_ptr)
542                 printf("hash=%d\n", i);
543
544         while (rec_ptr) {
545                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
546         }
547
548         return tdb_unlock(tdb, i, F_WRLCK);
549 }
550
551 void tdb_dump_all(TDB_CONTEXT *tdb)
552 {
553         int i;
554         for (i=0;i<tdb->header.hash_size;i++) {
555                 tdb_dump_chain(tdb, i);
556         }
557         printf("freelist:\n");
558         tdb_dump_chain(tdb, -1);
559 }
560
561 int tdb_printfreelist(TDB_CONTEXT *tdb)
562 {
563         int ret;
564         long total_free = 0;
565         tdb_off offset, rec_ptr;
566         struct list_struct rec;
567
568         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
569                 return ret;
570
571         offset = FREELIST_TOP;
572
573         /* read in the freelist top */
574         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
575                 tdb_unlock(tdb, -1, F_WRLCK);
576                 return 0;
577         }
578
579         printf("freelist top=[0x%08x]\n", rec_ptr );
580         while (rec_ptr) {
581                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
582                         tdb_unlock(tdb, -1, F_WRLCK);
583                         return -1;
584                 }
585
586                 if (rec.magic != TDB_FREE_MAGIC) {
587                         printf("bad magic 0x%08x in free list\n", rec.magic);
588                         tdb_unlock(tdb, -1, F_WRLCK);
589                         return -1;
590                 }
591
592                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n", 
593                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
594                 total_free += rec.rec_len;
595
596                 /* move to the next record */
597                 rec_ptr = rec.next;
598         }
599         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
600                (int)total_free);
601
602         return tdb_unlock(tdb, -1, F_WRLCK);
603 }
604
605 /* Remove an element from the freelist.  Must have alloc lock. */
606 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
607 {
608         tdb_off last_ptr, i;
609
610         /* read in the freelist top */
611         last_ptr = FREELIST_TOP;
612         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
613                 if (i == off) {
614                         /* We've found it! */
615                         return ofs_write(tdb, last_ptr, &next);
616                 }
617                 /* Follow chain (next offset is at start of record) */
618                 last_ptr = i;
619         }
620         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
621         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
622 }
623
624 /* Add an element into the freelist. Merge adjacent records if
625    neccessary. */
626 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
627 {
628         tdb_off right, left;
629
630         /* Allocation and tailer lock */
631         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
632                 return -1;
633
634         /* set an initial tailer, so if we fail we don't leave a bogus record */
635         if (update_tailer(tdb, offset, rec) != 0) {
636                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
637                 goto fail;
638         }
639
640         /* Look right first (I'm an Australian, dammit) */
641         right = offset + sizeof(*rec) + rec->rec_len;
642         if (right + sizeof(*rec) <= tdb->map_size) {
643                 struct list_struct r;
644
645                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
646                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
647                         goto left;
648                 }
649
650                 /* If it's free, expand to include it. */
651                 if (r.magic == TDB_FREE_MAGIC) {
652                         if (remove_from_freelist(tdb, right, r.next) == -1) {
653                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
654                                 goto left;
655                         }
656                         rec->rec_len += sizeof(r) + r.rec_len;
657                 }
658         }
659
660 left:
661         /* Look left */
662         left = offset - sizeof(tdb_off);
663         if (left > TDB_DATA_START(tdb->header.hash_size)) {
664                 struct list_struct l;
665                 tdb_off leftsize;
666                 
667                 /* Read in tailer and jump back to header */
668                 if (ofs_read(tdb, left, &leftsize) == -1) {
669                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
670                         goto update;
671                 }
672                 left = offset - leftsize;
673
674                 /* Now read in record */
675                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
676                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
677                         goto update;
678                 }
679
680                 /* If it's free, expand to include it. */
681                 if (l.magic == TDB_FREE_MAGIC) {
682                         if (remove_from_freelist(tdb, left, l.next) == -1) {
683                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
684                                 goto update;
685                         } else {
686                                 offset = left;
687                                 rec->rec_len += leftsize;
688                         }
689                 }
690         }
691
692 update:
693         if (update_tailer(tdb, offset, rec) == -1) {
694                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
695                 goto fail;
696         }
697
698         /* Now, prepend to free list */
699         rec->magic = TDB_FREE_MAGIC;
700
701         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
702             rec_write(tdb, offset, rec) == -1 ||
703             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
704                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
705                 goto fail;
706         }
707
708         /* And we're done. */
709         tdb_unlock(tdb, -1, F_WRLCK);
710         return 0;
711
712  fail:
713         tdb_unlock(tdb, -1, F_WRLCK);
714         return -1;
715 }
716
717
718 /* expand a file.  we prefer to use ftruncate, as that is what posix
719   says to use for mmap expansion */
720 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
721 {
722         char buf[1024];
723 #if HAVE_FTRUNCATE_EXTEND
724         if (ftruncate(tdb->fd, size+addition) != 0) {
725                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
726                            size+addition, strerror(errno)));
727                 return -1;
728         }
729 #else
730         char b = 0;
731
732 #ifdef HAVE_PWRITE
733         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
734 #else
735         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
736             write(tdb->fd, &b, 1) != 1) {
737 #endif
738                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
739                            size+addition, strerror(errno)));
740                 return -1;
741         }
742 #endif
743
744         /* now fill the file with something. This ensures that the file isn't sparse, which would be
745            very bad if we ran out of disk. This must be done with write, not via mmap */
746         memset(buf, 0x42, sizeof(buf));
747         while (addition) {
748                 int n = addition>sizeof(buf)?sizeof(buf):addition;
749 #ifdef HAVE_PWRITE
750                 int ret = pwrite(tdb->fd, buf, n, size);
751 #else
752                 int ret;
753                 if (lseek(tdb->fd, size, SEEK_SET) != size)
754                         return -1;
755                 ret = write(tdb->fd, buf, n);
756 #endif
757                 if (ret != n) {
758                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
759                                    n, strerror(errno)));
760                         return -1;
761                 }
762                 addition -= n;
763                 size += n;
764         }
765         return 0;
766 }
767
768
769 /* expand the database at least size bytes by expanding the underlying
770    file and doing the mmap again if necessary */
771 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
772 {
773         struct list_struct rec;
774         tdb_off offset;
775
776         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
777                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
778                 return -1;
779         }
780
781         /* must know about any previous expansions by another process */
782         tdb_oob(tdb, tdb->map_size + 1, 1);
783
784         /* always make room for at least 10 more records, and round
785            the database up to a multiple of TDB_PAGE_SIZE */
786         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
787
788         if (!(tdb->flags & TDB_INTERNAL))
789                 tdb_munmap(tdb);
790
791         /*
792          * We must ensure the file is unmapped before doing this
793          * to ensure consistency with systems like OpenBSD where
794          * writes and mmaps are not consistent.
795          */
796
797         /* expand the file itself */
798         if (!(tdb->flags & TDB_INTERNAL)) {
799                 if (expand_file(tdb, tdb->map_size, size) != 0)
800                         goto fail;
801         }
802
803         tdb->map_size += size;
804
805         if (tdb->flags & TDB_INTERNAL)
806                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
807         else {
808                 /*
809                  * We must ensure the file is remapped before adding the space
810                  * to ensure consistency with systems like OpenBSD where
811                  * writes and mmaps are not consistent.
812                  */
813
814                 /* We're ok if the mmap fails as we'll fallback to read/write */
815                 tdb_mmap(tdb);
816         }
817
818         /* form a new freelist record */
819         memset(&rec,'\0',sizeof(rec));
820         rec.rec_len = size - sizeof(rec);
821
822         /* link it into the free list */
823         offset = tdb->map_size - size;
824         if (tdb_free(tdb, offset, &rec) == -1)
825                 goto fail;
826
827         tdb_unlock(tdb, -1, F_WRLCK);
828         return 0;
829  fail:
830         tdb_unlock(tdb, -1, F_WRLCK);
831         return -1;
832 }
833
834
835 /* 
836    the core of tdb_allocate - called when we have decided which
837    free list entry to use
838  */
839 static tdb_off tdb_allocate_ofs(TDB_CONTEXT *tdb, tdb_len length, tdb_off rec_ptr,
840                                 struct list_struct *rec, tdb_off last_ptr)
841 {
842         struct list_struct newrec;
843         tdb_off newrec_ptr;
844
845         memset(&newrec, '\0', sizeof(newrec));
846
847         /* found it - now possibly split it up  */
848         if (rec->rec_len > length + MIN_REC_SIZE) {
849                 /* Length of left piece */
850                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
851                 
852                 /* Right piece to go on free list */
853                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
854                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
855                 
856                 /* And left record is shortened */
857                 rec->rec_len = length;
858         } else {
859                 newrec_ptr = 0;
860         }
861         
862         /* Remove allocated record from the free list */
863         if (ofs_write(tdb, last_ptr, &rec->next) == -1) {
864                 return 0;
865         }
866         
867         /* Update header: do this before we drop alloc
868            lock, otherwise tdb_free() might try to
869            merge with us, thinking we're free.
870            (Thanks Jeremy Allison). */
871         rec->magic = TDB_MAGIC;
872         if (rec_write(tdb, rec_ptr, rec) == -1) {
873                 return 0;
874         }
875         
876         /* Did we create new block? */
877         if (newrec_ptr) {
878                 /* Update allocated record tailer (we
879                    shortened it). */
880                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
881                         return 0;
882                 }
883                 
884                 /* Free new record */
885                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
886                         return 0;
887                 }
888         }
889         
890         /* all done - return the new record offset */
891         return rec_ptr;
892 }
893
894 /* allocate some space from the free list. The offset returned points
895    to a unconnected list_struct within the database with room for at
896    least length bytes of total data
897
898    0 is returned if the space could not be allocated
899  */
900 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
901                             struct list_struct *rec)
902 {
903         tdb_off rec_ptr, last_ptr, newrec_ptr;
904         struct {
905                 tdb_off rec_ptr, last_ptr;
906                 tdb_len rec_len;
907         } bestfit;
908
909         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
910                 return 0;
911
912         /* Extra bytes required for tailer */
913         length += sizeof(tdb_off);
914
915  again:
916         last_ptr = FREELIST_TOP;
917
918         /* read in the freelist top */
919         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
920                 goto fail;
921
922         bestfit.rec_ptr = 0;
923
924         /* 
925            this is a best fit allocation strategy. Originally we used
926            a first fit strategy, but it suffered from massive fragmentation
927            issues when faced with a slowly increasing record size.
928          */
929         while (rec_ptr) {
930                 if (rec_free_read(tdb, rec_ptr, rec) == -1) {
931                         goto fail;
932                 }
933
934                 if (rec->rec_len >= length) {
935                         if (bestfit.rec_ptr == 0 ||
936                             rec->rec_len < bestfit.rec_len) {
937                                 bestfit.rec_len = rec->rec_len;
938                                 bestfit.rec_ptr = rec_ptr;
939                                 bestfit.last_ptr = last_ptr;
940                                 /* consider a fit to be good enough if we aren't wasting more than half the space */
941                                 if (bestfit.rec_len < 2*length) {
942                                         break;
943                                 }
944                         }
945                 }
946
947                 /* move to the next record */
948                 last_ptr = rec_ptr;
949                 rec_ptr = rec->next;
950         }
951
952         if (bestfit.rec_ptr != 0) {
953                 if (rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
954                         goto fail;
955                 }
956
957                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
958                 tdb_unlock(tdb, -1, F_WRLCK);
959                 return newrec_ptr;
960         }
961
962         /* we didn't find enough space. See if we can expand the
963            database and if we can then try again */
964         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
965                 goto again;
966  fail:
967         tdb_unlock(tdb, -1, F_WRLCK);
968         return 0;
969 }
970
971 /* initialise a new database with a specified hash size */
972 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
973 {
974         struct tdb_header *newdb;
975         int size, ret = -1;
976
977         /* We make it up in memory, then write it out if not internal */
978         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
979         if (!(newdb = calloc(size, 1)))
980                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
981
982         /* Fill in the header */
983         newdb->version = TDB_VERSION;
984         newdb->hash_size = hash_size;
985 #ifdef USE_SPINLOCKS
986         newdb->rwlocks = size;
987 #endif
988         if (tdb->flags & TDB_INTERNAL) {
989                 tdb->map_size = size;
990                 tdb->map_ptr = (char *)newdb;
991                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
992                 /* Convert the `ondisk' version if asked. */
993                 CONVERT(*newdb);
994                 return 0;
995         }
996         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
997                 goto fail;
998
999         if (ftruncate(tdb->fd, 0) == -1)
1000                 goto fail;
1001
1002         /* This creates an endian-converted header, as if read from disk */
1003         CONVERT(*newdb);
1004         memcpy(&tdb->header, newdb, sizeof(tdb->header));
1005         /* Don't endian-convert the magic food! */
1006         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
1007         if (write(tdb->fd, newdb, size) != size)
1008                 ret = -1;
1009         else
1010                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
1011
1012   fail:
1013         SAFE_FREE(newdb);
1014         return ret;
1015 }
1016
1017 /* Returns 0 on fail.  On success, return offset of record, and fills
1018    in rec */
1019 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
1020                         struct list_struct *r)
1021 {
1022         tdb_off rec_ptr;
1023         
1024         /* read in the hash top */
1025         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
1026                 return 0;
1027
1028         /* keep looking until we find the right record */
1029         while (rec_ptr) {
1030                 if (rec_read(tdb, rec_ptr, r) == -1)
1031                         return 0;
1032
1033                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
1034                         char *k;
1035                         /* a very likely hit - read the key */
1036                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
1037                                            r->key_len);
1038                         if (!k)
1039                                 return 0;
1040
1041                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1042                                 SAFE_FREE(k);
1043                                 return rec_ptr;
1044                         }
1045                         SAFE_FREE(k);
1046                 }
1047                 rec_ptr = r->next;
1048         }
1049         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1050 }
1051
1052 /* As tdb_find, but if you succeed, keep the lock */
1053 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1054                              struct list_struct *rec)
1055 {
1056         u32 rec_ptr;
1057
1058         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1059                 return 0;
1060         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1061                 tdb_unlock(tdb, BUCKET(hash), locktype);
1062         return rec_ptr;
1063 }
1064
1065 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1066 {
1067         return tdb->ecode;
1068 }
1069
1070 static struct tdb_errname {
1071         enum TDB_ERROR ecode; const char *estring;
1072 } emap[] = { {TDB_SUCCESS, "Success"},
1073              {TDB_ERR_CORRUPT, "Corrupt database"},
1074              {TDB_ERR_IO, "IO Error"},
1075              {TDB_ERR_LOCK, "Locking error"},
1076              {TDB_ERR_OOM, "Out of memory"},
1077              {TDB_ERR_EXISTS, "Record exists"},
1078              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1079              {TDB_ERR_NOEXIST, "Record does not exist"} };
1080
1081 /* Error string for the last tdb error */
1082 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1083 {
1084         u32 i;
1085         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1086                 if (tdb->ecode == emap[i].ecode)
1087                         return emap[i].estring;
1088         return "Invalid error code";
1089 }
1090
1091 /* update an entry in place - this only works if the new data size
1092    is <= the old data size and the key exists.
1093    on failure return -1.
1094 */
1095
1096 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1097 {
1098         struct list_struct rec;
1099         tdb_off rec_ptr;
1100
1101         /* find entry */
1102         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1103                 return -1;
1104
1105         /* must be long enough key, data and tailer */
1106         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1107                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1108                 return -1;
1109         }
1110
1111         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1112                       dbuf.dptr, dbuf.dsize) == -1)
1113                 return -1;
1114
1115         if (dbuf.dsize != rec.data_len) {
1116                 /* update size */
1117                 rec.data_len = dbuf.dsize;
1118                 return rec_write(tdb, rec_ptr, &rec);
1119         }
1120  
1121         return 0;
1122 }
1123
1124 /* find an entry in the database given a key */
1125 /* If an entry doesn't exist tdb_err will be set to
1126  * TDB_ERR_NOEXIST. If a key has no data attached
1127  * tdb_err will not be set. Both will return a
1128  * zero pptr and zero dsize.
1129  */
1130
1131 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1132 {
1133         tdb_off rec_ptr;
1134         struct list_struct rec;
1135         TDB_DATA ret;
1136         u32 hash;
1137
1138         /* find which hash bucket it is in */
1139         hash = tdb->hash_fn(&key);
1140         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1141                 return tdb_null;
1142
1143         if (rec.data_len)
1144                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1145                                           rec.data_len);
1146         else
1147                 ret.dptr = NULL;
1148         ret.dsize = rec.data_len;
1149         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1150         return ret;
1151 }
1152
1153 /* check if an entry in the database exists 
1154
1155    note that 1 is returned if the key is found and 0 is returned if not found
1156    this doesn't match the conventions in the rest of this module, but is
1157    compatible with gdbm
1158 */
1159 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1160 {
1161         struct list_struct rec;
1162         
1163         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1164                 return 0;
1165         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1166         return 1;
1167 }
1168
1169 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1170 {
1171         u32 hash = tdb->hash_fn(&key);
1172         return tdb_exists_hash(tdb, key, hash);
1173 }
1174
1175 /* record lock stops delete underneath */
1176 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1177 {
1178         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1179 }
1180 /*
1181   Write locks override our own fcntl readlocks, so check it here.
1182   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1183   an error to fail to get the lock here.
1184 */
1185  
1186 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1187 {
1188         struct tdb_traverse_lock *i;
1189         for (i = &tdb->travlocks; i; i = i->next)
1190                 if (i->off == off)
1191                         return -1;
1192         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1193 }
1194
1195 /*
1196   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1197   an error to fail to get the lock here.
1198 */
1199
1200 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1201 {
1202         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1203 }
1204 /* fcntl locks don't stack: avoid unlocking someone else's */
1205 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1206 {
1207         struct tdb_traverse_lock *i;
1208         u32 count = 0;
1209
1210         if (off == 0)
1211                 return 0;
1212         for (i = &tdb->travlocks; i; i = i->next)
1213                 if (i->off == off)
1214                         count++;
1215         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1216 }
1217
1218 /* actually delete an entry in the database given the offset */
1219 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1220 {
1221         tdb_off last_ptr, i;
1222         struct list_struct lastrec;
1223
1224         if (tdb->read_only) return -1;
1225
1226         if (write_lock_record(tdb, rec_ptr) == -1) {
1227                 /* Someone traversing here: mark it as dead */
1228                 rec->magic = TDB_DEAD_MAGIC;
1229                 return rec_write(tdb, rec_ptr, rec);
1230         }
1231         if (write_unlock_record(tdb, rec_ptr) != 0)
1232                 return -1;
1233
1234         /* find previous record in hash chain */
1235         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1236                 return -1;
1237         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1238                 if (rec_read(tdb, i, &lastrec) == -1)
1239                         return -1;
1240
1241         /* unlink it: next ptr is at start of record. */
1242         if (last_ptr == 0)
1243                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1244         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1245                 return -1;
1246
1247         /* recover the space */
1248         if (tdb_free(tdb, rec_ptr, rec) == -1)
1249                 return -1;
1250         return 0;
1251 }
1252
1253 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1254 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1255                          struct list_struct *rec)
1256 {
1257         int want_next = (tlock->off != 0);
1258
1259         /* Lock each chain from the start one. */
1260         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1261                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1262                         return -1;
1263
1264                 /* No previous record?  Start at top of chain. */
1265                 if (!tlock->off) {
1266                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1267                                      &tlock->off) == -1)
1268                                 goto fail;
1269                 } else {
1270                         /* Otherwise unlock the previous record. */
1271                         if (unlock_record(tdb, tlock->off) != 0)
1272                                 goto fail;
1273                 }
1274
1275                 if (want_next) {
1276                         /* We have offset of old record: grab next */
1277                         if (rec_read(tdb, tlock->off, rec) == -1)
1278                                 goto fail;
1279                         tlock->off = rec->next;
1280                 }
1281
1282                 /* Iterate through chain */
1283                 while( tlock->off) {
1284                         tdb_off current;
1285                         if (rec_read(tdb, tlock->off, rec) == -1)
1286                                 goto fail;
1287                         if (!TDB_DEAD(rec)) {
1288                                 /* Woohoo: we found one! */
1289                                 if (lock_record(tdb, tlock->off) != 0)
1290                                         goto fail;
1291                                 return tlock->off;
1292                         }
1293
1294                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
1295                         if (tlock->off == rec->next) {
1296                                 TDB_LOG((tdb, 0, "tdb_next_lock: loop detected.\n"));
1297                                 goto fail;
1298                         }
1299
1300                         /* Try to clean dead ones from old traverses */
1301                         current = tlock->off;
1302                         tlock->off = rec->next;
1303                         if (!tdb->read_only && 
1304                             do_delete(tdb, current, rec) != 0)
1305                                 goto fail;
1306                 }
1307                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1308                 want_next = 0;
1309         }
1310         /* We finished iteration without finding anything */
1311         return TDB_ERRCODE(TDB_SUCCESS, 0);
1312
1313  fail:
1314         tlock->off = 0;
1315         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1316                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1317         return -1;
1318 }
1319
1320 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1321    return -1 on error or the record count traversed
1322    if fn is NULL then it is not called
1323    a non-zero return value from fn() indicates that the traversal should stop
1324   */
1325 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private)
1326 {
1327         TDB_DATA key, dbuf;
1328         struct list_struct rec;
1329         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1330         int ret, count = 0;
1331
1332         /* This was in the initializaton, above, but the IRIX compiler
1333          * did not like it.  crh
1334          */
1335         tl.next = tdb->travlocks.next;
1336
1337         /* fcntl locks don't stack: beware traverse inside traverse */
1338         tdb->travlocks.next = &tl;
1339
1340         /* tdb_next_lock places locks on the record returned, and its chain */
1341         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1342                 count++;
1343                 /* now read the full record */
1344                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1345                                           rec.key_len + rec.data_len);
1346                 if (!key.dptr) {
1347                         ret = -1;
1348                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1349                                 goto out;
1350                         if (unlock_record(tdb, tl.off) != 0)
1351                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1352                         goto out;
1353                 }
1354                 key.dsize = rec.key_len;
1355                 dbuf.dptr = key.dptr + rec.key_len;
1356                 dbuf.dsize = rec.data_len;
1357
1358                 /* Drop chain lock, call out */
1359                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1360                         ret = -1;
1361                         goto out;
1362                 }
1363                 if (fn && fn(tdb, key, dbuf, private)) {
1364                         /* They want us to terminate traversal */
1365                         ret = count;
1366                         if (unlock_record(tdb, tl.off) != 0) {
1367                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1368                                 ret = -1;
1369                         }
1370                         tdb->travlocks.next = tl.next;
1371                         SAFE_FREE(key.dptr);
1372                         return count;
1373                 }
1374                 SAFE_FREE(key.dptr);
1375         }
1376 out:
1377         tdb->travlocks.next = tl.next;
1378         if (ret < 0)
1379                 return -1;
1380         else
1381                 return count;
1382 }
1383
1384 /* find the first entry in the database and return its key */
1385 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1386 {
1387         TDB_DATA key;
1388         struct list_struct rec;
1389
1390         /* release any old lock */
1391         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1392                 return tdb_null;
1393         tdb->travlocks.off = tdb->travlocks.hash = 0;
1394
1395         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1396                 return tdb_null;
1397         /* now read the key */
1398         key.dsize = rec.key_len;
1399         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1400         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1401                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1402         return key;
1403 }
1404
1405 /* find the next entry in the database, returning its key */
1406 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1407 {
1408         u32 oldhash;
1409         TDB_DATA key = tdb_null;
1410         struct list_struct rec;
1411         char *k = NULL;
1412
1413         /* Is locked key the old key?  If so, traverse will be reliable. */
1414         if (tdb->travlocks.off) {
1415                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1416                         return tdb_null;
1417                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1418                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1419                                             rec.key_len))
1420                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1421                         /* No, it wasn't: unlock it and start from scratch */
1422                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1423                                 return tdb_null;
1424                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1425                                 return tdb_null;
1426                         tdb->travlocks.off = 0;
1427                 }
1428
1429                 SAFE_FREE(k);
1430         }
1431
1432         if (!tdb->travlocks.off) {
1433                 /* No previous element: do normal find, and lock record */
1434                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
1435                 if (!tdb->travlocks.off)
1436                         return tdb_null;
1437                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1438                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1439                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1440                         return tdb_null;
1441                 }
1442         }
1443         oldhash = tdb->travlocks.hash;
1444
1445         /* Grab next record: locks chain and returned record,
1446            unlocks old record */
1447         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1448                 key.dsize = rec.key_len;
1449                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1450                                           key.dsize);
1451                 /* Unlock the chain of this new record */
1452                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1453                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1454         }
1455         /* Unlock the chain of old record */
1456         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1457                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1458         return key;
1459 }
1460
1461 /* delete an entry in the database given a key */
1462 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1463 {
1464         tdb_off rec_ptr;
1465         struct list_struct rec;
1466         int ret;
1467
1468         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1469                 return -1;
1470         ret = do_delete(tdb, rec_ptr, &rec);
1471         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1472                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1473         return ret;
1474 }
1475
1476 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1477 {
1478         u32 hash = tdb->hash_fn(&key);
1479         return tdb_delete_hash(tdb, key, hash);
1480 }
1481
1482 /* store an element in the database, replacing any existing element
1483    with the same key 
1484
1485    return 0 on success, -1 on failure
1486 */
1487 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1488 {
1489         struct list_struct rec;
1490         u32 hash;
1491         tdb_off rec_ptr;
1492         char *p = NULL;
1493         int ret = 0;
1494
1495         /* find which hash bucket it is in */
1496         hash = tdb->hash_fn(&key);
1497         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1498                 return -1;
1499
1500         /* check for it existing, on insert. */
1501         if (flag == TDB_INSERT) {
1502                 if (tdb_exists_hash(tdb, key, hash)) {
1503                         tdb->ecode = TDB_ERR_EXISTS;
1504                         goto fail;
1505                 }
1506         } else {
1507                 /* first try in-place update, on modify or replace. */
1508                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1509                         goto out;
1510                 if (tdb->ecode == TDB_ERR_NOEXIST &&
1511                     flag == TDB_MODIFY) {
1512                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
1513                          we should fail the store */
1514                         goto fail;
1515                 }
1516         }
1517         /* reset the error code potentially set by the tdb_update() */
1518         tdb->ecode = TDB_SUCCESS;
1519
1520         /* delete any existing record - if it doesn't exist we don't
1521            care.  Doing this first reduces fragmentation, and avoids
1522            coalescing with `allocated' block before it's updated. */
1523         if (flag != TDB_INSERT)
1524                 tdb_delete_hash(tdb, key, hash);
1525
1526         /* Copy key+value *before* allocating free space in case malloc
1527            fails and we are left with a dead spot in the tdb. */
1528
1529         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1530                 tdb->ecode = TDB_ERR_OOM;
1531                 goto fail;
1532         }
1533
1534         memcpy(p, key.dptr, key.dsize);
1535         if (dbuf.dsize)
1536                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1537
1538         /* we have to allocate some space */
1539         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1540                 goto fail;
1541
1542         /* Read hash top into next ptr */
1543         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1544                 goto fail;
1545
1546         rec.key_len = key.dsize;
1547         rec.data_len = dbuf.dsize;
1548         rec.full_hash = hash;
1549         rec.magic = TDB_MAGIC;
1550
1551         /* write out and point the top of the hash chain at it */
1552         if (rec_write(tdb, rec_ptr, &rec) == -1
1553             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1554             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1555                 /* Need to tdb_unallocate() here */
1556                 goto fail;
1557         }
1558  out:
1559         SAFE_FREE(p); 
1560         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1561         return ret;
1562 fail:
1563         ret = -1;
1564         goto out;
1565 }
1566
1567 /* Attempt to append data to an entry in place - this only works if the new data size
1568    is <= the old data size and the key exists.
1569    on failure return -1. Record must be locked before calling.
1570 */
1571 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1572 {
1573         struct list_struct rec;
1574         tdb_off rec_ptr;
1575
1576         /* find entry */
1577         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1578                 return -1;
1579
1580         /* Append of 0 is always ok. */
1581         if (new_dbuf.dsize == 0)
1582                 return 0;
1583
1584         /* must be long enough for key, old data + new data and tailer */
1585         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1586                 /* No room. */
1587                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1588                 return -1;
1589         }
1590
1591         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1592                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1593                 return -1;
1594
1595         /* update size */
1596         rec.data_len += new_dbuf.dsize;
1597         return rec_write(tdb, rec_ptr, &rec);
1598 }
1599
1600 /* Append to an entry. Create if not exist. */
1601
1602 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1603 {
1604         struct list_struct rec;
1605         u32 hash;
1606         tdb_off rec_ptr;
1607         char *p = NULL;
1608         int ret = 0;
1609         size_t new_data_size = 0;
1610
1611         /* find which hash bucket it is in */
1612         hash = tdb->hash_fn(&key);
1613         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1614                 return -1;
1615
1616         /* first try in-place. */
1617         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1618                 goto out;
1619
1620         /* reset the error code potentially set by the tdb_append_inplace() */
1621         tdb->ecode = TDB_SUCCESS;
1622
1623         /* find entry */
1624         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1625                 if (tdb->ecode != TDB_ERR_NOEXIST)
1626                         goto fail;
1627
1628                 /* Not found - create. */
1629
1630                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1631                 goto out;
1632         }
1633
1634         new_data_size = rec.data_len + new_dbuf.dsize;
1635
1636         /* Copy key+old_value+value *before* allocating free space in case malloc
1637            fails and we are left with a dead spot in the tdb. */
1638
1639         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1640                 tdb->ecode = TDB_ERR_OOM;
1641                 goto fail;
1642         }
1643
1644         /* Copy the key in place. */
1645         memcpy(p, key.dptr, key.dsize);
1646
1647         /* Now read the old data into place. */
1648         if (rec.data_len &&
1649                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1650                         goto fail;
1651
1652         /* Finally append the new data. */
1653         if (new_dbuf.dsize)
1654                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1655
1656         /* delete any existing record - if it doesn't exist we don't
1657            care.  Doing this first reduces fragmentation, and avoids
1658            coalescing with `allocated' block before it's updated. */
1659
1660         tdb_delete_hash(tdb, key, hash);
1661
1662         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1663                 goto fail;
1664
1665         /* Read hash top into next ptr */
1666         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1667                 goto fail;
1668
1669         rec.key_len = key.dsize;
1670         rec.data_len = new_data_size;
1671         rec.full_hash = hash;
1672         rec.magic = TDB_MAGIC;
1673
1674         /* write out and point the top of the hash chain at it */
1675         if (rec_write(tdb, rec_ptr, &rec) == -1
1676             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1677             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1678                 /* Need to tdb_unallocate() here */
1679                 goto fail;
1680         }
1681
1682  out:
1683         SAFE_FREE(p); 
1684         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1685         return ret;
1686
1687 fail:
1688         ret = -1;
1689         goto out;
1690 }
1691
1692 static int tdb_already_open(dev_t device,
1693                             ino_t ino)
1694 {
1695         TDB_CONTEXT *i;
1696         
1697         for (i = tdbs; i; i = i->next) {
1698                 if (i->device == device && i->inode == ino) {
1699                         return 1;
1700                 }
1701         }
1702
1703         return 0;
1704 }
1705
1706 /* open the database, creating it if necessary 
1707
1708    The open_flags and mode are passed straight to the open call on the
1709    database file. A flags value of O_WRONLY is invalid. The hash size
1710    is advisory, use zero for a default value.
1711
1712    Return is NULL on error, in which case errno is also set.  Don't 
1713    try to call tdb_error or tdb_errname, just do strerror(errno).
1714
1715    @param name may be NULL for internal databases. */
1716 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1717                       int open_flags, mode_t mode)
1718 {
1719         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
1720 }
1721
1722 /* a default logging function */
1723 static void null_log_fn(TDB_CONTEXT *tdb, int level, const char *fmt, ...)
1724 {
1725 }
1726
1727
1728 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1729                          int open_flags, mode_t mode,
1730                          tdb_log_func log_fn,
1731                          tdb_hash_func hash_fn)
1732 {
1733         TDB_CONTEXT *tdb;
1734         struct stat st;
1735         int rev = 0, locked = 0;
1736         uint8_t *vp;
1737         u32 vertest;
1738
1739         if (!(tdb = calloc(1, sizeof *tdb))) {
1740                 /* Can't log this */
1741                 errno = ENOMEM;
1742                 goto fail;
1743         }
1744         tdb->fd = -1;
1745         tdb->name = NULL;
1746         tdb->map_ptr = NULL;
1747         tdb->flags = tdb_flags;
1748         tdb->open_flags = open_flags;
1749         tdb->log_fn = log_fn?log_fn:null_log_fn;
1750         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
1751
1752         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1753                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1754                          name));
1755                 errno = EINVAL;
1756                 goto fail;
1757         }
1758         
1759         if (hash_size == 0)
1760                 hash_size = DEFAULT_HASH_SIZE;
1761         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1762                 tdb->read_only = 1;
1763                 /* read only databases don't do locking or clear if first */
1764                 tdb->flags |= TDB_NOLOCK;
1765                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1766         }
1767
1768         /* internal databases don't mmap or lock, and start off cleared */
1769         if (tdb->flags & TDB_INTERNAL) {
1770                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1771                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1772                 if (tdb_new_database(tdb, hash_size) != 0) {
1773                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1774                         goto fail;
1775                 }
1776                 goto internal;
1777         }
1778
1779         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1780                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1781                          name, strerror(errno)));
1782                 goto fail;      /* errno set by open(2) */
1783         }
1784
1785         /* ensure there is only one process initialising at once */
1786         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1787                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1788                          name, strerror(errno)));
1789                 goto fail;      /* errno set by tdb_brlock */
1790         }
1791
1792         /* we need to zero database if we are the only one with it open */
1793         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
1794                 (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
1795                 open_flags |= O_CREAT;
1796                 if (ftruncate(tdb->fd, 0) == -1) {
1797                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1798                                  "failed to truncate %s: %s\n",
1799                                  name, strerror(errno)));
1800                         goto fail; /* errno set by ftruncate */
1801                 }
1802         }
1803
1804         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1805             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1806             || (tdb->header.version != TDB_VERSION
1807                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1808                 /* its not a valid database - possibly initialise it */
1809                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1810                         errno = EIO; /* ie bad format or something */
1811                         goto fail;
1812                 }
1813                 rev = (tdb->flags & TDB_CONVERT);
1814         }
1815         vp = (uint8_t *)&tdb->header.version;
1816         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1817                   (((u32)vp[2]) << 8) | (u32)vp[3];
1818         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1819         if (!rev)
1820                 tdb->flags &= ~TDB_CONVERT;
1821         else {
1822                 tdb->flags |= TDB_CONVERT;
1823                 convert(&tdb->header, sizeof(tdb->header));
1824         }
1825         if (fstat(tdb->fd, &st) == -1)
1826                 goto fail;
1827
1828         /* Is it already in the open list?  If so, fail. */
1829         if (tdb_already_open(st.st_dev, st.st_ino)) {
1830                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1831                          "%s (%d,%d) is already open in this process\n",
1832                          name, (int)st.st_dev, (int)st.st_ino));
1833                 errno = EBUSY;
1834                 goto fail;
1835         }
1836
1837         if (!(tdb->name = (char *)strdup(name))) {
1838                 errno = ENOMEM;
1839                 goto fail;
1840         }
1841
1842         tdb->map_size = st.st_size;
1843         tdb->device = st.st_dev;
1844         tdb->inode = st.st_ino;
1845         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1846         if (!tdb->locked) {
1847                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1848                          "failed to allocate lock structure for %s\n",
1849                          name));
1850                 errno = ENOMEM;
1851                 goto fail;
1852         }
1853         tdb_mmap(tdb);
1854         if (locked) {
1855                 if (!tdb->read_only)
1856                         if (tdb_clear_spinlocks(tdb) != 0) {
1857                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1858                                 "failed to clear spinlock\n"));
1859                                 goto fail;
1860                         }
1861                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1862                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1863                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1864                                  name, strerror(errno)));
1865                         goto fail;
1866                 }
1867
1868         }
1869
1870         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
1871            we didn't get the initial exclusive lock as we need to let all other
1872            users know we're using it. */
1873
1874         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
1875         /* leave this lock in place to indicate it's in use */
1876         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1877                 goto fail;
1878         }
1879
1880
1881  internal:
1882         /* Internal (memory-only) databases skip all the code above to
1883          * do with disk files, and resume here by releasing their
1884          * global lock and hooking into the active list. */
1885         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1886                 goto fail;
1887         tdb->next = tdbs;
1888         tdbs = tdb;
1889         return tdb;
1890
1891  fail:
1892         { int save_errno = errno;
1893
1894         if (!tdb)
1895                 return NULL;
1896         
1897         if (tdb->map_ptr) {
1898                 if (tdb->flags & TDB_INTERNAL)
1899                         SAFE_FREE(tdb->map_ptr);
1900                 else
1901                         tdb_munmap(tdb);
1902         }
1903         SAFE_FREE(tdb->name);
1904         if (tdb->fd != -1)
1905                 if (close(tdb->fd) != 0)
1906                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1907         SAFE_FREE(tdb->locked);
1908         SAFE_FREE(tdb);
1909         errno = save_errno;
1910         return NULL;
1911         }
1912 }
1913
1914 /**
1915  * Close a database.
1916  *
1917  * @returns -1 for error; 0 for success.
1918  **/
1919 int tdb_close(TDB_CONTEXT *tdb)
1920 {
1921         TDB_CONTEXT **i;
1922         int ret = 0;
1923
1924         if (tdb->map_ptr) {
1925                 if (tdb->flags & TDB_INTERNAL)
1926                         SAFE_FREE(tdb->map_ptr);
1927                 else
1928                         tdb_munmap(tdb);
1929         }
1930         SAFE_FREE(tdb->name);
1931         if (tdb->fd != -1)
1932                 ret = close(tdb->fd);
1933         SAFE_FREE(tdb->locked);
1934
1935         /* Remove from contexts list */
1936         for (i = &tdbs; *i; i = &(*i)->next) {
1937                 if (*i == tdb) {
1938                         *i = tdb->next;
1939                         break;
1940                 }
1941         }
1942
1943         memset(tdb, 0, sizeof(*tdb));
1944         SAFE_FREE(tdb);
1945
1946         return ret;
1947 }
1948
1949 /* lock/unlock entire database */
1950 int tdb_lockall(TDB_CONTEXT *tdb)
1951 {
1952         u32 i;
1953
1954         /* There are no locks on read-only dbs */
1955         if (tdb->read_only)
1956                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1957         for (i = 0; i < tdb->header.hash_size; i++) 
1958                 if (tdb_lock(tdb, i, F_WRLCK))
1959                         break;
1960
1961         /* If error, release locks we have... */
1962         if (i < tdb->header.hash_size) {
1963                 u32 j;
1964
1965                 for ( j = 0; j < i; j++)
1966                         tdb_unlock(tdb, j, F_WRLCK);
1967                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1968         }
1969
1970         return 0;
1971 }
1972 void tdb_unlockall(TDB_CONTEXT *tdb)
1973 {
1974         u32 i;
1975         for (i=0; i < tdb->header.hash_size; i++)
1976                 tdb_unlock(tdb, i, F_WRLCK);
1977 }
1978
1979 /* lock/unlock one hash chain. This is meant to be used to reduce
1980    contention - it cannot guarantee how many records will be locked */
1981 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1982 {
1983         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1984 }
1985
1986 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1987 {
1988         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1989 }
1990
1991 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1992 {
1993         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
1994 }
1995
1996 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1997 {
1998         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
1999 }
2000
2001
2002 /* register a loging function */
2003 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2004 {
2005         tdb->log_fn = fn?fn:null_log_fn;
2006 }
2007
2008
2009 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
2010    seek pointer from our parent and to re-establish locks */
2011 int tdb_reopen(TDB_CONTEXT *tdb)
2012 {
2013         struct stat st;
2014
2015         if (tdb->flags & TDB_INTERNAL)
2016                 return 0; /* Nothing to do. */
2017         if (tdb_munmap(tdb) != 0) {
2018                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2019                 goto fail;
2020         }
2021         if (close(tdb->fd) != 0)
2022                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2023         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2024         if (tdb->fd == -1) {
2025                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2026                 goto fail;
2027         }
2028         if (fstat(tdb->fd, &st) != 0) {
2029                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2030                 goto fail;
2031         }
2032         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2033                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2034                 goto fail;
2035         }
2036         tdb_mmap(tdb);
2037         if ((tdb->flags & TDB_CLEAR_IF_FIRST) && (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
2038                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2039                 goto fail;
2040         }
2041
2042         return 0;
2043
2044 fail:
2045         tdb_close(tdb);
2046         return -1;
2047 }
2048
2049 /* reopen all tdb's */
2050 int tdb_reopen_all(void)
2051 {
2052         TDB_CONTEXT *tdb;
2053
2054         for (tdb=tdbs; tdb; tdb = tdb->next) {
2055                 /* Ensure no clear-if-first. */
2056                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
2057                 if (tdb_reopen(tdb) != 0)
2058                         return -1;
2059         }
2060
2061         return 0;
2062 }