This commit was manufactured by cvs2svn to create branch 'SAMBA_3_0'.(This used to...
[samba.git] / source3 / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23 #ifdef STANDALONE
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <string.h>
33 #include <fcntl.h>
34 #include <errno.h>
35 #include <sys/mman.h>
36 #include <sys/stat.h>
37 #include <signal.h>
38 #include "tdb.h"
39 #include "spinlock.h"
40 #else
41 #include "includes.h"
42 #endif
43
44 #define TDB_MAGIC_FOOD "TDB file\n"
45 #define TDB_VERSION (0x26011967 + 6)
46 #define TDB_MAGIC (0x26011999U)
47 #define TDB_FREE_MAGIC (~TDB_MAGIC)
48 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
49 #define TDB_ALIGNMENT 4
50 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
51 #define DEFAULT_HASH_SIZE 131
52 #define TDB_PAGE_SIZE 0x2000
53 #define FREELIST_TOP (sizeof(struct tdb_header))
54 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
55 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
56 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
57 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
58 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
59
60 /* NB assumes there is a local variable called "tdb" that is the
61  * current context, also takes doubly-parenthesized print-style
62  * argument. */
63 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
64
65 /* lock offsets */
66 #define GLOBAL_LOCK 0
67 #define ACTIVE_LOCK 4
68
69 #ifndef MAP_FILE
70 #define MAP_FILE 0
71 #endif
72
73 #ifndef MAP_FAILED
74 #define MAP_FAILED ((void *)-1)
75 #endif
76
77 /* free memory if the pointer is valid and zero the pointer */
78 #ifndef SAFE_FREE
79 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
80 #endif
81
82 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
83 TDB_DATA tdb_null;
84
85 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
86 static TDB_CONTEXT *tdbs = NULL;
87
88 static int tdb_munmap(TDB_CONTEXT *tdb)
89 {
90         if (tdb->flags & TDB_INTERNAL)
91                 return 0;
92
93 #ifdef HAVE_MMAP
94         if (tdb->map_ptr) {
95                 int ret = munmap(tdb->map_ptr, tdb->map_size);
96                 if (ret != 0)
97                         return ret;
98         }
99 #endif
100         tdb->map_ptr = NULL;
101         return 0;
102 }
103
104 static void tdb_mmap(TDB_CONTEXT *tdb)
105 {
106         if (tdb->flags & TDB_INTERNAL)
107                 return;
108
109 #ifdef HAVE_MMAP
110         if (!(tdb->flags & TDB_NOMMAP)) {
111                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
112                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
113                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
114
115                 /*
116                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
117                  */
118
119                 if (tdb->map_ptr == MAP_FAILED) {
120                         tdb->map_ptr = NULL;
121                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
122                                  tdb->map_size, strerror(errno)));
123                 }
124         } else {
125                 tdb->map_ptr = NULL;
126         }
127 #else
128         tdb->map_ptr = NULL;
129 #endif
130 }
131
132 /* Endian conversion: we only ever deal with 4 byte quantities */
133 static void *convert(void *buf, u32 size)
134 {
135         u32 i, *p = buf;
136         for (i = 0; i < size / 4; i++)
137                 p[i] = TDB_BYTEREV(p[i]);
138         return buf;
139 }
140 #define DOCONV() (tdb->flags & TDB_CONVERT)
141 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
142
143 /* the body of the database is made of one list_struct for the free space
144    plus a separate data list for each hash value */
145 struct list_struct {
146         tdb_off next; /* offset of the next record in the list */
147         tdb_len rec_len; /* total byte length of record */
148         tdb_len key_len; /* byte length of key */
149         tdb_len data_len; /* byte length of data */
150         u32 full_hash; /* the full 32 bit hash of the key */
151         u32 magic;   /* try to catch errors */
152         /* the following union is implied:
153                 union {
154                         char record[rec_len];
155                         struct {
156                                 char key[key_len];
157                                 char data[data_len];
158                         }
159                         u32 totalsize; (tailer)
160                 }
161         */
162 };
163
164 /***************************************************************
165  Allow a caller to set a "alarm" flag that tdb can check to abort
166  a blocking lock on SIGALRM.
167 ***************************************************************/
168
169 static sig_atomic_t *palarm_fired;
170
171 void tdb_set_lock_alarm(sig_atomic_t *palarm)
172 {
173         palarm_fired = palarm;
174 }
175
176 /* a byte range locking function - return 0 on success
177    this functions locks/unlocks 1 byte at the specified offset.
178
179    On error, errno is also set so that errors are passed back properly
180    through tdb_open(). */
181 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
182                       int rw_type, int lck_type, int probe)
183 {
184         struct flock fl;
185         int ret;
186
187         if (tdb->flags & TDB_NOLOCK)
188                 return 0;
189         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
190                 errno = EACCES;
191                 return -1;
192         }
193
194         fl.l_type = rw_type;
195         fl.l_whence = SEEK_SET;
196         fl.l_start = offset;
197         fl.l_len = 1;
198         fl.l_pid = 0;
199
200         do {
201                 ret = fcntl(tdb->fd,lck_type,&fl);
202                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
203                         break;
204         } while (ret == -1 && errno == EINTR);
205
206         if (ret == -1) {
207                 if (!probe && lck_type != F_SETLK) {
208                         /* Ensure error code is set for log fun to examine. */
209                         if (errno == EINTR && palarm_fired && *palarm_fired)
210                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
211                         else
212                                 tdb->ecode = TDB_ERR_LOCK;
213                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
214                                  tdb->fd, offset, rw_type, lck_type));
215                 }
216                 /* Was it an alarm timeout ? */
217                 if (errno == EINTR && palarm_fired && *palarm_fired)
218                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
219                 /* Otherwise - generic lock error. */
220                 /* errno set by fcntl */
221                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
222         }
223         return 0;
224 }
225
226 /* lock a list in the database. list -1 is the alloc list */
227 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
228 {
229         if (list < -1 || list >= (int)tdb->header.hash_size) {
230                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
231                            list, ltype));
232                 return -1;
233         }
234         if (tdb->flags & TDB_NOLOCK)
235                 return 0;
236
237         /* Since fcntl locks don't nest, we do a lock for the first one,
238            and simply bump the count for future ones */
239         if (tdb->locked[list+1].count == 0) {
240                 if (!tdb->read_only && tdb->header.rwlocks) {
241                         if (tdb_spinlock(tdb, list, ltype)) {
242                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
243                                            list, ltype));
244                                 return -1;
245                         }
246                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
247                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
248                                            list, ltype, strerror(errno)));
249                         return -1;
250                 }
251                 tdb->locked[list+1].ltype = ltype;
252         }
253         tdb->locked[list+1].count++;
254         return 0;
255 }
256
257 /* unlock the database: returns void because it's too late for errors. */
258         /* changed to return int it may be interesting to know there
259            has been an error  --simo */
260 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
261 {
262         int ret = -1;
263
264         if (tdb->flags & TDB_NOLOCK)
265                 return 0;
266
267         /* Sanity checks */
268         if (list < -1 || list >= (int)tdb->header.hash_size) {
269                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
270                 return ret;
271         }
272
273         if (tdb->locked[list+1].count==0) {
274                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
275                 return ret;
276         }
277
278         if (tdb->locked[list+1].count == 1) {
279                 /* Down to last nested lock: unlock underneath */
280                 if (!tdb->read_only && tdb->header.rwlocks) {
281                         ret = tdb_spinunlock(tdb, list, ltype);
282                 } else {
283                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
284                 }
285         } else {
286                 ret = 0;
287         }
288         tdb->locked[list+1].count--;
289
290         if (ret)
291                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
292         return ret;
293 }
294
295 /* This is based on the hash algorithm from gdbm */
296 static u32 tdb_hash(TDB_DATA *key)
297 {
298         u32 value;      /* Used to compute the hash value.  */
299         u32   i;        /* Used to cycle through random values. */
300
301         /* Set the initial value from the key size. */
302         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
303                 value = (value + (key->dptr[i] << (i*5 % 24)));
304
305         return (1103515243 * value + 12345);  
306 }
307
308 /* check for an out of bounds access - if it is out of bounds then
309    see if the database has been expanded by someone else and expand
310    if necessary 
311    note that "len" is the minimum length needed for the db
312 */
313 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
314 {
315         struct stat st;
316         if (len <= tdb->map_size)
317                 return 0;
318         if (tdb->flags & TDB_INTERNAL) {
319                 if (!probe) {
320                         /* Ensure ecode is set for log fn. */
321                         tdb->ecode = TDB_ERR_IO;
322                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
323                                  (int)len, (int)tdb->map_size));
324                 }
325                 return TDB_ERRCODE(TDB_ERR_IO, -1);
326         }
327
328         if (fstat(tdb->fd, &st) == -1)
329                 return TDB_ERRCODE(TDB_ERR_IO, -1);
330
331         if (st.st_size < (size_t)len) {
332                 if (!probe) {
333                         /* Ensure ecode is set for log fn. */
334                         tdb->ecode = TDB_ERR_IO;
335                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
336                                  (int)len, (int)st.st_size));
337                 }
338                 return TDB_ERRCODE(TDB_ERR_IO, -1);
339         }
340
341         /* Unmap, update size, remap */
342         if (tdb_munmap(tdb) == -1)
343                 return TDB_ERRCODE(TDB_ERR_IO, -1);
344         tdb->map_size = st.st_size;
345         tdb_mmap(tdb);
346         return 0;
347 }
348
349 /* write a lump of data at a specified offset */
350 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
351 {
352         if (tdb_oob(tdb, off + len, 0) != 0)
353                 return -1;
354
355         if (tdb->map_ptr)
356                 memcpy(off + (char *)tdb->map_ptr, buf, len);
357 #ifdef HAVE_PWRITE
358         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
359 #else
360         else if (lseek(tdb->fd, off, SEEK_SET) != off
361                  || write(tdb->fd, buf, len) != (ssize_t)len) {
362 #endif
363                 /* Ensure ecode is set for log fn. */
364                 tdb->ecode = TDB_ERR_IO;
365                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
366                            off, len, strerror(errno)));
367                 return TDB_ERRCODE(TDB_ERR_IO, -1);
368         }
369         return 0;
370 }
371
372 /* read a lump of data at a specified offset, maybe convert */
373 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
374 {
375         if (tdb_oob(tdb, off + len, 0) != 0)
376                 return -1;
377
378         if (tdb->map_ptr)
379                 memcpy(buf, off + (char *)tdb->map_ptr, len);
380 #ifdef HAVE_PREAD
381         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
382 #else
383         else if (lseek(tdb->fd, off, SEEK_SET) != off
384                  || read(tdb->fd, buf, len) != (ssize_t)len) {
385 #endif
386                 /* Ensure ecode is set for log fn. */
387                 tdb->ecode = TDB_ERR_IO;
388                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
389                            off, len, strerror(errno)));
390                 return TDB_ERRCODE(TDB_ERR_IO, -1);
391         }
392         if (cv)
393                 convert(buf, len);
394         return 0;
395 }
396
397 /* read a lump of data, allocating the space for it */
398 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
399 {
400         char *buf;
401
402         if (!(buf = malloc(len))) {
403                 /* Ensure ecode is set for log fn. */
404                 tdb->ecode = TDB_ERR_OOM;
405                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
406                            len, strerror(errno)));
407                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
408         }
409         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
410                 SAFE_FREE(buf);
411                 return NULL;
412         }
413         return buf;
414 }
415
416 /* read/write a tdb_off */
417 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
418 {
419         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
420 }
421 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
422 {
423         tdb_off off = *d;
424         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
425 }
426
427 /* read/write a record */
428 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
429 {
430         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
431                 return -1;
432         if (TDB_BAD_MAGIC(rec)) {
433                 /* Ensure ecode is set for log fn. */
434                 tdb->ecode = TDB_ERR_CORRUPT;
435                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
436                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
437         }
438         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
439 }
440 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
441 {
442         struct list_struct r = *rec;
443         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
444 }
445
446 /* read a freelist record and check for simple errors */
447 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
448 {
449         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
450                 return -1;
451
452         if (rec->magic == TDB_MAGIC) {
453                 /* this happens when a app is showdown while deleting a record - we should
454                    not completely fail when this happens */
455                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
456                          rec->magic, off));
457                 rec->magic = TDB_FREE_MAGIC;
458                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
459                         return -1;
460         }
461
462         if (rec->magic != TDB_FREE_MAGIC) {
463                 /* Ensure ecode is set for log fn. */
464                 tdb->ecode = TDB_ERR_CORRUPT;
465                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
466                            rec->magic, off));
467                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
468         }
469         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
470                 return -1;
471         return 0;
472 }
473
474 /* update a record tailer (must hold allocation lock) */
475 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
476                          const struct list_struct *rec)
477 {
478         tdb_off totalsize;
479
480         /* Offset of tailer from record header */
481         totalsize = sizeof(*rec) + rec->rec_len;
482         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
483                          &totalsize);
484 }
485
486 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
487 {
488         struct list_struct rec;
489         tdb_off tailer_ofs, tailer;
490
491         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
492                 printf("ERROR: failed to read record at %u\n", offset);
493                 return 0;
494         }
495
496         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
497                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
498
499         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
500         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
501                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
502                 return rec.next;
503         }
504
505         if (tailer != rec.rec_len + sizeof(rec)) {
506                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
507                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
508         }
509         return rec.next;
510 }
511
512 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
513 {
514         tdb_off rec_ptr, top;
515
516         top = TDB_HASH_TOP(i);
517
518         if (tdb_lock(tdb, i, F_WRLCK) != 0)
519                 return -1;
520
521         if (ofs_read(tdb, top, &rec_ptr) == -1)
522                 return tdb_unlock(tdb, i, F_WRLCK);
523
524         if (rec_ptr)
525                 printf("hash=%d\n", i);
526
527         while (rec_ptr) {
528                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
529         }
530
531         return tdb_unlock(tdb, i, F_WRLCK);
532 }
533
534 void tdb_dump_all(TDB_CONTEXT *tdb)
535 {
536         int i;
537         for (i=0;i<tdb->header.hash_size;i++) {
538                 tdb_dump_chain(tdb, i);
539         }
540         printf("freelist:\n");
541         tdb_dump_chain(tdb, -1);
542 }
543
544 int tdb_printfreelist(TDB_CONTEXT *tdb)
545 {
546         int ret;
547         long total_free = 0;
548         tdb_off offset, rec_ptr;
549         struct list_struct rec;
550
551         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
552                 return ret;
553
554         offset = FREELIST_TOP;
555
556         /* read in the freelist top */
557         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
558                 tdb_unlock(tdb, -1, F_WRLCK);
559                 return 0;
560         }
561
562         printf("freelist top=[0x%08x]\n", rec_ptr );
563         while (rec_ptr) {
564                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
565                         tdb_unlock(tdb, -1, F_WRLCK);
566                         return -1;
567                 }
568
569                 if (rec.magic != TDB_FREE_MAGIC) {
570                         printf("bad magic 0x%08x in free list\n", rec.magic);
571                         tdb_unlock(tdb, -1, F_WRLCK);
572                         return -1;
573                 }
574
575                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
576                 total_free += rec.rec_len;
577
578                 /* move to the next record */
579                 rec_ptr = rec.next;
580         }
581         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
582                (int)total_free);
583
584         return tdb_unlock(tdb, -1, F_WRLCK);
585 }
586
587 /* Remove an element from the freelist.  Must have alloc lock. */
588 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
589 {
590         tdb_off last_ptr, i;
591
592         /* read in the freelist top */
593         last_ptr = FREELIST_TOP;
594         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
595                 if (i == off) {
596                         /* We've found it! */
597                         return ofs_write(tdb, last_ptr, &next);
598                 }
599                 /* Follow chain (next offset is at start of record) */
600                 last_ptr = i;
601         }
602         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
603         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
604 }
605
606 /* Add an element into the freelist. Merge adjacent records if
607    neccessary. */
608 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
609 {
610         tdb_off right, left;
611
612         /* Allocation and tailer lock */
613         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
614                 return -1;
615
616         /* set an initial tailer, so if we fail we don't leave a bogus record */
617         if (update_tailer(tdb, offset, rec) != 0) {
618                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
619                 goto fail;
620         }
621
622         /* Look right first (I'm an Australian, dammit) */
623         right = offset + sizeof(*rec) + rec->rec_len;
624         if (right + sizeof(*rec) <= tdb->map_size) {
625                 struct list_struct r;
626
627                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
628                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
629                         goto left;
630                 }
631
632                 /* If it's free, expand to include it. */
633                 if (r.magic == TDB_FREE_MAGIC) {
634                         if (remove_from_freelist(tdb, right, r.next) == -1) {
635                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
636                                 goto left;
637                         }
638                         rec->rec_len += sizeof(r) + r.rec_len;
639                 }
640         }
641
642 left:
643         /* Look left */
644         left = offset - sizeof(tdb_off);
645         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
646                 struct list_struct l;
647                 tdb_off leftsize;
648
649                 /* Read in tailer and jump back to header */
650                 if (ofs_read(tdb, left, &leftsize) == -1) {
651                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
652                         goto update;
653                 }
654                 left = offset - leftsize;
655
656                 /* Now read in record */
657                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
658                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
659                         goto update;
660                 }
661
662                 /* If it's free, expand to include it. */
663                 if (l.magic == TDB_FREE_MAGIC) {
664                         if (remove_from_freelist(tdb, left, l.next) == -1) {
665                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
666                                 goto update;
667                         } else {
668                                 offset = left;
669                                 rec->rec_len += leftsize;
670                         }
671                 }
672         }
673
674 update:
675         if (update_tailer(tdb, offset, rec) == -1) {
676                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
677                 goto fail;
678         }
679
680         /* Now, prepend to free list */
681         rec->magic = TDB_FREE_MAGIC;
682
683         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
684             rec_write(tdb, offset, rec) == -1 ||
685             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
686                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
687                 goto fail;
688         }
689
690         /* And we're done. */
691         tdb_unlock(tdb, -1, F_WRLCK);
692         return 0;
693
694  fail:
695         tdb_unlock(tdb, -1, F_WRLCK);
696         return -1;
697 }
698
699
700 /* expand a file.  we prefer to use ftruncate, as that is what posix
701   says to use for mmap expansion */
702 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
703 {
704         char buf[1024];
705 #if HAVE_FTRUNCATE_EXTEND
706         if (ftruncate(tdb->fd, size+addition) != 0) {
707                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
708                            size+addition, strerror(errno)));
709                 return -1;
710         }
711 #else
712         char b = 0;
713
714 #ifdef HAVE_PWRITE
715         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
716 #else
717         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
718             write(tdb->fd, &b, 1) != 1) {
719 #endif
720                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
721                            size+addition, strerror(errno)));
722                 return -1;
723         }
724 #endif
725
726         /* now fill the file with something. This ensures that the file isn't sparse, which would be
727            very bad if we ran out of disk. This must be done with write, not via mmap */
728         memset(buf, 0x42, sizeof(buf));
729         while (addition) {
730                 int n = addition>sizeof(buf)?sizeof(buf):addition;
731 #ifdef HAVE_PWRITE
732                 int ret = pwrite(tdb->fd, buf, n, size);
733 #else
734                 int ret;
735                 if (lseek(tdb->fd, size, SEEK_SET) != size)
736                         return -1;
737                 ret = write(tdb->fd, buf, n);
738 #endif
739                 if (ret != n) {
740                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
741                                    n, strerror(errno)));
742                         return -1;
743                 }
744                 addition -= n;
745                 size += n;
746         }
747         return 0;
748 }
749
750
751 /* expand the database at least size bytes by expanding the underlying
752    file and doing the mmap again if necessary */
753 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
754 {
755         struct list_struct rec;
756         tdb_off offset;
757
758         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
759                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
760                 return -1;
761         }
762
763         /* must know about any previous expansions by another process */
764         tdb_oob(tdb, tdb->map_size + 1, 1);
765
766         /* always make room for at least 10 more records, and round
767            the database up to a multiple of TDB_PAGE_SIZE */
768         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
769
770         if (!(tdb->flags & TDB_INTERNAL))
771                 tdb_munmap(tdb);
772
773         /*
774          * We must ensure the file is unmapped before doing this
775          * to ensure consistency with systems like OpenBSD where
776          * writes and mmaps are not consistent.
777          */
778
779         /* expand the file itself */
780         if (!(tdb->flags & TDB_INTERNAL)) {
781                 if (expand_file(tdb, tdb->map_size, size) != 0)
782                         goto fail;
783         }
784
785         tdb->map_size += size;
786
787         if (tdb->flags & TDB_INTERNAL)
788                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
789         else {
790                 /*
791                  * We must ensure the file is remapped before adding the space
792                  * to ensure consistency with systems like OpenBSD where
793                  * writes and mmaps are not consistent.
794                  */
795
796                 /* We're ok if the mmap fails as we'll fallback to read/write */
797                 tdb_mmap(tdb);
798         }
799
800         /* form a new freelist record */
801         memset(&rec,'\0',sizeof(rec));
802         rec.rec_len = size - sizeof(rec);
803
804         /* link it into the free list */
805         offset = tdb->map_size - size;
806         if (tdb_free(tdb, offset, &rec) == -1)
807                 goto fail;
808
809         tdb_unlock(tdb, -1, F_WRLCK);
810         return 0;
811  fail:
812         tdb_unlock(tdb, -1, F_WRLCK);
813         return -1;
814 }
815
816 /* allocate some space from the free list. The offset returned points
817    to a unconnected list_struct within the database with room for at
818    least length bytes of total data
819
820    0 is returned if the space could not be allocated
821  */
822 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
823                             struct list_struct *rec)
824 {
825         tdb_off rec_ptr, last_ptr, newrec_ptr;
826         struct list_struct newrec;
827
828         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
829                 return 0;
830
831         /* Extra bytes required for tailer */
832         length += sizeof(tdb_off);
833
834  again:
835         last_ptr = FREELIST_TOP;
836
837         /* read in the freelist top */
838         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
839                 goto fail;
840
841         /* keep looking until we find a freelist record big enough */
842         while (rec_ptr) {
843                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
844                         goto fail;
845
846                 if (rec->rec_len >= length) {
847                         /* found it - now possibly split it up  */
848                         if (rec->rec_len > length + MIN_REC_SIZE) {
849                                 /* Length of left piece */
850                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
851
852                                 /* Right piece to go on free list */
853                                 newrec.rec_len = rec->rec_len
854                                         - (sizeof(*rec) + length);
855                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
856
857                                 /* And left record is shortened */
858                                 rec->rec_len = length;
859                         } else
860                                 newrec_ptr = 0;
861
862                         /* Remove allocated record from the free list */
863                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
864                                 goto fail;
865
866                         /* Update header: do this before we drop alloc
867                            lock, otherwise tdb_free() might try to
868                            merge with us, thinking we're free.
869                            (Thanks Jeremy Allison). */
870                         rec->magic = TDB_MAGIC;
871                         if (rec_write(tdb, rec_ptr, rec) == -1)
872                                 goto fail;
873
874                         /* Did we create new block? */
875                         if (newrec_ptr) {
876                                 /* Update allocated record tailer (we
877                                    shortened it). */
878                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
879                                         goto fail;
880
881                                 /* Free new record */
882                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
883                                         goto fail;
884                         }
885
886                         /* all done - return the new record offset */
887                         tdb_unlock(tdb, -1, F_WRLCK);
888                         return rec_ptr;
889                 }
890                 /* move to the next record */
891                 last_ptr = rec_ptr;
892                 rec_ptr = rec->next;
893         }
894         /* we didn't find enough space. See if we can expand the
895            database and if we can then try again */
896         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
897                 goto again;
898  fail:
899         tdb_unlock(tdb, -1, F_WRLCK);
900         return 0;
901 }
902
903 /* initialise a new database with a specified hash size */
904 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
905 {
906         struct tdb_header *newdb;
907         int size, ret = -1;
908
909         /* We make it up in memory, then write it out if not internal */
910         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
911         if (!(newdb = calloc(size, 1)))
912                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
913
914         /* Fill in the header */
915         newdb->version = TDB_VERSION;
916         newdb->hash_size = hash_size;
917 #ifdef USE_SPINLOCKS
918         newdb->rwlocks = size;
919 #endif
920         if (tdb->flags & TDB_INTERNAL) {
921                 tdb->map_size = size;
922                 tdb->map_ptr = (char *)newdb;
923                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
924                 /* Convert the `ondisk' version if asked. */
925                 CONVERT(*newdb);
926                 return 0;
927         }
928         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
929                 goto fail;
930
931         if (ftruncate(tdb->fd, 0) == -1)
932                 goto fail;
933
934         /* This creates an endian-converted header, as if read from disk */
935         CONVERT(*newdb);
936         memcpy(&tdb->header, newdb, sizeof(tdb->header));
937         /* Don't endian-convert the magic food! */
938         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
939         if (write(tdb->fd, newdb, size) != size)
940                 ret = -1;
941         else
942                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
943
944   fail:
945         SAFE_FREE(newdb);
946         return ret;
947 }
948
949 /* Returns 0 on fail.  On success, return offset of record, and fills
950    in rec */
951 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
952                         struct list_struct *r)
953 {
954         tdb_off rec_ptr;
955         
956         /* read in the hash top */
957         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
958                 return 0;
959
960         /* keep looking until we find the right record */
961         while (rec_ptr) {
962                 if (rec_read(tdb, rec_ptr, r) == -1)
963                         return 0;
964
965                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
966                         char *k;
967                         /* a very likely hit - read the key */
968                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
969                                            r->key_len);
970                         if (!k)
971                                 return 0;
972
973                         if (memcmp(key.dptr, k, key.dsize) == 0) {
974                                 SAFE_FREE(k);
975                                 return rec_ptr;
976                         }
977                         SAFE_FREE(k);
978                 }
979                 rec_ptr = r->next;
980         }
981         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
982 }
983
984 /* If they do lockkeys, check that this hash is one they locked */
985 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
986 {
987         u32 i;
988         if (!tdb->lockedkeys)
989                 return 1;
990         for (i = 0; i < tdb->lockedkeys[0]; i++)
991                 if (tdb->lockedkeys[i+1] == hash)
992                         return 1;
993         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
994 }
995
996 /* As tdb_find, but if you succeed, keep the lock */
997 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
998                              struct list_struct *rec)
999 {
1000         u32 hash, rec_ptr;
1001
1002         hash = tdb_hash(&key);
1003         if (!tdb_keylocked(tdb, hash))
1004                 return 0;
1005         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1006                 return 0;
1007         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1008                 tdb_unlock(tdb, BUCKET(hash), locktype);
1009         return rec_ptr;
1010 }
1011
1012 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1013 {
1014         return tdb->ecode;
1015 }
1016
1017 static struct tdb_errname {
1018         enum TDB_ERROR ecode; const char *estring;
1019 } emap[] = { {TDB_SUCCESS, "Success"},
1020              {TDB_ERR_CORRUPT, "Corrupt database"},
1021              {TDB_ERR_IO, "IO Error"},
1022              {TDB_ERR_LOCK, "Locking error"},
1023              {TDB_ERR_OOM, "Out of memory"},
1024              {TDB_ERR_EXISTS, "Record exists"},
1025              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1026              {TDB_ERR_NOEXIST, "Record does not exist"} };
1027
1028 /* Error string for the last tdb error */
1029 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1030 {
1031         u32 i;
1032         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1033                 if (tdb->ecode == emap[i].ecode)
1034                         return emap[i].estring;
1035         return "Invalid error code";
1036 }
1037
1038 /* update an entry in place - this only works if the new data size
1039    is <= the old data size and the key exists.
1040    on failure return -1.
1041 */
1042
1043 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1044 {
1045         struct list_struct rec;
1046         tdb_off rec_ptr;
1047
1048         /* find entry */
1049         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1050                 return -1;
1051
1052         /* must be long enough key, data and tailer */
1053         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1054                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1055                 return -1;
1056         }
1057
1058         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1059                       dbuf.dptr, dbuf.dsize) == -1)
1060                 return -1;
1061
1062         if (dbuf.dsize != rec.data_len) {
1063                 /* update size */
1064                 rec.data_len = dbuf.dsize;
1065                 return rec_write(tdb, rec_ptr, &rec);
1066         }
1067  
1068         return 0;
1069 }
1070
1071 /* find an entry in the database given a key */
1072 /* If an entry doesn't exist tdb_err will be set to
1073  * TDB_ERR_NOEXIST. If a key has no data attached
1074  * tdb_err will not be set. Both will return a
1075  * zero pptr and zero dsize.
1076  */
1077
1078 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1079 {
1080         tdb_off rec_ptr;
1081         struct list_struct rec;
1082         TDB_DATA ret;
1083
1084         /* find which hash bucket it is in */
1085         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1086                 return tdb_null;
1087
1088         if (rec.data_len)
1089                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1090                                           rec.data_len);
1091         else
1092                 ret.dptr = NULL;
1093         ret.dsize = rec.data_len;
1094         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1095         return ret;
1096 }
1097
1098 /* check if an entry in the database exists 
1099
1100    note that 1 is returned if the key is found and 0 is returned if not found
1101    this doesn't match the conventions in the rest of this module, but is
1102    compatible with gdbm
1103 */
1104 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1105 {
1106         struct list_struct rec;
1107         
1108         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1109                 return 0;
1110         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1111         return 1;
1112 }
1113
1114 /* record lock stops delete underneath */
1115 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1116 {
1117         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1118 }
1119 /*
1120   Write locks override our own fcntl readlocks, so check it here.
1121   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1122   an error to fail to get the lock here.
1123 */
1124  
1125 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1126 {
1127         struct tdb_traverse_lock *i;
1128         for (i = &tdb->travlocks; i; i = i->next)
1129                 if (i->off == off)
1130                         return -1;
1131         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1132 }
1133
1134 /*
1135   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1136   an error to fail to get the lock here.
1137 */
1138
1139 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1140 {
1141         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1142 }
1143 /* fcntl locks don't stack: avoid unlocking someone else's */
1144 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1145 {
1146         struct tdb_traverse_lock *i;
1147         u32 count = 0;
1148
1149         if (off == 0)
1150                 return 0;
1151         for (i = &tdb->travlocks; i; i = i->next)
1152                 if (i->off == off)
1153                         count++;
1154         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1155 }
1156
1157 /* actually delete an entry in the database given the offset */
1158 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1159 {
1160         tdb_off last_ptr, i;
1161         struct list_struct lastrec;
1162
1163         if (tdb->read_only) return -1;
1164
1165         if (write_lock_record(tdb, rec_ptr) == -1) {
1166                 /* Someone traversing here: mark it as dead */
1167                 rec->magic = TDB_DEAD_MAGIC;
1168                 return rec_write(tdb, rec_ptr, rec);
1169         }
1170         if (write_unlock_record(tdb, rec_ptr) != 0)
1171                 return -1;
1172
1173         /* find previous record in hash chain */
1174         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1175                 return -1;
1176         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1177                 if (rec_read(tdb, i, &lastrec) == -1)
1178                         return -1;
1179
1180         /* unlink it: next ptr is at start of record. */
1181         if (last_ptr == 0)
1182                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1183         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1184                 return -1;
1185
1186         /* recover the space */
1187         if (tdb_free(tdb, rec_ptr, rec) == -1)
1188                 return -1;
1189         return 0;
1190 }
1191
1192 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1193 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1194                          struct list_struct *rec)
1195 {
1196         int want_next = (tlock->off != 0);
1197
1198         /* No traversal allows if you've called tdb_lockkeys() */
1199         if (tdb->lockedkeys)
1200                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1201
1202         /* Lock each chain from the start one. */
1203         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1204                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1205                         return -1;
1206
1207                 /* No previous record?  Start at top of chain. */
1208                 if (!tlock->off) {
1209                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1210                                      &tlock->off) == -1)
1211                                 goto fail;
1212                 } else {
1213                         /* Otherwise unlock the previous record. */
1214                         if (unlock_record(tdb, tlock->off) != 0)
1215                                 goto fail;
1216                 }
1217
1218                 if (want_next) {
1219                         /* We have offset of old record: grab next */
1220                         if (rec_read(tdb, tlock->off, rec) == -1)
1221                                 goto fail;
1222                         tlock->off = rec->next;
1223                 }
1224
1225                 /* Iterate through chain */
1226                 while( tlock->off) {
1227                         tdb_off current;
1228                         if (rec_read(tdb, tlock->off, rec) == -1)
1229                                 goto fail;
1230                         if (!TDB_DEAD(rec)) {
1231                                 /* Woohoo: we found one! */
1232                                 if (lock_record(tdb, tlock->off) != 0)
1233                                         goto fail;
1234                                 return tlock->off;
1235                         }
1236                         /* Try to clean dead ones from old traverses */
1237                         current = tlock->off;
1238                         tlock->off = rec->next;
1239                         if (do_delete(tdb, current, rec) != 0)
1240                                 goto fail;
1241                 }
1242                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1243                 want_next = 0;
1244         }
1245         /* We finished iteration without finding anything */
1246         return TDB_ERRCODE(TDB_SUCCESS, 0);
1247
1248  fail:
1249         tlock->off = 0;
1250         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1251                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1252         return -1;
1253 }
1254
1255 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1256    return -1 on error or the record count traversed
1257    if fn is NULL then it is not called
1258    a non-zero return value from fn() indicates that the traversal should stop
1259   */
1260 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1261 {
1262         TDB_DATA key, dbuf;
1263         struct list_struct rec;
1264         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1265         int ret, count = 0;
1266
1267         /* This was in the initializaton, above, but the IRIX compiler
1268          * did not like it.  crh
1269          */
1270         tl.next = tdb->travlocks.next;
1271
1272         /* fcntl locks don't stack: beware traverse inside traverse */
1273         tdb->travlocks.next = &tl;
1274
1275         /* tdb_next_lock places locks on the record returned, and its chain */
1276         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1277                 count++;
1278                 /* now read the full record */
1279                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1280                                           rec.key_len + rec.data_len);
1281                 if (!key.dptr) {
1282                         ret = -1;
1283                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1284                                 goto out;
1285                         if (unlock_record(tdb, tl.off) != 0)
1286                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1287                         goto out;
1288                 }
1289                 key.dsize = rec.key_len;
1290                 dbuf.dptr = key.dptr + rec.key_len;
1291                 dbuf.dsize = rec.data_len;
1292
1293                 /* Drop chain lock, call out */
1294                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1295                         ret = -1;
1296                         goto out;
1297                 }
1298                 if (fn && fn(tdb, key, dbuf, state)) {
1299                         /* They want us to terminate traversal */
1300                         ret = count;
1301                         if (unlock_record(tdb, tl.off) != 0) {
1302                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1303                                 ret = -1;
1304                         }
1305                         tdb->travlocks.next = tl.next;
1306                         SAFE_FREE(key.dptr);
1307                         return count;
1308                 }
1309                 SAFE_FREE(key.dptr);
1310         }
1311 out:
1312         tdb->travlocks.next = tl.next;
1313         if (ret < 0)
1314                 return -1;
1315         else
1316                 return count;
1317 }
1318
1319 /* find the first entry in the database and return its key */
1320 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1321 {
1322         TDB_DATA key;
1323         struct list_struct rec;
1324
1325         /* release any old lock */
1326         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1327                 return tdb_null;
1328         tdb->travlocks.off = tdb->travlocks.hash = 0;
1329
1330         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1331                 return tdb_null;
1332         /* now read the key */
1333         key.dsize = rec.key_len;
1334         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1335         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1336                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1337         return key;
1338 }
1339
1340 /* find the next entry in the database, returning its key */
1341 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1342 {
1343         u32 oldhash;
1344         TDB_DATA key = tdb_null;
1345         struct list_struct rec;
1346         char *k = NULL;
1347
1348         /* Is locked key the old key?  If so, traverse will be reliable. */
1349         if (tdb->travlocks.off) {
1350                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1351                         return tdb_null;
1352                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1353                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1354                                             rec.key_len))
1355                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1356                         /* No, it wasn't: unlock it and start from scratch */
1357                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1358                                 return tdb_null;
1359                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1360                                 return tdb_null;
1361                         tdb->travlocks.off = 0;
1362                 }
1363
1364                 SAFE_FREE(k);
1365         }
1366
1367         if (!tdb->travlocks.off) {
1368                 /* No previous element: do normal find, and lock record */
1369                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1370                 if (!tdb->travlocks.off)
1371                         return tdb_null;
1372                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1373                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1374                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1375                         return tdb_null;
1376                 }
1377         }
1378         oldhash = tdb->travlocks.hash;
1379
1380         /* Grab next record: locks chain and returned record,
1381            unlocks old record */
1382         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1383                 key.dsize = rec.key_len;
1384                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1385                                           key.dsize);
1386                 /* Unlock the chain of this new record */
1387                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1388                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1389         }
1390         /* Unlock the chain of old record */
1391         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1392                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1393         return key;
1394 }
1395
1396 /* delete an entry in the database given a key */
1397 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1398 {
1399         tdb_off rec_ptr;
1400         struct list_struct rec;
1401         int ret;
1402
1403         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1404                 return -1;
1405         ret = do_delete(tdb, rec_ptr, &rec);
1406         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1407                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1408         return ret;
1409 }
1410
1411 /* store an element in the database, replacing any existing element
1412    with the same key 
1413
1414    return 0 on success, -1 on failure
1415 */
1416 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1417 {
1418         struct list_struct rec;
1419         u32 hash;
1420         tdb_off rec_ptr;
1421         char *p = NULL;
1422         int ret = 0;
1423
1424         /* find which hash bucket it is in */
1425         hash = tdb_hash(&key);
1426         if (!tdb_keylocked(tdb, hash))
1427                 return -1;
1428         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1429                 return -1;
1430
1431         /* check for it existing, on insert. */
1432         if (flag == TDB_INSERT) {
1433                 if (tdb_exists(tdb, key)) {
1434                         tdb->ecode = TDB_ERR_EXISTS;
1435                         goto fail;
1436                 }
1437         } else {
1438                 /* first try in-place update, on modify or replace. */
1439                 if (tdb_update(tdb, key, dbuf) == 0)
1440                         goto out;
1441                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1442                         goto fail;
1443         }
1444         /* reset the error code potentially set by the tdb_update() */
1445         tdb->ecode = TDB_SUCCESS;
1446
1447         /* delete any existing record - if it doesn't exist we don't
1448            care.  Doing this first reduces fragmentation, and avoids
1449            coalescing with `allocated' block before it's updated. */
1450         if (flag != TDB_INSERT)
1451                 tdb_delete(tdb, key);
1452
1453         /* Copy key+value *before* allocating free space in case malloc
1454            fails and we are left with a dead spot in the tdb. */
1455
1456         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1457                 tdb->ecode = TDB_ERR_OOM;
1458                 goto fail;
1459         }
1460
1461         memcpy(p, key.dptr, key.dsize);
1462         if (dbuf.dsize)
1463                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1464
1465         /* now we're into insert / modify / replace of a record which
1466          * we know could not be optimised by an in-place store (for
1467          * various reasons).  */
1468         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1469                 goto fail;
1470
1471         /* Read hash top into next ptr */
1472         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1473                 goto fail;
1474
1475         rec.key_len = key.dsize;
1476         rec.data_len = dbuf.dsize;
1477         rec.full_hash = hash;
1478         rec.magic = TDB_MAGIC;
1479
1480         /* write out and point the top of the hash chain at it */
1481         if (rec_write(tdb, rec_ptr, &rec) == -1
1482             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1483             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1484                 /* Need to tdb_unallocate() here */
1485                 goto fail;
1486         }
1487  out:
1488         SAFE_FREE(p); 
1489         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1490         return ret;
1491 fail:
1492         ret = -1;
1493         goto out;
1494 }
1495
1496 /* Attempt to append data to an entry in place - this only works if the new data size
1497    is <= the old data size and the key exists.
1498    on failure return -1. Record must be locked before calling.
1499 */
1500 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1501 {
1502         struct list_struct rec;
1503         tdb_off rec_ptr;
1504
1505         /* find entry */
1506         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1507                 return -1;
1508
1509         /* Append of 0 is always ok. */
1510         if (new_dbuf.dsize == 0)
1511                 return 0;
1512
1513         /* must be long enough for key, old data + new data and tailer */
1514         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1515                 /* No room. */
1516                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1517                 return -1;
1518         }
1519
1520         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1521                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1522                 return -1;
1523
1524         /* update size */
1525         rec.data_len += new_dbuf.dsize;
1526         return rec_write(tdb, rec_ptr, &rec);
1527 }
1528
1529 /* Append to an entry. Create if not exist. */
1530
1531 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1532 {
1533         struct list_struct rec;
1534         u32 hash;
1535         tdb_off rec_ptr;
1536         char *p = NULL;
1537         int ret = 0;
1538         size_t new_data_size = 0;
1539
1540         /* find which hash bucket it is in */
1541         hash = tdb_hash(&key);
1542         if (!tdb_keylocked(tdb, hash))
1543                 return -1;
1544         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1545                 return -1;
1546
1547         /* first try in-place. */
1548         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1549                 goto out;
1550
1551         /* reset the error code potentially set by the tdb_append_inplace() */
1552         tdb->ecode = TDB_SUCCESS;
1553
1554         /* find entry */
1555         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1556                 if (tdb->ecode != TDB_ERR_NOEXIST)
1557                         goto fail;
1558
1559                 /* Not found - create. */
1560
1561                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1562                 goto out;
1563         }
1564
1565         new_data_size = rec.data_len + new_dbuf.dsize;
1566
1567         /* Copy key+old_value+value *before* allocating free space in case malloc
1568            fails and we are left with a dead spot in the tdb. */
1569
1570         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1571                 tdb->ecode = TDB_ERR_OOM;
1572                 goto fail;
1573         }
1574
1575         /* Copy the key in place. */
1576         memcpy(p, key.dptr, key.dsize);
1577
1578         /* Now read the old data into place. */
1579         if (rec.data_len &&
1580                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1581                         goto fail;
1582
1583         /* Finally append the new data. */
1584         if (new_dbuf.dsize)
1585                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1586
1587         /* delete any existing record - if it doesn't exist we don't
1588            care.  Doing this first reduces fragmentation, and avoids
1589            coalescing with `allocated' block before it's updated. */
1590
1591         tdb_delete(tdb, key);
1592
1593         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1594                 goto fail;
1595
1596         /* Read hash top into next ptr */
1597         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1598                 goto fail;
1599
1600         rec.key_len = key.dsize;
1601         rec.data_len = new_data_size;
1602         rec.full_hash = hash;
1603         rec.magic = TDB_MAGIC;
1604
1605         /* write out and point the top of the hash chain at it */
1606         if (rec_write(tdb, rec_ptr, &rec) == -1
1607             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1608             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1609                 /* Need to tdb_unallocate() here */
1610                 goto fail;
1611         }
1612
1613  out:
1614         SAFE_FREE(p); 
1615         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1616         return ret;
1617
1618 fail:
1619         ret = -1;
1620         goto out;
1621 }
1622
1623 static int tdb_already_open(dev_t device,
1624                             ino_t ino)
1625 {
1626         TDB_CONTEXT *i;
1627         
1628         for (i = tdbs; i; i = i->next) {
1629                 if (i->device == device && i->inode == ino) {
1630                         return 1;
1631                 }
1632         }
1633
1634         return 0;
1635 }
1636
1637 /* open the database, creating it if necessary 
1638
1639    The open_flags and mode are passed straight to the open call on the
1640    database file. A flags value of O_WRONLY is invalid. The hash size
1641    is advisory, use zero for a default value.
1642
1643    Return is NULL on error, in which case errno is also set.  Don't 
1644    try to call tdb_error or tdb_errname, just do strerror(errno).
1645
1646    @param name may be NULL for internal databases. */
1647 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1648                       int open_flags, mode_t mode)
1649 {
1650         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1651 }
1652
1653
1654 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1655                          int open_flags, mode_t mode,
1656                          tdb_log_func log_fn)
1657 {
1658         TDB_CONTEXT *tdb;
1659         struct stat st;
1660         int rev = 0, locked;
1661         unsigned char *vp;
1662         u32 vertest;
1663
1664         if (!(tdb = calloc(1, sizeof *tdb))) {
1665                 /* Can't log this */
1666                 errno = ENOMEM;
1667                 goto fail;
1668         }
1669         tdb->fd = -1;
1670         tdb->name = NULL;
1671         tdb->map_ptr = NULL;
1672         tdb->lockedkeys = NULL;
1673         tdb->flags = tdb_flags;
1674         tdb->open_flags = open_flags;
1675         tdb->log_fn = log_fn;
1676         
1677         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1678                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1679                          name));
1680                 errno = EINVAL;
1681                 goto fail;
1682         }
1683         
1684         if (hash_size == 0)
1685                 hash_size = DEFAULT_HASH_SIZE;
1686         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1687                 tdb->read_only = 1;
1688                 /* read only databases don't do locking or clear if first */
1689                 tdb->flags |= TDB_NOLOCK;
1690                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1691         }
1692
1693         /* internal databases don't mmap or lock, and start off cleared */
1694         if (tdb->flags & TDB_INTERNAL) {
1695                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1696                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1697                 if (tdb_new_database(tdb, hash_size) != 0) {
1698                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1699                         goto fail;
1700                 }
1701                 goto internal;
1702         }
1703
1704         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1705                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1706                          name, strerror(errno)));
1707                 goto fail;      /* errno set by open(2) */
1708         }
1709
1710         /* ensure there is only one process initialising at once */
1711         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1712                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1713                          name, strerror(errno)));
1714                 goto fail;      /* errno set by tdb_brlock */
1715         }
1716
1717         /* we need to zero database if we are the only one with it open */
1718         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1719             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1720                 open_flags |= O_CREAT;
1721                 if (ftruncate(tdb->fd, 0) == -1) {
1722                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1723                                  "failed to truncate %s: %s\n",
1724                                  name, strerror(errno)));
1725                         goto fail; /* errno set by ftruncate */
1726                 }
1727         }
1728
1729         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1730             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1731             || (tdb->header.version != TDB_VERSION
1732                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1733                 /* its not a valid database - possibly initialise it */
1734                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1735                         errno = EIO; /* ie bad format or something */
1736                         goto fail;
1737                 }
1738                 rev = (tdb->flags & TDB_CONVERT);
1739         }
1740         vp = (unsigned char *)&tdb->header.version;
1741         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1742                   (((u32)vp[2]) << 8) | (u32)vp[3];
1743         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1744         if (!rev)
1745                 tdb->flags &= ~TDB_CONVERT;
1746         else {
1747                 tdb->flags |= TDB_CONVERT;
1748                 convert(&tdb->header, sizeof(tdb->header));
1749         }
1750         if (fstat(tdb->fd, &st) == -1)
1751                 goto fail;
1752
1753         /* Is it already in the open list?  If so, fail. */
1754         if (tdb_already_open(st.st_dev, st.st_ino)) {
1755                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1756                          "%s (%d,%d) is already open in this process\n",
1757                          name, st.st_dev, st.st_ino));
1758                 errno = EBUSY;
1759                 goto fail;
1760         }
1761
1762         if (!(tdb->name = (char *)strdup(name))) {
1763                 errno = ENOMEM;
1764                 goto fail;
1765         }
1766
1767         tdb->map_size = st.st_size;
1768         tdb->device = st.st_dev;
1769         tdb->inode = st.st_ino;
1770         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1771         if (!tdb->locked) {
1772                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1773                          "failed to allocate lock structure for %s\n",
1774                          name));
1775                 errno = ENOMEM;
1776                 goto fail;
1777         }
1778         tdb_mmap(tdb);
1779         if (locked) {
1780                 if (!tdb->read_only)
1781                         if (tdb_clear_spinlocks(tdb) != 0) {
1782                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1783                                 "failed to clear spinlock\n"));
1784                                 goto fail;
1785                         }
1786                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1787                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1788                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1789                                  name, strerror(errno)));
1790                         goto fail;
1791                 }
1792         }
1793         /* leave this lock in place to indicate it's in use */
1794         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1795                 goto fail;
1796
1797  internal:
1798         /* Internal (memory-only) databases skip all the code above to
1799          * do with disk files, and resume here by releasing their
1800          * global lock and hooking into the active list. */
1801         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1802                 goto fail;
1803         tdb->next = tdbs;
1804         tdbs = tdb;
1805         return tdb;
1806
1807  fail:
1808         { int save_errno = errno;
1809
1810         if (!tdb)
1811                 return NULL;
1812         
1813         if (tdb->map_ptr) {
1814                 if (tdb->flags & TDB_INTERNAL)
1815                         SAFE_FREE(tdb->map_ptr);
1816                 else
1817                         tdb_munmap(tdb);
1818         }
1819         SAFE_FREE(tdb->name);
1820         if (tdb->fd != -1)
1821                 if (close(tdb->fd) != 0)
1822                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1823         SAFE_FREE(tdb->locked);
1824         SAFE_FREE(tdb);
1825         errno = save_errno;
1826         return NULL;
1827         }
1828 }
1829
1830 /**
1831  * Close a database.
1832  *
1833  * @returns -1 for error; 0 for success.
1834  **/
1835 int tdb_close(TDB_CONTEXT *tdb)
1836 {
1837         TDB_CONTEXT **i;
1838         int ret = 0;
1839
1840         if (tdb->map_ptr) {
1841                 if (tdb->flags & TDB_INTERNAL)
1842                         SAFE_FREE(tdb->map_ptr);
1843                 else
1844                         tdb_munmap(tdb);
1845         }
1846         SAFE_FREE(tdb->name);
1847         if (tdb->fd != -1)
1848                 ret = close(tdb->fd);
1849         SAFE_FREE(tdb->locked);
1850         SAFE_FREE(tdb->lockedkeys);
1851
1852         /* Remove from contexts list */
1853         for (i = &tdbs; *i; i = &(*i)->next) {
1854                 if (*i == tdb) {
1855                         *i = tdb->next;
1856                         break;
1857                 }
1858         }
1859
1860         memset(tdb, 0, sizeof(*tdb));
1861         SAFE_FREE(tdb);
1862
1863         return ret;
1864 }
1865
1866 /* lock/unlock entire database */
1867 int tdb_lockall(TDB_CONTEXT *tdb)
1868 {
1869         u32 i;
1870
1871         /* There are no locks on read-only dbs */
1872         if (tdb->read_only)
1873                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1874         if (tdb->lockedkeys)
1875                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1876         for (i = 0; i < tdb->header.hash_size; i++) 
1877                 if (tdb_lock(tdb, i, F_WRLCK))
1878                         break;
1879
1880         /* If error, release locks we have... */
1881         if (i < tdb->header.hash_size) {
1882                 u32 j;
1883
1884                 for ( j = 0; j < i; j++)
1885                         tdb_unlock(tdb, j, F_WRLCK);
1886                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1887         }
1888
1889         return 0;
1890 }
1891 void tdb_unlockall(TDB_CONTEXT *tdb)
1892 {
1893         u32 i;
1894         for (i=0; i < tdb->header.hash_size; i++)
1895                 tdb_unlock(tdb, i, F_WRLCK);
1896 }
1897
1898 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1899 {
1900         u32 i, j, hash;
1901
1902         /* Can't lock more keys if already locked */
1903         if (tdb->lockedkeys)
1904                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1905         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1906                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1907         /* First number in array is # keys */
1908         tdb->lockedkeys[0] = number;
1909
1910         /* Insertion sort by bucket */
1911         for (i = 0; i < number; i++) {
1912                 hash = tdb_hash(&keys[i]);
1913                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1914                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1915                 tdb->lockedkeys[j+1] = hash;
1916         }
1917         /* Finally, lock in order */
1918         for (i = 0; i < number; i++)
1919                 if (tdb_lock(tdb, i, F_WRLCK))
1920                         break;
1921
1922         /* If error, release locks we have... */
1923         if (i < number) {
1924                 for ( j = 0; j < i; j++)
1925                         tdb_unlock(tdb, j, F_WRLCK);
1926                 SAFE_FREE(tdb->lockedkeys);
1927                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1928         }
1929         return 0;
1930 }
1931
1932 /* Unlock the keys previously locked by tdb_lockkeys() */
1933 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1934 {
1935         u32 i;
1936         if (!tdb->lockedkeys)
1937                 return;
1938         for (i = 0; i < tdb->lockedkeys[0]; i++)
1939                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1940         SAFE_FREE(tdb->lockedkeys);
1941 }
1942
1943 /* lock/unlock one hash chain. This is meant to be used to reduce
1944    contention - it cannot guarantee how many records will be locked */
1945 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1946 {
1947         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1948 }
1949
1950 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1951 {
1952         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1953 }
1954
1955 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1956 {
1957         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1958 }
1959
1960 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1961 {
1962         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1963 }
1964
1965
1966 /* register a loging function */
1967 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1968 {
1969         tdb->log_fn = fn;
1970 }
1971
1972
1973 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1974    seek pointer from our parent and to re-establish locks */
1975 int tdb_reopen(TDB_CONTEXT *tdb)
1976 {
1977         struct stat st;
1978
1979         if (tdb_munmap(tdb) != 0) {
1980                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
1981                 goto fail;
1982         }
1983         if (close(tdb->fd) != 0)
1984                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
1985         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
1986         if (tdb->fd == -1) {
1987                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
1988                 goto fail;
1989         }
1990         if (fstat(tdb->fd, &st) != 0) {
1991                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
1992                 goto fail;
1993         }
1994         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
1995                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
1996                 goto fail;
1997         }
1998         tdb_mmap(tdb);
1999         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2000                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2001                 goto fail;
2002         }
2003
2004         return 0;
2005
2006 fail:
2007         tdb_close(tdb);
2008         return -1;
2009 }
2010
2011 /* reopen all tdb's */
2012 int tdb_reopen_all(void)
2013 {
2014         TDB_CONTEXT *tdb;
2015
2016         for (tdb=tdbs; tdb; tdb = tdb->next) {
2017                 if (tdb_reopen(tdb) != 0) return -1;
2018         }
2019
2020         return 0;
2021 }