removed the state parameter to tdb_traverse and the TDB_MODIFY flag
[kai/samba-autobuild/.git] / source4 / lib / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Paul `Rusty' Russell              2000
6    Copyright (C) Jeremy Allison                    2000-2003
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23
24 /* NOTE: If you use tdbs under valgrind, and in particular if you run
25  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
26  * think this is because valgrind doesn't understand that the mmap'd
27  * area may be written to by other processes.  Memory can, from the
28  * point of view of the grinded process, spontaneously become
29  * initialized.
30  *
31  * I can think of a few solutions.  [mbp 20030311]
32  *
33  * 1 - Write suppressions for Valgrind so that it doesn't complain
34  * about this.  Probably the most reasonable but people need to
35  * remember to use them.
36  *
37  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
38  *
39  * 3 - Use the special valgrind macros to mark memory as valid at the
40  * right time.  Probably too hard -- the process just doesn't know.
41  */ 
42
43 #ifdef STANDALONE
44 #if HAVE_CONFIG_H
45 #include <config.h>
46 #endif
47
48 #include <stdlib.h>
49 #include <stdio.h>
50 #include <fcntl.h>
51 #include <unistd.h>
52 #include <string.h>
53 #include <fcntl.h>
54 #include <errno.h>
55 #include <sys/mman.h>
56 #include <sys/stat.h>
57 #include <signal.h>
58 #include "tdb.h"
59 #include "spinlock.h"
60 #else
61 #include "includes.h"
62 #endif
63
64 #define TDB_MAGIC_FOOD "TDB file\n"
65 #define TDB_VERSION (0x26011967 + 6)
66 #define TDB_MAGIC (0x26011999U)
67 #define TDB_FREE_MAGIC (~TDB_MAGIC)
68 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
69 #define TDB_ALIGNMENT 4
70 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
71 #define DEFAULT_HASH_SIZE 131
72 #define TDB_PAGE_SIZE 0x2000
73 #define FREELIST_TOP (sizeof(struct tdb_header))
74 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
75 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
76 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
77 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
78 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
79 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
80
81
82 /* NB assumes there is a local variable called "tdb" that is the
83  * current context, also takes doubly-parenthesized print-style
84  * argument. */
85 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
86
87 /* lock offsets */
88 #define GLOBAL_LOCK 0
89 #define ACTIVE_LOCK 4
90
91 #ifndef MAP_FILE
92 #define MAP_FILE 0
93 #endif
94
95 #ifndef MAP_FAILED
96 #define MAP_FAILED ((void *)-1)
97 #endif
98
99 /* free memory if the pointer is valid and zero the pointer */
100 #ifndef SAFE_FREE
101 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
102 #endif
103
104 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
105 TDB_DATA tdb_null;
106
107 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
108 static TDB_CONTEXT *tdbs = NULL;
109
110 static int tdb_munmap(TDB_CONTEXT *tdb)
111 {
112         if (tdb->flags & TDB_INTERNAL)
113                 return 0;
114
115 #ifdef HAVE_MMAP
116         if (tdb->map_ptr) {
117                 int ret = munmap(tdb->map_ptr, tdb->map_size);
118                 if (ret != 0)
119                         return ret;
120         }
121 #endif
122         tdb->map_ptr = NULL;
123         return 0;
124 }
125
126 static void tdb_mmap(TDB_CONTEXT *tdb)
127 {
128         if (tdb->flags & TDB_INTERNAL)
129                 return;
130
131 #ifdef HAVE_MMAP
132         if (!(tdb->flags & TDB_NOMMAP)) {
133                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
134                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
135                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
136
137                 /*
138                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
139                  */
140
141                 if (tdb->map_ptr == MAP_FAILED) {
142                         tdb->map_ptr = NULL;
143                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
144                                  tdb->map_size, strerror(errno)));
145                 }
146         } else {
147                 tdb->map_ptr = NULL;
148         }
149 #else
150         tdb->map_ptr = NULL;
151 #endif
152 }
153
154 /* Endian conversion: we only ever deal with 4 byte quantities */
155 static void *convert(void *buf, u32 size)
156 {
157         u32 i, *p = buf;
158         for (i = 0; i < size / 4; i++)
159                 p[i] = TDB_BYTEREV(p[i]);
160         return buf;
161 }
162 #define DOCONV() (tdb->flags & TDB_CONVERT)
163 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
164
165 /* the body of the database is made of one list_struct for the free space
166    plus a separate data list for each hash value */
167 struct list_struct {
168         tdb_off next; /* offset of the next record in the list */
169         tdb_len rec_len; /* total byte length of record */
170         tdb_len key_len; /* byte length of key */
171         tdb_len data_len; /* byte length of data */
172         u32 full_hash; /* the full 32 bit hash of the key */
173         u32 magic;   /* try to catch errors */
174         /* the following union is implied:
175                 union {
176                         char record[rec_len];
177                         struct {
178                                 char key[key_len];
179                                 char data[data_len];
180                         }
181                         u32 totalsize; (tailer)
182                 }
183         */
184 };
185
186 /***************************************************************
187  Allow a caller to set a "alarm" flag that tdb can check to abort
188  a blocking lock on SIGALRM.
189 ***************************************************************/
190
191 static SIG_ATOMIC_T *palarm_fired;
192
193 void tdb_set_lock_alarm(SIG_ATOMIC_T *palarm)
194 {
195         palarm_fired = palarm;
196 }
197
198 /* a byte range locking function - return 0 on success
199    this functions locks/unlocks 1 byte at the specified offset.
200
201    On error, errno is also set so that errors are passed back properly
202    through tdb_open(). */
203 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
204                       int rw_type, int lck_type, int probe)
205 {
206         struct flock fl;
207         int ret;
208
209         if (tdb->flags & TDB_NOLOCK)
210                 return 0;
211         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
212                 errno = EACCES;
213                 return -1;
214         }
215
216         fl.l_type = rw_type;
217         fl.l_whence = SEEK_SET;
218         fl.l_start = offset;
219         fl.l_len = 1;
220         fl.l_pid = 0;
221
222         do {
223                 ret = fcntl(tdb->fd,lck_type,&fl);
224                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
225                         break;
226         } while (ret == -1 && errno == EINTR);
227
228         if (ret == -1) {
229                 if (!probe && lck_type != F_SETLK) {
230                         /* Ensure error code is set for log fun to examine. */
231                         if (errno == EINTR && palarm_fired && *palarm_fired)
232                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
233                         else
234                                 tdb->ecode = TDB_ERR_LOCK;
235                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
236                                  tdb->fd, offset, rw_type, lck_type));
237                 }
238                 /* Was it an alarm timeout ? */
239                 if (errno == EINTR && palarm_fired && *palarm_fired) {
240                         TDB_LOG((tdb, 5, "tdb_brlock timed out (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
241                                  tdb->fd, offset, rw_type, lck_type));
242                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
243                 }
244                 /* Otherwise - generic lock error. */
245                 /* errno set by fcntl */
246                 TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
247                          tdb->fd, offset, rw_type, lck_type, strerror(errno)));
248                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
249         }
250         return 0;
251 }
252
253 /* lock a list in the database. list -1 is the alloc list */
254 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
255 {
256         if (list < -1 || list >= (int)tdb->header.hash_size) {
257                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
258                            list, ltype));
259                 return -1;
260         }
261         if (tdb->flags & TDB_NOLOCK)
262                 return 0;
263
264         /* Since fcntl locks don't nest, we do a lock for the first one,
265            and simply bump the count for future ones */
266         if (tdb->locked[list+1].count == 0) {
267                 if (!tdb->read_only && tdb->header.rwlocks) {
268                         if (tdb_spinlock(tdb, list, ltype)) {
269                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
270                                            list, ltype));
271                                 return -1;
272                         }
273                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
274                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
275                                            list, ltype, strerror(errno)));
276                         return -1;
277                 }
278                 tdb->locked[list+1].ltype = ltype;
279         }
280         tdb->locked[list+1].count++;
281         return 0;
282 }
283
284 /* unlock the database: returns void because it's too late for errors. */
285         /* changed to return int it may be interesting to know there
286            has been an error  --simo */
287 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
288 {
289         int ret = -1;
290
291         if (tdb->flags & TDB_NOLOCK)
292                 return 0;
293
294         /* Sanity checks */
295         if (list < -1 || list >= (int)tdb->header.hash_size) {
296                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
297                 return ret;
298         }
299
300         if (tdb->locked[list+1].count==0) {
301                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
302                 return ret;
303         }
304
305         if (tdb->locked[list+1].count == 1) {
306                 /* Down to last nested lock: unlock underneath */
307                 if (!tdb->read_only && tdb->header.rwlocks) {
308                         ret = tdb_spinunlock(tdb, list, ltype);
309                 } else {
310                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
311                 }
312         } else {
313                 ret = 0;
314         }
315         tdb->locked[list+1].count--;
316
317         if (ret)
318                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
319         return ret;
320 }
321
322 /* This is based on the hash algorithm from gdbm */
323 static u32 tdb_hash(TDB_DATA *key)
324 {
325         u32 value;      /* Used to compute the hash value.  */
326         u32   i;        /* Used to cycle through random values. */
327
328         /* Set the initial value from the key size. */
329         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
330                 value = (value + (key->dptr[i] << (i*5 % 24)));
331
332         return (1103515243 * value + 12345);  
333 }
334
335 /* check for an out of bounds access - if it is out of bounds then
336    see if the database has been expanded by someone else and expand
337    if necessary 
338    note that "len" is the minimum length needed for the db
339 */
340 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
341 {
342         struct stat st;
343         if (len <= tdb->map_size)
344                 return 0;
345         if (tdb->flags & TDB_INTERNAL) {
346                 if (!probe) {
347                         /* Ensure ecode is set for log fn. */
348                         tdb->ecode = TDB_ERR_IO;
349                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
350                                  (int)len, (int)tdb->map_size));
351                 }
352                 return TDB_ERRCODE(TDB_ERR_IO, -1);
353         }
354
355         if (fstat(tdb->fd, &st) == -1)
356                 return TDB_ERRCODE(TDB_ERR_IO, -1);
357
358         if (st.st_size < (size_t)len) {
359                 if (!probe) {
360                         /* Ensure ecode is set for log fn. */
361                         tdb->ecode = TDB_ERR_IO;
362                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
363                                  (int)len, (int)st.st_size));
364                 }
365                 return TDB_ERRCODE(TDB_ERR_IO, -1);
366         }
367
368         /* Unmap, update size, remap */
369         if (tdb_munmap(tdb) == -1)
370                 return TDB_ERRCODE(TDB_ERR_IO, -1);
371         tdb->map_size = st.st_size;
372         tdb_mmap(tdb);
373         return 0;
374 }
375
376 /* write a lump of data at a specified offset */
377 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
378 {
379         if (tdb_oob(tdb, off + len, 0) != 0)
380                 return -1;
381
382         if (tdb->map_ptr)
383                 memcpy(off + (char *)tdb->map_ptr, buf, len);
384 #ifdef HAVE_PWRITE
385         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
386 #else
387         else if (lseek(tdb->fd, off, SEEK_SET) != off
388                  || write(tdb->fd, buf, len) != (ssize_t)len) {
389 #endif
390                 /* Ensure ecode is set for log fn. */
391                 tdb->ecode = TDB_ERR_IO;
392                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
393                            off, len, strerror(errno)));
394                 return TDB_ERRCODE(TDB_ERR_IO, -1);
395         }
396         return 0;
397 }
398
399 /* read a lump of data at a specified offset, maybe convert */
400 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
401 {
402         if (tdb_oob(tdb, off + len, 0) != 0)
403                 return -1;
404
405         if (tdb->map_ptr)
406                 memcpy(buf, off + (char *)tdb->map_ptr, len);
407 #ifdef HAVE_PREAD
408         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
409 #else
410         else if (lseek(tdb->fd, off, SEEK_SET) != off
411                  || read(tdb->fd, buf, len) != (ssize_t)len) {
412 #endif
413                 /* Ensure ecode is set for log fn. */
414                 tdb->ecode = TDB_ERR_IO;
415                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
416                            off, len, strerror(errno)));
417                 return TDB_ERRCODE(TDB_ERR_IO, -1);
418         }
419         if (cv)
420                 convert(buf, len);
421         return 0;
422 }
423
424 /* read a lump of data, allocating the space for it */
425 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
426 {
427         char *buf;
428
429         if (!(buf = malloc(len))) {
430                 /* Ensure ecode is set for log fn. */
431                 tdb->ecode = TDB_ERR_OOM;
432                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
433                            len, strerror(errno)));
434                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
435         }
436         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
437                 SAFE_FREE(buf);
438                 return NULL;
439         }
440         return buf;
441 }
442
443 /* read/write a tdb_off */
444 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
445 {
446         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
447 }
448 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
449 {
450         tdb_off off = *d;
451         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
452 }
453
454 /* read/write a record */
455 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
456 {
457         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
458                 return -1;
459         if (TDB_BAD_MAGIC(rec)) {
460                 /* Ensure ecode is set for log fn. */
461                 tdb->ecode = TDB_ERR_CORRUPT;
462                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
463                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
464         }
465         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
466 }
467 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
468 {
469         struct list_struct r = *rec;
470         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
471 }
472
473 /* read a freelist record and check for simple errors */
474 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
475 {
476         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
477                 return -1;
478
479         if (rec->magic == TDB_MAGIC) {
480                 /* this happens when a app is showdown while deleting a record - we should
481                    not completely fail when this happens */
482                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
483                          rec->magic, off));
484                 rec->magic = TDB_FREE_MAGIC;
485                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
486                         return -1;
487         }
488
489         if (rec->magic != TDB_FREE_MAGIC) {
490                 /* Ensure ecode is set for log fn. */
491                 tdb->ecode = TDB_ERR_CORRUPT;
492                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
493                            rec->magic, off));
494                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
495         }
496         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
497                 return -1;
498         return 0;
499 }
500
501 /* update a record tailer (must hold allocation lock) */
502 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
503                          const struct list_struct *rec)
504 {
505         tdb_off totalsize;
506
507         /* Offset of tailer from record header */
508         totalsize = sizeof(*rec) + rec->rec_len;
509         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
510                          &totalsize);
511 }
512
513 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
514 {
515         struct list_struct rec;
516         tdb_off tailer_ofs, tailer;
517
518         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
519                 printf("ERROR: failed to read record at %u\n", offset);
520                 return 0;
521         }
522
523         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
524                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
525
526         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
527         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
528                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
529                 return rec.next;
530         }
531
532         if (tailer != rec.rec_len + sizeof(rec)) {
533                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
534                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
535         }
536         return rec.next;
537 }
538
539 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
540 {
541         tdb_off rec_ptr, top;
542
543         top = TDB_HASH_TOP(i);
544
545         if (tdb_lock(tdb, i, F_WRLCK) != 0)
546                 return -1;
547
548         if (ofs_read(tdb, top, &rec_ptr) == -1)
549                 return tdb_unlock(tdb, i, F_WRLCK);
550
551         if (rec_ptr)
552                 printf("hash=%d\n", i);
553
554         while (rec_ptr) {
555                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
556         }
557
558         return tdb_unlock(tdb, i, F_WRLCK);
559 }
560
561 void tdb_dump_all(TDB_CONTEXT *tdb)
562 {
563         int i;
564         for (i=0;i<tdb->header.hash_size;i++) {
565                 tdb_dump_chain(tdb, i);
566         }
567         printf("freelist:\n");
568         tdb_dump_chain(tdb, -1);
569 }
570
571 int tdb_printfreelist(TDB_CONTEXT *tdb)
572 {
573         int ret;
574         long total_free = 0;
575         tdb_off offset, rec_ptr;
576         struct list_struct rec;
577
578         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
579                 return ret;
580
581         offset = FREELIST_TOP;
582
583         /* read in the freelist top */
584         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
585                 tdb_unlock(tdb, -1, F_WRLCK);
586                 return 0;
587         }
588
589         printf("freelist top=[0x%08x]\n", rec_ptr );
590         while (rec_ptr) {
591                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
592                         tdb_unlock(tdb, -1, F_WRLCK);
593                         return -1;
594                 }
595
596                 if (rec.magic != TDB_FREE_MAGIC) {
597                         printf("bad magic 0x%08x in free list\n", rec.magic);
598                         tdb_unlock(tdb, -1, F_WRLCK);
599                         return -1;
600                 }
601
602                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
603                 total_free += rec.rec_len;
604
605                 /* move to the next record */
606                 rec_ptr = rec.next;
607         }
608         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
609                (int)total_free);
610
611         return tdb_unlock(tdb, -1, F_WRLCK);
612 }
613
614 /* Remove an element from the freelist.  Must have alloc lock. */
615 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
616 {
617         tdb_off last_ptr, i;
618
619         /* read in the freelist top */
620         last_ptr = FREELIST_TOP;
621         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
622                 if (i == off) {
623                         /* We've found it! */
624                         return ofs_write(tdb, last_ptr, &next);
625                 }
626                 /* Follow chain (next offset is at start of record) */
627                 last_ptr = i;
628         }
629         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
630         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
631 }
632
633 /* Add an element into the freelist. Merge adjacent records if
634    neccessary. */
635 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
636 {
637         tdb_off right, left;
638
639         /* Allocation and tailer lock */
640         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
641                 return -1;
642
643         /* set an initial tailer, so if we fail we don't leave a bogus record */
644         if (update_tailer(tdb, offset, rec) != 0) {
645                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
646                 goto fail;
647         }
648
649         /* Look right first (I'm an Australian, dammit) */
650         right = offset + sizeof(*rec) + rec->rec_len;
651         if (right + sizeof(*rec) <= tdb->map_size) {
652                 struct list_struct r;
653
654                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
655                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
656                         goto left;
657                 }
658
659                 /* If it's free, expand to include it. */
660                 if (r.magic == TDB_FREE_MAGIC) {
661                         if (remove_from_freelist(tdb, right, r.next) == -1) {
662                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
663                                 goto left;
664                         }
665                         rec->rec_len += sizeof(r) + r.rec_len;
666                 }
667         }
668
669 left:
670         /* Look left */
671         left = offset - sizeof(tdb_off);
672         if (left > TDB_DATA_START(tdb->header.hash_size)) {
673                 struct list_struct l;
674                 tdb_off leftsize;
675                 
676                 /* Read in tailer and jump back to header */
677                 if (ofs_read(tdb, left, &leftsize) == -1) {
678                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
679                         goto update;
680                 }
681                 left = offset - leftsize;
682
683                 /* Now read in record */
684                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
685                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
686                         goto update;
687                 }
688
689                 /* If it's free, expand to include it. */
690                 if (l.magic == TDB_FREE_MAGIC) {
691                         if (remove_from_freelist(tdb, left, l.next) == -1) {
692                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
693                                 goto update;
694                         } else {
695                                 offset = left;
696                                 rec->rec_len += leftsize;
697                         }
698                 }
699         }
700
701 update:
702         if (update_tailer(tdb, offset, rec) == -1) {
703                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
704                 goto fail;
705         }
706
707         /* Now, prepend to free list */
708         rec->magic = TDB_FREE_MAGIC;
709
710         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
711             rec_write(tdb, offset, rec) == -1 ||
712             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
713                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
714                 goto fail;
715         }
716
717         /* And we're done. */
718         tdb_unlock(tdb, -1, F_WRLCK);
719         return 0;
720
721  fail:
722         tdb_unlock(tdb, -1, F_WRLCK);
723         return -1;
724 }
725
726
727 /* expand a file.  we prefer to use ftruncate, as that is what posix
728   says to use for mmap expansion */
729 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
730 {
731         char buf[1024];
732 #if HAVE_FTRUNCATE_EXTEND
733         if (ftruncate(tdb->fd, size+addition) != 0) {
734                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
735                            size+addition, strerror(errno)));
736                 return -1;
737         }
738 #else
739         char b = 0;
740
741 #ifdef HAVE_PWRITE
742         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
743 #else
744         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
745             write(tdb->fd, &b, 1) != 1) {
746 #endif
747                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
748                            size+addition, strerror(errno)));
749                 return -1;
750         }
751 #endif
752
753         /* now fill the file with something. This ensures that the file isn't sparse, which would be
754            very bad if we ran out of disk. This must be done with write, not via mmap */
755         memset(buf, 0x42, sizeof(buf));
756         while (addition) {
757                 int n = addition>sizeof(buf)?sizeof(buf):addition;
758 #ifdef HAVE_PWRITE
759                 int ret = pwrite(tdb->fd, buf, n, size);
760 #else
761                 int ret;
762                 if (lseek(tdb->fd, size, SEEK_SET) != size)
763                         return -1;
764                 ret = write(tdb->fd, buf, n);
765 #endif
766                 if (ret != n) {
767                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
768                                    n, strerror(errno)));
769                         return -1;
770                 }
771                 addition -= n;
772                 size += n;
773         }
774         return 0;
775 }
776
777
778 /* expand the database at least size bytes by expanding the underlying
779    file and doing the mmap again if necessary */
780 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
781 {
782         struct list_struct rec;
783         tdb_off offset;
784
785         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
786                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
787                 return -1;
788         }
789
790         /* must know about any previous expansions by another process */
791         tdb_oob(tdb, tdb->map_size + 1, 1);
792
793         /* always make room for at least 10 more records, and round
794            the database up to a multiple of TDB_PAGE_SIZE */
795         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
796
797         if (!(tdb->flags & TDB_INTERNAL))
798                 tdb_munmap(tdb);
799
800         /*
801          * We must ensure the file is unmapped before doing this
802          * to ensure consistency with systems like OpenBSD where
803          * writes and mmaps are not consistent.
804          */
805
806         /* expand the file itself */
807         if (!(tdb->flags & TDB_INTERNAL)) {
808                 if (expand_file(tdb, tdb->map_size, size) != 0)
809                         goto fail;
810         }
811
812         tdb->map_size += size;
813
814         if (tdb->flags & TDB_INTERNAL)
815                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
816         else {
817                 /*
818                  * We must ensure the file is remapped before adding the space
819                  * to ensure consistency with systems like OpenBSD where
820                  * writes and mmaps are not consistent.
821                  */
822
823                 /* We're ok if the mmap fails as we'll fallback to read/write */
824                 tdb_mmap(tdb);
825         }
826
827         /* form a new freelist record */
828         memset(&rec,'\0',sizeof(rec));
829         rec.rec_len = size - sizeof(rec);
830
831         /* link it into the free list */
832         offset = tdb->map_size - size;
833         if (tdb_free(tdb, offset, &rec) == -1)
834                 goto fail;
835
836         tdb_unlock(tdb, -1, F_WRLCK);
837         return 0;
838  fail:
839         tdb_unlock(tdb, -1, F_WRLCK);
840         return -1;
841 }
842
843 /* allocate some space from the free list. The offset returned points
844    to a unconnected list_struct within the database with room for at
845    least length bytes of total data
846
847    0 is returned if the space could not be allocated
848  */
849 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
850                             struct list_struct *rec)
851 {
852         tdb_off rec_ptr, last_ptr, newrec_ptr;
853         struct list_struct newrec;
854
855         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
856                 return 0;
857
858         /* Extra bytes required for tailer */
859         length += sizeof(tdb_off);
860
861  again:
862         last_ptr = FREELIST_TOP;
863
864         /* read in the freelist top */
865         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
866                 goto fail;
867
868         /* keep looking until we find a freelist record big enough */
869         while (rec_ptr) {
870                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
871                         goto fail;
872
873                 if (rec->rec_len >= length) {
874                         /* found it - now possibly split it up  */
875                         if (rec->rec_len > length + MIN_REC_SIZE) {
876                                 /* Length of left piece */
877                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
878
879                                 /* Right piece to go on free list */
880                                 newrec.rec_len = rec->rec_len
881                                         - (sizeof(*rec) + length);
882                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
883
884                                 /* And left record is shortened */
885                                 rec->rec_len = length;
886                         } else
887                                 newrec_ptr = 0;
888
889                         /* Remove allocated record from the free list */
890                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
891                                 goto fail;
892
893                         /* Update header: do this before we drop alloc
894                            lock, otherwise tdb_free() might try to
895                            merge with us, thinking we're free.
896                            (Thanks Jeremy Allison). */
897                         rec->magic = TDB_MAGIC;
898                         if (rec_write(tdb, rec_ptr, rec) == -1)
899                                 goto fail;
900
901                         /* Did we create new block? */
902                         if (newrec_ptr) {
903                                 /* Update allocated record tailer (we
904                                    shortened it). */
905                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
906                                         goto fail;
907
908                                 /* Free new record */
909                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
910                                         goto fail;
911                         }
912
913                         /* all done - return the new record offset */
914                         tdb_unlock(tdb, -1, F_WRLCK);
915                         return rec_ptr;
916                 }
917                 /* move to the next record */
918                 last_ptr = rec_ptr;
919                 rec_ptr = rec->next;
920         }
921         /* we didn't find enough space. See if we can expand the
922            database and if we can then try again */
923         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
924                 goto again;
925  fail:
926         tdb_unlock(tdb, -1, F_WRLCK);
927         return 0;
928 }
929
930 /* initialise a new database with a specified hash size */
931 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
932 {
933         struct tdb_header *newdb;
934         int size, ret = -1;
935
936         /* We make it up in memory, then write it out if not internal */
937         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
938         if (!(newdb = calloc(size, 1)))
939                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
940
941         /* Fill in the header */
942         newdb->version = TDB_VERSION;
943         newdb->hash_size = hash_size;
944 #ifdef USE_SPINLOCKS
945         newdb->rwlocks = size;
946 #endif
947         if (tdb->flags & TDB_INTERNAL) {
948                 tdb->map_size = size;
949                 tdb->map_ptr = (char *)newdb;
950                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
951                 /* Convert the `ondisk' version if asked. */
952                 CONVERT(*newdb);
953                 return 0;
954         }
955         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
956                 goto fail;
957
958         if (ftruncate(tdb->fd, 0) == -1)
959                 goto fail;
960
961         /* This creates an endian-converted header, as if read from disk */
962         CONVERT(*newdb);
963         memcpy(&tdb->header, newdb, sizeof(tdb->header));
964         /* Don't endian-convert the magic food! */
965         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
966         if (write(tdb->fd, newdb, size) != size)
967                 ret = -1;
968         else
969                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
970
971   fail:
972         SAFE_FREE(newdb);
973         return ret;
974 }
975
976 /* Returns 0 on fail.  On success, return offset of record, and fills
977    in rec */
978 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
979                         struct list_struct *r)
980 {
981         tdb_off rec_ptr;
982         
983         /* read in the hash top */
984         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
985                 return 0;
986
987         /* keep looking until we find the right record */
988         while (rec_ptr) {
989                 if (rec_read(tdb, rec_ptr, r) == -1)
990                         return 0;
991
992                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
993                         char *k;
994                         /* a very likely hit - read the key */
995                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
996                                            r->key_len);
997                         if (!k)
998                                 return 0;
999
1000                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1001                                 SAFE_FREE(k);
1002                                 return rec_ptr;
1003                         }
1004                         SAFE_FREE(k);
1005                 }
1006                 rec_ptr = r->next;
1007         }
1008         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1009 }
1010
1011 /* If they do lockkeys, check that this hash is one they locked */
1012 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1013 {
1014         u32 i;
1015         if (!tdb->lockedkeys)
1016                 return 1;
1017         for (i = 0; i < tdb->lockedkeys[0]; i++)
1018                 if (tdb->lockedkeys[i+1] == hash)
1019                         return 1;
1020         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1021 }
1022
1023 /* As tdb_find, but if you succeed, keep the lock */
1024 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1025                              struct list_struct *rec)
1026 {
1027         u32 rec_ptr;
1028
1029         if (!tdb_keylocked(tdb, hash))
1030                 return 0;
1031         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1032                 return 0;
1033         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1034                 tdb_unlock(tdb, BUCKET(hash), locktype);
1035         return rec_ptr;
1036 }
1037
1038 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1039 {
1040         return tdb->ecode;
1041 }
1042
1043 static struct tdb_errname {
1044         enum TDB_ERROR ecode; const char *estring;
1045 } emap[] = { {TDB_SUCCESS, "Success"},
1046              {TDB_ERR_CORRUPT, "Corrupt database"},
1047              {TDB_ERR_IO, "IO Error"},
1048              {TDB_ERR_LOCK, "Locking error"},
1049              {TDB_ERR_OOM, "Out of memory"},
1050              {TDB_ERR_EXISTS, "Record exists"},
1051              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1052              {TDB_ERR_NOEXIST, "Record does not exist"} };
1053
1054 /* Error string for the last tdb error */
1055 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1056 {
1057         u32 i;
1058         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1059                 if (tdb->ecode == emap[i].ecode)
1060                         return emap[i].estring;
1061         return "Invalid error code";
1062 }
1063
1064 /* update an entry in place - this only works if the new data size
1065    is <= the old data size and the key exists.
1066    on failure return -1.
1067 */
1068
1069 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1070 {
1071         struct list_struct rec;
1072         tdb_off rec_ptr;
1073
1074         /* find entry */
1075         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1076                 return -1;
1077
1078         /* must be long enough key, data and tailer */
1079         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1080                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1081                 return -1;
1082         }
1083
1084         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1085                       dbuf.dptr, dbuf.dsize) == -1)
1086                 return -1;
1087
1088         if (dbuf.dsize != rec.data_len) {
1089                 /* update size */
1090                 rec.data_len = dbuf.dsize;
1091                 return rec_write(tdb, rec_ptr, &rec);
1092         }
1093  
1094         return 0;
1095 }
1096
1097 /* find an entry in the database given a key */
1098 /* If an entry doesn't exist tdb_err will be set to
1099  * TDB_ERR_NOEXIST. If a key has no data attached
1100  * tdb_err will not be set. Both will return a
1101  * zero pptr and zero dsize.
1102  */
1103
1104 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1105 {
1106         tdb_off rec_ptr;
1107         struct list_struct rec;
1108         TDB_DATA ret;
1109         u32 hash;
1110
1111         /* find which hash bucket it is in */
1112         hash = tdb_hash(&key);
1113         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1114                 return tdb_null;
1115
1116         if (rec.data_len)
1117                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1118                                           rec.data_len);
1119         else
1120                 ret.dptr = NULL;
1121         ret.dsize = rec.data_len;
1122         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1123         return ret;
1124 }
1125
1126 /* check if an entry in the database exists 
1127
1128    note that 1 is returned if the key is found and 0 is returned if not found
1129    this doesn't match the conventions in the rest of this module, but is
1130    compatible with gdbm
1131 */
1132 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1133 {
1134         struct list_struct rec;
1135         
1136         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1137                 return 0;
1138         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1139         return 1;
1140 }
1141
1142 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1143 {
1144         u32 hash = tdb_hash(&key);
1145         return tdb_exists_hash(tdb, key, hash);
1146 }
1147
1148 /* record lock stops delete underneath */
1149 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1150 {
1151         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1152 }
1153 /*
1154   Write locks override our own fcntl readlocks, so check it here.
1155   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1156   an error to fail to get the lock here.
1157 */
1158  
1159 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1160 {
1161         struct tdb_traverse_lock *i;
1162         for (i = &tdb->travlocks; i; i = i->next)
1163                 if (i->off == off)
1164                         return -1;
1165         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1166 }
1167
1168 /*
1169   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1170   an error to fail to get the lock here.
1171 */
1172
1173 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1174 {
1175         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1176 }
1177 /* fcntl locks don't stack: avoid unlocking someone else's */
1178 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1179 {
1180         struct tdb_traverse_lock *i;
1181         u32 count = 0;
1182
1183         if (off == 0)
1184                 return 0;
1185         for (i = &tdb->travlocks; i; i = i->next)
1186                 if (i->off == off)
1187                         count++;
1188         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1189 }
1190
1191 /* actually delete an entry in the database given the offset */
1192 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1193 {
1194         tdb_off last_ptr, i;
1195         struct list_struct lastrec;
1196
1197         if (tdb->read_only) return -1;
1198
1199         if (write_lock_record(tdb, rec_ptr) == -1) {
1200                 /* Someone traversing here: mark it as dead */
1201                 rec->magic = TDB_DEAD_MAGIC;
1202                 return rec_write(tdb, rec_ptr, rec);
1203         }
1204         if (write_unlock_record(tdb, rec_ptr) != 0)
1205                 return -1;
1206
1207         /* find previous record in hash chain */
1208         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1209                 return -1;
1210         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1211                 if (rec_read(tdb, i, &lastrec) == -1)
1212                         return -1;
1213
1214         /* unlink it: next ptr is at start of record. */
1215         if (last_ptr == 0)
1216                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1217         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1218                 return -1;
1219
1220         /* recover the space */
1221         if (tdb_free(tdb, rec_ptr, rec) == -1)
1222                 return -1;
1223         return 0;
1224 }
1225
1226 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1227 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1228                          struct list_struct *rec)
1229 {
1230         int want_next = (tlock->off != 0);
1231
1232         /* No traversal allows if you've called tdb_lockkeys() */
1233         if (tdb->lockedkeys)
1234                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1235
1236         /* Lock each chain from the start one. */
1237         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1238                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1239                         return -1;
1240
1241                 /* No previous record?  Start at top of chain. */
1242                 if (!tlock->off) {
1243                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1244                                      &tlock->off) == -1)
1245                                 goto fail;
1246                 } else {
1247                         /* Otherwise unlock the previous record. */
1248                         if (unlock_record(tdb, tlock->off) != 0)
1249                                 goto fail;
1250                 }
1251
1252                 if (want_next) {
1253                         /* We have offset of old record: grab next */
1254                         if (rec_read(tdb, tlock->off, rec) == -1)
1255                                 goto fail;
1256                         tlock->off = rec->next;
1257                 }
1258
1259                 /* Iterate through chain */
1260                 while( tlock->off) {
1261                         tdb_off current;
1262                         if (rec_read(tdb, tlock->off, rec) == -1)
1263                                 goto fail;
1264                         if (!TDB_DEAD(rec)) {
1265                                 /* Woohoo: we found one! */
1266                                 if (lock_record(tdb, tlock->off) != 0)
1267                                         goto fail;
1268                                 return tlock->off;
1269                         }
1270                         /* Try to clean dead ones from old traverses */
1271                         current = tlock->off;
1272                         tlock->off = rec->next;
1273                         if (!tdb->read_only && 
1274                             do_delete(tdb, current, rec) != 0)
1275                                 goto fail;
1276                 }
1277                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1278                 want_next = 0;
1279         }
1280         /* We finished iteration without finding anything */
1281         return TDB_ERRCODE(TDB_SUCCESS, 0);
1282
1283  fail:
1284         tlock->off = 0;
1285         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1286                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1287         return -1;
1288 }
1289
1290 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1291    return -1 on error or the record count traversed
1292    if fn is NULL then it is not called
1293    a non-zero return value from fn() indicates that the traversal should stop
1294   */
1295 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn)
1296 {
1297         TDB_DATA key, dbuf;
1298         struct list_struct rec;
1299         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1300         int ret, count = 0;
1301
1302         /* This was in the initializaton, above, but the IRIX compiler
1303          * did not like it.  crh
1304          */
1305         tl.next = tdb->travlocks.next;
1306
1307         /* fcntl locks don't stack: beware traverse inside traverse */
1308         tdb->travlocks.next = &tl;
1309
1310         /* tdb_next_lock places locks on the record returned, and its chain */
1311         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1312                 count++;
1313                 /* now read the full record */
1314                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1315                                           rec.key_len + rec.data_len);
1316                 if (!key.dptr) {
1317                         ret = -1;
1318                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1319                                 goto out;
1320                         if (unlock_record(tdb, tl.off) != 0)
1321                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1322                         goto out;
1323                 }
1324                 key.dsize = rec.key_len;
1325                 dbuf.dptr = key.dptr + rec.key_len;
1326                 dbuf.dsize = rec.data_len;
1327
1328                 /* Drop chain lock, call out */
1329                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1330                         ret = -1;
1331                         goto out;
1332                 }
1333                 if (fn && fn(tdb, key, dbuf)) {
1334                         /* They want us to terminate traversal */
1335                         ret = count;
1336                         if (unlock_record(tdb, tl.off) != 0) {
1337                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1338                                 ret = -1;
1339                         }
1340                         tdb->travlocks.next = tl.next;
1341                         SAFE_FREE(key.dptr);
1342                         return count;
1343                 }
1344                 SAFE_FREE(key.dptr);
1345         }
1346 out:
1347         tdb->travlocks.next = tl.next;
1348         if (ret < 0)
1349                 return -1;
1350         else
1351                 return count;
1352 }
1353
1354 /* find the first entry in the database and return its key */
1355 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1356 {
1357         TDB_DATA key;
1358         struct list_struct rec;
1359
1360         /* release any old lock */
1361         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1362                 return tdb_null;
1363         tdb->travlocks.off = tdb->travlocks.hash = 0;
1364
1365         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1366                 return tdb_null;
1367         /* now read the key */
1368         key.dsize = rec.key_len;
1369         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1370         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1371                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1372         return key;
1373 }
1374
1375 /* find the next entry in the database, returning its key */
1376 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1377 {
1378         u32 oldhash;
1379         TDB_DATA key = tdb_null;
1380         struct list_struct rec;
1381         char *k = NULL;
1382
1383         /* Is locked key the old key?  If so, traverse will be reliable. */
1384         if (tdb->travlocks.off) {
1385                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1386                         return tdb_null;
1387                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1388                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1389                                             rec.key_len))
1390                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1391                         /* No, it wasn't: unlock it and start from scratch */
1392                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1393                                 return tdb_null;
1394                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1395                                 return tdb_null;
1396                         tdb->travlocks.off = 0;
1397                 }
1398
1399                 SAFE_FREE(k);
1400         }
1401
1402         if (!tdb->travlocks.off) {
1403                 /* No previous element: do normal find, and lock record */
1404                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb_hash(&oldkey), F_WRLCK, &rec);
1405                 if (!tdb->travlocks.off)
1406                         return tdb_null;
1407                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1408                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1409                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1410                         return tdb_null;
1411                 }
1412         }
1413         oldhash = tdb->travlocks.hash;
1414
1415         /* Grab next record: locks chain and returned record,
1416            unlocks old record */
1417         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1418                 key.dsize = rec.key_len;
1419                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1420                                           key.dsize);
1421                 /* Unlock the chain of this new record */
1422                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1423                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1424         }
1425         /* Unlock the chain of old record */
1426         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1427                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1428         return key;
1429 }
1430
1431 /* delete an entry in the database given a key */
1432 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1433 {
1434         tdb_off rec_ptr;
1435         struct list_struct rec;
1436         int ret;
1437
1438         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1439                 return -1;
1440         ret = do_delete(tdb, rec_ptr, &rec);
1441         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1442                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1443         return ret;
1444 }
1445
1446 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1447 {
1448         u32 hash = tdb_hash(&key);
1449         return tdb_delete_hash(tdb, key, hash);
1450 }
1451
1452 /* store an element in the database, replacing any existing element
1453    with the same key 
1454
1455    return 0 on success, -1 on failure
1456 */
1457 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1458 {
1459         struct list_struct rec;
1460         u32 hash;
1461         tdb_off rec_ptr;
1462         char *p = NULL;
1463         int ret = 0;
1464
1465         /* find which hash bucket it is in */
1466         hash = tdb_hash(&key);
1467         if (!tdb_keylocked(tdb, hash))
1468                 return -1;
1469         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1470                 return -1;
1471
1472         /* check for it existing, on insert. */
1473         if (flag == TDB_INSERT) {
1474                 if (tdb_exists_hash(tdb, key, hash)) {
1475                         tdb->ecode = TDB_ERR_EXISTS;
1476                         goto fail;
1477                 }
1478         } else {
1479                 /* first try in-place update, on modify or replace. */
1480                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1481                         goto out;
1482         }
1483         /* reset the error code potentially set by the tdb_update() */
1484         tdb->ecode = TDB_SUCCESS;
1485
1486         /* delete any existing record - if it doesn't exist we don't
1487            care.  Doing this first reduces fragmentation, and avoids
1488            coalescing with `allocated' block before it's updated. */
1489         if (flag != TDB_INSERT)
1490                 tdb_delete_hash(tdb, key, hash);
1491
1492         /* Copy key+value *before* allocating free space in case malloc
1493            fails and we are left with a dead spot in the tdb. */
1494
1495         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1496                 tdb->ecode = TDB_ERR_OOM;
1497                 goto fail;
1498         }
1499
1500         memcpy(p, key.dptr, key.dsize);
1501         if (dbuf.dsize)
1502                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1503
1504         /* we have to allocate some space */
1505         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1506                 goto fail;
1507
1508         /* Read hash top into next ptr */
1509         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1510                 goto fail;
1511
1512         rec.key_len = key.dsize;
1513         rec.data_len = dbuf.dsize;
1514         rec.full_hash = hash;
1515         rec.magic = TDB_MAGIC;
1516
1517         /* write out and point the top of the hash chain at it */
1518         if (rec_write(tdb, rec_ptr, &rec) == -1
1519             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1520             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1521                 /* Need to tdb_unallocate() here */
1522                 goto fail;
1523         }
1524  out:
1525         SAFE_FREE(p); 
1526         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1527         return ret;
1528 fail:
1529         ret = -1;
1530         goto out;
1531 }
1532
1533 /* Attempt to append data to an entry in place - this only works if the new data size
1534    is <= the old data size and the key exists.
1535    on failure return -1. Record must be locked before calling.
1536 */
1537 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1538 {
1539         struct list_struct rec;
1540         tdb_off rec_ptr;
1541
1542         /* find entry */
1543         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1544                 return -1;
1545
1546         /* Append of 0 is always ok. */
1547         if (new_dbuf.dsize == 0)
1548                 return 0;
1549
1550         /* must be long enough for key, old data + new data and tailer */
1551         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1552                 /* No room. */
1553                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1554                 return -1;
1555         }
1556
1557         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1558                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1559                 return -1;
1560
1561         /* update size */
1562         rec.data_len += new_dbuf.dsize;
1563         return rec_write(tdb, rec_ptr, &rec);
1564 }
1565
1566 /* Append to an entry. Create if not exist. */
1567
1568 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1569 {
1570         struct list_struct rec;
1571         u32 hash;
1572         tdb_off rec_ptr;
1573         char *p = NULL;
1574         int ret = 0;
1575         size_t new_data_size = 0;
1576
1577         /* find which hash bucket it is in */
1578         hash = tdb_hash(&key);
1579         if (!tdb_keylocked(tdb, hash))
1580                 return -1;
1581         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1582                 return -1;
1583
1584         /* first try in-place. */
1585         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1586                 goto out;
1587
1588         /* reset the error code potentially set by the tdb_append_inplace() */
1589         tdb->ecode = TDB_SUCCESS;
1590
1591         /* find entry */
1592         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1593                 if (tdb->ecode != TDB_ERR_NOEXIST)
1594                         goto fail;
1595
1596                 /* Not found - create. */
1597
1598                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1599                 goto out;
1600         }
1601
1602         new_data_size = rec.data_len + new_dbuf.dsize;
1603
1604         /* Copy key+old_value+value *before* allocating free space in case malloc
1605            fails and we are left with a dead spot in the tdb. */
1606
1607         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1608                 tdb->ecode = TDB_ERR_OOM;
1609                 goto fail;
1610         }
1611
1612         /* Copy the key in place. */
1613         memcpy(p, key.dptr, key.dsize);
1614
1615         /* Now read the old data into place. */
1616         if (rec.data_len &&
1617                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1618                         goto fail;
1619
1620         /* Finally append the new data. */
1621         if (new_dbuf.dsize)
1622                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1623
1624         /* delete any existing record - if it doesn't exist we don't
1625            care.  Doing this first reduces fragmentation, and avoids
1626            coalescing with `allocated' block before it's updated. */
1627
1628         tdb_delete_hash(tdb, key, hash);
1629
1630         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1631                 goto fail;
1632
1633         /* Read hash top into next ptr */
1634         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1635                 goto fail;
1636
1637         rec.key_len = key.dsize;
1638         rec.data_len = new_data_size;
1639         rec.full_hash = hash;
1640         rec.magic = TDB_MAGIC;
1641
1642         /* write out and point the top of the hash chain at it */
1643         if (rec_write(tdb, rec_ptr, &rec) == -1
1644             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1645             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1646                 /* Need to tdb_unallocate() here */
1647                 goto fail;
1648         }
1649
1650  out:
1651         SAFE_FREE(p); 
1652         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1653         return ret;
1654
1655 fail:
1656         ret = -1;
1657         goto out;
1658 }
1659
1660 static int tdb_already_open(dev_t device,
1661                             ino_t ino)
1662 {
1663         TDB_CONTEXT *i;
1664         
1665         for (i = tdbs; i; i = i->next) {
1666                 if (i->device == device && i->inode == ino) {
1667                         return 1;
1668                 }
1669         }
1670
1671         return 0;
1672 }
1673
1674 /* open the database, creating it if necessary 
1675
1676    The open_flags and mode are passed straight to the open call on the
1677    database file. A flags value of O_WRONLY is invalid. The hash size
1678    is advisory, use zero for a default value.
1679
1680    Return is NULL on error, in which case errno is also set.  Don't 
1681    try to call tdb_error or tdb_errname, just do strerror(errno).
1682
1683    @param name may be NULL for internal databases. */
1684 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1685                       int open_flags, mode_t mode)
1686 {
1687         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1688 }
1689
1690
1691 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1692                          int open_flags, mode_t mode,
1693                          tdb_log_func log_fn)
1694 {
1695         TDB_CONTEXT *tdb;
1696         struct stat st;
1697         int rev = 0, locked;
1698         unsigned char *vp;
1699         u32 vertest;
1700
1701         if (!(tdb = calloc(1, sizeof *tdb))) {
1702                 /* Can't log this */
1703                 errno = ENOMEM;
1704                 goto fail;
1705         }
1706         tdb->fd = -1;
1707         tdb->name = NULL;
1708         tdb->map_ptr = NULL;
1709         tdb->lockedkeys = NULL;
1710         tdb->flags = tdb_flags;
1711         tdb->open_flags = open_flags;
1712         tdb->log_fn = log_fn;
1713         
1714         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1715                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1716                          name));
1717                 errno = EINVAL;
1718                 goto fail;
1719         }
1720         
1721         if (hash_size == 0)
1722                 hash_size = DEFAULT_HASH_SIZE;
1723         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1724                 tdb->read_only = 1;
1725                 /* read only databases don't do locking or clear if first */
1726                 tdb->flags |= TDB_NOLOCK;
1727                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1728         }
1729
1730         /* internal databases don't mmap or lock, and start off cleared */
1731         if (tdb->flags & TDB_INTERNAL) {
1732                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1733                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1734                 if (tdb_new_database(tdb, hash_size) != 0) {
1735                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1736                         goto fail;
1737                 }
1738                 goto internal;
1739         }
1740
1741         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1742                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1743                          name, strerror(errno)));
1744                 goto fail;      /* errno set by open(2) */
1745         }
1746
1747         /* ensure there is only one process initialising at once */
1748         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1749                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1750                          name, strerror(errno)));
1751                 goto fail;      /* errno set by tdb_brlock */
1752         }
1753
1754         /* we need to zero database if we are the only one with it open */
1755         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1756             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1757                 open_flags |= O_CREAT;
1758                 if (ftruncate(tdb->fd, 0) == -1) {
1759                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1760                                  "failed to truncate %s: %s\n",
1761                                  name, strerror(errno)));
1762                         goto fail; /* errno set by ftruncate */
1763                 }
1764         }
1765
1766         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1767             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1768             || (tdb->header.version != TDB_VERSION
1769                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1770                 /* its not a valid database - possibly initialise it */
1771                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1772                         errno = EIO; /* ie bad format or something */
1773                         goto fail;
1774                 }
1775                 rev = (tdb->flags & TDB_CONVERT);
1776         }
1777         vp = (unsigned char *)&tdb->header.version;
1778         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1779                   (((u32)vp[2]) << 8) | (u32)vp[3];
1780         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1781         if (!rev)
1782                 tdb->flags &= ~TDB_CONVERT;
1783         else {
1784                 tdb->flags |= TDB_CONVERT;
1785                 convert(&tdb->header, sizeof(tdb->header));
1786         }
1787         if (fstat(tdb->fd, &st) == -1)
1788                 goto fail;
1789
1790         /* Is it already in the open list?  If so, fail. */
1791         if (tdb_already_open(st.st_dev, st.st_ino)) {
1792                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1793                          "%s (%d,%d) is already open in this process\n",
1794                          name, st.st_dev, st.st_ino));
1795                 errno = EBUSY;
1796                 goto fail;
1797         }
1798
1799         if (!(tdb->name = (char *)strdup(name))) {
1800                 errno = ENOMEM;
1801                 goto fail;
1802         }
1803
1804         tdb->map_size = st.st_size;
1805         tdb->device = st.st_dev;
1806         tdb->inode = st.st_ino;
1807         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1808         if (!tdb->locked) {
1809                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1810                          "failed to allocate lock structure for %s\n",
1811                          name));
1812                 errno = ENOMEM;
1813                 goto fail;
1814         }
1815         tdb_mmap(tdb);
1816         if (locked) {
1817                 if (!tdb->read_only)
1818                         if (tdb_clear_spinlocks(tdb) != 0) {
1819                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1820                                 "failed to clear spinlock\n"));
1821                                 goto fail;
1822                         }
1823                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1824                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1825                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1826                                  name, strerror(errno)));
1827                         goto fail;
1828                 }
1829         }
1830         /* leave this lock in place to indicate it's in use */
1831         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1832                 goto fail;
1833
1834  internal:
1835         /* Internal (memory-only) databases skip all the code above to
1836          * do with disk files, and resume here by releasing their
1837          * global lock and hooking into the active list. */
1838         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1839                 goto fail;
1840         tdb->next = tdbs;
1841         tdbs = tdb;
1842         return tdb;
1843
1844  fail:
1845         { int save_errno = errno;
1846
1847         if (!tdb)
1848                 return NULL;
1849         
1850         if (tdb->map_ptr) {
1851                 if (tdb->flags & TDB_INTERNAL)
1852                         SAFE_FREE(tdb->map_ptr);
1853                 else
1854                         tdb_munmap(tdb);
1855         }
1856         SAFE_FREE(tdb->name);
1857         if (tdb->fd != -1)
1858                 if (close(tdb->fd) != 0)
1859                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1860         SAFE_FREE(tdb->locked);
1861         SAFE_FREE(tdb);
1862         errno = save_errno;
1863         return NULL;
1864         }
1865 }
1866
1867 /**
1868  * Close a database.
1869  *
1870  * @returns -1 for error; 0 for success.
1871  **/
1872 int tdb_close(TDB_CONTEXT *tdb)
1873 {
1874         TDB_CONTEXT **i;
1875         int ret = 0;
1876
1877         if (tdb->map_ptr) {
1878                 if (tdb->flags & TDB_INTERNAL)
1879                         SAFE_FREE(tdb->map_ptr);
1880                 else
1881                         tdb_munmap(tdb);
1882         }
1883         SAFE_FREE(tdb->name);
1884         if (tdb->fd != -1)
1885                 ret = close(tdb->fd);
1886         SAFE_FREE(tdb->locked);
1887         SAFE_FREE(tdb->lockedkeys);
1888
1889         /* Remove from contexts list */
1890         for (i = &tdbs; *i; i = &(*i)->next) {
1891                 if (*i == tdb) {
1892                         *i = tdb->next;
1893                         break;
1894                 }
1895         }
1896
1897         memset(tdb, 0, sizeof(*tdb));
1898         SAFE_FREE(tdb);
1899
1900         return ret;
1901 }
1902
1903 /* lock/unlock entire database */
1904 int tdb_lockall(TDB_CONTEXT *tdb)
1905 {
1906         u32 i;
1907
1908         /* There are no locks on read-only dbs */
1909         if (tdb->read_only)
1910                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1911         if (tdb->lockedkeys)
1912                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1913         for (i = 0; i < tdb->header.hash_size; i++) 
1914                 if (tdb_lock(tdb, i, F_WRLCK))
1915                         break;
1916
1917         /* If error, release locks we have... */
1918         if (i < tdb->header.hash_size) {
1919                 u32 j;
1920
1921                 for ( j = 0; j < i; j++)
1922                         tdb_unlock(tdb, j, F_WRLCK);
1923                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1924         }
1925
1926         return 0;
1927 }
1928 void tdb_unlockall(TDB_CONTEXT *tdb)
1929 {
1930         u32 i;
1931         for (i=0; i < tdb->header.hash_size; i++)
1932                 tdb_unlock(tdb, i, F_WRLCK);
1933 }
1934
1935 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1936 {
1937         u32 i, j, hash;
1938
1939         /* Can't lock more keys if already locked */
1940         if (tdb->lockedkeys)
1941                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1942         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1943                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1944         /* First number in array is # keys */
1945         tdb->lockedkeys[0] = number;
1946
1947         /* Insertion sort by bucket */
1948         for (i = 0; i < number; i++) {
1949                 hash = tdb_hash(&keys[i]);
1950                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1951                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1952                 tdb->lockedkeys[j+1] = hash;
1953         }
1954         /* Finally, lock in order */
1955         for (i = 0; i < number; i++)
1956                 if (tdb_lock(tdb, i, F_WRLCK))
1957                         break;
1958
1959         /* If error, release locks we have... */
1960         if (i < number) {
1961                 for ( j = 0; j < i; j++)
1962                         tdb_unlock(tdb, j, F_WRLCK);
1963                 SAFE_FREE(tdb->lockedkeys);
1964                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1965         }
1966         return 0;
1967 }
1968
1969 /* Unlock the keys previously locked by tdb_lockkeys() */
1970 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1971 {
1972         u32 i;
1973         if (!tdb->lockedkeys)
1974                 return;
1975         for (i = 0; i < tdb->lockedkeys[0]; i++)
1976                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1977         SAFE_FREE(tdb->lockedkeys);
1978 }
1979
1980 /* lock/unlock one hash chain. This is meant to be used to reduce
1981    contention - it cannot guarantee how many records will be locked */
1982 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1983 {
1984         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1985 }
1986
1987 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1988 {
1989         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1990 }
1991
1992 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1993 {
1994         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1995 }
1996
1997 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1998 {
1999         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
2000 }
2001
2002
2003 /* register a loging function */
2004 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2005 {
2006         tdb->log_fn = fn;
2007 }
2008
2009
2010 /* reopen a tdb - this is used after a fork to ensure that we have an independent
2011    seek pointer from our parent and to re-establish locks */
2012 int tdb_reopen(TDB_CONTEXT *tdb)
2013 {
2014         struct stat st;
2015
2016         if (tdb_munmap(tdb) != 0) {
2017                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2018                 goto fail;
2019         }
2020         if (close(tdb->fd) != 0)
2021                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2022         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2023         if (tdb->fd == -1) {
2024                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2025                 goto fail;
2026         }
2027         if (fstat(tdb->fd, &st) != 0) {
2028                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2029                 goto fail;
2030         }
2031         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2032                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2033                 goto fail;
2034         }
2035         tdb_mmap(tdb);
2036         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2037                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2038                 goto fail;
2039         }
2040
2041         return 0;
2042
2043 fail:
2044         tdb_close(tdb);
2045         return -1;
2046 }
2047
2048 /* reopen all tdb's */
2049 int tdb_reopen_all(void)
2050 {
2051         TDB_CONTEXT *tdb;
2052
2053         for (tdb=tdbs; tdb; tdb = tdb->next) {
2054                 if (tdb_reopen(tdb) != 0) return -1;
2055         }
2056
2057         return 0;
2058 }