r6528: - in tdb_fetch() we effectively disallowed zero length records by
[samba.git] / source / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2004
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 2 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, write to the Free Software
26    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 */
28
29
30 /* NOTE: If you use tdbs under valgrind, and in particular if you run
31  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
32  * think this is because valgrind doesn't understand that the mmap'd
33  * area may be written to by other processes.  Memory can, from the
34  * point of view of the grinded process, spontaneously become
35  * initialized.
36  *
37  * I can think of a few solutions.  [mbp 20030311]
38  *
39  * 1 - Write suppressions for Valgrind so that it doesn't complain
40  * about this.  Probably the most reasonable but people need to
41  * remember to use them.
42  *
43  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
44  *
45  * 3 - Use the special valgrind macros to mark memory as valid at the
46  * right time.  Probably too hard -- the process just doesn't know.
47  */ 
48
49 #ifndef _SAMBA_BUILD_
50 #if HAVE_CONFIG_H
51 #include <config.h>
52 #endif
53
54 #include <stdlib.h>
55 #include <stdio.h>
56 #include <stdint.h>
57 #include <fcntl.h>
58 #include <unistd.h>
59 #include <string.h>
60 #include <fcntl.h>
61 #include <errno.h>
62 #include <sys/mman.h>
63 #include <sys/stat.h>
64 #include "tdb.h"
65 #else
66 #include "includes.h"
67 #include "lib/tdb/include/tdb.h"
68 #include "system/time.h"
69 #include "system/shmem.h"
70 #include "system/filesys.h"
71 #endif
72
73 #define TDB_MAGIC_FOOD "TDB file\n"
74 #define TDB_VERSION (0x26011967 + 6)
75 #define TDB_MAGIC (0x26011999U)
76 #define TDB_FREE_MAGIC (~TDB_MAGIC)
77 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
78 #define TDB_ALIGNMENT 4
79 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
80 #define DEFAULT_HASH_SIZE 131
81 #define TDB_PAGE_SIZE 0x2000
82 #define FREELIST_TOP (sizeof(struct tdb_header))
83 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
84 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
85 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
86 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
87 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
88 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
89
90
91 /* NB assumes there is a local variable called "tdb" that is the
92  * current context, also takes doubly-parenthesized print-style
93  * argument. */
94 #define TDB_LOG(x) tdb->log_fn x
95
96 /* lock offsets */
97 #define GLOBAL_LOCK 0
98 #define ACTIVE_LOCK 4
99
100 #ifndef MAP_FILE
101 #define MAP_FILE 0
102 #endif
103
104 #ifndef MAP_FAILED
105 #define MAP_FAILED ((void *)-1)
106 #endif
107
108 /* free memory if the pointer is valid and zero the pointer */
109 #ifndef SAFE_FREE
110 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
111 #endif
112
113 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
114 TDB_DATA tdb_null;
115
116 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
117 static TDB_CONTEXT *tdbs = NULL;
118
119 static int tdb_munmap(TDB_CONTEXT *tdb)
120 {
121         if (tdb->flags & TDB_INTERNAL)
122                 return 0;
123
124 #ifdef HAVE_MMAP
125         if (tdb->map_ptr) {
126                 int ret = munmap(tdb->map_ptr, tdb->map_size);
127                 if (ret != 0)
128                         return ret;
129         }
130 #endif
131         tdb->map_ptr = NULL;
132         return 0;
133 }
134
135 static void tdb_mmap(TDB_CONTEXT *tdb)
136 {
137         if (tdb->flags & TDB_INTERNAL)
138                 return;
139
140 #ifdef HAVE_MMAP
141         if (!(tdb->flags & TDB_NOMMAP)) {
142                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
143                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
144                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
145
146                 /*
147                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
148                  */
149
150                 if (tdb->map_ptr == MAP_FAILED) {
151                         tdb->map_ptr = NULL;
152                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
153                                  tdb->map_size, strerror(errno)));
154                 }
155         } else {
156                 tdb->map_ptr = NULL;
157         }
158 #else
159         tdb->map_ptr = NULL;
160 #endif
161 }
162
163 /* Endian conversion: we only ever deal with 4 byte quantities */
164 static void *convert(void *buf, u32 size)
165 {
166         u32 i, *p = buf;
167         for (i = 0; i < size / 4; i++)
168                 p[i] = TDB_BYTEREV(p[i]);
169         return buf;
170 }
171 #define DOCONV() (tdb->flags & TDB_CONVERT)
172 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
173
174 /* the body of the database is made of one list_struct for the free space
175    plus a separate data list for each hash value */
176 struct list_struct {
177         tdb_off next; /* offset of the next record in the list */
178         tdb_len rec_len; /* total byte length of record */
179         tdb_len key_len; /* byte length of key */
180         tdb_len data_len; /* byte length of data */
181         u32 full_hash; /* the full 32 bit hash of the key */
182         u32 magic;   /* try to catch errors */
183         /* the following union is implied:
184                 union {
185                         char record[rec_len];
186                         struct {
187                                 char key[key_len];
188                                 char data[data_len];
189                         }
190                         u32 totalsize; (tailer)
191                 }
192         */
193 };
194
195 /* a byte range locking function - return 0 on success
196    this functions locks/unlocks 1 byte at the specified offset.
197
198    On error, errno is also set so that errors are passed back properly
199    through tdb_open(). */
200 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
201                       int rw_type, int lck_type, int probe)
202 {
203         struct flock fl;
204         int ret;
205
206         if (tdb->flags & TDB_NOLOCK)
207                 return 0;
208         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
209                 errno = EACCES;
210                 return -1;
211         }
212
213         fl.l_type = rw_type;
214         fl.l_whence = SEEK_SET;
215         fl.l_start = offset;
216         fl.l_len = 1;
217         fl.l_pid = 0;
218
219         do {
220                 ret = fcntl(tdb->fd,lck_type,&fl);
221         } while (ret == -1 && errno == EINTR);
222
223         if (ret == -1) {
224                 if (!probe && lck_type != F_SETLK) {
225                         /* Ensure error code is set for log fun to examine. */
226                         tdb->ecode = TDB_ERR_LOCK;
227                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
228                                  tdb->fd, offset, rw_type, lck_type));
229                 }
230                 /* Generic lock error. errno set by fcntl.
231                  * EAGAIN is an expected return from non-blocking
232                  * locks. */
233                 if (errno != EAGAIN) {
234                 TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
235                                  tdb->fd, offset, rw_type, lck_type, 
236                                  strerror(errno)));
237                 }
238                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
239         }
240         return 0;
241 }
242
243 /* lock a list in the database. list -1 is the alloc list */
244 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
245 {
246         if (list < -1 || list >= (int)tdb->header.hash_size) {
247                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
248                            list, ltype));
249                 return -1;
250         }
251         if (tdb->flags & TDB_NOLOCK)
252                 return 0;
253
254         /* Since fcntl locks don't nest, we do a lock for the first one,
255            and simply bump the count for future ones */
256         if (tdb->locked[list+1].count == 0) {
257                 if (!tdb->read_only && tdb->header.rwlocks) {
258                         if (tdb_spinlock(tdb, list, ltype)) {
259                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list %d ltype=%d\n", 
260                                            list, ltype));
261                                 return -1;
262                         }
263                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
264                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
265                                            list, ltype, strerror(errno)));
266                         return -1;
267                 }
268                 tdb->locked[list+1].ltype = ltype;
269         }
270         tdb->locked[list+1].count++;
271         return 0;
272 }
273
274 /* unlock the database: returns void because it's too late for errors. */
275         /* changed to return int it may be interesting to know there
276            has been an error  --simo */
277 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
278 {
279         int ret = -1;
280
281         if (tdb->flags & TDB_NOLOCK)
282                 return 0;
283
284         /* Sanity checks */
285         if (list < -1 || list >= (int)tdb->header.hash_size) {
286                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
287                 return ret;
288         }
289
290         if (tdb->locked[list+1].count==0) {
291                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
292                 return ret;
293         }
294
295         if (tdb->locked[list+1].count == 1) {
296                 /* Down to last nested lock: unlock underneath */
297                 if (!tdb->read_only && tdb->header.rwlocks) {
298                         ret = tdb_spinunlock(tdb, list, ltype);
299                 } else {
300                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
301                 }
302         } else {
303                 ret = 0;
304         }
305         tdb->locked[list+1].count--;
306
307         if (ret)
308                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
309         return ret;
310 }
311
312 /* This is based on the hash algorithm from gdbm */
313 static u32 default_tdb_hash(TDB_DATA *key)
314 {
315         u32 value;      /* Used to compute the hash value.  */
316         u32   i;        /* Used to cycle through random values. */
317
318         /* Set the initial value from the key size. */
319         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
320                 value = (value + (key->dptr[i] << (i*5 % 24)));
321
322         return (1103515243 * value + 12345);  
323 }
324
325 /* check for an out of bounds access - if it is out of bounds then
326    see if the database has been expanded by someone else and expand
327    if necessary 
328    note that "len" is the minimum length needed for the db
329 */
330 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
331 {
332         struct stat st;
333         if (len <= tdb->map_size)
334                 return 0;
335         if (tdb->flags & TDB_INTERNAL) {
336                 if (!probe) {
337                         /* Ensure ecode is set for log fn. */
338                         tdb->ecode = TDB_ERR_IO;
339                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
340                                  (int)len, (int)tdb->map_size));
341                 }
342                 return TDB_ERRCODE(TDB_ERR_IO, -1);
343         }
344
345         if (fstat(tdb->fd, &st) == -1)
346                 return TDB_ERRCODE(TDB_ERR_IO, -1);
347
348         if (st.st_size < (size_t)len) {
349                 if (!probe) {
350                         /* Ensure ecode is set for log fn. */
351                         tdb->ecode = TDB_ERR_IO;
352                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
353                                  (int)len, (int)st.st_size));
354                 }
355                 return TDB_ERRCODE(TDB_ERR_IO, -1);
356         }
357
358         /* Unmap, update size, remap */
359         if (tdb_munmap(tdb) == -1)
360                 return TDB_ERRCODE(TDB_ERR_IO, -1);
361         tdb->map_size = st.st_size;
362         tdb_mmap(tdb);
363         return 0;
364 }
365
366 /* write a lump of data at a specified offset */
367 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
368 {
369         if (tdb_oob(tdb, off + len, 0) != 0)
370                 return -1;
371
372         if (tdb->map_ptr)
373                 memcpy(off + (char *)tdb->map_ptr, buf, len);
374 #ifdef HAVE_PWRITE
375         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
376 #else
377         else if (lseek(tdb->fd, off, SEEK_SET) != off
378                  || write(tdb->fd, buf, len) != (ssize_t)len) {
379 #endif
380                 /* Ensure ecode is set for log fn. */
381                 tdb->ecode = TDB_ERR_IO;
382                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
383                            off, len, strerror(errno)));
384                 return TDB_ERRCODE(TDB_ERR_IO, -1);
385         }
386         return 0;
387 }
388
389 /* read a lump of data at a specified offset, maybe convert */
390 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
391 {
392         if (tdb_oob(tdb, off + len, 0) != 0)
393                 return -1;
394
395         if (tdb->map_ptr)
396                 memcpy(buf, off + (char *)tdb->map_ptr, len);
397 #ifdef HAVE_PREAD
398         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
399 #else
400         else if (lseek(tdb->fd, off, SEEK_SET) != off
401                  || read(tdb->fd, buf, len) != (ssize_t)len) {
402 #endif
403                 /* Ensure ecode is set for log fn. */
404                 tdb->ecode = TDB_ERR_IO;
405                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
406                            off, len, strerror(errno)));
407                 return TDB_ERRCODE(TDB_ERR_IO, -1);
408         }
409         if (cv)
410                 convert(buf, len);
411         return 0;
412 }
413
414 /* read a lump of data, allocating the space for it */
415 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
416 {
417         char *buf;
418
419         if (!(buf = malloc(len))) {
420                 /* Ensure ecode is set for log fn. */
421                 tdb->ecode = TDB_ERR_OOM;
422                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
423                            len, strerror(errno)));
424                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
425         }
426         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
427                 SAFE_FREE(buf);
428                 return NULL;
429         }
430         return buf;
431 }
432
433 /* read/write a tdb_off */
434 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
435 {
436         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
437 }
438 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
439 {
440         tdb_off off = *d;
441         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
442 }
443
444 /* read/write a record */
445 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
446 {
447         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
448                 return -1;
449         if (TDB_BAD_MAGIC(rec)) {
450                 /* Ensure ecode is set for log fn. */
451                 tdb->ecode = TDB_ERR_CORRUPT;
452                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
453                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
454         }
455         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
456 }
457 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
458 {
459         struct list_struct r = *rec;
460         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
461 }
462
463 /* read a freelist record and check for simple errors */
464 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
465 {
466         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
467                 return -1;
468
469         if (rec->magic == TDB_MAGIC) {
470                 /* this happens when a app is showdown while deleting a record - we should
471                    not completely fail when this happens */
472                 TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
473                          rec->magic, off));
474                 rec->magic = TDB_FREE_MAGIC;
475                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
476                         return -1;
477         }
478
479         if (rec->magic != TDB_FREE_MAGIC) {
480                 /* Ensure ecode is set for log fn. */
481                 tdb->ecode = TDB_ERR_CORRUPT;
482                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
483                            rec->magic, off));
484                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
485         }
486         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
487                 return -1;
488         return 0;
489 }
490
491 /* update a record tailer (must hold allocation lock) */
492 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
493                          const struct list_struct *rec)
494 {
495         tdb_off totalsize;
496
497         /* Offset of tailer from record header */
498         totalsize = sizeof(*rec) + rec->rec_len;
499         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
500                          &totalsize);
501 }
502
503 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
504 {
505         struct list_struct rec;
506         tdb_off tailer_ofs, tailer;
507
508         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
509                 printf("ERROR: failed to read record at %u\n", offset);
510                 return 0;
511         }
512
513         printf(" rec: offset=0x%08x next=0x%08x rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
514                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
515
516         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
517         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
518                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
519                 return rec.next;
520         }
521
522         if (tailer != rec.rec_len + sizeof(rec)) {
523                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
524                                 (unsigned int)tailer, (unsigned int)(rec.rec_len + sizeof(rec)));
525         }
526         return rec.next;
527 }
528
529 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
530 {
531         tdb_off rec_ptr, top;
532
533         top = TDB_HASH_TOP(i);
534
535         if (tdb_lock(tdb, i, F_WRLCK) != 0)
536                 return -1;
537
538         if (ofs_read(tdb, top, &rec_ptr) == -1)
539                 return tdb_unlock(tdb, i, F_WRLCK);
540
541         if (rec_ptr)
542                 printf("hash=%d\n", i);
543
544         while (rec_ptr) {
545                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
546         }
547
548         return tdb_unlock(tdb, i, F_WRLCK);
549 }
550
551 void tdb_dump_all(TDB_CONTEXT *tdb)
552 {
553         int i;
554         for (i=0;i<tdb->header.hash_size;i++) {
555                 tdb_dump_chain(tdb, i);
556         }
557         printf("freelist:\n");
558         tdb_dump_chain(tdb, -1);
559 }
560
561 int tdb_printfreelist(TDB_CONTEXT *tdb)
562 {
563         int ret;
564         long total_free = 0;
565         tdb_off offset, rec_ptr;
566         struct list_struct rec;
567
568         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
569                 return ret;
570
571         offset = FREELIST_TOP;
572
573         /* read in the freelist top */
574         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
575                 tdb_unlock(tdb, -1, F_WRLCK);
576                 return 0;
577         }
578
579         printf("freelist top=[0x%08x]\n", rec_ptr );
580         while (rec_ptr) {
581                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
582                         tdb_unlock(tdb, -1, F_WRLCK);
583                         return -1;
584                 }
585
586                 if (rec.magic != TDB_FREE_MAGIC) {
587                         printf("bad magic 0x%08x in free list\n", rec.magic);
588                         tdb_unlock(tdb, -1, F_WRLCK);
589                         return -1;
590                 }
591
592                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)] (end = 0x%08x)\n", 
593                        rec_ptr, rec.rec_len, rec.rec_len, rec_ptr + rec.rec_len);
594                 total_free += rec.rec_len;
595
596                 /* move to the next record */
597                 rec_ptr = rec.next;
598         }
599         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
600                (int)total_free);
601
602         return tdb_unlock(tdb, -1, F_WRLCK);
603 }
604
605 /* Remove an element from the freelist.  Must have alloc lock. */
606 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
607 {
608         tdb_off last_ptr, i;
609
610         /* read in the freelist top */
611         last_ptr = FREELIST_TOP;
612         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
613                 if (i == off) {
614                         /* We've found it! */
615                         return ofs_write(tdb, last_ptr, &next);
616                 }
617                 /* Follow chain (next offset is at start of record) */
618                 last_ptr = i;
619         }
620         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
621         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
622 }
623
624 /* Add an element into the freelist. Merge adjacent records if
625    neccessary. */
626 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
627 {
628         tdb_off right, left;
629
630         /* Allocation and tailer lock */
631         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
632                 return -1;
633
634         /* set an initial tailer, so if we fail we don't leave a bogus record */
635         if (update_tailer(tdb, offset, rec) != 0) {
636                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
637                 goto fail;
638         }
639
640         /* Look right first (I'm an Australian, dammit) */
641         right = offset + sizeof(*rec) + rec->rec_len;
642         if (right + sizeof(*rec) <= tdb->map_size) {
643                 struct list_struct r;
644
645                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
646                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
647                         goto left;
648                 }
649
650                 /* If it's free, expand to include it. */
651                 if (r.magic == TDB_FREE_MAGIC) {
652                         if (remove_from_freelist(tdb, right, r.next) == -1) {
653                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
654                                 goto left;
655                         }
656                         rec->rec_len += sizeof(r) + r.rec_len;
657                 }
658         }
659
660 left:
661         /* Look left */
662         left = offset - sizeof(tdb_off);
663         if (left > TDB_DATA_START(tdb->header.hash_size)) {
664                 struct list_struct l;
665                 tdb_off leftsize;
666                 
667                 /* Read in tailer and jump back to header */
668                 if (ofs_read(tdb, left, &leftsize) == -1) {
669                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
670                         goto update;
671                 }
672                 left = offset - leftsize;
673
674                 /* Now read in record */
675                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
676                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
677                         goto update;
678                 }
679
680                 /* If it's free, expand to include it. */
681                 if (l.magic == TDB_FREE_MAGIC) {
682                         if (remove_from_freelist(tdb, left, l.next) == -1) {
683                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
684                                 goto update;
685                         } else {
686                                 offset = left;
687                                 rec->rec_len += leftsize;
688                         }
689                 }
690         }
691
692 update:
693         if (update_tailer(tdb, offset, rec) == -1) {
694                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
695                 goto fail;
696         }
697
698         /* Now, prepend to free list */
699         rec->magic = TDB_FREE_MAGIC;
700
701         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
702             rec_write(tdb, offset, rec) == -1 ||
703             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
704                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
705                 goto fail;
706         }
707
708         /* And we're done. */
709         tdb_unlock(tdb, -1, F_WRLCK);
710         return 0;
711
712  fail:
713         tdb_unlock(tdb, -1, F_WRLCK);
714         return -1;
715 }
716
717
718 /* expand a file.  we prefer to use ftruncate, as that is what posix
719   says to use for mmap expansion */
720 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
721 {
722         char buf[1024];
723 #if HAVE_FTRUNCATE_EXTEND
724         if (ftruncate(tdb->fd, size+addition) != 0) {
725                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
726                            size+addition, strerror(errno)));
727                 return -1;
728         }
729 #else
730         char b = 0;
731
732 #ifdef HAVE_PWRITE
733         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
734 #else
735         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
736             write(tdb->fd, &b, 1) != 1) {
737 #endif
738                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
739                            size+addition, strerror(errno)));
740                 return -1;
741         }
742 #endif
743
744         /* now fill the file with something. This ensures that the file isn't sparse, which would be
745            very bad if we ran out of disk. This must be done with write, not via mmap */
746         memset(buf, 0x42, sizeof(buf));
747         while (addition) {
748                 int n = addition>sizeof(buf)?sizeof(buf):addition;
749 #ifdef HAVE_PWRITE
750                 int ret = pwrite(tdb->fd, buf, n, size);
751 #else
752                 int ret;
753                 if (lseek(tdb->fd, size, SEEK_SET) != size)
754                         return -1;
755                 ret = write(tdb->fd, buf, n);
756 #endif
757                 if (ret != n) {
758                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
759                                    n, strerror(errno)));
760                         return -1;
761                 }
762                 addition -= n;
763                 size += n;
764         }
765         return 0;
766 }
767
768
769 /* expand the database at least size bytes by expanding the underlying
770    file and doing the mmap again if necessary */
771 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
772 {
773         struct list_struct rec;
774         tdb_off offset;
775
776         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
777                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
778                 return -1;
779         }
780
781         /* must know about any previous expansions by another process */
782         tdb_oob(tdb, tdb->map_size + 1, 1);
783
784         /* always make room for at least 10 more records, and round
785            the database up to a multiple of TDB_PAGE_SIZE */
786         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
787
788         if (!(tdb->flags & TDB_INTERNAL))
789                 tdb_munmap(tdb);
790
791         /*
792          * We must ensure the file is unmapped before doing this
793          * to ensure consistency with systems like OpenBSD where
794          * writes and mmaps are not consistent.
795          */
796
797         /* expand the file itself */
798         if (!(tdb->flags & TDB_INTERNAL)) {
799                 if (expand_file(tdb, tdb->map_size, size) != 0)
800                         goto fail;
801         }
802
803         tdb->map_size += size;
804
805         if (tdb->flags & TDB_INTERNAL) {
806                 char *new_map_ptr = realloc(tdb->map_ptr, tdb->map_size);
807                 if (!new_map_ptr) {
808                         tdb->map_size -= size;
809                         goto fail;
810                 }
811                 tdb->map_ptr = new_map_ptr;
812         } else {
813                 /*
814                  * We must ensure the file is remapped before adding the space
815                  * to ensure consistency with systems like OpenBSD where
816                  * writes and mmaps are not consistent.
817                  */
818
819                 /* We're ok if the mmap fails as we'll fallback to read/write */
820                 tdb_mmap(tdb);
821         }
822
823         /* form a new freelist record */
824         memset(&rec,'\0',sizeof(rec));
825         rec.rec_len = size - sizeof(rec);
826
827         /* link it into the free list */
828         offset = tdb->map_size - size;
829         if (tdb_free(tdb, offset, &rec) == -1)
830                 goto fail;
831
832         tdb_unlock(tdb, -1, F_WRLCK);
833         return 0;
834  fail:
835         tdb_unlock(tdb, -1, F_WRLCK);
836         return -1;
837 }
838
839
840 /* 
841    the core of tdb_allocate - called when we have decided which
842    free list entry to use
843  */
844 static tdb_off tdb_allocate_ofs(TDB_CONTEXT *tdb, tdb_len length, tdb_off rec_ptr,
845                                 struct list_struct *rec, tdb_off last_ptr)
846 {
847         struct list_struct newrec;
848         tdb_off newrec_ptr;
849
850         memset(&newrec, '\0', sizeof(newrec));
851
852         /* found it - now possibly split it up  */
853         if (rec->rec_len > length + MIN_REC_SIZE) {
854                 /* Length of left piece */
855                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
856                 
857                 /* Right piece to go on free list */
858                 newrec.rec_len = rec->rec_len - (sizeof(*rec) + length);
859                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
860                 
861                 /* And left record is shortened */
862                 rec->rec_len = length;
863         } else {
864                 newrec_ptr = 0;
865         }
866         
867         /* Remove allocated record from the free list */
868         if (ofs_write(tdb, last_ptr, &rec->next) == -1) {
869                 return 0;
870         }
871         
872         /* Update header: do this before we drop alloc
873            lock, otherwise tdb_free() might try to
874            merge with us, thinking we're free.
875            (Thanks Jeremy Allison). */
876         rec->magic = TDB_MAGIC;
877         if (rec_write(tdb, rec_ptr, rec) == -1) {
878                 return 0;
879         }
880         
881         /* Did we create new block? */
882         if (newrec_ptr) {
883                 /* Update allocated record tailer (we
884                    shortened it). */
885                 if (update_tailer(tdb, rec_ptr, rec) == -1) {
886                         return 0;
887                 }
888                 
889                 /* Free new record */
890                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1) {
891                         return 0;
892                 }
893         }
894         
895         /* all done - return the new record offset */
896         return rec_ptr;
897 }
898
899 /* allocate some space from the free list. The offset returned points
900    to a unconnected list_struct within the database with room for at
901    least length bytes of total data
902
903    0 is returned if the space could not be allocated
904  */
905 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
906                             struct list_struct *rec)
907 {
908         tdb_off rec_ptr, last_ptr, newrec_ptr;
909         struct {
910                 tdb_off rec_ptr, last_ptr;
911                 tdb_len rec_len;
912         } bestfit;
913
914         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
915                 return 0;
916
917         /* Extra bytes required for tailer */
918         length += sizeof(tdb_off);
919
920  again:
921         last_ptr = FREELIST_TOP;
922
923         /* read in the freelist top */
924         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
925                 goto fail;
926
927         bestfit.rec_ptr = 0;
928
929         /* 
930            this is a best fit allocation strategy. Originally we used
931            a first fit strategy, but it suffered from massive fragmentation
932            issues when faced with a slowly increasing record size.
933          */
934         while (rec_ptr) {
935                 if (rec_free_read(tdb, rec_ptr, rec) == -1) {
936                         goto fail;
937                 }
938
939                 if (rec->rec_len >= length) {
940                         if (bestfit.rec_ptr == 0 ||
941                             rec->rec_len < bestfit.rec_len) {
942                                 bestfit.rec_len = rec->rec_len;
943                                 bestfit.rec_ptr = rec_ptr;
944                                 bestfit.last_ptr = last_ptr;
945                                 /* consider a fit to be good enough if we aren't wasting more than half the space */
946                                 if (bestfit.rec_len < 2*length) {
947                                         break;
948                                 }
949                         }
950                 }
951
952                 /* move to the next record */
953                 last_ptr = rec_ptr;
954                 rec_ptr = rec->next;
955         }
956
957         if (bestfit.rec_ptr != 0) {
958                 if (rec_free_read(tdb, bestfit.rec_ptr, rec) == -1) {
959                         goto fail;
960                 }
961
962                 newrec_ptr = tdb_allocate_ofs(tdb, length, bestfit.rec_ptr, rec, bestfit.last_ptr);
963                 tdb_unlock(tdb, -1, F_WRLCK);
964                 return newrec_ptr;
965         }
966
967         /* we didn't find enough space. See if we can expand the
968            database and if we can then try again */
969         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
970                 goto again;
971  fail:
972         tdb_unlock(tdb, -1, F_WRLCK);
973         return 0;
974 }
975
976 /* initialise a new database with a specified hash size */
977 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
978 {
979         struct tdb_header *newdb;
980         int size, ret = -1;
981
982         /* We make it up in memory, then write it out if not internal */
983         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
984         if (!(newdb = calloc(size, 1)))
985                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
986
987         /* Fill in the header */
988         newdb->version = TDB_VERSION;
989         newdb->hash_size = hash_size;
990 #ifdef USE_SPINLOCKS
991         newdb->rwlocks = size;
992 #endif
993         if (tdb->flags & TDB_INTERNAL) {
994                 tdb->map_size = size;
995                 tdb->map_ptr = (char *)newdb;
996                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
997                 /* Convert the `ondisk' version if asked. */
998                 CONVERT(*newdb);
999                 return 0;
1000         }
1001         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
1002                 goto fail;
1003
1004         if (ftruncate(tdb->fd, 0) == -1)
1005                 goto fail;
1006
1007         /* This creates an endian-converted header, as if read from disk */
1008         CONVERT(*newdb);
1009         memcpy(&tdb->header, newdb, sizeof(tdb->header));
1010         /* Don't endian-convert the magic food! */
1011         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
1012         if (write(tdb->fd, newdb, size) != size)
1013                 ret = -1;
1014         else
1015                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
1016
1017   fail:
1018         SAFE_FREE(newdb);
1019         return ret;
1020 }
1021
1022 /* Returns 0 on fail.  On success, return offset of record, and fills
1023    in rec */
1024 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
1025                         struct list_struct *r)
1026 {
1027         tdb_off rec_ptr;
1028         
1029         /* read in the hash top */
1030         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
1031                 return 0;
1032
1033         /* keep looking until we find the right record */
1034         while (rec_ptr) {
1035                 if (rec_read(tdb, rec_ptr, r) == -1)
1036                         return 0;
1037
1038                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
1039                         char *k;
1040                         /* a very likely hit - read the key */
1041                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
1042                                            r->key_len);
1043                         if (!k)
1044                                 return 0;
1045
1046                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1047                                 SAFE_FREE(k);
1048                                 return rec_ptr;
1049                         }
1050                         SAFE_FREE(k);
1051                 }
1052                 rec_ptr = r->next;
1053         }
1054         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1055 }
1056
1057 /* As tdb_find, but if you succeed, keep the lock */
1058 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1059                              struct list_struct *rec)
1060 {
1061         u32 rec_ptr;
1062
1063         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1064                 return 0;
1065         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1066                 tdb_unlock(tdb, BUCKET(hash), locktype);
1067         return rec_ptr;
1068 }
1069
1070 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1071 {
1072         return tdb->ecode;
1073 }
1074
1075 static struct tdb_errname {
1076         enum TDB_ERROR ecode; const char *estring;
1077 } emap[] = { {TDB_SUCCESS, "Success"},
1078              {TDB_ERR_CORRUPT, "Corrupt database"},
1079              {TDB_ERR_IO, "IO Error"},
1080              {TDB_ERR_LOCK, "Locking error"},
1081              {TDB_ERR_OOM, "Out of memory"},
1082              {TDB_ERR_EXISTS, "Record exists"},
1083              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1084              {TDB_ERR_NOEXIST, "Record does not exist"} };
1085
1086 /* Error string for the last tdb error */
1087 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1088 {
1089         u32 i;
1090         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1091                 if (tdb->ecode == emap[i].ecode)
1092                         return emap[i].estring;
1093         return "Invalid error code";
1094 }
1095
1096 /* update an entry in place - this only works if the new data size
1097    is <= the old data size and the key exists.
1098    on failure return -1.
1099 */
1100
1101 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1102 {
1103         struct list_struct rec;
1104         tdb_off rec_ptr;
1105
1106         /* find entry */
1107         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1108                 return -1;
1109
1110         /* must be long enough key, data and tailer */
1111         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1112                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1113                 return -1;
1114         }
1115
1116         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1117                       dbuf.dptr, dbuf.dsize) == -1)
1118                 return -1;
1119
1120         if (dbuf.dsize != rec.data_len) {
1121                 /* update size */
1122                 rec.data_len = dbuf.dsize;
1123                 return rec_write(tdb, rec_ptr, &rec);
1124         }
1125  
1126         return 0;
1127 }
1128
1129 /* find an entry in the database given a key */
1130 /* If an entry doesn't exist tdb_err will be set to
1131  * TDB_ERR_NOEXIST. If a key has no data attached
1132  * then the TDB_DATA will have zero length but
1133  * a non-zero pointer
1134  */
1135
1136 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1137 {
1138         tdb_off rec_ptr;
1139         struct list_struct rec;
1140         TDB_DATA ret;
1141         u32 hash;
1142
1143         /* find which hash bucket it is in */
1144         hash = tdb->hash_fn(&key);
1145         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1146                 return tdb_null;
1147
1148         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1149                                   rec.data_len);
1150         ret.dsize = rec.data_len;
1151         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1152         return ret;
1153 }
1154
1155 /* check if an entry in the database exists 
1156
1157    note that 1 is returned if the key is found and 0 is returned if not found
1158    this doesn't match the conventions in the rest of this module, but is
1159    compatible with gdbm
1160 */
1161 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1162 {
1163         struct list_struct rec;
1164         
1165         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1166                 return 0;
1167         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1168         return 1;
1169 }
1170
1171 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1172 {
1173         u32 hash = tdb->hash_fn(&key);
1174         return tdb_exists_hash(tdb, key, hash);
1175 }
1176
1177 /* record lock stops delete underneath */
1178 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1179 {
1180         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1181 }
1182 /*
1183   Write locks override our own fcntl readlocks, so check it here.
1184   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1185   an error to fail to get the lock here.
1186 */
1187  
1188 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1189 {
1190         struct tdb_traverse_lock *i;
1191         for (i = &tdb->travlocks; i; i = i->next)
1192                 if (i->off == off)
1193                         return -1;
1194         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1195 }
1196
1197 /*
1198   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1199   an error to fail to get the lock here.
1200 */
1201
1202 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1203 {
1204         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1205 }
1206 /* fcntl locks don't stack: avoid unlocking someone else's */
1207 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1208 {
1209         struct tdb_traverse_lock *i;
1210         u32 count = 0;
1211
1212         if (off == 0)
1213                 return 0;
1214         for (i = &tdb->travlocks; i; i = i->next)
1215                 if (i->off == off)
1216                         count++;
1217         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1218 }
1219
1220 /* actually delete an entry in the database given the offset */
1221 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1222 {
1223         tdb_off last_ptr, i;
1224         struct list_struct lastrec;
1225
1226         if (tdb->read_only) return -1;
1227
1228         if (write_lock_record(tdb, rec_ptr) == -1) {
1229                 /* Someone traversing here: mark it as dead */
1230                 rec->magic = TDB_DEAD_MAGIC;
1231                 return rec_write(tdb, rec_ptr, rec);
1232         }
1233         if (write_unlock_record(tdb, rec_ptr) != 0)
1234                 return -1;
1235
1236         /* find previous record in hash chain */
1237         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1238                 return -1;
1239         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1240                 if (rec_read(tdb, i, &lastrec) == -1)
1241                         return -1;
1242
1243         /* unlink it: next ptr is at start of record. */
1244         if (last_ptr == 0)
1245                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1246         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1247                 return -1;
1248
1249         /* recover the space */
1250         if (tdb_free(tdb, rec_ptr, rec) == -1)
1251                 return -1;
1252         return 0;
1253 }
1254
1255 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1256 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1257                          struct list_struct *rec)
1258 {
1259         int want_next = (tlock->off != 0);
1260
1261         /* Lock each chain from the start one. */
1262         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1263                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1264                         return -1;
1265
1266                 /* No previous record?  Start at top of chain. */
1267                 if (!tlock->off) {
1268                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1269                                      &tlock->off) == -1)
1270                                 goto fail;
1271                 } else {
1272                         /* Otherwise unlock the previous record. */
1273                         if (unlock_record(tdb, tlock->off) != 0)
1274                                 goto fail;
1275                 }
1276
1277                 if (want_next) {
1278                         /* We have offset of old record: grab next */
1279                         if (rec_read(tdb, tlock->off, rec) == -1)
1280                                 goto fail;
1281                         tlock->off = rec->next;
1282                 }
1283
1284                 /* Iterate through chain */
1285                 while( tlock->off) {
1286                         tdb_off current;
1287                         if (rec_read(tdb, tlock->off, rec) == -1)
1288                                 goto fail;
1289                         if (!TDB_DEAD(rec)) {
1290                                 /* Woohoo: we found one! */
1291                                 if (lock_record(tdb, tlock->off) != 0)
1292                                         goto fail;
1293                                 return tlock->off;
1294                         }
1295
1296                         /* Detect infinite loops. From "Shlomi Yaakobovich" <Shlomi@exanet.com>. */
1297                         if (tlock->off == rec->next) {
1298                                 TDB_LOG((tdb, 0, "tdb_next_lock: loop detected.\n"));
1299                                 goto fail;
1300                         }
1301
1302                         /* Try to clean dead ones from old traverses */
1303                         current = tlock->off;
1304                         tlock->off = rec->next;
1305                         if (!tdb->read_only && 
1306                             do_delete(tdb, current, rec) != 0)
1307                                 goto fail;
1308                 }
1309                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1310                 want_next = 0;
1311         }
1312         /* We finished iteration without finding anything */
1313         return TDB_ERRCODE(TDB_SUCCESS, 0);
1314
1315  fail:
1316         tlock->off = 0;
1317         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1318                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1319         return -1;
1320 }
1321
1322 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1323    return -1 on error or the record count traversed
1324    if fn is NULL then it is not called
1325    a non-zero return value from fn() indicates that the traversal should stop
1326   */
1327 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private)
1328 {
1329         TDB_DATA key, dbuf;
1330         struct list_struct rec;
1331         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1332         int ret, count = 0;
1333
1334         /* This was in the initializaton, above, but the IRIX compiler
1335          * did not like it.  crh
1336          */
1337         tl.next = tdb->travlocks.next;
1338
1339         /* fcntl locks don't stack: beware traverse inside traverse */
1340         tdb->travlocks.next = &tl;
1341
1342         /* tdb_next_lock places locks on the record returned, and its chain */
1343         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1344                 count++;
1345                 /* now read the full record */
1346                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1347                                           rec.key_len + rec.data_len);
1348                 if (!key.dptr) {
1349                         ret = -1;
1350                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1351                                 goto out;
1352                         if (unlock_record(tdb, tl.off) != 0)
1353                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1354                         goto out;
1355                 }
1356                 key.dsize = rec.key_len;
1357                 dbuf.dptr = key.dptr + rec.key_len;
1358                 dbuf.dsize = rec.data_len;
1359
1360                 /* Drop chain lock, call out */
1361                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1362                         ret = -1;
1363                         goto out;
1364                 }
1365                 if (fn && fn(tdb, key, dbuf, private)) {
1366                         /* They want us to terminate traversal */
1367                         ret = count;
1368                         if (unlock_record(tdb, tl.off) != 0) {
1369                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1370                                 ret = -1;
1371                         }
1372                         tdb->travlocks.next = tl.next;
1373                         SAFE_FREE(key.dptr);
1374                         return count;
1375                 }
1376                 SAFE_FREE(key.dptr);
1377         }
1378 out:
1379         tdb->travlocks.next = tl.next;
1380         if (ret < 0)
1381                 return -1;
1382         else
1383                 return count;
1384 }
1385
1386 /* find the first entry in the database and return its key */
1387 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1388 {
1389         TDB_DATA key;
1390         struct list_struct rec;
1391
1392         /* release any old lock */
1393         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1394                 return tdb_null;
1395         tdb->travlocks.off = tdb->travlocks.hash = 0;
1396
1397         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1398                 return tdb_null;
1399         /* now read the key */
1400         key.dsize = rec.key_len;
1401         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1402         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1403                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1404         return key;
1405 }
1406
1407 /* find the next entry in the database, returning its key */
1408 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1409 {
1410         u32 oldhash;
1411         TDB_DATA key = tdb_null;
1412         struct list_struct rec;
1413         char *k = NULL;
1414
1415         /* Is locked key the old key?  If so, traverse will be reliable. */
1416         if (tdb->travlocks.off) {
1417                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1418                         return tdb_null;
1419                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1420                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1421                                             rec.key_len))
1422                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1423                         /* No, it wasn't: unlock it and start from scratch */
1424                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1425                                 return tdb_null;
1426                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1427                                 return tdb_null;
1428                         tdb->travlocks.off = 0;
1429                 }
1430
1431                 SAFE_FREE(k);
1432         }
1433
1434         if (!tdb->travlocks.off) {
1435                 /* No previous element: do normal find, and lock record */
1436                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
1437                 if (!tdb->travlocks.off)
1438                         return tdb_null;
1439                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1440                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1441                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1442                         return tdb_null;
1443                 }
1444         }
1445         oldhash = tdb->travlocks.hash;
1446
1447         /* Grab next record: locks chain and returned record,
1448            unlocks old record */
1449         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1450                 key.dsize = rec.key_len;
1451                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1452                                           key.dsize);
1453                 /* Unlock the chain of this new record */
1454                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1455                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1456         }
1457         /* Unlock the chain of old record */
1458         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1459                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1460         return key;
1461 }
1462
1463 /* delete an entry in the database given a key */
1464 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1465 {
1466         tdb_off rec_ptr;
1467         struct list_struct rec;
1468         int ret;
1469
1470         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1471                 return -1;
1472         ret = do_delete(tdb, rec_ptr, &rec);
1473         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1474                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1475         return ret;
1476 }
1477
1478 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1479 {
1480         u32 hash = tdb->hash_fn(&key);
1481         return tdb_delete_hash(tdb, key, hash);
1482 }
1483
1484 /* store an element in the database, replacing any existing element
1485    with the same key 
1486
1487    return 0 on success, -1 on failure
1488 */
1489 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1490 {
1491         struct list_struct rec;
1492         u32 hash;
1493         tdb_off rec_ptr;
1494         char *p = NULL;
1495         int ret = 0;
1496
1497         /* find which hash bucket it is in */
1498         hash = tdb->hash_fn(&key);
1499         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1500                 return -1;
1501
1502         /* check for it existing, on insert. */
1503         if (flag == TDB_INSERT) {
1504                 if (tdb_exists_hash(tdb, key, hash)) {
1505                         tdb->ecode = TDB_ERR_EXISTS;
1506                         goto fail;
1507                 }
1508         } else {
1509                 /* first try in-place update, on modify or replace. */
1510                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1511                         goto out;
1512                 if (tdb->ecode == TDB_ERR_NOEXIST &&
1513                     flag == TDB_MODIFY) {
1514                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
1515                          we should fail the store */
1516                         goto fail;
1517                 }
1518         }
1519         /* reset the error code potentially set by the tdb_update() */
1520         tdb->ecode = TDB_SUCCESS;
1521
1522         /* delete any existing record - if it doesn't exist we don't
1523            care.  Doing this first reduces fragmentation, and avoids
1524            coalescing with `allocated' block before it's updated. */
1525         if (flag != TDB_INSERT)
1526                 tdb_delete_hash(tdb, key, hash);
1527
1528         /* Copy key+value *before* allocating free space in case malloc
1529            fails and we are left with a dead spot in the tdb. */
1530
1531         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1532                 tdb->ecode = TDB_ERR_OOM;
1533                 goto fail;
1534         }
1535
1536         memcpy(p, key.dptr, key.dsize);
1537         if (dbuf.dsize)
1538                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1539
1540         /* we have to allocate some space */
1541         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1542                 goto fail;
1543
1544         /* Read hash top into next ptr */
1545         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1546                 goto fail;
1547
1548         rec.key_len = key.dsize;
1549         rec.data_len = dbuf.dsize;
1550         rec.full_hash = hash;
1551         rec.magic = TDB_MAGIC;
1552
1553         /* write out and point the top of the hash chain at it */
1554         if (rec_write(tdb, rec_ptr, &rec) == -1
1555             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1556             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1557                 /* Need to tdb_unallocate() here */
1558                 goto fail;
1559         }
1560  out:
1561         SAFE_FREE(p); 
1562         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1563         return ret;
1564 fail:
1565         ret = -1;
1566         goto out;
1567 }
1568
1569 /* Attempt to append data to an entry in place - this only works if the new data size
1570    is <= the old data size and the key exists.
1571    on failure return -1. Record must be locked before calling.
1572 */
1573 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1574 {
1575         struct list_struct rec;
1576         tdb_off rec_ptr;
1577
1578         /* find entry */
1579         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1580                 return -1;
1581
1582         /* Append of 0 is always ok. */
1583         if (new_dbuf.dsize == 0)
1584                 return 0;
1585
1586         /* must be long enough for key, old data + new data and tailer */
1587         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1588                 /* No room. */
1589                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1590                 return -1;
1591         }
1592
1593         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1594                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1595                 return -1;
1596
1597         /* update size */
1598         rec.data_len += new_dbuf.dsize;
1599         return rec_write(tdb, rec_ptr, &rec);
1600 }
1601
1602 /* Append to an entry. Create if not exist. */
1603
1604 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1605 {
1606         struct list_struct rec;
1607         u32 hash;
1608         tdb_off rec_ptr;
1609         char *p = NULL;
1610         int ret = 0;
1611         size_t new_data_size = 0;
1612
1613         /* find which hash bucket it is in */
1614         hash = tdb->hash_fn(&key);
1615         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1616                 return -1;
1617
1618         /* first try in-place. */
1619         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1620                 goto out;
1621
1622         /* reset the error code potentially set by the tdb_append_inplace() */
1623         tdb->ecode = TDB_SUCCESS;
1624
1625         /* find entry */
1626         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1627                 if (tdb->ecode != TDB_ERR_NOEXIST)
1628                         goto fail;
1629
1630                 /* Not found - create. */
1631
1632                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1633                 goto out;
1634         }
1635
1636         new_data_size = rec.data_len + new_dbuf.dsize;
1637
1638         /* Copy key+old_value+value *before* allocating free space in case malloc
1639            fails and we are left with a dead spot in the tdb. */
1640
1641         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1642                 tdb->ecode = TDB_ERR_OOM;
1643                 goto fail;
1644         }
1645
1646         /* Copy the key in place. */
1647         memcpy(p, key.dptr, key.dsize);
1648
1649         /* Now read the old data into place. */
1650         if (rec.data_len &&
1651                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1652                         goto fail;
1653
1654         /* Finally append the new data. */
1655         if (new_dbuf.dsize)
1656                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1657
1658         /* delete any existing record - if it doesn't exist we don't
1659            care.  Doing this first reduces fragmentation, and avoids
1660            coalescing with `allocated' block before it's updated. */
1661
1662         tdb_delete_hash(tdb, key, hash);
1663
1664         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1665                 goto fail;
1666
1667         /* Read hash top into next ptr */
1668         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1669                 goto fail;
1670
1671         rec.key_len = key.dsize;
1672         rec.data_len = new_data_size;
1673         rec.full_hash = hash;
1674         rec.magic = TDB_MAGIC;
1675
1676         /* write out and point the top of the hash chain at it */
1677         if (rec_write(tdb, rec_ptr, &rec) == -1
1678             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1679             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1680                 /* Need to tdb_unallocate() here */
1681                 goto fail;
1682         }
1683
1684  out:
1685         SAFE_FREE(p); 
1686         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1687         return ret;
1688
1689 fail:
1690         ret = -1;
1691         goto out;
1692 }
1693
1694 static int tdb_already_open(dev_t device,
1695                             ino_t ino)
1696 {
1697         TDB_CONTEXT *i;
1698         
1699         for (i = tdbs; i; i = i->next) {
1700                 if (i->device == device && i->inode == ino) {
1701                         return 1;
1702                 }
1703         }
1704
1705         return 0;
1706 }
1707
1708 /* open the database, creating it if necessary 
1709
1710    The open_flags and mode are passed straight to the open call on the
1711    database file. A flags value of O_WRONLY is invalid. The hash size
1712    is advisory, use zero for a default value.
1713
1714    Return is NULL on error, in which case errno is also set.  Don't 
1715    try to call tdb_error or tdb_errname, just do strerror(errno).
1716
1717    @param name may be NULL for internal databases. */
1718 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1719                       int open_flags, mode_t mode)
1720 {
1721         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
1722 }
1723
1724 /* a default logging function */
1725 static void null_log_fn(TDB_CONTEXT *tdb, int level, const char *fmt, ...)
1726 {
1727 }
1728
1729
1730 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1731                          int open_flags, mode_t mode,
1732                          tdb_log_func log_fn,
1733                          tdb_hash_func hash_fn)
1734 {
1735         TDB_CONTEXT *tdb;
1736         struct stat st;
1737         int rev = 0, locked = 0;
1738         uint8_t *vp;
1739         u32 vertest;
1740
1741         if (!(tdb = calloc(1, sizeof *tdb))) {
1742                 /* Can't log this */
1743                 errno = ENOMEM;
1744                 goto fail;
1745         }
1746         tdb->fd = -1;
1747         tdb->name = NULL;
1748         tdb->map_ptr = NULL;
1749         tdb->flags = tdb_flags;
1750         tdb->open_flags = open_flags;
1751         tdb->log_fn = log_fn?log_fn:null_log_fn;
1752         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
1753
1754         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1755                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1756                          name));
1757                 errno = EINVAL;
1758                 goto fail;
1759         }
1760         
1761         if (hash_size == 0)
1762                 hash_size = DEFAULT_HASH_SIZE;
1763         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1764                 tdb->read_only = 1;
1765                 /* read only databases don't do locking or clear if first */
1766                 tdb->flags |= TDB_NOLOCK;
1767                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1768         }
1769
1770         /* internal databases don't mmap or lock, and start off cleared */
1771         if (tdb->flags & TDB_INTERNAL) {
1772                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1773                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1774                 if (tdb_new_database(tdb, hash_size) != 0) {
1775                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1776                         goto fail;
1777                 }
1778                 goto internal;
1779         }
1780
1781         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1782                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1783                          name, strerror(errno)));
1784                 goto fail;      /* errno set by open(2) */
1785         }
1786
1787         /* ensure there is only one process initialising at once */
1788         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1789                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1790                          name, strerror(errno)));
1791                 goto fail;      /* errno set by tdb_brlock */
1792         }
1793
1794         /* we need to zero database if we are the only one with it open */
1795         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
1796                 (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
1797                 open_flags |= O_CREAT;
1798                 if (ftruncate(tdb->fd, 0) == -1) {
1799                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1800                                  "failed to truncate %s: %s\n",
1801                                  name, strerror(errno)));
1802                         goto fail; /* errno set by ftruncate */
1803                 }
1804         }
1805
1806         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1807             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1808             || (tdb->header.version != TDB_VERSION
1809                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1810                 /* its not a valid database - possibly initialise it */
1811                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1812                         errno = EIO; /* ie bad format or something */
1813                         goto fail;
1814                 }
1815                 rev = (tdb->flags & TDB_CONVERT);
1816         }
1817         vp = (uint8_t *)&tdb->header.version;
1818         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1819                   (((u32)vp[2]) << 8) | (u32)vp[3];
1820         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1821         if (!rev)
1822                 tdb->flags &= ~TDB_CONVERT;
1823         else {
1824                 tdb->flags |= TDB_CONVERT;
1825                 convert(&tdb->header, sizeof(tdb->header));
1826         }
1827         if (fstat(tdb->fd, &st) == -1)
1828                 goto fail;
1829
1830         /* Is it already in the open list?  If so, fail. */
1831         if (tdb_already_open(st.st_dev, st.st_ino)) {
1832                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1833                          "%s (%d,%d) is already open in this process\n",
1834                          name, (int)st.st_dev, (int)st.st_ino));
1835                 errno = EBUSY;
1836                 goto fail;
1837         }
1838
1839         if (!(tdb->name = (char *)strdup(name))) {
1840                 errno = ENOMEM;
1841                 goto fail;
1842         }
1843
1844         tdb->map_size = st.st_size;
1845         tdb->device = st.st_dev;
1846         tdb->inode = st.st_ino;
1847         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1848         if (!tdb->locked) {
1849                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1850                          "failed to allocate lock structure for %s\n",
1851                          name));
1852                 errno = ENOMEM;
1853                 goto fail;
1854         }
1855         tdb_mmap(tdb);
1856         if (locked) {
1857                 if (!tdb->read_only)
1858                         if (tdb_clear_spinlocks(tdb) != 0) {
1859                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1860                                 "failed to clear spinlock\n"));
1861                                 goto fail;
1862                         }
1863                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1864                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1865                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1866                                  name, strerror(errno)));
1867                         goto fail;
1868                 }
1869
1870         }
1871
1872         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
1873            we didn't get the initial exclusive lock as we need to let all other
1874            users know we're using it. */
1875
1876         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
1877         /* leave this lock in place to indicate it's in use */
1878         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1879                 goto fail;
1880         }
1881
1882
1883  internal:
1884         /* Internal (memory-only) databases skip all the code above to
1885          * do with disk files, and resume here by releasing their
1886          * global lock and hooking into the active list. */
1887         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1888                 goto fail;
1889         tdb->next = tdbs;
1890         tdbs = tdb;
1891         return tdb;
1892
1893  fail:
1894         { int save_errno = errno;
1895
1896         if (!tdb)
1897                 return NULL;
1898         
1899         if (tdb->map_ptr) {
1900                 if (tdb->flags & TDB_INTERNAL)
1901                         SAFE_FREE(tdb->map_ptr);
1902                 else
1903                         tdb_munmap(tdb);
1904         }
1905         SAFE_FREE(tdb->name);
1906         if (tdb->fd != -1)
1907                 if (close(tdb->fd) != 0)
1908                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1909         SAFE_FREE(tdb->locked);
1910         SAFE_FREE(tdb);
1911         errno = save_errno;
1912         return NULL;
1913         }
1914 }
1915
1916 /**
1917  * Close a database.
1918  *
1919  * @returns -1 for error; 0 for success.
1920  **/
1921 int tdb_close(TDB_CONTEXT *tdb)
1922 {
1923         TDB_CONTEXT **i;
1924         int ret = 0;
1925
1926         if (tdb->map_ptr) {
1927                 if (tdb->flags & TDB_INTERNAL)
1928                         SAFE_FREE(tdb->map_ptr);
1929                 else
1930                         tdb_munmap(tdb);
1931         }
1932         SAFE_FREE(tdb->name);
1933         if (tdb->fd != -1)
1934                 ret = close(tdb->fd);
1935         SAFE_FREE(tdb->locked);
1936
1937         /* Remove from contexts list */
1938         for (i = &tdbs; *i; i = &(*i)->next) {
1939                 if (*i == tdb) {
1940                         *i = tdb->next;
1941                         break;
1942                 }
1943         }
1944
1945         memset(tdb, 0, sizeof(*tdb));
1946         SAFE_FREE(tdb);
1947
1948         return ret;
1949 }
1950
1951 /* lock/unlock entire database */
1952 int tdb_lockall(TDB_CONTEXT *tdb)
1953 {
1954         u32 i;
1955
1956         /* There are no locks on read-only dbs */
1957         if (tdb->read_only)
1958                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1959         for (i = 0; i < tdb->header.hash_size; i++) 
1960                 if (tdb_lock(tdb, i, F_WRLCK))
1961                         break;
1962
1963         /* If error, release locks we have... */
1964         if (i < tdb->header.hash_size) {
1965                 u32 j;
1966
1967                 for ( j = 0; j < i; j++)
1968                         tdb_unlock(tdb, j, F_WRLCK);
1969                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1970         }
1971
1972         return 0;
1973 }
1974 void tdb_unlockall(TDB_CONTEXT *tdb)
1975 {
1976         u32 i;
1977         for (i=0; i < tdb->header.hash_size; i++)
1978                 tdb_unlock(tdb, i, F_WRLCK);
1979 }
1980
1981 /* lock/unlock one hash chain. This is meant to be used to reduce
1982    contention - it cannot guarantee how many records will be locked */
1983 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1984 {
1985         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1986 }
1987
1988 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1989 {
1990         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1991 }
1992
1993 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1994 {
1995         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
1996 }
1997
1998 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1999 {
2000         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
2001 }
2002
2003
2004 /* register a loging function */
2005 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2006 {
2007         tdb->log_fn = fn?fn:null_log_fn;
2008 }
2009
2010
2011 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
2012    seek pointer from our parent and to re-establish locks */
2013 int tdb_reopen(TDB_CONTEXT *tdb)
2014 {
2015         struct stat st;
2016
2017         if (tdb->flags & TDB_INTERNAL)
2018                 return 0; /* Nothing to do. */
2019         if (tdb_munmap(tdb) != 0) {
2020                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2021                 goto fail;
2022         }
2023         if (close(tdb->fd) != 0)
2024                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2025         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2026         if (tdb->fd == -1) {
2027                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2028                 goto fail;
2029         }
2030         if (fstat(tdb->fd, &st) != 0) {
2031                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2032                 goto fail;
2033         }
2034         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2035                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2036                 goto fail;
2037         }
2038         tdb_mmap(tdb);
2039         if ((tdb->flags & TDB_CLEAR_IF_FIRST) && (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
2040                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2041                 goto fail;
2042         }
2043
2044         return 0;
2045
2046 fail:
2047         tdb_close(tdb);
2048         return -1;
2049 }
2050
2051 /* reopen all tdb's */
2052 int tdb_reopen_all(void)
2053 {
2054         TDB_CONTEXT *tdb;
2055
2056         for (tdb=tdbs; tdb; tdb = tdb->next) {
2057                 /* Ensure no clear-if-first. */
2058                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
2059                 if (tdb_reopen(tdb) != 0)
2060                         return -1;
2061         }
2062
2063         return 0;
2064 }