r4088: Get medieval on our ass about malloc.... :-). Take control of all our allocation
[tprouty/samba.git] / source / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2004
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 2 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23    
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, write to the Free Software
26    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 */
28
29
30 /* NOTE: If you use tdbs under valgrind, and in particular if you run
31  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
32  * think this is because valgrind doesn't understand that the mmap'd
33  * area may be written to by other processes.  Memory can, from the
34  * point of view of the grinded process, spontaneously become
35  * initialized.
36  *
37  * I can think of a few solutions.  [mbp 20030311]
38  *
39  * 1 - Write suppressions for Valgrind so that it doesn't complain
40  * about this.  Probably the most reasonable but people need to
41  * remember to use them.
42  *
43  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
44  *
45  * 3 - Use the special valgrind macros to mark memory as valid at the
46  * right time.  Probably too hard -- the process just doesn't know.
47  */ 
48
49 #ifdef STANDALONE
50 #if HAVE_CONFIG_H
51 #include <config.h>
52 #endif
53
54 #include <stdlib.h>
55 #include <stdio.h>
56 #include <fcntl.h>
57 #include <unistd.h>
58 #include <string.h>
59 #include <fcntl.h>
60 #include <errno.h>
61 #include <sys/mman.h>
62 #include <sys/stat.h>
63 #include <signal.h>
64 #include "tdb.h"
65 #include "spinlock.h"
66 #else
67 #include "includes.h"
68
69 #if defined(PARANOID_MALLOC_CHECKER)
70 #ifdef malloc
71 #undef malloc
72 #endif
73
74 #ifdef realloc
75 #undef realloc
76 #endif
77
78 #ifdef calloc
79 #undef calloc
80 #endif
81
82 #ifdef strdup
83 #undef strdup
84 #endif
85
86 #ifdef strndup
87 #undef strndup
88 #endif
89
90 #endif
91
92 #endif
93
94 #define TDB_MAGIC_FOOD "TDB file\n"
95 #define TDB_VERSION (0x26011967 + 6)
96 #define TDB_MAGIC (0x26011999U)
97 #define TDB_FREE_MAGIC (~TDB_MAGIC)
98 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
99 #define TDB_ALIGNMENT 4
100 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
101 #define DEFAULT_HASH_SIZE 131
102 #define TDB_PAGE_SIZE 0x2000
103 #define FREELIST_TOP (sizeof(struct tdb_header))
104 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
105 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
106 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
107 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
108 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
109 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
110
111
112 /* NB assumes there is a local variable called "tdb" that is the
113  * current context, also takes doubly-parenthesized print-style
114  * argument. */
115 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
116
117 /* lock offsets */
118 #define GLOBAL_LOCK 0
119 #define ACTIVE_LOCK 4
120
121 #ifndef MAP_FILE
122 #define MAP_FILE 0
123 #endif
124
125 #ifndef MAP_FAILED
126 #define MAP_FAILED ((void *)-1)
127 #endif
128
129 /* free memory if the pointer is valid and zero the pointer */
130 #ifndef SAFE_FREE
131 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
132 #endif
133
134 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
135 TDB_DATA tdb_null;
136
137 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
138 static TDB_CONTEXT *tdbs = NULL;
139
140 static int tdb_munmap(TDB_CONTEXT *tdb)
141 {
142         if (tdb->flags & TDB_INTERNAL)
143                 return 0;
144
145 #ifdef HAVE_MMAP
146         if (tdb->map_ptr) {
147                 int ret = munmap(tdb->map_ptr, tdb->map_size);
148                 if (ret != 0)
149                         return ret;
150         }
151 #endif
152         tdb->map_ptr = NULL;
153         return 0;
154 }
155
156 static void tdb_mmap(TDB_CONTEXT *tdb)
157 {
158         if (tdb->flags & TDB_INTERNAL)
159                 return;
160
161 #ifdef HAVE_MMAP
162         if (!(tdb->flags & TDB_NOMMAP)) {
163                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
164                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
165                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
166
167                 /*
168                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
169                  */
170
171                 if (tdb->map_ptr == MAP_FAILED) {
172                         tdb->map_ptr = NULL;
173                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
174                                  tdb->map_size, strerror(errno)));
175                 }
176         } else {
177                 tdb->map_ptr = NULL;
178         }
179 #else
180         tdb->map_ptr = NULL;
181 #endif
182 }
183
184 /* Endian conversion: we only ever deal with 4 byte quantities */
185 static void *convert(void *buf, u32 size)
186 {
187         u32 i, *p = buf;
188         for (i = 0; i < size / 4; i++)
189                 p[i] = TDB_BYTEREV(p[i]);
190         return buf;
191 }
192 #define DOCONV() (tdb->flags & TDB_CONVERT)
193 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
194
195 /* the body of the database is made of one list_struct for the free space
196    plus a separate data list for each hash value */
197 struct list_struct {
198         tdb_off next; /* offset of the next record in the list */
199         tdb_len rec_len; /* total byte length of record */
200         tdb_len key_len; /* byte length of key */
201         tdb_len data_len; /* byte length of data */
202         u32 full_hash; /* the full 32 bit hash of the key */
203         u32 magic;   /* try to catch errors */
204         /* the following union is implied:
205                 union {
206                         char record[rec_len];
207                         struct {
208                                 char key[key_len];
209                                 char data[data_len];
210                         }
211                         u32 totalsize; (tailer)
212                 }
213         */
214 };
215
216 /***************************************************************
217  Allow a caller to set a "alarm" flag that tdb can check to abort
218  a blocking lock on SIGALRM.
219 ***************************************************************/
220
221 static sig_atomic_t *palarm_fired;
222
223 void tdb_set_lock_alarm(sig_atomic_t *palarm)
224 {
225         palarm_fired = palarm;
226 }
227
228 /* a byte range locking function - return 0 on success
229    this functions locks/unlocks 1 byte at the specified offset.
230
231    On error, errno is also set so that errors are passed back properly
232    through tdb_open(). */
233 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
234                       int rw_type, int lck_type, int probe)
235 {
236         struct flock fl;
237         int ret;
238
239         if (tdb->flags & TDB_NOLOCK)
240                 return 0;
241         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
242                 errno = EACCES;
243                 return -1;
244         }
245
246         fl.l_type = rw_type;
247         fl.l_whence = SEEK_SET;
248         fl.l_start = offset;
249         fl.l_len = 1;
250         fl.l_pid = 0;
251
252         do {
253                 ret = fcntl(tdb->fd,lck_type,&fl);
254                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
255                         break;
256         } while (ret == -1 && errno == EINTR);
257
258         if (ret == -1) {
259                 if (!probe && lck_type != F_SETLK) {
260                         /* Ensure error code is set for log fun to examine. */
261                         if (errno == EINTR && palarm_fired && *palarm_fired)
262                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
263                         else
264                                 tdb->ecode = TDB_ERR_LOCK;
265                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
266                                  tdb->fd, offset, rw_type, lck_type));
267                 }
268                 /* Was it an alarm timeout ? */
269                 if (errno == EINTR && palarm_fired && *palarm_fired) {
270                         TDB_LOG((tdb, 5, "tdb_brlock timed out (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
271                                  tdb->fd, offset, rw_type, lck_type));
272                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
273                 }
274                 /* Otherwise - generic lock error. errno set by fcntl.
275                  * EAGAIN is an expected return from non-blocking
276                  * locks. */
277                 if (errno != EAGAIN) {
278                         TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
279                                  tdb->fd, offset, rw_type, lck_type, 
280                                  strerror(errno)));
281                 }
282                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
283         }
284         return 0;
285 }
286
287 /* lock a list in the database. list -1 is the alloc list */
288 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
289 {
290         if (list < -1 || list >= (int)tdb->header.hash_size) {
291                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
292                            list, ltype));
293                 return -1;
294         }
295         if (tdb->flags & TDB_NOLOCK)
296                 return 0;
297
298         /* Since fcntl locks don't nest, we do a lock for the first one,
299            and simply bump the count for future ones */
300         if (tdb->locked[list+1].count == 0) {
301                 if (!tdb->read_only && tdb->header.rwlocks) {
302                         if (tdb_spinlock(tdb, list, ltype)) {
303                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list %d ltype=%d\n", 
304                                            list, ltype));
305                                 return -1;
306                         }
307                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
308                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
309                                            list, ltype, strerror(errno)));
310                         return -1;
311                 }
312                 tdb->locked[list+1].ltype = ltype;
313         }
314         tdb->locked[list+1].count++;
315         return 0;
316 }
317
318 /* unlock the database: returns void because it's too late for errors. */
319         /* changed to return int it may be interesting to know there
320            has been an error  --simo */
321 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
322 {
323         int ret = -1;
324
325         if (tdb->flags & TDB_NOLOCK)
326                 return 0;
327
328         /* Sanity checks */
329         if (list < -1 || list >= (int)tdb->header.hash_size) {
330                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
331                 return ret;
332         }
333
334         if (tdb->locked[list+1].count==0) {
335                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
336                 return ret;
337         }
338
339         if (tdb->locked[list+1].count == 1) {
340                 /* Down to last nested lock: unlock underneath */
341                 if (!tdb->read_only && tdb->header.rwlocks) {
342                         ret = tdb_spinunlock(tdb, list, ltype);
343                 } else {
344                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
345                 }
346         } else {
347                 ret = 0;
348         }
349         tdb->locked[list+1].count--;
350
351         if (ret)
352                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
353         return ret;
354 }
355
356 /* check for an out of bounds access - if it is out of bounds then
357    see if the database has been expanded by someone else and expand
358    if necessary 
359    note that "len" is the minimum length needed for the db
360 */
361 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
362 {
363         struct stat st;
364         if (len <= tdb->map_size)
365                 return 0;
366         if (tdb->flags & TDB_INTERNAL) {
367                 if (!probe) {
368                         /* Ensure ecode is set for log fn. */
369                         tdb->ecode = TDB_ERR_IO;
370                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
371                                  (int)len, (int)tdb->map_size));
372                 }
373                 return TDB_ERRCODE(TDB_ERR_IO, -1);
374         }
375
376         if (fstat(tdb->fd, &st) == -1)
377                 return TDB_ERRCODE(TDB_ERR_IO, -1);
378
379         if (st.st_size < (size_t)len) {
380                 if (!probe) {
381                         /* Ensure ecode is set for log fn. */
382                         tdb->ecode = TDB_ERR_IO;
383                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
384                                  (int)len, (int)st.st_size));
385                 }
386                 return TDB_ERRCODE(TDB_ERR_IO, -1);
387         }
388
389         /* Unmap, update size, remap */
390         if (tdb_munmap(tdb) == -1)
391                 return TDB_ERRCODE(TDB_ERR_IO, -1);
392         tdb->map_size = st.st_size;
393         tdb_mmap(tdb);
394         return 0;
395 }
396
397 /* write a lump of data at a specified offset */
398 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
399 {
400         if (tdb_oob(tdb, off + len, 0) != 0)
401                 return -1;
402
403         if (tdb->map_ptr)
404                 memcpy(off + (char *)tdb->map_ptr, buf, len);
405 #ifdef HAVE_PWRITE
406         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
407 #else
408         else if (lseek(tdb->fd, off, SEEK_SET) != off
409                  || write(tdb->fd, buf, len) != (ssize_t)len) {
410 #endif
411                 /* Ensure ecode is set for log fn. */
412                 tdb->ecode = TDB_ERR_IO;
413                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
414                            off, len, strerror(errno)));
415                 return TDB_ERRCODE(TDB_ERR_IO, -1);
416         }
417         return 0;
418 }
419
420 /* read a lump of data at a specified offset, maybe convert */
421 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
422 {
423         if (tdb_oob(tdb, off + len, 0) != 0)
424                 return -1;
425
426         if (tdb->map_ptr)
427                 memcpy(buf, off + (char *)tdb->map_ptr, len);
428 #ifdef HAVE_PREAD
429         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
430 #else
431         else if (lseek(tdb->fd, off, SEEK_SET) != off
432                  || read(tdb->fd, buf, len) != (ssize_t)len) {
433 #endif
434                 /* Ensure ecode is set for log fn. */
435                 tdb->ecode = TDB_ERR_IO;
436                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
437                            off, len, strerror(errno)));
438                 return TDB_ERRCODE(TDB_ERR_IO, -1);
439         }
440         if (cv)
441                 convert(buf, len);
442         return 0;
443 }
444
445 /* read a lump of data, allocating the space for it */
446 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
447 {
448         char *buf;
449
450         if (!(buf = malloc(len))) {
451                 /* Ensure ecode is set for log fn. */
452                 tdb->ecode = TDB_ERR_OOM;
453                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
454                            len, strerror(errno)));
455                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
456         }
457         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
458                 SAFE_FREE(buf);
459                 return NULL;
460         }
461         return buf;
462 }
463
464 /* read/write a tdb_off */
465 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
466 {
467         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
468 }
469 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
470 {
471         tdb_off off = *d;
472         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
473 }
474
475 /* read/write a record */
476 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
477 {
478         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
479                 return -1;
480         if (TDB_BAD_MAGIC(rec)) {
481                 /* Ensure ecode is set for log fn. */
482                 tdb->ecode = TDB_ERR_CORRUPT;
483                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
484                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
485         }
486         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
487 }
488 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
489 {
490         struct list_struct r = *rec;
491         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
492 }
493
494 /* read a freelist record and check for simple errors */
495 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
496 {
497         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
498                 return -1;
499
500         if (rec->magic == TDB_MAGIC) {
501                 /* this happens when a app is showdown while deleting a record - we should
502                    not completely fail when this happens */
503                 TDB_LOG((tdb, 0,"rec_free_read non-free magic 0x%x at offset=%d - fixing\n", 
504                          rec->magic, off));
505                 rec->magic = TDB_FREE_MAGIC;
506                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
507                         return -1;
508         }
509
510         if (rec->magic != TDB_FREE_MAGIC) {
511                 /* Ensure ecode is set for log fn. */
512                 tdb->ecode = TDB_ERR_CORRUPT;
513                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
514                            rec->magic, off));
515                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
516         }
517         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
518                 return -1;
519         return 0;
520 }
521
522 /* update a record tailer (must hold allocation lock) */
523 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
524                          const struct list_struct *rec)
525 {
526         tdb_off totalsize;
527
528         /* Offset of tailer from record header */
529         totalsize = sizeof(*rec) + rec->rec_len;
530         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
531                          &totalsize);
532 }
533
534 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
535 {
536         struct list_struct rec;
537         tdb_off tailer_ofs, tailer;
538
539         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
540                 printf("ERROR: failed to read record at %u\n", offset);
541                 return 0;
542         }
543
544         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
545                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
546
547         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
548         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
549                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
550                 return rec.next;
551         }
552
553         if (tailer != rec.rec_len + sizeof(rec)) {
554                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
555                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
556         }
557         return rec.next;
558 }
559
560 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
561 {
562         tdb_off rec_ptr, top;
563
564         top = TDB_HASH_TOP(i);
565
566         if (tdb_lock(tdb, i, F_WRLCK) != 0)
567                 return -1;
568
569         if (ofs_read(tdb, top, &rec_ptr) == -1)
570                 return tdb_unlock(tdb, i, F_WRLCK);
571
572         if (rec_ptr)
573                 printf("hash=%d\n", i);
574
575         while (rec_ptr) {
576                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
577         }
578
579         return tdb_unlock(tdb, i, F_WRLCK);
580 }
581
582 void tdb_dump_all(TDB_CONTEXT *tdb)
583 {
584         int i;
585         for (i=0;i<tdb->header.hash_size;i++) {
586                 tdb_dump_chain(tdb, i);
587         }
588         printf("freelist:\n");
589         tdb_dump_chain(tdb, -1);
590 }
591
592 int tdb_printfreelist(TDB_CONTEXT *tdb)
593 {
594         int ret;
595         long total_free = 0;
596         tdb_off offset, rec_ptr;
597         struct list_struct rec;
598
599         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
600                 return ret;
601
602         offset = FREELIST_TOP;
603
604         /* read in the freelist top */
605         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
606                 tdb_unlock(tdb, -1, F_WRLCK);
607                 return 0;
608         }
609
610         printf("freelist top=[0x%08x]\n", rec_ptr );
611         while (rec_ptr) {
612                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
613                         tdb_unlock(tdb, -1, F_WRLCK);
614                         return -1;
615                 }
616
617                 if (rec.magic != TDB_FREE_MAGIC) {
618                         printf("bad magic 0x%08x in free list\n", rec.magic);
619                         tdb_unlock(tdb, -1, F_WRLCK);
620                         return -1;
621                 }
622
623                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
624                 total_free += rec.rec_len;
625
626                 /* move to the next record */
627                 rec_ptr = rec.next;
628         }
629         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
630                (int)total_free);
631
632         return tdb_unlock(tdb, -1, F_WRLCK);
633 }
634
635 /* Remove an element from the freelist.  Must have alloc lock. */
636 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
637 {
638         tdb_off last_ptr, i;
639
640         /* read in the freelist top */
641         last_ptr = FREELIST_TOP;
642         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
643                 if (i == off) {
644                         /* We've found it! */
645                         return ofs_write(tdb, last_ptr, &next);
646                 }
647                 /* Follow chain (next offset is at start of record) */
648                 last_ptr = i;
649         }
650         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
651         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
652 }
653
654 /* Add an element into the freelist. Merge adjacent records if
655    neccessary. */
656 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
657 {
658         tdb_off right, left;
659
660         /* Allocation and tailer lock */
661         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
662                 return -1;
663
664         /* set an initial tailer, so if we fail we don't leave a bogus record */
665         if (update_tailer(tdb, offset, rec) != 0) {
666                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
667                 goto fail;
668         }
669
670         /* Look right first (I'm an Australian, dammit) */
671         right = offset + sizeof(*rec) + rec->rec_len;
672         if (right + sizeof(*rec) <= tdb->map_size) {
673                 struct list_struct r;
674
675                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
676                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
677                         goto left;
678                 }
679
680                 /* If it's free, expand to include it. */
681                 if (r.magic == TDB_FREE_MAGIC) {
682                         if (remove_from_freelist(tdb, right, r.next) == -1) {
683                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
684                                 goto left;
685                         }
686                         rec->rec_len += sizeof(r) + r.rec_len;
687                 }
688         }
689
690 left:
691         /* Look left */
692         left = offset - sizeof(tdb_off);
693         if (left > TDB_DATA_START(tdb->header.hash_size)) {
694                 struct list_struct l;
695                 tdb_off leftsize;
696                 
697                 /* Read in tailer and jump back to header */
698                 if (ofs_read(tdb, left, &leftsize) == -1) {
699                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
700                         goto update;
701                 }
702                 left = offset - leftsize;
703
704                 /* Now read in record */
705                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
706                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
707                         goto update;
708                 }
709
710                 /* If it's free, expand to include it. */
711                 if (l.magic == TDB_FREE_MAGIC) {
712                         if (remove_from_freelist(tdb, left, l.next) == -1) {
713                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
714                                 goto update;
715                         } else {
716                                 offset = left;
717                                 rec->rec_len += leftsize;
718                         }
719                 }
720         }
721
722 update:
723         if (update_tailer(tdb, offset, rec) == -1) {
724                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
725                 goto fail;
726         }
727
728         /* Now, prepend to free list */
729         rec->magic = TDB_FREE_MAGIC;
730
731         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
732             rec_write(tdb, offset, rec) == -1 ||
733             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
734                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
735                 goto fail;
736         }
737
738         /* And we're done. */
739         tdb_unlock(tdb, -1, F_WRLCK);
740         return 0;
741
742  fail:
743         tdb_unlock(tdb, -1, F_WRLCK);
744         return -1;
745 }
746
747
748 /* expand a file.  we prefer to use ftruncate, as that is what posix
749   says to use for mmap expansion */
750 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
751 {
752         char buf[1024];
753 #if HAVE_FTRUNCATE_EXTEND
754         if (ftruncate(tdb->fd, size+addition) != 0) {
755                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
756                            size+addition, strerror(errno)));
757                 return -1;
758         }
759 #else
760         char b = 0;
761
762 #ifdef HAVE_PWRITE
763         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
764 #else
765         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
766             write(tdb->fd, &b, 1) != 1) {
767 #endif
768                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
769                            size+addition, strerror(errno)));
770                 return -1;
771         }
772 #endif
773
774         /* now fill the file with something. This ensures that the file isn't sparse, which would be
775            very bad if we ran out of disk. This must be done with write, not via mmap */
776         memset(buf, 0x42, sizeof(buf));
777         while (addition) {
778                 int n = addition>sizeof(buf)?sizeof(buf):addition;
779 #ifdef HAVE_PWRITE
780                 int ret = pwrite(tdb->fd, buf, n, size);
781 #else
782                 int ret;
783                 if (lseek(tdb->fd, size, SEEK_SET) != size)
784                         return -1;
785                 ret = write(tdb->fd, buf, n);
786 #endif
787                 if (ret != n) {
788                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
789                                    n, strerror(errno)));
790                         return -1;
791                 }
792                 addition -= n;
793                 size += n;
794         }
795         return 0;
796 }
797
798
799 /* expand the database at least size bytes by expanding the underlying
800    file and doing the mmap again if necessary */
801 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
802 {
803         struct list_struct rec;
804         tdb_off offset;
805
806         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
807                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
808                 return -1;
809         }
810
811         /* must know about any previous expansions by another process */
812         tdb_oob(tdb, tdb->map_size + 1, 1);
813
814         /* always make room for at least 10 more records, and round
815            the database up to a multiple of TDB_PAGE_SIZE */
816         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
817
818         if (!(tdb->flags & TDB_INTERNAL))
819                 tdb_munmap(tdb);
820
821         /*
822          * We must ensure the file is unmapped before doing this
823          * to ensure consistency with systems like OpenBSD where
824          * writes and mmaps are not consistent.
825          */
826
827         /* expand the file itself */
828         if (!(tdb->flags & TDB_INTERNAL)) {
829                 if (expand_file(tdb, tdb->map_size, size) != 0)
830                         goto fail;
831         }
832
833         tdb->map_size += size;
834
835         if (tdb->flags & TDB_INTERNAL)
836                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
837         else {
838                 /*
839                  * We must ensure the file is remapped before adding the space
840                  * to ensure consistency with systems like OpenBSD where
841                  * writes and mmaps are not consistent.
842                  */
843
844                 /* We're ok if the mmap fails as we'll fallback to read/write */
845                 tdb_mmap(tdb);
846         }
847
848         /* form a new freelist record */
849         memset(&rec,'\0',sizeof(rec));
850         rec.rec_len = size - sizeof(rec);
851
852         /* link it into the free list */
853         offset = tdb->map_size - size;
854         if (tdb_free(tdb, offset, &rec) == -1)
855                 goto fail;
856
857         tdb_unlock(tdb, -1, F_WRLCK);
858         return 0;
859  fail:
860         tdb_unlock(tdb, -1, F_WRLCK);
861         return -1;
862 }
863
864 /* allocate some space from the free list. The offset returned points
865    to a unconnected list_struct within the database with room for at
866    least length bytes of total data
867
868    0 is returned if the space could not be allocated
869  */
870 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
871                             struct list_struct *rec)
872 {
873         tdb_off rec_ptr, last_ptr, newrec_ptr;
874         struct list_struct newrec;
875
876         memset(&newrec, '\0', sizeof(newrec));
877
878         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
879                 return 0;
880
881         /* Extra bytes required for tailer */
882         length += sizeof(tdb_off);
883
884  again:
885         last_ptr = FREELIST_TOP;
886
887         /* read in the freelist top */
888         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
889                 goto fail;
890
891         /* keep looking until we find a freelist record big enough */
892         while (rec_ptr) {
893                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
894                         goto fail;
895
896                 if (rec->rec_len >= length) {
897                         /* found it - now possibly split it up  */
898                         if (rec->rec_len > length + MIN_REC_SIZE) {
899                                 /* Length of left piece */
900                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
901
902                                 /* Right piece to go on free list */
903                                 newrec.rec_len = rec->rec_len
904                                         - (sizeof(*rec) + length);
905                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
906
907                                 /* And left record is shortened */
908                                 rec->rec_len = length;
909                         } else
910                                 newrec_ptr = 0;
911
912                         /* Remove allocated record from the free list */
913                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
914                                 goto fail;
915
916                         /* Update header: do this before we drop alloc
917                            lock, otherwise tdb_free() might try to
918                            merge with us, thinking we're free.
919                            (Thanks Jeremy Allison). */
920                         rec->magic = TDB_MAGIC;
921                         if (rec_write(tdb, rec_ptr, rec) == -1)
922                                 goto fail;
923
924                         /* Did we create new block? */
925                         if (newrec_ptr) {
926                                 /* Update allocated record tailer (we
927                                    shortened it). */
928                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
929                                         goto fail;
930
931                                 /* Free new record */
932                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
933                                         goto fail;
934                         }
935
936                         /* all done - return the new record offset */
937                         tdb_unlock(tdb, -1, F_WRLCK);
938                         return rec_ptr;
939                 }
940                 /* move to the next record */
941                 last_ptr = rec_ptr;
942                 rec_ptr = rec->next;
943         }
944         /* we didn't find enough space. See if we can expand the
945            database and if we can then try again */
946         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
947                 goto again;
948  fail:
949         tdb_unlock(tdb, -1, F_WRLCK);
950         return 0;
951 }
952
953 /* initialise a new database with a specified hash size */
954 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
955 {
956         struct tdb_header *newdb;
957         int size, ret = -1;
958
959         /* We make it up in memory, then write it out if not internal */
960         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
961         if (!(newdb = calloc(size, 1)))
962                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
963
964         /* Fill in the header */
965         newdb->version = TDB_VERSION;
966         newdb->hash_size = hash_size;
967 #ifdef USE_SPINLOCKS
968         newdb->rwlocks = size;
969 #endif
970         if (tdb->flags & TDB_INTERNAL) {
971                 tdb->map_size = size;
972                 tdb->map_ptr = (char *)newdb;
973                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
974                 /* Convert the `ondisk' version if asked. */
975                 CONVERT(*newdb);
976                 return 0;
977         }
978         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
979                 goto fail;
980
981         if (ftruncate(tdb->fd, 0) == -1)
982                 goto fail;
983
984         /* This creates an endian-converted header, as if read from disk */
985         CONVERT(*newdb);
986         memcpy(&tdb->header, newdb, sizeof(tdb->header));
987         /* Don't endian-convert the magic food! */
988         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
989         if (write(tdb->fd, newdb, size) != size)
990                 ret = -1;
991         else
992                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
993
994   fail:
995         SAFE_FREE(newdb);
996         return ret;
997 }
998
999 /* Returns 0 on fail.  On success, return offset of record, and fills
1000    in rec */
1001 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
1002                         struct list_struct *r)
1003 {
1004         tdb_off rec_ptr;
1005         
1006         /* read in the hash top */
1007         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
1008                 return 0;
1009
1010         /* keep looking until we find the right record */
1011         while (rec_ptr) {
1012                 if (rec_read(tdb, rec_ptr, r) == -1)
1013                         return 0;
1014
1015                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
1016                         char *k;
1017                         /* a very likely hit - read the key */
1018                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
1019                                            r->key_len);
1020                         if (!k)
1021                                 return 0;
1022
1023                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1024                                 SAFE_FREE(k);
1025                                 return rec_ptr;
1026                         }
1027                         SAFE_FREE(k);
1028                 }
1029                 rec_ptr = r->next;
1030         }
1031         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1032 }
1033
1034 /* As tdb_find, but if you succeed, keep the lock */
1035 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1036                              struct list_struct *rec)
1037 {
1038         u32 rec_ptr;
1039
1040         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1041                 return 0;
1042         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1043                 tdb_unlock(tdb, BUCKET(hash), locktype);
1044         return rec_ptr;
1045 }
1046
1047 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1048 {
1049         return tdb->ecode;
1050 }
1051
1052 static struct tdb_errname {
1053         enum TDB_ERROR ecode; const char *estring;
1054 } emap[] = { {TDB_SUCCESS, "Success"},
1055              {TDB_ERR_CORRUPT, "Corrupt database"},
1056              {TDB_ERR_IO, "IO Error"},
1057              {TDB_ERR_LOCK, "Locking error"},
1058              {TDB_ERR_OOM, "Out of memory"},
1059              {TDB_ERR_EXISTS, "Record exists"},
1060              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1061              {TDB_ERR_NOEXIST, "Record does not exist"} };
1062
1063 /* Error string for the last tdb error */
1064 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1065 {
1066         u32 i;
1067         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1068                 if (tdb->ecode == emap[i].ecode)
1069                         return emap[i].estring;
1070         return "Invalid error code";
1071 }
1072
1073 /* update an entry in place - this only works if the new data size
1074    is <= the old data size and the key exists.
1075    on failure return -1.
1076 */
1077
1078 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1079 {
1080         struct list_struct rec;
1081         tdb_off rec_ptr;
1082
1083         /* find entry */
1084         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1085                 return -1;
1086
1087         /* must be long enough key, data and tailer */
1088         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1089                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1090                 return -1;
1091         }
1092
1093         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1094                       dbuf.dptr, dbuf.dsize) == -1)
1095                 return -1;
1096
1097         if (dbuf.dsize != rec.data_len) {
1098                 /* update size */
1099                 rec.data_len = dbuf.dsize;
1100                 return rec_write(tdb, rec_ptr, &rec);
1101         }
1102  
1103         return 0;
1104 }
1105
1106 /* find an entry in the database given a key */
1107 /* If an entry doesn't exist tdb_err will be set to
1108  * TDB_ERR_NOEXIST. If a key has no data attached
1109  * tdb_err will not be set. Both will return a
1110  * zero pptr and zero dsize.
1111  */
1112
1113 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1114 {
1115         tdb_off rec_ptr;
1116         struct list_struct rec;
1117         TDB_DATA ret;
1118         u32 hash;
1119
1120         /* find which hash bucket it is in */
1121         hash = tdb->hash_fn(&key);
1122         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1123                 return tdb_null;
1124
1125         if (rec.data_len)
1126                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1127                                           rec.data_len);
1128         else
1129                 ret.dptr = NULL;
1130         ret.dsize = rec.data_len;
1131         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1132         return ret;
1133 }
1134
1135 /* check if an entry in the database exists 
1136
1137    note that 1 is returned if the key is found and 0 is returned if not found
1138    this doesn't match the conventions in the rest of this module, but is
1139    compatible with gdbm
1140 */
1141 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1142 {
1143         struct list_struct rec;
1144         
1145         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1146                 return 0;
1147         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1148         return 1;
1149 }
1150
1151 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1152 {
1153         u32 hash = tdb->hash_fn(&key);
1154         return tdb_exists_hash(tdb, key, hash);
1155 }
1156
1157 /* record lock stops delete underneath */
1158 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1159 {
1160         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1161 }
1162 /*
1163   Write locks override our own fcntl readlocks, so check it here.
1164   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1165   an error to fail to get the lock here.
1166 */
1167  
1168 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1169 {
1170         struct tdb_traverse_lock *i;
1171         for (i = &tdb->travlocks; i; i = i->next)
1172                 if (i->off == off)
1173                         return -1;
1174         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1175 }
1176
1177 /*
1178   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1179   an error to fail to get the lock here.
1180 */
1181
1182 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1183 {
1184         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1185 }
1186 /* fcntl locks don't stack: avoid unlocking someone else's */
1187 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1188 {
1189         struct tdb_traverse_lock *i;
1190         u32 count = 0;
1191
1192         if (off == 0)
1193                 return 0;
1194         for (i = &tdb->travlocks; i; i = i->next)
1195                 if (i->off == off)
1196                         count++;
1197         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1198 }
1199
1200 /* actually delete an entry in the database given the offset */
1201 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1202 {
1203         tdb_off last_ptr, i;
1204         struct list_struct lastrec;
1205
1206         if (tdb->read_only) return -1;
1207
1208         if (write_lock_record(tdb, rec_ptr) == -1) {
1209                 /* Someone traversing here: mark it as dead */
1210                 rec->magic = TDB_DEAD_MAGIC;
1211                 return rec_write(tdb, rec_ptr, rec);
1212         }
1213         if (write_unlock_record(tdb, rec_ptr) != 0)
1214                 return -1;
1215
1216         /* find previous record in hash chain */
1217         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1218                 return -1;
1219         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1220                 if (rec_read(tdb, i, &lastrec) == -1)
1221                         return -1;
1222
1223         /* unlink it: next ptr is at start of record. */
1224         if (last_ptr == 0)
1225                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1226         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1227                 return -1;
1228
1229         /* recover the space */
1230         if (tdb_free(tdb, rec_ptr, rec) == -1)
1231                 return -1;
1232         return 0;
1233 }
1234
1235 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1236 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1237                          struct list_struct *rec)
1238 {
1239         int want_next = (tlock->off != 0);
1240
1241         /* Lock each chain from the start one. */
1242         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1243                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1244                         return -1;
1245
1246                 /* No previous record?  Start at top of chain. */
1247                 if (!tlock->off) {
1248                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1249                                      &tlock->off) == -1)
1250                                 goto fail;
1251                 } else {
1252                         /* Otherwise unlock the previous record. */
1253                         if (unlock_record(tdb, tlock->off) != 0)
1254                                 goto fail;
1255                 }
1256
1257                 if (want_next) {
1258                         /* We have offset of old record: grab next */
1259                         if (rec_read(tdb, tlock->off, rec) == -1)
1260                                 goto fail;
1261                         tlock->off = rec->next;
1262                 }
1263
1264                 /* Iterate through chain */
1265                 while( tlock->off) {
1266                         tdb_off current;
1267                         if (rec_read(tdb, tlock->off, rec) == -1)
1268                                 goto fail;
1269                         if (!TDB_DEAD(rec)) {
1270                                 /* Woohoo: we found one! */
1271                                 if (lock_record(tdb, tlock->off) != 0)
1272                                         goto fail;
1273                                 return tlock->off;
1274                         }
1275                         /* Try to clean dead ones from old traverses */
1276                         current = tlock->off;
1277                         tlock->off = rec->next;
1278                         if (!tdb->read_only && 
1279                             do_delete(tdb, current, rec) != 0)
1280                                 goto fail;
1281                 }
1282                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1283                 want_next = 0;
1284         }
1285         /* We finished iteration without finding anything */
1286         return TDB_ERRCODE(TDB_SUCCESS, 0);
1287
1288  fail:
1289         tlock->off = 0;
1290         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1291                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1292         return -1;
1293 }
1294
1295 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1296    return -1 on error or the record count traversed
1297    if fn is NULL then it is not called
1298    a non-zero return value from fn() indicates that the traversal should stop
1299   */
1300 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private)
1301 {
1302         TDB_DATA key, dbuf;
1303         struct list_struct rec;
1304         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1305         int ret, count = 0;
1306
1307         /* This was in the initializaton, above, but the IRIX compiler
1308          * did not like it.  crh
1309          */
1310         tl.next = tdb->travlocks.next;
1311
1312         /* fcntl locks don't stack: beware traverse inside traverse */
1313         tdb->travlocks.next = &tl;
1314
1315         /* tdb_next_lock places locks on the record returned, and its chain */
1316         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1317                 count++;
1318                 /* now read the full record */
1319                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1320                                           rec.key_len + rec.data_len);
1321                 if (!key.dptr) {
1322                         ret = -1;
1323                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1324                                 goto out;
1325                         if (unlock_record(tdb, tl.off) != 0)
1326                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1327                         goto out;
1328                 }
1329                 key.dsize = rec.key_len;
1330                 dbuf.dptr = key.dptr + rec.key_len;
1331                 dbuf.dsize = rec.data_len;
1332
1333                 /* Drop chain lock, call out */
1334                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1335                         ret = -1;
1336                         goto out;
1337                 }
1338                 if (fn && fn(tdb, key, dbuf, private)) {
1339                         /* They want us to terminate traversal */
1340                         ret = count;
1341                         if (unlock_record(tdb, tl.off) != 0) {
1342                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1343                                 ret = -1;
1344                         }
1345                         tdb->travlocks.next = tl.next;
1346                         SAFE_FREE(key.dptr);
1347                         return count;
1348                 }
1349                 SAFE_FREE(key.dptr);
1350         }
1351 out:
1352         tdb->travlocks.next = tl.next;
1353         if (ret < 0)
1354                 return -1;
1355         else
1356                 return count;
1357 }
1358
1359 /* find the first entry in the database and return its key */
1360 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1361 {
1362         TDB_DATA key;
1363         struct list_struct rec;
1364
1365         /* release any old lock */
1366         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1367                 return tdb_null;
1368         tdb->travlocks.off = tdb->travlocks.hash = 0;
1369
1370         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1371                 return tdb_null;
1372         /* now read the key */
1373         key.dsize = rec.key_len;
1374         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1375         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1376                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1377         return key;
1378 }
1379
1380 /* find the next entry in the database, returning its key */
1381 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1382 {
1383         u32 oldhash;
1384         TDB_DATA key = tdb_null;
1385         struct list_struct rec;
1386         char *k = NULL;
1387
1388         /* Is locked key the old key?  If so, traverse will be reliable. */
1389         if (tdb->travlocks.off) {
1390                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1391                         return tdb_null;
1392                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1393                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1394                                             rec.key_len))
1395                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1396                         /* No, it wasn't: unlock it and start from scratch */
1397                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1398                                 return tdb_null;
1399                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1400                                 return tdb_null;
1401                         tdb->travlocks.off = 0;
1402                 }
1403
1404                 SAFE_FREE(k);
1405         }
1406
1407         if (!tdb->travlocks.off) {
1408                 /* No previous element: do normal find, and lock record */
1409                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb->hash_fn(&oldkey), F_WRLCK, &rec);
1410                 if (!tdb->travlocks.off)
1411                         return tdb_null;
1412                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1413                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1414                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1415                         return tdb_null;
1416                 }
1417         }
1418         oldhash = tdb->travlocks.hash;
1419
1420         /* Grab next record: locks chain and returned record,
1421            unlocks old record */
1422         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1423                 key.dsize = rec.key_len;
1424                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1425                                           key.dsize);
1426                 /* Unlock the chain of this new record */
1427                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1428                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1429         }
1430         /* Unlock the chain of old record */
1431         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1432                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1433         return key;
1434 }
1435
1436 /* delete an entry in the database given a key */
1437 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1438 {
1439         tdb_off rec_ptr;
1440         struct list_struct rec;
1441         int ret;
1442
1443         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1444                 return -1;
1445         ret = do_delete(tdb, rec_ptr, &rec);
1446         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1447                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1448         return ret;
1449 }
1450
1451 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1452 {
1453         u32 hash = tdb->hash_fn(&key);
1454         return tdb_delete_hash(tdb, key, hash);
1455 }
1456
1457 /* store an element in the database, replacing any existing element
1458    with the same key 
1459
1460    return 0 on success, -1 on failure
1461 */
1462 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1463 {
1464         struct list_struct rec;
1465         u32 hash;
1466         tdb_off rec_ptr;
1467         char *p = NULL;
1468         int ret = 0;
1469
1470         /* find which hash bucket it is in */
1471         hash = tdb->hash_fn(&key);
1472         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1473                 return -1;
1474
1475         /* check for it existing, on insert. */
1476         if (flag == TDB_INSERT) {
1477                 if (tdb_exists_hash(tdb, key, hash)) {
1478                         tdb->ecode = TDB_ERR_EXISTS;
1479                         goto fail;
1480                 }
1481         } else {
1482                 /* first try in-place update, on modify or replace. */
1483                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1484                         goto out;
1485                 if (tdb->ecode == TDB_ERR_NOEXIST &&
1486                     flag == TDB_MODIFY) {
1487                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
1488                          we should fail the store */
1489                         goto fail;
1490         }
1491         }
1492         /* reset the error code potentially set by the tdb_update() */
1493         tdb->ecode = TDB_SUCCESS;
1494
1495         /* delete any existing record - if it doesn't exist we don't
1496            care.  Doing this first reduces fragmentation, and avoids
1497            coalescing with `allocated' block before it's updated. */
1498         if (flag != TDB_INSERT)
1499                 tdb_delete_hash(tdb, key, hash);
1500
1501         /* Copy key+value *before* allocating free space in case malloc
1502            fails and we are left with a dead spot in the tdb. */
1503
1504         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1505                 tdb->ecode = TDB_ERR_OOM;
1506                 goto fail;
1507         }
1508
1509         memcpy(p, key.dptr, key.dsize);
1510         if (dbuf.dsize)
1511                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1512
1513         /* we have to allocate some space */
1514         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1515                 goto fail;
1516
1517         /* Read hash top into next ptr */
1518         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1519                 goto fail;
1520
1521         rec.key_len = key.dsize;
1522         rec.data_len = dbuf.dsize;
1523         rec.full_hash = hash;
1524         rec.magic = TDB_MAGIC;
1525
1526         /* write out and point the top of the hash chain at it */
1527         if (rec_write(tdb, rec_ptr, &rec) == -1
1528             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1529             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1530                 /* Need to tdb_unallocate() here */
1531                 goto fail;
1532         }
1533  out:
1534         SAFE_FREE(p); 
1535         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1536         return ret;
1537 fail:
1538         ret = -1;
1539         goto out;
1540 }
1541
1542 /* Attempt to append data to an entry in place - this only works if the new data size
1543    is <= the old data size and the key exists.
1544    on failure return -1. Record must be locked before calling.
1545 */
1546 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1547 {
1548         struct list_struct rec;
1549         tdb_off rec_ptr;
1550
1551         /* find entry */
1552         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1553                 return -1;
1554
1555         /* Append of 0 is always ok. */
1556         if (new_dbuf.dsize == 0)
1557                 return 0;
1558
1559         /* must be long enough for key, old data + new data and tailer */
1560         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1561                 /* No room. */
1562                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1563                 return -1;
1564         }
1565
1566         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1567                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1568                 return -1;
1569
1570         /* update size */
1571         rec.data_len += new_dbuf.dsize;
1572         return rec_write(tdb, rec_ptr, &rec);
1573 }
1574
1575 /* Append to an entry. Create if not exist. */
1576
1577 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1578 {
1579         struct list_struct rec;
1580         u32 hash;
1581         tdb_off rec_ptr;
1582         char *p = NULL;
1583         int ret = 0;
1584         size_t new_data_size = 0;
1585
1586         /* find which hash bucket it is in */
1587         hash = tdb->hash_fn(&key);
1588         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1589                 return -1;
1590
1591         /* first try in-place. */
1592         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1593                 goto out;
1594
1595         /* reset the error code potentially set by the tdb_append_inplace() */
1596         tdb->ecode = TDB_SUCCESS;
1597
1598         /* find entry */
1599         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1600                 if (tdb->ecode != TDB_ERR_NOEXIST)
1601                         goto fail;
1602
1603                 /* Not found - create. */
1604
1605                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1606                 goto out;
1607         }
1608
1609         new_data_size = rec.data_len + new_dbuf.dsize;
1610
1611         /* Copy key+old_value+value *before* allocating free space in case malloc
1612            fails and we are left with a dead spot in the tdb. */
1613
1614         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1615                 tdb->ecode = TDB_ERR_OOM;
1616                 goto fail;
1617         }
1618
1619         /* Copy the key in place. */
1620         memcpy(p, key.dptr, key.dsize);
1621
1622         /* Now read the old data into place. */
1623         if (rec.data_len &&
1624                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1625                         goto fail;
1626
1627         /* Finally append the new data. */
1628         if (new_dbuf.dsize)
1629                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1630
1631         /* delete any existing record - if it doesn't exist we don't
1632            care.  Doing this first reduces fragmentation, and avoids
1633            coalescing with `allocated' block before it's updated. */
1634
1635         tdb_delete_hash(tdb, key, hash);
1636
1637         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1638                 goto fail;
1639
1640         /* Read hash top into next ptr */
1641         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1642                 goto fail;
1643
1644         rec.key_len = key.dsize;
1645         rec.data_len = new_data_size;
1646         rec.full_hash = hash;
1647         rec.magic = TDB_MAGIC;
1648
1649         /* write out and point the top of the hash chain at it */
1650         if (rec_write(tdb, rec_ptr, &rec) == -1
1651             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1652             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1653                 /* Need to tdb_unallocate() here */
1654                 goto fail;
1655         }
1656
1657  out:
1658         SAFE_FREE(p); 
1659         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1660         return ret;
1661
1662 fail:
1663         ret = -1;
1664         goto out;
1665 }
1666
1667 static int tdb_already_open(dev_t device,
1668                             ino_t ino)
1669 {
1670         TDB_CONTEXT *i;
1671         
1672         for (i = tdbs; i; i = i->next) {
1673                 if (i->device == device && i->inode == ino) {
1674                         return 1;
1675                 }
1676         }
1677
1678         return 0;
1679 }
1680
1681 /* This is based on the hash algorithm from gdbm */
1682 static u32 default_tdb_hash(TDB_DATA *key)
1683 {
1684         u32 value;      /* Used to compute the hash value.  */
1685         u32   i;        /* Used to cycle through random values. */
1686
1687         /* Set the initial value from the key size. */
1688         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
1689                 value = (value + (key->dptr[i] << (i*5 % 24)));
1690
1691         return (1103515243 * value + 12345);  
1692 }
1693
1694 /* open the database, creating it if necessary 
1695
1696    The open_flags and mode are passed straight to the open call on the
1697    database file. A flags value of O_WRONLY is invalid. The hash size
1698    is advisory, use zero for a default value.
1699
1700    Return is NULL on error, in which case errno is also set.  Don't 
1701    try to call tdb_error or tdb_errname, just do strerror(errno).
1702
1703    @param name may be NULL for internal databases. */
1704 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1705                       int open_flags, mode_t mode)
1706 {
1707         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL, NULL);
1708 }
1709
1710
1711 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1712                          int open_flags, mode_t mode,
1713                          tdb_log_func log_fn,
1714                          tdb_hash_func hash_fn)
1715 {
1716         TDB_CONTEXT *tdb;
1717         struct stat st;
1718         int rev = 0, locked = 0;
1719         unsigned char *vp;
1720         u32 vertest;
1721
1722         if (!(tdb = calloc(1, sizeof *tdb))) {
1723                 /* Can't log this */
1724                 errno = ENOMEM;
1725                 goto fail;
1726         }
1727         tdb->fd = -1;
1728         tdb->name = NULL;
1729         tdb->map_ptr = NULL;
1730         tdb->flags = tdb_flags;
1731         tdb->open_flags = open_flags;
1732         tdb->log_fn = log_fn;
1733         tdb->hash_fn = hash_fn ? hash_fn : default_tdb_hash;
1734
1735         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1736                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1737                          name));
1738                 errno = EINVAL;
1739                 goto fail;
1740         }
1741         
1742         if (hash_size == 0)
1743                 hash_size = DEFAULT_HASH_SIZE;
1744         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1745                 tdb->read_only = 1;
1746                 /* read only databases don't do locking or clear if first */
1747                 tdb->flags |= TDB_NOLOCK;
1748                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1749         }
1750
1751         /* internal databases don't mmap or lock, and start off cleared */
1752         if (tdb->flags & TDB_INTERNAL) {
1753                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1754                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1755                 if (tdb_new_database(tdb, hash_size) != 0) {
1756                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1757                         goto fail;
1758                 }
1759                 goto internal;
1760         }
1761
1762         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1763                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1764                          name, strerror(errno)));
1765                 goto fail;      /* errno set by open(2) */
1766         }
1767
1768         /* ensure there is only one process initialising at once */
1769         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1770                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1771                          name, strerror(errno)));
1772                 goto fail;      /* errno set by tdb_brlock */
1773         }
1774
1775         /* we need to zero database if we are the only one with it open */
1776         if ((tdb_flags & TDB_CLEAR_IF_FIRST) &&
1777                 (locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))) {
1778                 open_flags |= O_CREAT;
1779                 if (ftruncate(tdb->fd, 0) == -1) {
1780                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1781                                  "failed to truncate %s: %s\n",
1782                                  name, strerror(errno)));
1783                         goto fail; /* errno set by ftruncate */
1784                 }
1785         }
1786
1787         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1788             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1789             || (tdb->header.version != TDB_VERSION
1790                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1791                 /* its not a valid database - possibly initialise it */
1792                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1793                         errno = EIO; /* ie bad format or something */
1794                         goto fail;
1795                 }
1796                 rev = (tdb->flags & TDB_CONVERT);
1797         }
1798         vp = (unsigned char *)&tdb->header.version;
1799         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1800                   (((u32)vp[2]) << 8) | (u32)vp[3];
1801         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1802         if (!rev)
1803                 tdb->flags &= ~TDB_CONVERT;
1804         else {
1805                 tdb->flags |= TDB_CONVERT;
1806                 convert(&tdb->header, sizeof(tdb->header));
1807         }
1808         if (fstat(tdb->fd, &st) == -1)
1809                 goto fail;
1810
1811         /* Is it already in the open list?  If so, fail. */
1812         if (tdb_already_open(st.st_dev, st.st_ino)) {
1813                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1814                          "%s (%d,%d) is already open in this process\n",
1815                          name, (int)st.st_dev, (int)st.st_ino));
1816                 errno = EBUSY;
1817                 goto fail;
1818         }
1819
1820         if (!(tdb->name = (char *)strdup(name))) {
1821                 errno = ENOMEM;
1822                 goto fail;
1823         }
1824
1825         tdb->map_size = st.st_size;
1826         tdb->device = st.st_dev;
1827         tdb->inode = st.st_ino;
1828         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1829         if (!tdb->locked) {
1830                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1831                          "failed to allocate lock structure for %s\n",
1832                          name));
1833                 errno = ENOMEM;
1834                 goto fail;
1835         }
1836         tdb_mmap(tdb);
1837         if (locked) {
1838                 if (!tdb->read_only)
1839                         if (tdb_clear_spinlocks(tdb) != 0) {
1840                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1841                                 "failed to clear spinlock\n"));
1842                                 goto fail;
1843                         }
1844                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1845                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1846                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1847                                  name, strerror(errno)));
1848                         goto fail;
1849                 }
1850
1851         }
1852
1853         /* We always need to do this if the CLEAR_IF_FIRST flag is set, even if
1854            we didn't get the initial exclusive lock as we need to let all other
1855            users know we're using it. */
1856
1857         if (tdb_flags & TDB_CLEAR_IF_FIRST) {
1858                 /* leave this lock in place to indicate it's in use */
1859                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1860                         goto fail;
1861         }
1862
1863
1864  internal:
1865         /* Internal (memory-only) databases skip all the code above to
1866          * do with disk files, and resume here by releasing their
1867          * global lock and hooking into the active list. */
1868         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1869                 goto fail;
1870         tdb->next = tdbs;
1871         tdbs = tdb;
1872         return tdb;
1873
1874  fail:
1875         { int save_errno = errno;
1876
1877         if (!tdb)
1878                 return NULL;
1879         
1880         if (tdb->map_ptr) {
1881                 if (tdb->flags & TDB_INTERNAL)
1882                         SAFE_FREE(tdb->map_ptr);
1883                 else
1884                         tdb_munmap(tdb);
1885         }
1886         SAFE_FREE(tdb->name);
1887         if (tdb->fd != -1)
1888                 if (close(tdb->fd) != 0)
1889                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1890         SAFE_FREE(tdb->locked);
1891         SAFE_FREE(tdb);
1892         errno = save_errno;
1893         return NULL;
1894         }
1895 }
1896
1897 /**
1898  * Close a database.
1899  *
1900  * @returns -1 for error; 0 for success.
1901  **/
1902 int tdb_close(TDB_CONTEXT *tdb)
1903 {
1904         TDB_CONTEXT **i;
1905         int ret = 0;
1906
1907         if (tdb->map_ptr) {
1908                 if (tdb->flags & TDB_INTERNAL)
1909                         SAFE_FREE(tdb->map_ptr);
1910                 else
1911                         tdb_munmap(tdb);
1912         }
1913         SAFE_FREE(tdb->name);
1914         if (tdb->fd != -1)
1915                 ret = close(tdb->fd);
1916         SAFE_FREE(tdb->locked);
1917
1918         /* Remove from contexts list */
1919         for (i = &tdbs; *i; i = &(*i)->next) {
1920                 if (*i == tdb) {
1921                         *i = tdb->next;
1922                         break;
1923                 }
1924         }
1925
1926         memset(tdb, 0, sizeof(*tdb));
1927         SAFE_FREE(tdb);
1928
1929         return ret;
1930 }
1931
1932 /* lock/unlock entire database */
1933 int tdb_lockall(TDB_CONTEXT *tdb)
1934 {
1935         u32 i;
1936
1937         /* There are no locks on read-only dbs */
1938         if (tdb->read_only)
1939                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1940         for (i = 0; i < tdb->header.hash_size; i++) 
1941                 if (tdb_lock(tdb, i, F_WRLCK))
1942                         break;
1943
1944         /* If error, release locks we have... */
1945         if (i < tdb->header.hash_size) {
1946                 u32 j;
1947
1948                 for ( j = 0; j < i; j++)
1949                         tdb_unlock(tdb, j, F_WRLCK);
1950                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1951         }
1952
1953         return 0;
1954 }
1955 void tdb_unlockall(TDB_CONTEXT *tdb)
1956 {
1957         u32 i;
1958         for (i=0; i < tdb->header.hash_size; i++)
1959                 tdb_unlock(tdb, i, F_WRLCK);
1960 }
1961
1962 /* lock/unlock one hash chain. This is meant to be used to reduce
1963    contention - it cannot guarantee how many records will be locked */
1964 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1965 {
1966         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1967 }
1968
1969 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1970 {
1971         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_WRLCK);
1972 }
1973
1974 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1975 {
1976         return tdb_lock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
1977 }
1978
1979 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1980 {
1981         return tdb_unlock(tdb, BUCKET(tdb->hash_fn(&key)), F_RDLCK);
1982 }
1983
1984
1985 /* register a loging function */
1986 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1987 {
1988         tdb->log_fn = fn;
1989 }
1990
1991 /* reopen a tdb - this can be used after a fork to ensure that we have an independent
1992    seek pointer from our parent and to re-establish locks */
1993 int tdb_reopen(TDB_CONTEXT *tdb)
1994 {
1995         struct stat st;
1996
1997         if (tdb->flags & TDB_INTERNAL)
1998                 return 0; /* Nothing to do. */
1999         if (tdb_munmap(tdb) != 0) {
2000                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2001                 goto fail;
2002         }
2003         if (close(tdb->fd) != 0)
2004                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2005         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2006         if (tdb->fd == -1) {
2007                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2008                 goto fail;
2009         }
2010         if (fstat(tdb->fd, &st) != 0) {
2011                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2012                 goto fail;
2013         }
2014         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2015                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2016                 goto fail;
2017         }
2018         tdb_mmap(tdb);
2019         if ((tdb->flags & TDB_CLEAR_IF_FIRST) && (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)) {
2020                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2021                 goto fail;
2022         }
2023
2024         return 0;
2025
2026 fail:
2027         tdb_close(tdb);
2028         return -1;
2029 }
2030
2031 /* reopen all tdb's */
2032 int tdb_reopen_all(void)
2033 {
2034         TDB_CONTEXT *tdb;
2035
2036         for (tdb=tdbs; tdb; tdb = tdb->next) {
2037                 /* Ensure no clear-if-first. */
2038                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
2039                 if (tdb_reopen(tdb) != 0)
2040                         return -1;
2041         }
2042
2043         return 0;
2044 }