added a TDB_MODIFY flag to tdb_store() that says "if the record
[samba.git] / source4 / lib / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Paul `Rusty' Russell              2000
6    Copyright (C) Jeremy Allison                    2000-2003
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 2 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program; if not, write to the Free Software
20    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23
24 /* NOTE: If you use tdbs under valgrind, and in particular if you run
25  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
26  * think this is because valgrind doesn't understand that the mmap'd
27  * area may be written to by other processes.  Memory can, from the
28  * point of view of the grinded process, spontaneously become
29  * initialized.
30  *
31  * I can think of a few solutions.  [mbp 20030311]
32  *
33  * 1 - Write suppressions for Valgrind so that it doesn't complain
34  * about this.  Probably the most reasonable but people need to
35  * remember to use them.
36  *
37  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
38  *
39  * 3 - Use the special valgrind macros to mark memory as valid at the
40  * right time.  Probably too hard -- the process just doesn't know.
41  */ 
42
43 #ifdef STANDALONE
44 #if HAVE_CONFIG_H
45 #include <config.h>
46 #endif
47
48 #include <stdlib.h>
49 #include <stdio.h>
50 #include <fcntl.h>
51 #include <unistd.h>
52 #include <string.h>
53 #include <fcntl.h>
54 #include <errno.h>
55 #include <sys/mman.h>
56 #include <sys/stat.h>
57 #include <signal.h>
58 #include "tdb.h"
59 #include "spinlock.h"
60 #else
61 #include "includes.h"
62 #endif
63
64 #define TDB_MAGIC_FOOD "TDB file\n"
65 #define TDB_VERSION (0x26011967 + 6)
66 #define TDB_MAGIC (0x26011999U)
67 #define TDB_FREE_MAGIC (~TDB_MAGIC)
68 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
69 #define TDB_ALIGNMENT 4
70 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
71 #define DEFAULT_HASH_SIZE 131
72 #define TDB_PAGE_SIZE 0x2000
73 #define FREELIST_TOP (sizeof(struct tdb_header))
74 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
75 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
76 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
77 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
78 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
79 #define TDB_DATA_START(hash_size) (TDB_HASH_TOP(hash_size-1) + TDB_SPINLOCK_SIZE(hash_size))
80
81
82 /* NB assumes there is a local variable called "tdb" that is the
83  * current context, also takes doubly-parenthesized print-style
84  * argument. */
85 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
86
87 /* lock offsets */
88 #define GLOBAL_LOCK 0
89 #define ACTIVE_LOCK 4
90
91 #ifndef MAP_FILE
92 #define MAP_FILE 0
93 #endif
94
95 #ifndef MAP_FAILED
96 #define MAP_FAILED ((void *)-1)
97 #endif
98
99 /* free memory if the pointer is valid and zero the pointer */
100 #ifndef SAFE_FREE
101 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
102 #endif
103
104 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
105 TDB_DATA tdb_null;
106
107 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
108 static TDB_CONTEXT *tdbs = NULL;
109
110 static int tdb_munmap(TDB_CONTEXT *tdb)
111 {
112         if (tdb->flags & TDB_INTERNAL)
113                 return 0;
114
115 #ifdef HAVE_MMAP
116         if (tdb->map_ptr) {
117                 int ret = munmap(tdb->map_ptr, tdb->map_size);
118                 if (ret != 0)
119                         return ret;
120         }
121 #endif
122         tdb->map_ptr = NULL;
123         return 0;
124 }
125
126 static void tdb_mmap(TDB_CONTEXT *tdb)
127 {
128         if (tdb->flags & TDB_INTERNAL)
129                 return;
130
131 #ifdef HAVE_MMAP
132         if (!(tdb->flags & TDB_NOMMAP)) {
133                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
134                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
135                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
136
137                 /*
138                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
139                  */
140
141                 if (tdb->map_ptr == MAP_FAILED) {
142                         tdb->map_ptr = NULL;
143                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
144                                  tdb->map_size, strerror(errno)));
145                 }
146         } else {
147                 tdb->map_ptr = NULL;
148         }
149 #else
150         tdb->map_ptr = NULL;
151 #endif
152 }
153
154 /* Endian conversion: we only ever deal with 4 byte quantities */
155 static void *convert(void *buf, u32 size)
156 {
157         u32 i, *p = buf;
158         for (i = 0; i < size / 4; i++)
159                 p[i] = TDB_BYTEREV(p[i]);
160         return buf;
161 }
162 #define DOCONV() (tdb->flags & TDB_CONVERT)
163 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
164
165 /* the body of the database is made of one list_struct for the free space
166    plus a separate data list for each hash value */
167 struct list_struct {
168         tdb_off next; /* offset of the next record in the list */
169         tdb_len rec_len; /* total byte length of record */
170         tdb_len key_len; /* byte length of key */
171         tdb_len data_len; /* byte length of data */
172         u32 full_hash; /* the full 32 bit hash of the key */
173         u32 magic;   /* try to catch errors */
174         /* the following union is implied:
175                 union {
176                         char record[rec_len];
177                         struct {
178                                 char key[key_len];
179                                 char data[data_len];
180                         }
181                         u32 totalsize; (tailer)
182                 }
183         */
184 };
185
186 /***************************************************************
187  Allow a caller to set a "alarm" flag that tdb can check to abort
188  a blocking lock on SIGALRM.
189 ***************************************************************/
190
191 static SIG_ATOMIC_T *palarm_fired;
192
193 void tdb_set_lock_alarm(SIG_ATOMIC_T *palarm)
194 {
195         palarm_fired = palarm;
196 }
197
198 /* a byte range locking function - return 0 on success
199    this functions locks/unlocks 1 byte at the specified offset.
200
201    On error, errno is also set so that errors are passed back properly
202    through tdb_open(). */
203 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
204                       int rw_type, int lck_type, int probe)
205 {
206         struct flock fl;
207         int ret;
208
209         if (tdb->flags & TDB_NOLOCK)
210                 return 0;
211         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
212                 errno = EACCES;
213                 return -1;
214         }
215
216         fl.l_type = rw_type;
217         fl.l_whence = SEEK_SET;
218         fl.l_start = offset;
219         fl.l_len = 1;
220         fl.l_pid = 0;
221
222         do {
223                 ret = fcntl(tdb->fd,lck_type,&fl);
224                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
225                         break;
226         } while (ret == -1 && errno == EINTR);
227
228         if (ret == -1) {
229                 if (!probe && lck_type != F_SETLK) {
230                         /* Ensure error code is set for log fun to examine. */
231                         if (errno == EINTR && palarm_fired && *palarm_fired)
232                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
233                         else
234                                 tdb->ecode = TDB_ERR_LOCK;
235                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
236                                  tdb->fd, offset, rw_type, lck_type));
237                 }
238                 /* Was it an alarm timeout ? */
239                 if (errno == EINTR && palarm_fired && *palarm_fired) {
240                         TDB_LOG((tdb, 5, "tdb_brlock timed out (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
241                                  tdb->fd, offset, rw_type, lck_type));
242                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
243                 }
244                 /* Otherwise - generic lock error. */
245                 /* errno set by fcntl */
246                 TDB_LOG((tdb, 5, "tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d: %s\n", 
247                          tdb->fd, offset, rw_type, lck_type, strerror(errno)));
248                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
249         }
250         return 0;
251 }
252
253 /* lock a list in the database. list -1 is the alloc list */
254 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
255 {
256         if (list < -1 || list >= (int)tdb->header.hash_size) {
257                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
258                            list, ltype));
259                 return -1;
260         }
261         if (tdb->flags & TDB_NOLOCK)
262                 return 0;
263
264         /* Since fcntl locks don't nest, we do a lock for the first one,
265            and simply bump the count for future ones */
266         if (tdb->locked[list+1].count == 0) {
267                 if (!tdb->read_only && tdb->header.rwlocks) {
268                         if (tdb_spinlock(tdb, list, ltype)) {
269                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
270                                            list, ltype));
271                                 return -1;
272                         }
273                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
274                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
275                                            list, ltype, strerror(errno)));
276                         return -1;
277                 }
278                 tdb->locked[list+1].ltype = ltype;
279         }
280         tdb->locked[list+1].count++;
281         return 0;
282 }
283
284 /* unlock the database: returns void because it's too late for errors. */
285         /* changed to return int it may be interesting to know there
286            has been an error  --simo */
287 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
288 {
289         int ret = -1;
290
291         if (tdb->flags & TDB_NOLOCK)
292                 return 0;
293
294         /* Sanity checks */
295         if (list < -1 || list >= (int)tdb->header.hash_size) {
296                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
297                 return ret;
298         }
299
300         if (tdb->locked[list+1].count==0) {
301                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
302                 return ret;
303         }
304
305         if (tdb->locked[list+1].count == 1) {
306                 /* Down to last nested lock: unlock underneath */
307                 if (!tdb->read_only && tdb->header.rwlocks) {
308                         ret = tdb_spinunlock(tdb, list, ltype);
309                 } else {
310                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
311                 }
312         } else {
313                 ret = 0;
314         }
315         tdb->locked[list+1].count--;
316
317         if (ret)
318                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
319         return ret;
320 }
321
322 /* This is based on the hash algorithm from gdbm */
323 static u32 tdb_hash(TDB_DATA *key)
324 {
325         u32 value;      /* Used to compute the hash value.  */
326         u32   i;        /* Used to cycle through random values. */
327
328         /* Set the initial value from the key size. */
329         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
330                 value = (value + (key->dptr[i] << (i*5 % 24)));
331
332         return (1103515243 * value + 12345);  
333 }
334
335 /* check for an out of bounds access - if it is out of bounds then
336    see if the database has been expanded by someone else and expand
337    if necessary 
338    note that "len" is the minimum length needed for the db
339 */
340 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
341 {
342         struct stat st;
343         if (len <= tdb->map_size)
344                 return 0;
345         if (tdb->flags & TDB_INTERNAL) {
346                 if (!probe) {
347                         /* Ensure ecode is set for log fn. */
348                         tdb->ecode = TDB_ERR_IO;
349                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
350                                  (int)len, (int)tdb->map_size));
351                 }
352                 return TDB_ERRCODE(TDB_ERR_IO, -1);
353         }
354
355         if (fstat(tdb->fd, &st) == -1)
356                 return TDB_ERRCODE(TDB_ERR_IO, -1);
357
358         if (st.st_size < (size_t)len) {
359                 if (!probe) {
360                         /* Ensure ecode is set for log fn. */
361                         tdb->ecode = TDB_ERR_IO;
362                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
363                                  (int)len, (int)st.st_size));
364                 }
365                 return TDB_ERRCODE(TDB_ERR_IO, -1);
366         }
367
368         /* Unmap, update size, remap */
369         if (tdb_munmap(tdb) == -1)
370                 return TDB_ERRCODE(TDB_ERR_IO, -1);
371         tdb->map_size = st.st_size;
372         tdb_mmap(tdb);
373         return 0;
374 }
375
376 /* write a lump of data at a specified offset */
377 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
378 {
379         if (tdb_oob(tdb, off + len, 0) != 0)
380                 return -1;
381
382         if (tdb->map_ptr)
383                 memcpy(off + (char *)tdb->map_ptr, buf, len);
384 #ifdef HAVE_PWRITE
385         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
386 #else
387         else if (lseek(tdb->fd, off, SEEK_SET) != off
388                  || write(tdb->fd, buf, len) != (ssize_t)len) {
389 #endif
390                 /* Ensure ecode is set for log fn. */
391                 tdb->ecode = TDB_ERR_IO;
392                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
393                            off, len, strerror(errno)));
394                 return TDB_ERRCODE(TDB_ERR_IO, -1);
395         }
396         return 0;
397 }
398
399 /* read a lump of data at a specified offset, maybe convert */
400 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
401 {
402         if (tdb_oob(tdb, off + len, 0) != 0)
403                 return -1;
404
405         if (tdb->map_ptr)
406                 memcpy(buf, off + (char *)tdb->map_ptr, len);
407 #ifdef HAVE_PREAD
408         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
409 #else
410         else if (lseek(tdb->fd, off, SEEK_SET) != off
411                  || read(tdb->fd, buf, len) != (ssize_t)len) {
412 #endif
413                 /* Ensure ecode is set for log fn. */
414                 tdb->ecode = TDB_ERR_IO;
415                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
416                            off, len, strerror(errno)));
417                 return TDB_ERRCODE(TDB_ERR_IO, -1);
418         }
419         if (cv)
420                 convert(buf, len);
421         return 0;
422 }
423
424 /* read a lump of data, allocating the space for it */
425 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
426 {
427         char *buf;
428
429         if (!(buf = malloc(len))) {
430                 /* Ensure ecode is set for log fn. */
431                 tdb->ecode = TDB_ERR_OOM;
432                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
433                            len, strerror(errno)));
434                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
435         }
436         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
437                 SAFE_FREE(buf);
438                 return NULL;
439         }
440         return buf;
441 }
442
443 /* read/write a tdb_off */
444 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
445 {
446         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
447 }
448 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
449 {
450         tdb_off off = *d;
451         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
452 }
453
454 /* read/write a record */
455 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
456 {
457         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
458                 return -1;
459         if (TDB_BAD_MAGIC(rec)) {
460                 /* Ensure ecode is set for log fn. */
461                 tdb->ecode = TDB_ERR_CORRUPT;
462                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
463                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
464         }
465         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
466 }
467 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
468 {
469         struct list_struct r = *rec;
470         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
471 }
472
473 /* read a freelist record and check for simple errors */
474 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
475 {
476         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
477                 return -1;
478
479         if (rec->magic == TDB_MAGIC) {
480                 /* this happens when a app is showdown while deleting a record - we should
481                    not completely fail when this happens */
482                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
483                          rec->magic, off));
484                 rec->magic = TDB_FREE_MAGIC;
485                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
486                         return -1;
487         }
488
489         if (rec->magic != TDB_FREE_MAGIC) {
490                 /* Ensure ecode is set for log fn. */
491                 tdb->ecode = TDB_ERR_CORRUPT;
492                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
493                            rec->magic, off));
494                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
495         }
496         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
497                 return -1;
498         return 0;
499 }
500
501 /* update a record tailer (must hold allocation lock) */
502 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
503                          const struct list_struct *rec)
504 {
505         tdb_off totalsize;
506
507         /* Offset of tailer from record header */
508         totalsize = sizeof(*rec) + rec->rec_len;
509         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
510                          &totalsize);
511 }
512
513 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
514 {
515         struct list_struct rec;
516         tdb_off tailer_ofs, tailer;
517
518         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
519                 printf("ERROR: failed to read record at %u\n", offset);
520                 return 0;
521         }
522
523         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
524                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
525
526         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
527         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
528                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
529                 return rec.next;
530         }
531
532         if (tailer != rec.rec_len + sizeof(rec)) {
533                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
534                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
535         }
536         return rec.next;
537 }
538
539 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
540 {
541         tdb_off rec_ptr, top;
542
543         top = TDB_HASH_TOP(i);
544
545         if (tdb_lock(tdb, i, F_WRLCK) != 0)
546                 return -1;
547
548         if (ofs_read(tdb, top, &rec_ptr) == -1)
549                 return tdb_unlock(tdb, i, F_WRLCK);
550
551         if (rec_ptr)
552                 printf("hash=%d\n", i);
553
554         while (rec_ptr) {
555                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
556         }
557
558         return tdb_unlock(tdb, i, F_WRLCK);
559 }
560
561 void tdb_dump_all(TDB_CONTEXT *tdb)
562 {
563         int i;
564         for (i=0;i<tdb->header.hash_size;i++) {
565                 tdb_dump_chain(tdb, i);
566         }
567         printf("freelist:\n");
568         tdb_dump_chain(tdb, -1);
569 }
570
571 int tdb_printfreelist(TDB_CONTEXT *tdb)
572 {
573         int ret;
574         long total_free = 0;
575         tdb_off offset, rec_ptr;
576         struct list_struct rec;
577
578         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
579                 return ret;
580
581         offset = FREELIST_TOP;
582
583         /* read in the freelist top */
584         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
585                 tdb_unlock(tdb, -1, F_WRLCK);
586                 return 0;
587         }
588
589         printf("freelist top=[0x%08x]\n", rec_ptr );
590         while (rec_ptr) {
591                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
592                         tdb_unlock(tdb, -1, F_WRLCK);
593                         return -1;
594                 }
595
596                 if (rec.magic != TDB_FREE_MAGIC) {
597                         printf("bad magic 0x%08x in free list\n", rec.magic);
598                         tdb_unlock(tdb, -1, F_WRLCK);
599                         return -1;
600                 }
601
602                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
603                 total_free += rec.rec_len;
604
605                 /* move to the next record */
606                 rec_ptr = rec.next;
607         }
608         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
609                (int)total_free);
610
611         return tdb_unlock(tdb, -1, F_WRLCK);
612 }
613
614 /* Remove an element from the freelist.  Must have alloc lock. */
615 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
616 {
617         tdb_off last_ptr, i;
618
619         /* read in the freelist top */
620         last_ptr = FREELIST_TOP;
621         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
622                 if (i == off) {
623                         /* We've found it! */
624                         return ofs_write(tdb, last_ptr, &next);
625                 }
626                 /* Follow chain (next offset is at start of record) */
627                 last_ptr = i;
628         }
629         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
630         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
631 }
632
633 /* Add an element into the freelist. Merge adjacent records if
634    neccessary. */
635 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
636 {
637         tdb_off right, left;
638
639         /* Allocation and tailer lock */
640         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
641                 return -1;
642
643         /* set an initial tailer, so if we fail we don't leave a bogus record */
644         if (update_tailer(tdb, offset, rec) != 0) {
645                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
646                 goto fail;
647         }
648
649         /* Look right first (I'm an Australian, dammit) */
650         right = offset + sizeof(*rec) + rec->rec_len;
651         if (right + sizeof(*rec) <= tdb->map_size) {
652                 struct list_struct r;
653
654                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
655                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
656                         goto left;
657                 }
658
659                 /* If it's free, expand to include it. */
660                 if (r.magic == TDB_FREE_MAGIC) {
661                         if (remove_from_freelist(tdb, right, r.next) == -1) {
662                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
663                                 goto left;
664                         }
665                         rec->rec_len += sizeof(r) + r.rec_len;
666                 }
667         }
668
669 left:
670         /* Look left */
671         left = offset - sizeof(tdb_off);
672         if (left > TDB_DATA_START(tdb->header.hash_size)) {
673                 struct list_struct l;
674                 tdb_off leftsize;
675                 
676                 /* Read in tailer and jump back to header */
677                 if (ofs_read(tdb, left, &leftsize) == -1) {
678                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
679                         goto update;
680                 }
681                 left = offset - leftsize;
682
683                 /* Now read in record */
684                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
685                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
686                         goto update;
687                 }
688
689                 /* If it's free, expand to include it. */
690                 if (l.magic == TDB_FREE_MAGIC) {
691                         if (remove_from_freelist(tdb, left, l.next) == -1) {
692                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
693                                 goto update;
694                         } else {
695                                 offset = left;
696                                 rec->rec_len += leftsize;
697                         }
698                 }
699         }
700
701 update:
702         if (update_tailer(tdb, offset, rec) == -1) {
703                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
704                 goto fail;
705         }
706
707         /* Now, prepend to free list */
708         rec->magic = TDB_FREE_MAGIC;
709
710         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
711             rec_write(tdb, offset, rec) == -1 ||
712             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
713                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
714                 goto fail;
715         }
716
717         /* And we're done. */
718         tdb_unlock(tdb, -1, F_WRLCK);
719         return 0;
720
721  fail:
722         tdb_unlock(tdb, -1, F_WRLCK);
723         return -1;
724 }
725
726
727 /* expand a file.  we prefer to use ftruncate, as that is what posix
728   says to use for mmap expansion */
729 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
730 {
731         char buf[1024];
732 #if HAVE_FTRUNCATE_EXTEND
733         if (ftruncate(tdb->fd, size+addition) != 0) {
734                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
735                            size+addition, strerror(errno)));
736                 return -1;
737         }
738 #else
739         char b = 0;
740
741 #ifdef HAVE_PWRITE
742         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
743 #else
744         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
745             write(tdb->fd, &b, 1) != 1) {
746 #endif
747                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
748                            size+addition, strerror(errno)));
749                 return -1;
750         }
751 #endif
752
753         /* now fill the file with something. This ensures that the file isn't sparse, which would be
754            very bad if we ran out of disk. This must be done with write, not via mmap */
755         memset(buf, 0x42, sizeof(buf));
756         while (addition) {
757                 int n = addition>sizeof(buf)?sizeof(buf):addition;
758 #ifdef HAVE_PWRITE
759                 int ret = pwrite(tdb->fd, buf, n, size);
760 #else
761                 int ret;
762                 if (lseek(tdb->fd, size, SEEK_SET) != size)
763                         return -1;
764                 ret = write(tdb->fd, buf, n);
765 #endif
766                 if (ret != n) {
767                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
768                                    n, strerror(errno)));
769                         return -1;
770                 }
771                 addition -= n;
772                 size += n;
773         }
774         return 0;
775 }
776
777
778 /* expand the database at least size bytes by expanding the underlying
779    file and doing the mmap again if necessary */
780 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
781 {
782         struct list_struct rec;
783         tdb_off offset;
784
785         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
786                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
787                 return -1;
788         }
789
790         /* must know about any previous expansions by another process */
791         tdb_oob(tdb, tdb->map_size + 1, 1);
792
793         /* always make room for at least 10 more records, and round
794            the database up to a multiple of TDB_PAGE_SIZE */
795         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
796
797         if (!(tdb->flags & TDB_INTERNAL))
798                 tdb_munmap(tdb);
799
800         /*
801          * We must ensure the file is unmapped before doing this
802          * to ensure consistency with systems like OpenBSD where
803          * writes and mmaps are not consistent.
804          */
805
806         /* expand the file itself */
807         if (!(tdb->flags & TDB_INTERNAL)) {
808                 if (expand_file(tdb, tdb->map_size, size) != 0)
809                         goto fail;
810         }
811
812         tdb->map_size += size;
813
814         if (tdb->flags & TDB_INTERNAL)
815                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
816         else {
817                 /*
818                  * We must ensure the file is remapped before adding the space
819                  * to ensure consistency with systems like OpenBSD where
820                  * writes and mmaps are not consistent.
821                  */
822
823                 /* We're ok if the mmap fails as we'll fallback to read/write */
824                 tdb_mmap(tdb);
825         }
826
827         /* form a new freelist record */
828         memset(&rec,'\0',sizeof(rec));
829         rec.rec_len = size - sizeof(rec);
830
831         /* link it into the free list */
832         offset = tdb->map_size - size;
833         if (tdb_free(tdb, offset, &rec) == -1)
834                 goto fail;
835
836         tdb_unlock(tdb, -1, F_WRLCK);
837         return 0;
838  fail:
839         tdb_unlock(tdb, -1, F_WRLCK);
840         return -1;
841 }
842
843 /* allocate some space from the free list. The offset returned points
844    to a unconnected list_struct within the database with room for at
845    least length bytes of total data
846
847    0 is returned if the space could not be allocated
848  */
849 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
850                             struct list_struct *rec)
851 {
852         tdb_off rec_ptr, last_ptr, newrec_ptr;
853         struct list_struct newrec;
854
855         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
856                 return 0;
857
858         /* Extra bytes required for tailer */
859         length += sizeof(tdb_off);
860
861  again:
862         last_ptr = FREELIST_TOP;
863
864         /* read in the freelist top */
865         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
866                 goto fail;
867
868         /* keep looking until we find a freelist record big enough */
869         while (rec_ptr) {
870                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
871                         goto fail;
872
873                 if (rec->rec_len >= length) {
874                         /* found it - now possibly split it up  */
875                         if (rec->rec_len > length + MIN_REC_SIZE) {
876                                 /* Length of left piece */
877                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
878
879                                 /* Right piece to go on free list */
880                                 newrec.rec_len = rec->rec_len
881                                         - (sizeof(*rec) + length);
882                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
883
884                                 /* And left record is shortened */
885                                 rec->rec_len = length;
886                         } else
887                                 newrec_ptr = 0;
888
889                         /* Remove allocated record from the free list */
890                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
891                                 goto fail;
892
893                         /* Update header: do this before we drop alloc
894                            lock, otherwise tdb_free() might try to
895                            merge with us, thinking we're free.
896                            (Thanks Jeremy Allison). */
897                         rec->magic = TDB_MAGIC;
898                         if (rec_write(tdb, rec_ptr, rec) == -1)
899                                 goto fail;
900
901                         /* Did we create new block? */
902                         if (newrec_ptr) {
903                                 /* Update allocated record tailer (we
904                                    shortened it). */
905                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
906                                         goto fail;
907
908                                 /* Free new record */
909                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
910                                         goto fail;
911                         }
912
913                         /* all done - return the new record offset */
914                         tdb_unlock(tdb, -1, F_WRLCK);
915                         return rec_ptr;
916                 }
917                 /* move to the next record */
918                 last_ptr = rec_ptr;
919                 rec_ptr = rec->next;
920         }
921         /* we didn't find enough space. See if we can expand the
922            database and if we can then try again */
923         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
924                 goto again;
925  fail:
926         tdb_unlock(tdb, -1, F_WRLCK);
927         return 0;
928 }
929
930 /* initialise a new database with a specified hash size */
931 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
932 {
933         struct tdb_header *newdb;
934         int size, ret = -1;
935
936         /* We make it up in memory, then write it out if not internal */
937         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
938         if (!(newdb = calloc(size, 1)))
939                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
940
941         /* Fill in the header */
942         newdb->version = TDB_VERSION;
943         newdb->hash_size = hash_size;
944 #ifdef USE_SPINLOCKS
945         newdb->rwlocks = size;
946 #endif
947         if (tdb->flags & TDB_INTERNAL) {
948                 tdb->map_size = size;
949                 tdb->map_ptr = (char *)newdb;
950                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
951                 /* Convert the `ondisk' version if asked. */
952                 CONVERT(*newdb);
953                 return 0;
954         }
955         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
956                 goto fail;
957
958         if (ftruncate(tdb->fd, 0) == -1)
959                 goto fail;
960
961         /* This creates an endian-converted header, as if read from disk */
962         CONVERT(*newdb);
963         memcpy(&tdb->header, newdb, sizeof(tdb->header));
964         /* Don't endian-convert the magic food! */
965         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
966         if (write(tdb->fd, newdb, size) != size)
967                 ret = -1;
968         else
969                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
970
971   fail:
972         SAFE_FREE(newdb);
973         return ret;
974 }
975
976 /* Returns 0 on fail.  On success, return offset of record, and fills
977    in rec */
978 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
979                         struct list_struct *r)
980 {
981         tdb_off rec_ptr;
982         
983         /* read in the hash top */
984         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
985                 return 0;
986
987         /* keep looking until we find the right record */
988         while (rec_ptr) {
989                 if (rec_read(tdb, rec_ptr, r) == -1)
990                         return 0;
991
992                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
993                         char *k;
994                         /* a very likely hit - read the key */
995                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
996                                            r->key_len);
997                         if (!k)
998                                 return 0;
999
1000                         if (memcmp(key.dptr, k, key.dsize) == 0) {
1001                                 SAFE_FREE(k);
1002                                 return rec_ptr;
1003                         }
1004                         SAFE_FREE(k);
1005                 }
1006                 rec_ptr = r->next;
1007         }
1008         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1009 }
1010
1011 /* If they do lockkeys, check that this hash is one they locked */
1012 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1013 {
1014         u32 i;
1015         if (!tdb->lockedkeys)
1016                 return 1;
1017         for (i = 0; i < tdb->lockedkeys[0]; i++)
1018                 if (tdb->lockedkeys[i+1] == hash)
1019                         return 1;
1020         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1021 }
1022
1023 /* As tdb_find, but if you succeed, keep the lock */
1024 static tdb_off tdb_find_lock_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, int locktype,
1025                              struct list_struct *rec)
1026 {
1027         u32 rec_ptr;
1028
1029         if (!tdb_keylocked(tdb, hash))
1030                 return 0;
1031         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1032                 return 0;
1033         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1034                 tdb_unlock(tdb, BUCKET(hash), locktype);
1035         return rec_ptr;
1036 }
1037
1038 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1039 {
1040         return tdb->ecode;
1041 }
1042
1043 static struct tdb_errname {
1044         enum TDB_ERROR ecode; const char *estring;
1045 } emap[] = { {TDB_SUCCESS, "Success"},
1046              {TDB_ERR_CORRUPT, "Corrupt database"},
1047              {TDB_ERR_IO, "IO Error"},
1048              {TDB_ERR_LOCK, "Locking error"},
1049              {TDB_ERR_OOM, "Out of memory"},
1050              {TDB_ERR_EXISTS, "Record exists"},
1051              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1052              {TDB_ERR_NOEXIST, "Record does not exist"} };
1053
1054 /* Error string for the last tdb error */
1055 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1056 {
1057         u32 i;
1058         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1059                 if (tdb->ecode == emap[i].ecode)
1060                         return emap[i].estring;
1061         return "Invalid error code";
1062 }
1063
1064 /* update an entry in place - this only works if the new data size
1065    is <= the old data size and the key exists.
1066    on failure return -1.
1067 */
1068
1069 static int tdb_update_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
1070 {
1071         struct list_struct rec;
1072         tdb_off rec_ptr;
1073
1074         /* find entry */
1075         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1076                 return -1;
1077
1078         /* must be long enough key, data and tailer */
1079         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1080                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1081                 return -1;
1082         }
1083
1084         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1085                       dbuf.dptr, dbuf.dsize) == -1)
1086                 return -1;
1087
1088         if (dbuf.dsize != rec.data_len) {
1089                 /* update size */
1090                 rec.data_len = dbuf.dsize;
1091                 return rec_write(tdb, rec_ptr, &rec);
1092         }
1093  
1094         return 0;
1095 }
1096
1097 /* find an entry in the database given a key */
1098 /* If an entry doesn't exist tdb_err will be set to
1099  * TDB_ERR_NOEXIST. If a key has no data attached
1100  * tdb_err will not be set. Both will return a
1101  * zero pptr and zero dsize.
1102  */
1103
1104 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1105 {
1106         tdb_off rec_ptr;
1107         struct list_struct rec;
1108         TDB_DATA ret;
1109         u32 hash;
1110
1111         /* find which hash bucket it is in */
1112         hash = tdb_hash(&key);
1113         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
1114                 return tdb_null;
1115
1116         if (rec.data_len)
1117                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1118                                           rec.data_len);
1119         else
1120                 ret.dptr = NULL;
1121         ret.dsize = rec.data_len;
1122         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1123         return ret;
1124 }
1125
1126 /* check if an entry in the database exists 
1127
1128    note that 1 is returned if the key is found and 0 is returned if not found
1129    this doesn't match the conventions in the rest of this module, but is
1130    compatible with gdbm
1131 */
1132 static int tdb_exists_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1133 {
1134         struct list_struct rec;
1135         
1136         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
1137                 return 0;
1138         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1139         return 1;
1140 }
1141
1142 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1143 {
1144         u32 hash = tdb_hash(&key);
1145         return tdb_exists_hash(tdb, key, hash);
1146 }
1147
1148 /* record lock stops delete underneath */
1149 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1150 {
1151         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1152 }
1153 /*
1154   Write locks override our own fcntl readlocks, so check it here.
1155   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1156   an error to fail to get the lock here.
1157 */
1158  
1159 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1160 {
1161         struct tdb_traverse_lock *i;
1162         for (i = &tdb->travlocks; i; i = i->next)
1163                 if (i->off == off)
1164                         return -1;
1165         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1166 }
1167
1168 /*
1169   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1170   an error to fail to get the lock here.
1171 */
1172
1173 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1174 {
1175         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1176 }
1177 /* fcntl locks don't stack: avoid unlocking someone else's */
1178 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1179 {
1180         struct tdb_traverse_lock *i;
1181         u32 count = 0;
1182
1183         if (off == 0)
1184                 return 0;
1185         for (i = &tdb->travlocks; i; i = i->next)
1186                 if (i->off == off)
1187                         count++;
1188         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1189 }
1190
1191 /* actually delete an entry in the database given the offset */
1192 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1193 {
1194         tdb_off last_ptr, i;
1195         struct list_struct lastrec;
1196
1197         if (tdb->read_only) return -1;
1198
1199         if (write_lock_record(tdb, rec_ptr) == -1) {
1200                 /* Someone traversing here: mark it as dead */
1201                 rec->magic = TDB_DEAD_MAGIC;
1202                 return rec_write(tdb, rec_ptr, rec);
1203         }
1204         if (write_unlock_record(tdb, rec_ptr) != 0)
1205                 return -1;
1206
1207         /* find previous record in hash chain */
1208         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1209                 return -1;
1210         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1211                 if (rec_read(tdb, i, &lastrec) == -1)
1212                         return -1;
1213
1214         /* unlink it: next ptr is at start of record. */
1215         if (last_ptr == 0)
1216                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1217         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1218                 return -1;
1219
1220         /* recover the space */
1221         if (tdb_free(tdb, rec_ptr, rec) == -1)
1222                 return -1;
1223         return 0;
1224 }
1225
1226 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1227 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1228                          struct list_struct *rec)
1229 {
1230         int want_next = (tlock->off != 0);
1231
1232         /* No traversal allows if you've called tdb_lockkeys() */
1233         if (tdb->lockedkeys)
1234                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1235
1236         /* Lock each chain from the start one. */
1237         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1238                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1239                         return -1;
1240
1241                 /* No previous record?  Start at top of chain. */
1242                 if (!tlock->off) {
1243                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1244                                      &tlock->off) == -1)
1245                                 goto fail;
1246                 } else {
1247                         /* Otherwise unlock the previous record. */
1248                         if (unlock_record(tdb, tlock->off) != 0)
1249                                 goto fail;
1250                 }
1251
1252                 if (want_next) {
1253                         /* We have offset of old record: grab next */
1254                         if (rec_read(tdb, tlock->off, rec) == -1)
1255                                 goto fail;
1256                         tlock->off = rec->next;
1257                 }
1258
1259                 /* Iterate through chain */
1260                 while( tlock->off) {
1261                         tdb_off current;
1262                         if (rec_read(tdb, tlock->off, rec) == -1)
1263                                 goto fail;
1264                         if (!TDB_DEAD(rec)) {
1265                                 /* Woohoo: we found one! */
1266                                 if (lock_record(tdb, tlock->off) != 0)
1267                                         goto fail;
1268                                 return tlock->off;
1269                         }
1270                         /* Try to clean dead ones from old traverses */
1271                         current = tlock->off;
1272                         tlock->off = rec->next;
1273                         if (!tdb->read_only && 
1274                             do_delete(tdb, current, rec) != 0)
1275                                 goto fail;
1276                 }
1277                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1278                 want_next = 0;
1279         }
1280         /* We finished iteration without finding anything */
1281         return TDB_ERRCODE(TDB_SUCCESS, 0);
1282
1283  fail:
1284         tlock->off = 0;
1285         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1286                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1287         return -1;
1288 }
1289
1290 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1291    return -1 on error or the record count traversed
1292    if fn is NULL then it is not called
1293    a non-zero return value from fn() indicates that the traversal should stop
1294   */
1295 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *private)
1296 {
1297         TDB_DATA key, dbuf;
1298         struct list_struct rec;
1299         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1300         int ret, count = 0;
1301
1302         /* This was in the initializaton, above, but the IRIX compiler
1303          * did not like it.  crh
1304          */
1305         tl.next = tdb->travlocks.next;
1306
1307         /* fcntl locks don't stack: beware traverse inside traverse */
1308         tdb->travlocks.next = &tl;
1309
1310         /* tdb_next_lock places locks on the record returned, and its chain */
1311         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1312                 count++;
1313                 /* now read the full record */
1314                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1315                                           rec.key_len + rec.data_len);
1316                 if (!key.dptr) {
1317                         ret = -1;
1318                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1319                                 goto out;
1320                         if (unlock_record(tdb, tl.off) != 0)
1321                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1322                         goto out;
1323                 }
1324                 key.dsize = rec.key_len;
1325                 dbuf.dptr = key.dptr + rec.key_len;
1326                 dbuf.dsize = rec.data_len;
1327
1328                 /* Drop chain lock, call out */
1329                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1330                         ret = -1;
1331                         goto out;
1332                 }
1333                 if (fn && fn(tdb, key, dbuf, private)) {
1334                         /* They want us to terminate traversal */
1335                         ret = count;
1336                         if (unlock_record(tdb, tl.off) != 0) {
1337                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1338                                 ret = -1;
1339                         }
1340                         tdb->travlocks.next = tl.next;
1341                         SAFE_FREE(key.dptr);
1342                         return count;
1343                 }
1344                 SAFE_FREE(key.dptr);
1345         }
1346 out:
1347         tdb->travlocks.next = tl.next;
1348         if (ret < 0)
1349                 return -1;
1350         else
1351                 return count;
1352 }
1353
1354 /* find the first entry in the database and return its key */
1355 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1356 {
1357         TDB_DATA key;
1358         struct list_struct rec;
1359
1360         /* release any old lock */
1361         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1362                 return tdb_null;
1363         tdb->travlocks.off = tdb->travlocks.hash = 0;
1364
1365         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1366                 return tdb_null;
1367         /* now read the key */
1368         key.dsize = rec.key_len;
1369         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1370         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1371                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1372         return key;
1373 }
1374
1375 /* find the next entry in the database, returning its key */
1376 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1377 {
1378         u32 oldhash;
1379         TDB_DATA key = tdb_null;
1380         struct list_struct rec;
1381         char *k = NULL;
1382
1383         /* Is locked key the old key?  If so, traverse will be reliable. */
1384         if (tdb->travlocks.off) {
1385                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1386                         return tdb_null;
1387                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1388                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1389                                             rec.key_len))
1390                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1391                         /* No, it wasn't: unlock it and start from scratch */
1392                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1393                                 return tdb_null;
1394                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1395                                 return tdb_null;
1396                         tdb->travlocks.off = 0;
1397                 }
1398
1399                 SAFE_FREE(k);
1400         }
1401
1402         if (!tdb->travlocks.off) {
1403                 /* No previous element: do normal find, and lock record */
1404                 tdb->travlocks.off = tdb_find_lock_hash(tdb, oldkey, tdb_hash(&oldkey), F_WRLCK, &rec);
1405                 if (!tdb->travlocks.off)
1406                         return tdb_null;
1407                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1408                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1409                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1410                         return tdb_null;
1411                 }
1412         }
1413         oldhash = tdb->travlocks.hash;
1414
1415         /* Grab next record: locks chain and returned record,
1416            unlocks old record */
1417         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1418                 key.dsize = rec.key_len;
1419                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1420                                           key.dsize);
1421                 /* Unlock the chain of this new record */
1422                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1423                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1424         }
1425         /* Unlock the chain of old record */
1426         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1427                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1428         return key;
1429 }
1430
1431 /* delete an entry in the database given a key */
1432 static int tdb_delete_hash(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash)
1433 {
1434         tdb_off rec_ptr;
1435         struct list_struct rec;
1436         int ret;
1437
1438         if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK, &rec)))
1439                 return -1;
1440         ret = do_delete(tdb, rec_ptr, &rec);
1441         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1442                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1443         return ret;
1444 }
1445
1446 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1447 {
1448         u32 hash = tdb_hash(&key);
1449         return tdb_delete_hash(tdb, key, hash);
1450 }
1451
1452 /* store an element in the database, replacing any existing element
1453    with the same key 
1454
1455    return 0 on success, -1 on failure
1456 */
1457 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1458 {
1459         struct list_struct rec;
1460         u32 hash;
1461         tdb_off rec_ptr;
1462         char *p = NULL;
1463         int ret = 0;
1464
1465         /* find which hash bucket it is in */
1466         hash = tdb_hash(&key);
1467         if (!tdb_keylocked(tdb, hash))
1468                 return -1;
1469         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1470                 return -1;
1471
1472         /* check for it existing, on insert. */
1473         if (flag == TDB_INSERT) {
1474                 if (tdb_exists_hash(tdb, key, hash)) {
1475                         tdb->ecode = TDB_ERR_EXISTS;
1476                         goto fail;
1477                 }
1478         } else {
1479                 /* first try in-place update, on modify or replace. */
1480                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0)
1481                         goto out;
1482                 if (tdb->ecode == TDB_ERR_NOEXIST &&
1483                     flag == TDB_MODIFY) {
1484                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
1485                          we should fail the store */
1486                         goto fail;
1487                 }
1488         }
1489         /* reset the error code potentially set by the tdb_update() */
1490         tdb->ecode = TDB_SUCCESS;
1491
1492         /* delete any existing record - if it doesn't exist we don't
1493            care.  Doing this first reduces fragmentation, and avoids
1494            coalescing with `allocated' block before it's updated. */
1495         if (flag != TDB_INSERT)
1496                 tdb_delete_hash(tdb, key, hash);
1497
1498         /* Copy key+value *before* allocating free space in case malloc
1499            fails and we are left with a dead spot in the tdb. */
1500
1501         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1502                 tdb->ecode = TDB_ERR_OOM;
1503                 goto fail;
1504         }
1505
1506         memcpy(p, key.dptr, key.dsize);
1507         if (dbuf.dsize)
1508                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1509
1510         /* we have to allocate some space */
1511         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1512                 goto fail;
1513
1514         /* Read hash top into next ptr */
1515         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1516                 goto fail;
1517
1518         rec.key_len = key.dsize;
1519         rec.data_len = dbuf.dsize;
1520         rec.full_hash = hash;
1521         rec.magic = TDB_MAGIC;
1522
1523         /* write out and point the top of the hash chain at it */
1524         if (rec_write(tdb, rec_ptr, &rec) == -1
1525             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1526             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1527                 /* Need to tdb_unallocate() here */
1528                 goto fail;
1529         }
1530  out:
1531         SAFE_FREE(p); 
1532         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1533         return ret;
1534 fail:
1535         ret = -1;
1536         goto out;
1537 }
1538
1539 /* Attempt to append data to an entry in place - this only works if the new data size
1540    is <= the old data size and the key exists.
1541    on failure return -1. Record must be locked before calling.
1542 */
1543 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash, TDB_DATA new_dbuf)
1544 {
1545         struct list_struct rec;
1546         tdb_off rec_ptr;
1547
1548         /* find entry */
1549         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
1550                 return -1;
1551
1552         /* Append of 0 is always ok. */
1553         if (new_dbuf.dsize == 0)
1554                 return 0;
1555
1556         /* must be long enough for key, old data + new data and tailer */
1557         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1558                 /* No room. */
1559                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1560                 return -1;
1561         }
1562
1563         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1564                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1565                 return -1;
1566
1567         /* update size */
1568         rec.data_len += new_dbuf.dsize;
1569         return rec_write(tdb, rec_ptr, &rec);
1570 }
1571
1572 /* Append to an entry. Create if not exist. */
1573
1574 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1575 {
1576         struct list_struct rec;
1577         u32 hash;
1578         tdb_off rec_ptr;
1579         char *p = NULL;
1580         int ret = 0;
1581         size_t new_data_size = 0;
1582
1583         /* find which hash bucket it is in */
1584         hash = tdb_hash(&key);
1585         if (!tdb_keylocked(tdb, hash))
1586                 return -1;
1587         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1588                 return -1;
1589
1590         /* first try in-place. */
1591         if (tdb_append_inplace(tdb, key, hash, new_dbuf) == 0)
1592                 goto out;
1593
1594         /* reset the error code potentially set by the tdb_append_inplace() */
1595         tdb->ecode = TDB_SUCCESS;
1596
1597         /* find entry */
1598         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1599                 if (tdb->ecode != TDB_ERR_NOEXIST)
1600                         goto fail;
1601
1602                 /* Not found - create. */
1603
1604                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1605                 goto out;
1606         }
1607
1608         new_data_size = rec.data_len + new_dbuf.dsize;
1609
1610         /* Copy key+old_value+value *before* allocating free space in case malloc
1611            fails and we are left with a dead spot in the tdb. */
1612
1613         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1614                 tdb->ecode = TDB_ERR_OOM;
1615                 goto fail;
1616         }
1617
1618         /* Copy the key in place. */
1619         memcpy(p, key.dptr, key.dsize);
1620
1621         /* Now read the old data into place. */
1622         if (rec.data_len &&
1623                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1624                         goto fail;
1625
1626         /* Finally append the new data. */
1627         if (new_dbuf.dsize)
1628                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1629
1630         /* delete any existing record - if it doesn't exist we don't
1631            care.  Doing this first reduces fragmentation, and avoids
1632            coalescing with `allocated' block before it's updated. */
1633
1634         tdb_delete_hash(tdb, key, hash);
1635
1636         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1637                 goto fail;
1638
1639         /* Read hash top into next ptr */
1640         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1641                 goto fail;
1642
1643         rec.key_len = key.dsize;
1644         rec.data_len = new_data_size;
1645         rec.full_hash = hash;
1646         rec.magic = TDB_MAGIC;
1647
1648         /* write out and point the top of the hash chain at it */
1649         if (rec_write(tdb, rec_ptr, &rec) == -1
1650             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1651             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1652                 /* Need to tdb_unallocate() here */
1653                 goto fail;
1654         }
1655
1656  out:
1657         SAFE_FREE(p); 
1658         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1659         return ret;
1660
1661 fail:
1662         ret = -1;
1663         goto out;
1664 }
1665
1666 static int tdb_already_open(dev_t device,
1667                             ino_t ino)
1668 {
1669         TDB_CONTEXT *i;
1670         
1671         for (i = tdbs; i; i = i->next) {
1672                 if (i->device == device && i->inode == ino) {
1673                         return 1;
1674                 }
1675         }
1676
1677         return 0;
1678 }
1679
1680 /* open the database, creating it if necessary 
1681
1682    The open_flags and mode are passed straight to the open call on the
1683    database file. A flags value of O_WRONLY is invalid. The hash size
1684    is advisory, use zero for a default value.
1685
1686    Return is NULL on error, in which case errno is also set.  Don't 
1687    try to call tdb_error or tdb_errname, just do strerror(errno).
1688
1689    @param name may be NULL for internal databases. */
1690 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1691                       int open_flags, mode_t mode)
1692 {
1693         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1694 }
1695
1696
1697 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1698                          int open_flags, mode_t mode,
1699                          tdb_log_func log_fn)
1700 {
1701         TDB_CONTEXT *tdb;
1702         struct stat st;
1703         int rev = 0, locked;
1704         unsigned char *vp;
1705         u32 vertest;
1706
1707         if (!(tdb = calloc(1, sizeof *tdb))) {
1708                 /* Can't log this */
1709                 errno = ENOMEM;
1710                 goto fail;
1711         }
1712         tdb->fd = -1;
1713         tdb->name = NULL;
1714         tdb->map_ptr = NULL;
1715         tdb->lockedkeys = NULL;
1716         tdb->flags = tdb_flags;
1717         tdb->open_flags = open_flags;
1718         tdb->log_fn = log_fn;
1719         
1720         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1721                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1722                          name));
1723                 errno = EINVAL;
1724                 goto fail;
1725         }
1726         
1727         if (hash_size == 0)
1728                 hash_size = DEFAULT_HASH_SIZE;
1729         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1730                 tdb->read_only = 1;
1731                 /* read only databases don't do locking or clear if first */
1732                 tdb->flags |= TDB_NOLOCK;
1733                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1734         }
1735
1736         /* internal databases don't mmap or lock, and start off cleared */
1737         if (tdb->flags & TDB_INTERNAL) {
1738                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1739                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1740                 if (tdb_new_database(tdb, hash_size) != 0) {
1741                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1742                         goto fail;
1743                 }
1744                 goto internal;
1745         }
1746
1747         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1748                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1749                          name, strerror(errno)));
1750                 goto fail;      /* errno set by open(2) */
1751         }
1752
1753         /* ensure there is only one process initialising at once */
1754         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1755                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1756                          name, strerror(errno)));
1757                 goto fail;      /* errno set by tdb_brlock */
1758         }
1759
1760         /* we need to zero database if we are the only one with it open */
1761         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1762             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1763                 open_flags |= O_CREAT;
1764                 if (ftruncate(tdb->fd, 0) == -1) {
1765                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1766                                  "failed to truncate %s: %s\n",
1767                                  name, strerror(errno)));
1768                         goto fail; /* errno set by ftruncate */
1769                 }
1770         }
1771
1772         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1773             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1774             || (tdb->header.version != TDB_VERSION
1775                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1776                 /* its not a valid database - possibly initialise it */
1777                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1778                         errno = EIO; /* ie bad format or something */
1779                         goto fail;
1780                 }
1781                 rev = (tdb->flags & TDB_CONVERT);
1782         }
1783         vp = (unsigned char *)&tdb->header.version;
1784         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1785                   (((u32)vp[2]) << 8) | (u32)vp[3];
1786         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1787         if (!rev)
1788                 tdb->flags &= ~TDB_CONVERT;
1789         else {
1790                 tdb->flags |= TDB_CONVERT;
1791                 convert(&tdb->header, sizeof(tdb->header));
1792         }
1793         if (fstat(tdb->fd, &st) == -1)
1794                 goto fail;
1795
1796         /* Is it already in the open list?  If so, fail. */
1797         if (tdb_already_open(st.st_dev, st.st_ino)) {
1798                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1799                          "%s (%d,%d) is already open in this process\n",
1800                          name, st.st_dev, st.st_ino));
1801                 errno = EBUSY;
1802                 goto fail;
1803         }
1804
1805         if (!(tdb->name = (char *)strdup(name))) {
1806                 errno = ENOMEM;
1807                 goto fail;
1808         }
1809
1810         tdb->map_size = st.st_size;
1811         tdb->device = st.st_dev;
1812         tdb->inode = st.st_ino;
1813         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1814         if (!tdb->locked) {
1815                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1816                          "failed to allocate lock structure for %s\n",
1817                          name));
1818                 errno = ENOMEM;
1819                 goto fail;
1820         }
1821         tdb_mmap(tdb);
1822         if (locked) {
1823                 if (!tdb->read_only)
1824                         if (tdb_clear_spinlocks(tdb) != 0) {
1825                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1826                                 "failed to clear spinlock\n"));
1827                                 goto fail;
1828                         }
1829                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1830                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1831                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1832                                  name, strerror(errno)));
1833                         goto fail;
1834                 }
1835         }
1836         /* leave this lock in place to indicate it's in use */
1837         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1838                 goto fail;
1839
1840  internal:
1841         /* Internal (memory-only) databases skip all the code above to
1842          * do with disk files, and resume here by releasing their
1843          * global lock and hooking into the active list. */
1844         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1845                 goto fail;
1846         tdb->next = tdbs;
1847         tdbs = tdb;
1848         return tdb;
1849
1850  fail:
1851         { int save_errno = errno;
1852
1853         if (!tdb)
1854                 return NULL;
1855         
1856         if (tdb->map_ptr) {
1857                 if (tdb->flags & TDB_INTERNAL)
1858                         SAFE_FREE(tdb->map_ptr);
1859                 else
1860                         tdb_munmap(tdb);
1861         }
1862         SAFE_FREE(tdb->name);
1863         if (tdb->fd != -1)
1864                 if (close(tdb->fd) != 0)
1865                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1866         SAFE_FREE(tdb->locked);
1867         SAFE_FREE(tdb);
1868         errno = save_errno;
1869         return NULL;
1870         }
1871 }
1872
1873 /**
1874  * Close a database.
1875  *
1876  * @returns -1 for error; 0 for success.
1877  **/
1878 int tdb_close(TDB_CONTEXT *tdb)
1879 {
1880         TDB_CONTEXT **i;
1881         int ret = 0;
1882
1883         if (tdb->map_ptr) {
1884                 if (tdb->flags & TDB_INTERNAL)
1885                         SAFE_FREE(tdb->map_ptr);
1886                 else
1887                         tdb_munmap(tdb);
1888         }
1889         SAFE_FREE(tdb->name);
1890         if (tdb->fd != -1)
1891                 ret = close(tdb->fd);
1892         SAFE_FREE(tdb->locked);
1893         SAFE_FREE(tdb->lockedkeys);
1894
1895         /* Remove from contexts list */
1896         for (i = &tdbs; *i; i = &(*i)->next) {
1897                 if (*i == tdb) {
1898                         *i = tdb->next;
1899                         break;
1900                 }
1901         }
1902
1903         memset(tdb, 0, sizeof(*tdb));
1904         SAFE_FREE(tdb);
1905
1906         return ret;
1907 }
1908
1909 /* lock/unlock entire database */
1910 int tdb_lockall(TDB_CONTEXT *tdb)
1911 {
1912         u32 i;
1913
1914         /* There are no locks on read-only dbs */
1915         if (tdb->read_only)
1916                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1917         if (tdb->lockedkeys)
1918                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1919         for (i = 0; i < tdb->header.hash_size; i++) 
1920                 if (tdb_lock(tdb, i, F_WRLCK))
1921                         break;
1922
1923         /* If error, release locks we have... */
1924         if (i < tdb->header.hash_size) {
1925                 u32 j;
1926
1927                 for ( j = 0; j < i; j++)
1928                         tdb_unlock(tdb, j, F_WRLCK);
1929                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1930         }
1931
1932         return 0;
1933 }
1934 void tdb_unlockall(TDB_CONTEXT *tdb)
1935 {
1936         u32 i;
1937         for (i=0; i < tdb->header.hash_size; i++)
1938                 tdb_unlock(tdb, i, F_WRLCK);
1939 }
1940
1941 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1942 {
1943         u32 i, j, hash;
1944
1945         /* Can't lock more keys if already locked */
1946         if (tdb->lockedkeys)
1947                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1948         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1949                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1950         /* First number in array is # keys */
1951         tdb->lockedkeys[0] = number;
1952
1953         /* Insertion sort by bucket */
1954         for (i = 0; i < number; i++) {
1955                 hash = tdb_hash(&keys[i]);
1956                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1957                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1958                 tdb->lockedkeys[j+1] = hash;
1959         }
1960         /* Finally, lock in order */
1961         for (i = 0; i < number; i++)
1962                 if (tdb_lock(tdb, i, F_WRLCK))
1963                         break;
1964
1965         /* If error, release locks we have... */
1966         if (i < number) {
1967                 for ( j = 0; j < i; j++)
1968                         tdb_unlock(tdb, j, F_WRLCK);
1969                 SAFE_FREE(tdb->lockedkeys);
1970                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1971         }
1972         return 0;
1973 }
1974
1975 /* Unlock the keys previously locked by tdb_lockkeys() */
1976 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1977 {
1978         u32 i;
1979         if (!tdb->lockedkeys)
1980                 return;
1981         for (i = 0; i < tdb->lockedkeys[0]; i++)
1982                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1983         SAFE_FREE(tdb->lockedkeys);
1984 }
1985
1986 /* lock/unlock one hash chain. This is meant to be used to reduce
1987    contention - it cannot guarantee how many records will be locked */
1988 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1989 {
1990         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1991 }
1992
1993 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1994 {
1995         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1996 }
1997
1998 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1999 {
2000         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
2001 }
2002
2003 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
2004 {
2005         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
2006 }
2007
2008
2009 /* register a loging function */
2010 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
2011 {
2012         tdb->log_fn = fn;
2013 }
2014
2015
2016 /* reopen a tdb - this is used after a fork to ensure that we have an independent
2017    seek pointer from our parent and to re-establish locks */
2018 int tdb_reopen(TDB_CONTEXT *tdb)
2019 {
2020         struct stat st;
2021
2022         if (tdb_munmap(tdb) != 0) {
2023                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2024                 goto fail;
2025         }
2026         if (close(tdb->fd) != 0)
2027                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2028         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2029         if (tdb->fd == -1) {
2030                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2031                 goto fail;
2032         }
2033         if (fstat(tdb->fd, &st) != 0) {
2034                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2035                 goto fail;
2036         }
2037         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2038                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2039                 goto fail;
2040         }
2041         tdb_mmap(tdb);
2042         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2043                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2044                 goto fail;
2045         }
2046
2047         return 0;
2048
2049 fail:
2050         tdb_close(tdb);
2051         return -1;
2052 }
2053
2054 /* reopen all tdb's */
2055 int tdb_reopen_all(void)
2056 {
2057         TDB_CONTEXT *tdb;
2058
2059         for (tdb=tdbs; tdb; tdb = tdb->next) {
2060                 if (tdb_reopen(tdb) != 0) return -1;
2061         }
2062
2063         return 0;
2064 }