fec0b3094d366e0176e21da820e4b57cc29d868d
[tprouty/samba.git] / source / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2004
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 2 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23    
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, write to the Free Software
26    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 */
28
29
30 /* NOTE: If you use tdbs under valgrind, and in particular if you run
31  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
32  * think this is because valgrind doesn't understand that the mmap'd
33  * area may be written to by other processes.  Memory can, from the
34  * point of view of the grinded process, spontaneously become
35  * initialized.
36  *
37  * I can think of a few solutions.  [mbp 20030311]
38  *
39  * 1 - Write suppressions for Valgrind so that it doesn't complain
40  * about this.  Probably the most reasonable but people need to
41  * remember to use them.
42  *
43  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
44  *
45  * 3 - Use the special valgrind macros to mark memory as valid at the
46  * right time.  Probably too hard -- the process just doesn't know.
47  */ 
48
49 #ifdef STANDALONE
50 #if HAVE_CONFIG_H
51 #include <config.h>
52 #endif
53
54 #include <stdlib.h>
55 #include <stdio.h>
56 #include <fcntl.h>
57 #include <unistd.h>
58 #include <string.h>
59 #include <fcntl.h>
60 #include <errno.h>
61 #include <sys/mman.h>
62 #include <sys/stat.h>
63 #include <signal.h>
64 #include "tdb.h"
65 #include "spinlock.h"
66 #else
67 #include "includes.h"
68 #endif
69
70 #define TDB_MAGIC_FOOD "TDB file\n"
71 #define TDB_VERSION (0x26011967 + 6)
72 #define TDB_MAGIC (0x26011999U)
73 #define TDB_FREE_MAGIC (~TDB_MAGIC)
74 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
75 #define TDB_ALIGNMENT 4
76 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
77 #define DEFAULT_HASH_SIZE 131
78 #define TDB_PAGE_SIZE 0x2000
79 #define FREELIST_TOP (sizeof(struct tdb_header))
80 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
81 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
82 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
83 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
84 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
85 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
86
87
88 /* NB assumes there is a local variable called "tdb" that is the
89  * current context, also takes doubly-parenthesized print-style
90  * argument. */
91 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
92
93 /* lock offsets */
94 #define GLOBAL_LOCK 0
95 #define ACTIVE_LOCK 4
96
97 #ifndef MAP_FILE
98 #define MAP_FILE 0
99 #endif
100
101 #ifndef MAP_FAILED
102 #define MAP_FAILED ((void *)-1)
103 #endif
104
105 /* free memory if the pointer is valid and zero the pointer */
106 #ifndef SAFE_FREE
107 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
108 #endif
109
110 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
111 TDB_DATA tdb_null;
112
113 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
114 static TDB_CONTEXT *tdbs = NULL;
115
116 static int tdb_munmap(TDB_CONTEXT *tdb)
117 {
118         if (tdb->flags & TDB_INTERNAL)
119                 return 0;
120
121 #ifdef HAVE_MMAP
122         if (tdb->map_ptr) {
123                 int ret = munmap(tdb->map_ptr, tdb->map_size);
124                 if (ret != 0)
125                         return ret;
126         }
127 #endif
128         tdb->map_ptr = NULL;
129         return 0;
130 }
131
132 static void tdb_mmap(TDB_CONTEXT *tdb)
133 {
134         if (tdb->flags & TDB_INTERNAL)
135                 return;
136
137 #ifdef HAVE_MMAP
138         if (!(tdb->flags & TDB_NOMMAP)) {
139                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
140                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
141                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
142
143                 /*
144                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
145                  */
146
147                 if (tdb->map_ptr == MAP_FAILED) {
148                         tdb->map_ptr = NULL;
149                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
150                                  tdb->map_size, strerror(errno)));
151                 }
152         } else {
153                 tdb->map_ptr = NULL;
154         }
155 #else
156         tdb->map_ptr = NULL;
157 #endif
158 }
159
160 /* Endian conversion: we only ever deal with 4 byte quantities */
161 static void *convert(void *buf, u32 size)
162 {
163         u32 i, *p = buf;
164         for (i = 0; i < size / 4; i++)
165                 p[i] = TDB_BYTEREV(p[i]);
166         return buf;
167 }
168 #define DOCONV() (tdb->flags & TDB_CONVERT)
169 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
170
171 /* the body of the database is made of one list_struct for the free space
172    plus a separate data list for each hash value */
173 struct list_struct {
174         tdb_off next; /* offset of the next record in the list */
175         tdb_len rec_len; /* total byte length of record */
176         tdb_len key_len; /* byte length of key */
177         tdb_len data_len; /* byte length of data */
178         u32 full_hash; /* the full 32 bit hash of the key */
179         u32 magic;   /* try to catch errors */
180         /* the following union is implied:
181                 union {
182                         char record[rec_len];
183                         struct {
184                                 char key[key_len];
185                                 char data[data_len];
186                         }
187                         u32 totalsize; (tailer)
188                 }
189         */
190 };
191
192 /***************************************************************
193  Allow a caller to set a "alarm" flag that tdb can check to abort
194  a blocking lock on SIGALRM.
195 ***************************************************************/
196
197 static sig_atomic_t *palarm_fired;
198
199 void tdb_set_lock_alarm(sig_atomic_t *palarm)
200 {
201         palarm_fired = palarm;
202 }
203
204 /* a byte range locking function - return 0 on success
205    this functions locks/unlocks 1 byte at the specified offset.
206
207    On error, errno is also set so that errors are passed back properly
208    through tdb_open(). */
209 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
210                       int rw_type, int lck_type, int probe)
211 {
212         struct flock fl;
213         int ret;
214
215         if (tdb->flags & TDB_NOLOCK)
216                 return 0;
217         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
218                 errno = EACCES;
219                 return -1;
220         }
221
222         fl.l_type = rw_type;
223         fl.l_whence = SEEK_SET;
224         fl.l_start = offset;
225         fl.l_len = 1;
226         fl.l_pid = 0;
227
228         do {
229                 ret = fcntl(tdb->fd,lck_type,&fl);
230                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
231                         break;
232         } while (ret == -1 && errno == EINTR);
233
234         if (ret == -1) {
235                 if (!probe && lck_type != F_SETLK) {
236                         /* Ensure error code is set for log fun to examine. */
237                         if (errno == EINTR && palarm_fired && *palarm_fired)
238                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
239                         else
240                                 tdb->ecode = TDB_ERR_LOCK;
241                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
242                                  tdb->fd, offset, rw_type, lck_type));
243                 }
244                 /* Was it an alarm timeout ? */
245                 if (errno == EINTR && palarm_fired && *palarm_fired) {
246                         TDB_LOG((tdb, 5, "tdb_brlock timed out (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
247                                  tdb->fd, offset, rw_type, lck_type));
248                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
249                 }
250                 /* Otherwise - generic lock error. errno set by fcntl.
251                  * EAGAIN is an expected return from non-blocking
252                  * locks. */
253                 if (errno != EAGAIN) {
254                         TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
255                                  tdb->fd, offset, rw_type, lck_type, 
256                                  strerror(errno)));
257                 }
258                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
259         }
260         return 0;
261 }
262
263 /* lock a list in the database. list -1 is the alloc list */
264 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
265 {
266         if (list < -1 || list >= (int)tdb->header.hash_size) {
267                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
268                            list, ltype));
269                 return -1;
270         }
271         if (tdb->flags & TDB_NOLOCK)
272                 return 0;
273
274         /* Since fcntl locks don't nest, we do a lock for the first one,
275            and simply bump the count for future ones */
276         if (tdb->locked[list+1].count == 0) {
277                 if (!tdb->read_only && tdb->header.rwlocks) {
278                         if (tdb_spinlock(tdb, list, ltype)) {
279                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
280                                            list, ltype));
281                                 return -1;
282                         }
283                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
284                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
285                                            list, ltype, strerror(errno)));
286                         return -1;
287                 }
288                 tdb->locked[list+1].ltype = ltype;
289         }
290         tdb->locked[list+1].count++;
291         return 0;
292 }
293
294 /* unlock the database: returns void because it's too late for errors. */
295         /* changed to return int it may be interesting to know there
296            has been an error  --simo */
297 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
298 {
299         int ret = -1;
300
301         if (tdb->flags & TDB_NOLOCK)
302                 return 0;
303
304         /* Sanity checks */
305         if (list < -1 || list >= (int)tdb->header.hash_size) {
306                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
307                 return ret;
308         }
309
310         if (tdb->locked[list+1].count==0) {
311                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
312                 return ret;
313         }
314
315         if (tdb->locked[list+1].count == 1) {
316                 /* Down to last nested lock: unlock underneath */
317                 if (!tdb->read_only && tdb->header.rwlocks) {
318                         ret = tdb_spinunlock(tdb, list, ltype);
319                 } else {
320                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
321                 }
322         } else {
323                 ret = 0;
324         }
325         tdb->locked[list+1].count--;
326
327         if (ret)
328                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
329         return ret;
330 }
331
332 /* This is based on the hash algorithm from gdbm */
333 static u32 tdb_hash(TDB_DATA *key)
334 {
335         u32 value;      /* Used to compute the hash value.  */
336         u32   i;        /* Used to cycle through random values. */
337
338         /* Set the initial value from the key size. */
339         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
340                 value = (value + (key->dptr[i] << (i*5 % 24)));
341
342         return (1103515243 * value + 12345);  
343 }
344
345 /* check for an out of bounds access - if it is out of bounds then
346    see if the database has been expanded by someone else and expand
347    if necessary 
348    note that "len" is the minimum length needed for the db
349 */
350 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
351 {
352         struct stat st;
353         if (len <= tdb->map_size)
354                 return 0;
355         if (tdb->flags & TDB_INTERNAL) {
356                 if (!probe) {
357                         /* Ensure ecode is set for log fn. */
358                         tdb->ecode = TDB_ERR_IO;
359                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
360                                  (int)len, (int)tdb->map_size));
361                 }
362                 return TDB_ERRCODE(TDB_ERR_IO, -1);
363         }
364
365         if (fstat(tdb->fd, &st) == -1)
366                 return TDB_ERRCODE(TDB_ERR_IO, -1);
367
368         if (st.st_size < (size_t)len) {
369                 if (!probe) {
370                         /* Ensure ecode is set for log fn. */
371                         tdb->ecode = TDB_ERR_IO;
372                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
373                                  (int)len, (int)st.st_size));
374                 }
375                 return TDB_ERRCODE(TDB_ERR_IO, -1);
376         }
377
378         /* Unmap, update size, remap */
379         if (tdb_munmap(tdb) == -1)
380                 return TDB_ERRCODE(TDB_ERR_IO, -1);
381         tdb->map_size = st.st_size;
382         tdb_mmap(tdb);
383         return 0;
384 }
385
386 /* write a lump of data at a specified offset */
387 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
388 {
389         if (tdb_oob(tdb, off + len, 0) != 0)
390                 return -1;
391
392         if (tdb->map_ptr)
393                 memcpy(off + (char *)tdb->map_ptr, buf, len);
394 #ifdef HAVE_PWRITE
395         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
396 #else
397         else if (lseek(tdb->fd, off, SEEK_SET) != off
398                  || write(tdb->fd, buf, len) != (ssize_t)len) {
399 #endif
400                 /* Ensure ecode is set for log fn. */
401                 tdb->ecode = TDB_ERR_IO;
402                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
403                            off, len, strerror(errno)));
404                 return TDB_ERRCODE(TDB_ERR_IO, -1);
405         }
406         return 0;
407 }
408
409 /* read a lump of data at a specified offset, maybe convert */
410 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
411 {
412         if (tdb_oob(tdb, off + len, 0) != 0)
413                 return -1;
414
415         if (tdb->map_ptr)
416                 memcpy(buf, off + (char *)tdb->map_ptr, len);
417 #ifdef HAVE_PREAD
418         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
419 #else
420         else if (lseek(tdb->fd, off, SEEK_SET) != off
421                  || read(tdb->fd, buf, len) != (ssize_t)len) {
422 #endif
423                 /* Ensure ecode is set for log fn. */
424                 tdb->ecode = TDB_ERR_IO;
425                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
426                            off, len, strerror(errno)));
427                 return TDB_ERRCODE(TDB_ERR_IO, -1);
428         }
429         if (cv)
430                 convert(buf, len);
431         return 0;
432 }
433
434 /* read a lump of data, allocating the space for it */
435 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
436 {
437         char *buf;
438
439         if (!(buf = malloc(len))) {
440                 /* Ensure ecode is set for log fn. */
441                 tdb->ecode = TDB_ERR_OOM;
442                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
443                            len, strerror(errno)));
444                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
445         }
446         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
447                 SAFE_FREE(buf);
448                 return NULL;
449         }
450         return buf;
451 }
452
453 /* read/write a tdb_off */
454 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
455 {
456         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
457 }
458 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
459 {
460         tdb_off off = *d;
461         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
462 }
463
464 /* read/write a record */
465 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
466 {
467         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
468                 return -1;
469         if (TDB_BAD_MAGIC(rec)) {
470                 /* Ensure ecode is set for log fn. */
471                 tdb->ecode = TDB_ERR_CORRUPT;
472                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
473                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
474         }
475         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
476 }
477 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
478 {
479         struct list_struct r = *rec;
480         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
481 }
482
483 /* read a freelist record and check for simple errors */
484 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
485 {
486         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
487                 return -1;
488
489         if (rec->magic == TDB_MAGIC) {
490                 /* this happens when a app is showdown while deleting a record - we should
491                    not completely fail when this happens */
492                 TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
493                          rec->magic, off));
494                 rec->magic = TDB_FREE_MAGIC;
495                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
496                         return -1;
497         }
498
499         if (rec->magic != TDB_FREE_MAGIC) {
500                 /* Ensure ecode is set for log fn. */
501                 tdb->ecode = TDB_ERR_CORRUPT;
502                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
503                            rec->magic, off));
504                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
505         }
506         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
507                 return -1;
508         return 0;
509 }
510
511 /* update a record tailer (must hold allocation lock) */
512 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
513                          const struct list_struct *rec)
514 {
515         tdb_off totalsize;
516
517         /* Offset of tailer from record header */
518         totalsize = sizeof(*rec) + rec->rec_len;
519         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
520                          &totalsize);
521 }
522
523 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
524 {
525         struct list_struct rec;
526         tdb_off tailer_ofs, tailer;
527
528         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
529                 printf("ERROR: failed to read record at %u\n", offset);
530                 return 0;
531         }
532
533         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
534                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
535
536         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
537         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
538                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
539                 return rec.next;
540         }
541
542         if (tailer != rec.rec_len + sizeof(rec)) {
543                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
544                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
545         }
546         return rec.next;
547 }
548
549 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
550 {
551         tdb_off rec_ptr, top;
552
553         top = TDB_HASH_TOP(i);
554
555         if (tdb_lock(tdb, i, F_WRLCK) != 0)
556                 return -1;
557
558         if (ofs_read(tdb, top, &rec_ptr) == -1)
559                 return tdb_unlock(tdb, i, F_WRLCK);
560
561         if (rec_ptr)
562                 printf("hash=%d\n", i);
563
564         while (rec_ptr) {
565                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
566         }
567
568         return tdb_unlock(tdb, i, F_WRLCK);
569 }
570
571 void tdb_dump_all(TDB_CONTEXT *tdb)
572 {
573         int i;
574         for (i=0;i<tdb->header.hash_size;i++) {
575                 tdb_dump_chain(tdb, i);
576         }
577         printf("freelist:\n");
578         tdb_dump_chain(tdb, -1);
579 }
580
581 int tdb_printfreelist(TDB_CONTEXT *tdb)
582 {
583         int ret;
584         long total_free = 0;
585         tdb_off offset, rec_ptr;
586         struct list_struct rec;
587
588         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
589                 return ret;
590
591         offset = FREELIST_TOP;
592
593         /* read in the freelist top */
594         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
595                 tdb_unlock(tdb, -1, F_WRLCK);
596                 return 0;
597         }
598
599         printf("freelist top=[0x%08x]\n", rec_ptr );
600         while (rec_ptr) {
601                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
602                         tdb_unlock(tdb, -1, F_WRLCK);
603                         return -1;
604                 }
605
606                 if (rec.magic != TDB_FREE_MAGIC) {
607                         printf("bad magic 0x%08x in free list\n", rec.magic);
608                         tdb_unlock(tdb, -1, F_WRLCK);
609                         return -1;
610                 }
611
612                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
613                 total_free += rec.rec_len;
614
615                 /* move to the next record */
616                 rec_ptr = rec.next;
617         }
618         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
619                (int)total_free);
620
621         return tdb_unlock(tdb, -1, F_WRLCK);
622 }
623
624 /* Remove an element from the freelist.  Must have alloc lock. */
625 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
626 {
627         tdb_off last_ptr, i;
628
629         /* read in the freelist top */
630         last_ptr = FREELIST_TOP;
631         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
632                 if (i == off) {
633                         /* We've found it! */
634                         return ofs_write(tdb, last_ptr, &next);
635                 }
636                 /* Follow chain (next offset is at start of record) */
637                 last_ptr = i;
638         }
639         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
640         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
641 }
642
643 /* Add an element into the freelist. Merge adjacent records if
644    neccessary. */
645 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
646 {
647         tdb_off right, left;
648
649         /* Allocation and tailer lock */
650         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
651                 return -1;
652
653         /* set an initial tailer, so if we fail we don't leave a bogus record */
654         if (update_tailer(tdb, offset, rec) != 0) {
655                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
656                 goto fail;
657         }
658
659         /* Look right first (I'm an Australian, dammit) */
660         right = offset + sizeof(*rec) + rec->rec_len;
661         if (right + sizeof(*rec) <= tdb->map_size) {
662                 struct list_struct r;
663
664                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
665                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
666                         goto left;
667                 }
668
669                 /* If it's free, expand to include it. */
670                 if (r.magic == TDB_FREE_MAGIC) {
671                         if (remove_from_freelist(tdb, right, r.next) == -1) {
672                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
673                                 goto left;
674                         }
675                         rec->rec_len += sizeof(r) + r.rec_len;
676                 }
677         }
678
679 left:
680         /* Look left */
681         left = offset - sizeof(tdb_off);
682         if (left > TDB_DATA_START(tdb->header.hash_size)) {
683                 struct list_struct l;
684                 tdb_off leftsize;
685                 
686                 /* Read in tailer and jump back to header */
687                 if (ofs_read(tdb, left, &leftsize) == -1) {
688                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
689                         goto update;
690                 }
691                 left = offset - leftsize;
692
693                 /* Now read in record */
694                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
695                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
696                         goto update;
697                 }
698
699                 /* If it's free, expand to include it. */
700                 if (l.magic == TDB_FREE_MAGIC) {
701                         if (remove_from_freelist(tdb, left, l.next) == -1) {
702                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
703                                 goto update;
704                         } else {
705                                 offset = left;
706                                 rec->rec_len += leftsize;
707                         }
708                 }
709         }
710
711 update:
712         if (update_tailer(tdb, offset, rec) == -1) {
713                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
714                 goto fail;
715         }
716
717         /* Now, prepend to free list */
718         rec->magic = TDB_FREE_MAGIC;
719
720         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
721             rec_write(tdb, offset, rec) == -1 ||
722             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
723                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
724                 goto fail;
725         }
726
727         /* And we're done. */
728         tdb_unlock(tdb, -1, F_WRLCK);
729         return 0;
730
731  fail:
732         tdb_unlock(tdb, -1, F_WRLCK);
733         return -1;
734 }
735
736
737 /* expand a file.  we prefer to use ftruncate, as that is what posix
738   says to use for mmap expansion */
739 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
740 {
741         char buf[1024];
742 #if HAVE_FTRUNCATE_EXTEND
743         if (ftruncate(tdb->fd, size+addition) != 0) {
744                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
745                            size+addition, strerror(errno)));
746                 return -1;
747         }
748 #else
749         char b = 0;
750
751 #ifdef HAVE_PWRITE
752         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
753 #else
754         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
755             write(tdb->fd, &b, 1) != 1) {
756 #endif
757                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
758                            size+addition, strerror(errno)));
759                 return -1;
760         }
761 #endif
762
763         /* now fill the file with something. This ensures that the file isn't sparse, which would be
764            very bad if we ran out of disk. This must be done with write, not via mmap */
765         memset(buf, 0x42, sizeof(buf));
766         while (addition) {
767                 int n = addition>sizeof(buf)?sizeof(buf):addition;
768 #ifdef HAVE_PWRITE
769                 int ret = pwrite(tdb->fd, buf, n, size);
770 #else
771                 int ret;
772                 if (lseek(tdb->fd, size, SEEK_SET) != size)
773                         return -1;
774                 ret = write(tdb->fd, buf, n);
775 #endif
776                 if (ret != n) {
777                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
778                                    n, strerror(errno)));
779                         return -1;
780                 }
781                 addition -= n;
782                 size += n;
783         }
784         return 0;
785 }
786
787
788 /* expand the database at least size bytes by expanding the underlying
789    file and doing the mmap again if necessary */
790 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
791 {
792         struct list_struct rec;
793         tdb_off offset;
794
795         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
796                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
797                 return -1;
798         }
799
800         /* must know about any previous expansions by another process */
801         tdb_oob(tdb, tdb->map_size + 1, 1);
802
803         /* always make room for at least 10 more records, and round
804            the database up to a multiple of TDB_PAGE_SIZE */
805         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
806
807         if (!(tdb->flags & TDB_INTERNAL))
808                 tdb_munmap(tdb);
809
810         /*
811          * We must ensure the file is unmapped before doing this
812          * to ensure consistency with systems like OpenBSD where
813          * writes and mmaps are not consistent.
814          */
815
816         /* expand the file itself */
817         if (!(tdb->flags & TDB_INTERNAL)) {
818                 if (expand_file(tdb, tdb->map_size, size) != 0)
819                         goto fail;
820         }
821
822         tdb->map_size += size;
823
824         if (tdb->flags & TDB_INTERNAL)
825                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
826         else {
827                 /*
828                  * We must ensure the file is remapped before adding the space
829                  * to ensure consistency with systems like OpenBSD where
830                  * writes and mmaps are not consistent.
831                  */
832
833                 /* We're ok if the mmap fails as we'll fallback to read/write */
834                 tdb_mmap(tdb);
835         }
836
837         /* form a new freelist record */
838         memset(&rec,'\0',sizeof(rec));
839         rec.rec_len = size - sizeof(rec);
840
841         /* link it into the free list */
842         offset = tdb->map_size - size;
843         if (tdb_free(tdb, offset, &rec) == -1)
844                 goto fail;
845
846         tdb_unlock(tdb, -1, F_WRLCK);
847         return 0;
848  fail:
849         tdb_unlock(tdb, -1, F_WRLCK);
850         return -1;
851 }
852
853 /* allocate some space from the free list. The offset returned points
854    to a unconnected list_struct within the database with room for at
855    least length bytes of total data
856
857    0 is returned if the space could not be allocated
858  */
859 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
860                             struct list_struct *rec)
861 {
862         tdb_off rec_ptr, last_ptr, newrec_ptr;
863         struct list_struct newrec;
864
865         memset(&newrec, '\0', sizeof(newrec));
866
867         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
868                 return 0;
869
870         /* Extra bytes required for tailer */
871         length += sizeof(tdb_off);
872
873  again:
874         last_ptr = FREELIST_TOP;
875
876         /* read in the freelist top */
877         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
878                 goto fail;
879
880         /* keep looking until we find a freelist record big enough */
881         while (rec_ptr) {
882                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
883                         goto fail;
884
885                 if (rec->rec_len >= length) {
886                         /* found it - now possibly split it up  */
887                         if (rec->rec_len > length + MIN_REC_SIZE) {
888                                 /* Length of left piece */
889                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
890
891                                 /* Right piece to go on free list */
892                                 newrec.rec_len = rec->rec_len
893                                         - (sizeof(*rec) + length);
894                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
895
896                                 /* And left record is shortened */
897                                 rec->rec_len = length;
898                         } else
899                                 newrec_ptr = 0;
900
901                         /* Remove allocated record from the free list */
902                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
903                                 goto fail;
904
905                         /* Update header: do this before we drop alloc
906                            lock, otherwise tdb_free() might try to
907                            merge with us, thinking we're free.
908                            (Thanks Jeremy Allison). */
909                         rec->magic = TDB_MAGIC;
910                         if (rec_write(tdb, rec_ptr, rec) == -1)
911                                 goto fail;
912
913                         /* Did we create new block? */
914                         if (newrec_ptr) {
915                                 /* Update allocated record tailer (we
916                                    shortened it). */
917                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
918                                         goto fail;
919
920                                 /* Free new record */
921                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
922                                         goto fail;
923                         }
924
925                         /* all done - return the new record offset */
926                         tdb_unlock(tdb, -1, F_WRLCK);
927                         return rec_ptr;
928                 }
929                 /* move to the next record */
930                 last_ptr = rec_ptr;
931                 rec_ptr = rec->next;
932         }
933         /* we didn't find enough space. See if we can expand the
934            database and if we can then try again */
935         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
936                 goto again;
937  fail:
938         tdb_unlock(tdb, -1, F_WRLCK);
939         return 0;
940 }
941
942 /* initialise a new database with a specified hash size */
943 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
944 {
945         struct tdb_header *newdb;
946         int size, ret = -1;
947
948         /* We make it up in memory, then write it out if not internal */
949         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
950         if (!(newdb = calloc(size, 1)))
951                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
952
953         /* Fill in the header */
954         newdb->version = TDB_VERSION;
955         newdb->hash_size = hash_size;
956 #ifdef USE_SPINLOCKS
957         newdb->rwlocks = size;
958 #endif
959         if (tdb->flags & TDB_INTERNAL) {
960                 tdb->map_size = size;
961                 tdb->map_ptr = (char *)newdb;
962                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
963                 /* Convert the `ondisk' version if asked. */
964                 CONVERT(*newdb);
965                 return 0;
966         }
967         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
968                 goto fail;
969
970         if (ftruncate(tdb->fd, 0) == -1)
971                 goto fail;
972
973         /* This creates an endian-converted header, as if read from disk */
974         CONVERT(*newdb);
975         memcpy(&tdb->header, newdb, sizeof(tdb->header));
976         /* Don't endian-convert the magic food! */
977         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
978         if (write(tdb->fd, newdb, size) != size)
979                 ret = -1;
980         else
981                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
982
983   fail:
984         SAFE_FREE(newdb);
985         return ret;
986 }
987
988 /* Returns 0 on fail.  On success, return offset of record, and fills
989    in rec */
990 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
991                         struct list_struct *r)
992 {
993         tdb_off rec_ptr;
994         
995         /* read in the hash top */
996         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
997                 return 0;
998
999         /* keep looking until we find the right record */
1000         while (rec_ptr) {
1001                 if (rec_read(tdb, rec_ptr, r) == -1)
1002                         return 0;
1003
1004                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
1005                         char *k;
1006                         /* a very likely hit - read the key */
1007                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
1008                                            r->key_len);
1009                         if (!k)
1010                                 return 0;
1011
1012                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1013                                 SAFE_FREE(k);
1014                                 return rec_ptr;
1015                         }
1016                         SAFE_FREE(k);
1017                 }
1018                 rec_ptr = r->next;
1019         }
1020         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1021 }
1022
1023 /* If they do lockkeys, check that this hash is one they locked */
1024 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1025 {
1026         u32 i;
1027         if (!tdb->lockedkeys)
1028                 return 1;
1029         for (i = 0; i < tdb->lockedkeys[0]; i++)
1030                 if (tdb->lockedkeys[i+1] == hash)
1031                         return 1;
1032         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1033 }
1034
1035 /* As tdb_find, but if you succeed, keep the lock */
1036 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1037                              struct list_struct *rec)
1038 {
1039         u32 rec_ptr;
1040
1041         if (!tdb_keylocked(tdb, hash))
1042                 return 0;
1043         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1044                 return 0;
1045         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1046                 tdb_unlock(tdb, BUCKET(hash), locktype);
1047         return rec_ptr;
1048 }
1049
1050 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1051 {
1052         return tdb->ecode;
1053 }
1054
1055 static struct tdb_errname {
1056         enum TDB_ERROR ecode; const char *estring;
1057 } emap[] = { {TDB_SUCCESS, "Success"},
1058              {TDB_ERR_CORRUPT, "Corrupt database"},
1059              {TDB_ERR_IO, "IO Error"},
1060              {TDB_ERR_LOCK, "Locking error"},
1061              {TDB_ERR_OOM, "Out of memory"},
1062              {TDB_ERR_EXISTS, "Record exists"},
1063              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1064              {TDB_ERR_NOEXIST, "Record does not exist"} };
1065
1066 /* Error string for the last tdb error */
1067 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1068 {
1069         u32 i;
1070         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1071                 if (tdb->ecode == emap[i].ecode)
1072                         return emap[i].estring;
1073         return "Invalid error code";
1074 }
1075
1076 /* update an entry in place - this only works if the new data size
1077    is <= the old data size and the key exists.
1078    on failure return -1.
1079 */
1080
1081 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1082 {
1083         struct list_struct rec;
1084         tdb_off rec_ptr;
1085
1086         /* find entry */
1087         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1088                 return -1;
1089
1090         /* must be long enough key, data and tailer */
1091         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1092                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1093                 return -1;
1094         }
1095
1096         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1097                       dbuf.dptr, dbuf.dsize) == -1)
1098                 return -1;
1099
1100         if (dbuf.dsize != rec.data_len) {
1101                 /* update size */
1102                 rec.data_len = dbuf.dsize;
1103                 return rec_write(tdb, rec_ptr, &rec);
1104         }
1105  
1106         return 0;
1107 }
1108
1109 /* find an entry in the database given a key */
1110 /* If an entry doesn't exist tdb_err will be set to
1111  * TDB_ERR_NOEXIST. If a key has no data attached
1112  * tdb_err will not be set. Both will return a
1113  * zero pptr and zero dsize.
1114  */
1115
1116 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1117 {
1118         tdb_off rec_ptr;
1119         struct list_struct rec;
1120         TDB_DATA ret;
1121         u32 hash;
1122
1123         /* find which hash bucket it is in */
1124         hash = tdb_hash(&key);
1125         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1126                 return tdb_null;
1127
1128         if (rec.data_len)
1129                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1130                                           rec.data_len);
1131         else
1132                 ret.dptr = NULL;
1133         ret.dsize = rec.data_len;
1134         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1135         return ret;
1136 }
1137
1138 /* check if an entry in the database exists 
1139
1140    note that 1 is returned if the key is found and 0 is returned if not found
1141    this doesn't match the conventions in the rest of this module, but is
1142    compatible with gdbm
1143 */
1144 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1145 {
1146         struct list_struct rec;
1147         
1148         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1149                 return 0;
1150         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1151         return 1;
1152 }
1153
1154 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1155 {
1156         u32 hash = tdb_hash(&key);
1157         return tdb_exists_hash(tdb, key, hash);
1158 }
1159
1160 /* record lock stops delete underneath */
1161 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1162 {
1163         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1164 }
1165 /*
1166   Write locks override our own fcntl readlocks, so check it here.
1167   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1168   an error to fail to get the lock here.
1169 */
1170  
1171 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1172 {
1173         struct tdb_traverse_lock *i;
1174         for (i = &tdb->travlocks; i; i = i->next)
1175                 if (i->off == off)
1176                         return -1;
1177         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1178 }
1179
1180 /*
1181   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1182   an error to fail to get the lock here.
1183 */
1184
1185 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1186 {
1187         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1188 }
1189 /* fcntl locks don't stack: avoid unlocking someone else's */
1190 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1191 {
1192         struct tdb_traverse_lock *i;
1193         u32 count = 0;
1194
1195         if (off == 0)
1196                 return 0;
1197         for (i = &tdb->travlocks; i; i = i->next)
1198                 if (i->off == off)
1199                         count++;
1200         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1201 }
1202
1203 /* actually delete an entry in the database given the offset */
1204 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1205 {
1206         tdb_off last_ptr, i;
1207         struct list_struct lastrec;
1208
1209         if (tdb->read_only) return -1;
1210
1211         if (write_lock_record(tdb, rec_ptr) == -1) {
1212                 /* Someone traversing here: mark it as dead */
1213                 rec->magic = TDB_DEAD_MAGIC;
1214                 return rec_write(tdb, rec_ptr, rec);
1215         }
1216         if (write_unlock_record(tdb, rec_ptr) != 0)
1217                 return -1;
1218
1219         /* find previous record in hash chain */
1220         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1221                 return -1;
1222         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1223                 if (rec_read(tdb, i, &lastrec) == -1)
1224                         return -1;
1225
1226         /* unlink it: next ptr is at start of record. */
1227         if (last_ptr == 0)
1228                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1229         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1230                 return -1;
1231
1232         /* recover the space */
1233         if (tdb_free(tdb, rec_ptr, rec) == -1)
1234                 return -1;
1235         return 0;
1236 }
1237
1238 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1239 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1240                          struct list_struct *rec)
1241 {
1242         int want_next = (tlock->off != 0);
1243
1244         /* No traversal allows if you've called tdb_lockkeys() */
1245         if (tdb->lockedkeys)
1246                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1247
1248         /* Lock each chain from the start one. */
1249         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1250                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1251                         return -1;
1252
1253                 /* No previous record?  Start at top of chain. */
1254                 if (!tlock->off) {
1255                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1256                                      &tlock->off) == -1)
1257                                 goto fail;
1258                 } else {
1259                         /* Otherwise unlock the previous record. */
1260                         if (unlock_record(tdb, tlock->off) != 0)
1261                                 goto fail;
1262                 }
1263
1264                 if (want_next) {
1265                         /* We have offset of old record: grab next */
1266                         if (rec_read(tdb, tlock->off, rec) == -1)
1267                                 goto fail;
1268                         tlock->off = rec->next;
1269                 }
1270
1271                 /* Iterate through chain */
1272                 while( tlock->off) {
1273                         tdb_off current;
1274                         if (rec_read(tdb, tlock->off, rec) == -1)
1275                                 goto fail;
1276                         if (!TDB_DEAD(rec)) {
1277                                 /* Woohoo: we found one! */
1278                                 if (lock_record(tdb, tlock->off) != 0)
1279                                         goto fail;
1280                                 return tlock->off;
1281                         }
1282                         /* Try to clean dead ones from old traverses */
1283                         current = tlock->off;
1284                         tlock->off = rec->next;
1285                         if (!tdb->read_only && 
1286                             do_delete(tdb, current, rec) != 0)
1287                                 goto fail;
1288                 }
1289                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1290                 want_next = 0;
1291         }
1292         /* We finished iteration without finding anything */
1293         return TDB_ERRCODE(TDB_SUCCESS, 0);
1294
1295  fail:
1296         tlock->off = 0;
1297         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1298                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1299         return -1;
1300 }
1301
1302 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1303    return -1 on error or the record count traversed
1304    if fn is NULL then it is not called
1305    a non-zero return value from fn() indicates that the traversal should stop
1306   */
1307 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private)
1308 {
1309         TDB_DATA key, dbuf;
1310         struct list_struct rec;
1311         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1312         int ret, count = 0;
1313
1314         /* This was in the initializaton, above, but the IRIX compiler
1315          * did not like it.  crh
1316          */
1317         tl.next = tdb->travlocks.next;
1318
1319         /* fcntl locks don't stack: beware traverse inside traverse */
1320         tdb->travlocks.next = &tl;
1321
1322         /* tdb_next_lock places locks on the record returned, and its chain */
1323         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1324                 count++;
1325                 /* now read the full record */
1326                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1327                                           rec.key_len + rec.data_len);
1328                 if (!key.dptr) {
1329                         ret = -1;
1330                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1331                                 goto out;
1332                         if (unlock_record(tdb, tl.off) != 0)
1333                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1334                         goto out;
1335                 }
1336                 key.dsize = rec.key_len;
1337                 dbuf.dptr = key.dptr + rec.key_len;
1338                 dbuf.dsize = rec.data_len;
1339
1340                 /* Drop chain lock, call out */
1341                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1342                         ret = -1;
1343                         goto out;
1344                 }
1345                 if (fn && fn(tdb, key, dbuf, private)) {
1346                         /* They want us to terminate traversal */
1347                         ret = count;
1348                         if (unlock_record(tdb, tl.off) != 0) {
1349                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1350                                 ret = -1;
1351                         }
1352                         tdb->travlocks.next = tl.next;
1353                         SAFE_FREE(key.dptr);
1354                         return count;
1355                 }
1356                 SAFE_FREE(key.dptr);
1357         }
1358 out:
1359         tdb->travlocks.next = tl.next;
1360         if (ret < 0)
1361                 return -1;
1362         else
1363                 return count;
1364 }
1365
1366 /* find the first entry in the database and return its key */
1367 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1368 {
1369         TDB_DATA key;
1370         struct list_struct rec;
1371
1372         /* release any old lock */
1373         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1374                 return tdb_null;
1375         tdb->travlocks.off = tdb->travlocks.hash = 0;
1376
1377         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1378                 return tdb_null;
1379         /* now read the key */
1380         key.dsize = rec.key_len;
1381         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1382         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1383                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1384         return key;
1385 }
1386
1387 /* find the next entry in the database, returning its key */
1388 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1389 {
1390         u32 oldhash;
1391         TDB_DATA key = tdb_null;
1392         struct list_struct rec;
1393         char *k = NULL;
1394
1395         /* Is locked key the old key?  If so, traverse will be reliable. */
1396         if (tdb->travlocks.off) {
1397                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1398                         return tdb_null;
1399                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1400                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1401                                             rec.key_len))
1402                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1403                         /* No, it wasn't: unlock it and start from scratch */
1404                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1405                                 return tdb_null;
1406                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1407                                 return tdb_null;
1408                         tdb->travlocks.off = 0;
1409                 }
1410
1411                 SAFE_FREE(k);
1412         }
1413
1414         if (!tdb->travlocks.off) {
1415                 /* No previous element: do normal find, and lock record */
1416                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb_hash(&oldkey), F_WRLCK, &rec);
1417                 if (!tdb->travlocks.off)
1418                         return tdb_null;
1419                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1420                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1421                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1422                         return tdb_null;
1423                 }
1424         }
1425         oldhash = tdb->travlocks.hash;
1426
1427         /* Grab next record: locks chain and returned record,
1428            unlocks old record */
1429         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1430                 key.dsize = rec.key_len;
1431                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1432                                           key.dsize);
1433                 /* Unlock the chain of this new record */
1434                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1435                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1436         }
1437         /* Unlock the chain of old record */
1438         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1439                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1440         return key;
1441 }
1442
1443 /* delete an entry in the database given a key */
1444 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1445 {
1446         tdb_off rec_ptr;
1447         struct list_struct rec;
1448         int ret;
1449
1450         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1451                 return -1;
1452         ret = do_delete(tdb, rec_ptr, &rec);
1453         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1454                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1455         return ret;
1456 }
1457
1458 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1459 {
1460         u32 hash = tdb_hash(&key);
1461         return tdb_delete_hash(tdb, key, hash);
1462 }
1463
1464 /* store an element in the database, replacing any existing element
1465    with the same key 
1466
1467    return 0 on success, -1 on failure
1468 */
1469 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1470 {
1471         struct list_struct rec;
1472         u32 hash;
1473         tdb_off rec_ptr;
1474         char *p = NULL;
1475         int ret = 0;
1476
1477         /* find which hash bucket it is in */
1478         hash = tdb_hash(&key);
1479         if (!tdb_keylocked(tdb, hash))
1480                 return -1;
1481         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1482                 return -1;
1483
1484         /* check for it existing, on insert. */
1485         if (flag == TDB_INSERT) {
1486                 if (tdb_exists_hash(tdb, key, hash)) {
1487                         tdb->ecode = TDB_ERR_EXISTS;
1488                         goto fail;
1489                 }
1490         } else {
1491                 /* first try in-place update, on modify or replace. */
1492                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1493                         goto out;
1494                 if (tdb->ecode == TDB_ERR_NOEXIST &&
1495                     flag == TDB_MODIFY) {
1496                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
1497                          we should fail the store */
1498                         goto fail;
1499         }
1500         }
1501         /* reset the error code potentially set by the tdb_update() */
1502         tdb->ecode = TDB_SUCCESS;
1503
1504         /* delete any existing record - if it doesn't exist we don't
1505            care.  Doing this first reduces fragmentation, and avoids
1506            coalescing with `allocated' block before it's updated. */
1507         if (flag != TDB_INSERT)
1508                 tdb_delete_hash(tdb, key, hash);
1509
1510         /* Copy key+value *before* allocating free space in case malloc
1511            fails and we are left with a dead spot in the tdb. */
1512
1513         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1514                 tdb->ecode = TDB_ERR_OOM;
1515                 goto fail;
1516         }
1517
1518         memcpy(p, key.dptr, key.dsize);
1519         if (dbuf.dsize)
1520                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1521
1522         /* we have to allocate some space */
1523         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1524                 goto fail;
1525
1526         /* Read hash top into next ptr */
1527         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1528                 goto fail;
1529
1530         rec.key_len = key.dsize;
1531         rec.data_len = dbuf.dsize;
1532         rec.full_hash = hash;
1533         rec.magic = TDB_MAGIC;
1534
1535         /* write out and point the top of the hash chain at it */
1536         if (rec_write(tdb, rec_ptr, &rec) == -1
1537             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1538             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1539                 /* Need to tdb_unallocate() here */
1540                 goto fail;
1541         }
1542  out:
1543         SAFE_FREE(p); 
1544         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1545         return ret;
1546 fail:
1547         ret = -1;
1548         goto out;
1549 }
1550
1551 /* Attempt to append data to an entry in place - this only works if the new data size
1552    is <= the old data size and the key exists.
1553    on failure return -1. Record must be locked before calling.
1554 */
1555 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1556 {
1557         struct list_struct rec;
1558         tdb_off rec_ptr;
1559
1560         /* find entry */
1561         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1562                 return -1;
1563
1564         /* Append of 0 is always ok. */
1565         if (new_dbuf.dsize == 0)
1566                 return 0;
1567
1568         /* must be long enough for key, old data + new data and tailer */
1569         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1570                 /* No room. */
1571                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1572                 return -1;
1573         }
1574
1575         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1576                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1577                 return -1;
1578
1579         /* update size */
1580         rec.data_len += new_dbuf.dsize;
1581         return rec_write(tdb, rec_ptr, &rec);
1582 }
1583
1584 /* Append to an entry. Create if not exist. */
1585
1586 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1587 {
1588         struct list_struct rec;
1589         u32 hash;
1590         tdb_off rec_ptr;
1591         char *p = NULL;
1592         int ret = 0;
1593         size_t new_data_size = 0;
1594
1595         /* find which hash bucket it is in */
1596         hash = tdb_hash(&key);
1597         if (!tdb_keylocked(tdb, hash))
1598                 return -1;
1599         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1600                 return -1;
1601
1602         /* first try in-place. */
1603         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1604                 goto out;
1605
1606         /* reset the error code potentially set by the tdb_append_inplace() */
1607         tdb->ecode = TDB_SUCCESS;
1608
1609         /* find entry */
1610         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1611                 if (tdb->ecode != TDB_ERR_NOEXIST)
1612                         goto fail;
1613
1614                 /* Not found - create. */
1615
1616                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1617                 goto out;
1618         }
1619
1620         new_data_size = rec.data_len + new_dbuf.dsize;
1621
1622         /* Copy key+old_value+value *before* allocating free space in case malloc
1623            fails and we are left with a dead spot in the tdb. */
1624
1625         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1626                 tdb->ecode = TDB_ERR_OOM;
1627                 goto fail;
1628         }
1629
1630         /* Copy the key in place. */
1631         memcpy(p, key.dptr, key.dsize);
1632
1633         /* Now read the old data into place. */
1634         if (rec.data_len &&
1635                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1636                         goto fail;
1637
1638         /* Finally append the new data. */
1639         if (new_dbuf.dsize)
1640                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1641
1642         /* delete any existing record - if it doesn't exist we don't
1643            care.  Doing this first reduces fragmentation, and avoids
1644            coalescing with `allocated' block before it's updated. */
1645
1646         tdb_delete_hash(tdb, key, hash);
1647
1648         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1649                 goto fail;
1650
1651         /* Read hash top into next ptr */
1652         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1653                 goto fail;
1654
1655         rec.key_len = key.dsize;
1656         rec.data_len = new_data_size;
1657         rec.full_hash = hash;
1658         rec.magic = TDB_MAGIC;
1659
1660         /* write out and point the top of the hash chain at it */
1661         if (rec_write(tdb, rec_ptr, &rec) == -1
1662             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1663             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1664                 /* Need to tdb_unallocate() here */
1665                 goto fail;
1666         }
1667
1668  out:
1669         SAFE_FREE(p); 
1670         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1671         return ret;
1672
1673 fail:
1674         ret = -1;
1675         goto out;
1676 }
1677
1678 static int tdb_already_open(dev_t device,
1679                             ino_t ino)
1680 {
1681         TDB_CONTEXT *i;
1682         
1683         for (i = tdbs; i; i = i->next) {
1684                 if (i->device == device && i->inode == ino) {
1685                         return 1;
1686                 }
1687         }
1688
1689         return 0;
1690 }
1691
1692 /* open the database, creating it if necessary 
1693
1694    The open_flags and mode are passed straight to the open call on the
1695    database file. A flags value of O_WRONLY is invalid. The hash size
1696    is advisory, use zero for a default value.
1697
1698    Return is NULL on error, in which case errno is also set.  Don't 
1699    try to call tdb_error or tdb_errname, just do strerror(errno).
1700
1701    @param name may be NULL for internal databases. */
1702 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1703                       int open_flags, mode_t mode)
1704 {
1705         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1706 }
1707
1708
1709 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1710                          int open_flags, mode_t mode,
1711                          tdb_log_func log_fn)
1712 {
1713         TDB_CONTEXT *tdb;
1714         struct stat st;
1715         int rev = 0, locked = 0;
1716         unsigned char *vp;
1717         u32 vertest;
1718
1719         if (!(tdb = calloc(1, sizeof *tdb))) {
1720                 /* Can't log this */
1721                 errno = ENOMEM;
1722                 goto fail;
1723         }
1724         tdb->fd = -1;
1725         tdb->name = NULL;
1726         tdb->map_ptr = NULL;
1727         tdb->lockedkeys = NULL;
1728         tdb->flags = tdb_flags;
1729         tdb->open_flags = open_flags;
1730         tdb->log_fn = log_fn;
1731         
1732         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1733                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1734                          name));
1735                 errno = EINVAL;
1736                 goto fail;
1737         }
1738         
1739         if (hash_size == 0)
1740                 hash_size = DEFAULT_HASH_SIZE;
1741         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1742                 tdb->read_only = 1;
1743                 /* read only databases don't do locking or clear if first */
1744                 tdb->flags |= TDB_NOLOCK;
1745                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1746         }
1747
1748         /* internal databases don't mmap or lock, and start off cleared */
1749         if (tdb->flags & TDB_INTERNAL) {
1750                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1751                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1752                 if (tdb_new_database(tdb, hash_size) != 0) {
1753                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1754                         goto fail;
1755                 }
1756                 goto internal;
1757         }
1758
1759         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1760                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1761                          name, strerror(errno)));
1762                 goto fail;      /* errno set by open(2) */
1763         }
1764
1765         /* ensure there is only one process initialising at once */
1766         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1767                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1768                          name, strerror(errno)));
1769                 goto fail;      /* errno set by tdb_brlock */
1770         }
1771
1772         /* we need to zero database if we are the only one with it open */
1773         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
1774                 (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
1775                 open_flags |= O_CREAT;
1776                 if (ftruncate(tdb->fd, 0) == -1) {
1777                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1778                                  "failed to truncate %s: %s\n",
1779                                  name, strerror(errno)));
1780                         goto fail; /* errno set by ftruncate */
1781                 }
1782         }
1783
1784         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1785             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1786             || (tdb->header.version != TDB_VERSION
1787                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1788                 /* its not a valid database - possibly initialise it */
1789                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1790                         errno = EIO; /* ie bad format or something */
1791                         goto fail;
1792                 }
1793                 rev = (tdb->flags & TDB_CONVERT);
1794         }
1795         vp = (unsigned char *)&tdb->header.version;
1796         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1797                   (((u32)vp[2]) << 8) | (u32)vp[3];
1798         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1799         if (!rev)
1800                 tdb->flags &= ~TDB_CONVERT;
1801         else {
1802                 tdb->flags |= TDB_CONVERT;
1803                 convert(&tdb->header, sizeof(tdb->header));
1804         }
1805         if (fstat(tdb->fd, &st) == -1)
1806                 goto fail;
1807
1808         /* Is it already in the open list?  If so, fail. */
1809         if (tdb_already_open(st.st_dev, st.st_ino)) {
1810                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1811                          "%s (%d,%d) is already open in this process\n",
1812                          name, st.st_dev, st.st_ino));
1813                 errno = EBUSY;
1814                 goto fail;
1815         }
1816
1817         if (!(tdb->name = (char *)strdup(name))) {
1818                 errno = ENOMEM;
1819                 goto fail;
1820         }
1821
1822         tdb->map_size = st.st_size;
1823         tdb->device = st.st_dev;
1824         tdb->inode = st.st_ino;
1825         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1826         if (!tdb->locked) {
1827                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1828                          "failed to allocate lock structure for %s\n",
1829                          name));
1830                 errno = ENOMEM;
1831                 goto fail;
1832         }
1833         tdb_mmap(tdb);
1834         if (locked) {
1835                 if (!tdb->read_only)
1836                         if (tdb_clear_spinlocks(tdb) != 0) {
1837                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1838                                 "failed to clear spinlock\n"));
1839                                 goto fail;
1840                         }
1841                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1842                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1843                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1844                                  name, strerror(errno)));
1845                         goto fail;
1846                 }
1847
1848         }
1849
1850         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
1851            we didn't get the initial exclusive lock as we need to let all other
1852            users know we're using it. */
1853
1854         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
1855                 /* leave this lock in place to indicate it's in use */
1856                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1857                         goto fail;
1858         }
1859
1860
1861  internal:
1862         /* Internal (memory-only) databases skip all the code above to
1863          * do with disk files, and resume here by releasing their
1864          * global lock and hooking into the active list. */
1865         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1866                 goto fail;
1867         tdb->next = tdbs;
1868         tdbs = tdb;
1869         return tdb;
1870
1871  fail:
1872         { int save_errno = errno;
1873
1874         if (!tdb)
1875                 return NULL;
1876         
1877         if (tdb->map_ptr) {
1878                 if (tdb->flags & TDB_INTERNAL)
1879                         SAFE_FREE(tdb->map_ptr);
1880                 else
1881                         tdb_munmap(tdb);
1882         }
1883         SAFE_FREE(tdb->name);
1884         if (tdb->fd != -1)
1885                 if (close(tdb->fd) != 0)
1886                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1887         SAFE_FREE(tdb->locked);
1888         SAFE_FREE(tdb);
1889         errno = save_errno;
1890         return NULL;
1891         }
1892 }
1893
1894 /**
1895  * Close a database.
1896  *
1897  * @returns -1 for error; 0 for success.
1898  **/
1899 int tdb_close(TDB_CONTEXT *tdb)
1900 {
1901         TDB_CONTEXT **i;
1902         int ret = 0;
1903
1904         if (tdb->map_ptr) {
1905                 if (tdb->flags & TDB_INTERNAL)
1906                         SAFE_FREE(tdb->map_ptr);
1907                 else
1908                         tdb_munmap(tdb);
1909         }
1910         SAFE_FREE(tdb->name);
1911         if (tdb->fd != -1)
1912                 ret = close(tdb->fd);
1913         SAFE_FREE(tdb->locked);
1914         SAFE_FREE(tdb->lockedkeys);
1915
1916         /* Remove from contexts list */
1917         for (i = &tdbs; *i; i = &(*i)->next) {
1918                 if (*i == tdb) {
1919                         *i = tdb->next;
1920                         break;
1921                 }
1922         }
1923
1924         memset(tdb, 0, sizeof(*tdb));
1925         SAFE_FREE(tdb);
1926
1927         return ret;
1928 }
1929
1930 /* lock/unlock entire database */
1931 int tdb_lockall(TDB_CONTEXT *tdb)
1932 {
1933         u32 i;
1934
1935         /* There are no locks on read-only dbs */
1936         if (tdb->read_only)
1937                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1938         if (tdb->lockedkeys)
1939                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1940         for (i = 0; i < tdb->header.hash_size; i++) 
1941                 if (tdb_lock(tdb, i, F_WRLCK))
1942                         break;
1943
1944         /* If error, release locks we have... */
1945         if (i < tdb->header.hash_size) {
1946                 u32 j;
1947
1948                 for ( j = 0; j < i; j++)
1949                         tdb_unlock(tdb, j, F_WRLCK);
1950                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1951         }
1952
1953         return 0;
1954 }
1955 void tdb_unlockall(TDB_CONTEXT *tdb)
1956 {
1957         u32 i;
1958         for (i=0; i < tdb->header.hash_size; i++)
1959                 tdb_unlock(tdb, i, F_WRLCK);
1960 }
1961
1962 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1963 {
1964         u32 i, j, hash;
1965
1966         /* Can't lock more keys if already locked */
1967         if (tdb->lockedkeys)
1968                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1969         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1970                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1971         /* First number in array is # keys */
1972         tdb->lockedkeys[0] = number;
1973
1974         /* Insertion sort by bucket */
1975         for (i = 0; i < number; i++) {
1976                 hash = tdb_hash(&keys[i]);
1977                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1978                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1979                 tdb->lockedkeys[j+1] = hash;
1980         }
1981         /* Finally, lock in order */
1982         for (i = 0; i < number; i++)
1983                 if (tdb_lock(tdb, i, F_WRLCK))
1984                         break;
1985
1986         /* If error, release locks we have... */
1987         if (i < number) {
1988                 for ( j = 0; j < i; j++)
1989                         tdb_unlock(tdb, j, F_WRLCK);
1990                 SAFE_FREE(tdb->lockedkeys);
1991                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1992         }
1993         return 0;
1994 }
1995
1996 /* Unlock the keys previously locked by tdb_lockkeys() */
1997 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1998 {
1999         u32 i;
2000         if (!tdb->lockedkeys)
2001                 return;
2002         for (i = 0; i < tdb->lockedkeys[0]; i++)
2003                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
2004         SAFE_FREE(tdb->lockedkeys);
2005 }
2006
2007 /* lock/unlock one hash chain. This is meant to be used to reduce
2008    contention - it cannot guarantee how many records will be locked */
2009 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
2010 {
2011         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
2012 }
2013
2014 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
2015 {
2016         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
2017 }
2018
2019 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
2020 {
2021         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
2022 }
2023
2024 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
2025 {
2026         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
2027 }
2028
2029
2030 /* register a loging function */
2031 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2032 {
2033         tdb->log_fn = fn;
2034 }
2035
2036
2037 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
2038    seek pointer from our parent and to re-establish locks */
2039 int tdb_reopen(TDB_CONTEXT *tdb)
2040 {
2041         struct stat st;
2042
2043         if (tdb->flags & TDB_INTERNAL)
2044                 return 0; /* Nothing to do. */
2045         if (tdb_munmap(tdb) != 0) {
2046                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2047                 goto fail;
2048         }
2049         if (close(tdb->fd) != 0)
2050                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2051         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2052         if (tdb->fd == -1) {
2053                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2054                 goto fail;
2055         }
2056         if (fstat(tdb->fd, &st) != 0) {
2057                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2058                 goto fail;
2059         }
2060         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2061                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2062                 goto fail;
2063         }
2064         tdb_mmap(tdb);
2065         if ((tdb->flags & TDB_CLEAR_IF_FIRST) && (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
2066                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2067                 goto fail;
2068         }
2069
2070         return 0;
2071
2072 fail:
2073         tdb_close(tdb);
2074         return -1;
2075 }
2076
2077 /* reopen all tdb's */
2078 int tdb_reopen_all(void)
2079 {
2080         TDB_CONTEXT *tdb;
2081
2082         for (tdb=tdbs; tdb; tdb = tdb->next) {
2083                 /* Ensure no clear-if-first. */
2084                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
2085                 if (tdb_reopen(tdb) != 0)
2086                         return -1;
2087         }
2088
2089         return 0;
2090 }