Fixed crash bug when calling tdb_unlockkeys() with no locked keys.
[gd/samba-autobuild/.git] / source3 / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24
25 /* NOTE: If you use tdbs under valgrind, and in particular if you run
26  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
27  * think this is because valgrind doesn't understand that the mmap'd
28  * area may be written to by other processes.  Memory can, from the
29  * point of view of the grinded process, spontaneously become
30  * initialized.
31  *
32  * I can think of a few solutions.  [mbp 20030311]
33  *
34  * 1 - Write suppressions for Valgrind so that it doesn't complain
35  * about this.  Probably the most reasonable but people need to
36  * remember to use them.
37  *
38  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
39  *
40  * 3 - Use the special valgrind macros to mark memory as valid at the
41  * right time.  Probably too hard -- the process just doesn't know.
42  */ 
43
44 #ifdef STANDALONE
45 #if HAVE_CONFIG_H
46 #include <config.h>
47 #endif
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <fcntl.h>
52 #include <unistd.h>
53 #include <string.h>
54 #include <fcntl.h>
55 #include <errno.h>
56 #include <sys/mman.h>
57 #include <sys/stat.h>
58 #include <signal.h>
59 #include "tdb.h"
60 #include "spinlock.h"
61 #else
62 #include "includes.h"
63 #endif
64
65 #define TDB_MAGIC_FOOD "TDB file\n"
66 #define TDB_VERSION (0x26011967 + 6)
67 #define TDB_MAGIC (0x26011999U)
68 #define TDB_FREE_MAGIC (~TDB_MAGIC)
69 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
70 #define TDB_ALIGNMENT 4
71 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
72 #define DEFAULT_HASH_SIZE 131
73 #define TDB_PAGE_SIZE 0x2000
74 #define FREELIST_TOP (sizeof(struct tdb_header))
75 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
76 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
77 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
78 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
79 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
80
81 /* NB assumes there is a local variable called "tdb" that is the
82  * current context, also takes doubly-parenthesized print-style
83  * argument. */
84 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
85
86 /* lock offsets */
87 #define GLOBAL_LOCK 0
88 #define ACTIVE_LOCK 4
89
90 #ifndef MAP_FILE
91 #define MAP_FILE 0
92 #endif
93
94 #ifndef MAP_FAILED
95 #define MAP_FAILED ((void *)-1)
96 #endif
97
98 /* free memory if the pointer is valid and zero the pointer */
99 #ifndef SAFE_FREE
100 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
101 #endif
102
103 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
104 TDB_DATA tdb_null;
105
106 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
107 static TDB_CONTEXT *tdbs = NULL;
108
109 static int tdb_munmap(TDB_CONTEXT *tdb)
110 {
111         if (tdb->flags & TDB_INTERNAL)
112                 return 0;
113
114 #ifdef HAVE_MMAP
115         if (tdb->map_ptr) {
116                 int ret = munmap(tdb->map_ptr, tdb->map_size);
117                 if (ret != 0)
118                         return ret;
119         }
120 #endif
121         tdb->map_ptr = NULL;
122         return 0;
123 }
124
125 static void tdb_mmap(TDB_CONTEXT *tdb)
126 {
127         if (tdb->flags & TDB_INTERNAL)
128                 return;
129
130 #ifdef HAVE_MMAP
131         if (!(tdb->flags & TDB_NOMMAP)) {
132                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
133                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
134                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
135
136                 /*
137                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
138                  */
139
140                 if (tdb->map_ptr == MAP_FAILED) {
141                         tdb->map_ptr = NULL;
142                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
143                                  tdb->map_size, strerror(errno)));
144                 }
145         } else {
146                 tdb->map_ptr = NULL;
147         }
148 #else
149         tdb->map_ptr = NULL;
150 #endif
151 }
152
153 /* Endian conversion: we only ever deal with 4 byte quantities */
154 static void *convert(void *buf, u32 size)
155 {
156         u32 i, *p = buf;
157         for (i = 0; i < size / 4; i++)
158                 p[i] = TDB_BYTEREV(p[i]);
159         return buf;
160 }
161 #define DOCONV() (tdb->flags & TDB_CONVERT)
162 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
163
164 /* the body of the database is made of one list_struct for the free space
165    plus a separate data list for each hash value */
166 struct list_struct {
167         tdb_off next; /* offset of the next record in the list */
168         tdb_len rec_len; /* total byte length of record */
169         tdb_len key_len; /* byte length of key */
170         tdb_len data_len; /* byte length of data */
171         u32 full_hash; /* the full 32 bit hash of the key */
172         u32 magic;   /* try to catch errors */
173         /* the following union is implied:
174                 union {
175                         char record[rec_len];
176                         struct {
177                                 char key[key_len];
178                                 char data[data_len];
179                         }
180                         u32 totalsize; (tailer)
181                 }
182         */
183 };
184
185 /***************************************************************
186  Allow a caller to set a "alarm" flag that tdb can check to abort
187  a blocking lock on SIGALRM.
188 ***************************************************************/
189
190 static sig_atomic_t *palarm_fired;
191
192 void tdb_set_lock_alarm(sig_atomic_t *palarm)
193 {
194         palarm_fired = palarm;
195 }
196
197 /* a byte range locking function - return 0 on success
198    this functions locks/unlocks 1 byte at the specified offset.
199
200    On error, errno is also set so that errors are passed back properly
201    through tdb_open(). */
202 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
203                       int rw_type, int lck_type, int probe)
204 {
205         struct flock fl;
206         int ret;
207
208         if (tdb->flags & TDB_NOLOCK)
209                 return 0;
210         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
211                 errno = EACCES;
212                 return -1;
213         }
214
215         fl.l_type = rw_type;
216         fl.l_whence = SEEK_SET;
217         fl.l_start = offset;
218         fl.l_len = 1;
219         fl.l_pid = 0;
220
221         do {
222                 ret = fcntl(tdb->fd,lck_type,&fl);
223                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
224                         break;
225         } while (ret == -1 && errno == EINTR);
226
227         if (ret == -1) {
228                 if (!probe && lck_type != F_SETLK) {
229                         /* Ensure error code is set for log fun to examine. */
230                         if (errno == EINTR && palarm_fired && *palarm_fired)
231                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
232                         else
233                                 tdb->ecode = TDB_ERR_LOCK;
234                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
235                                  tdb->fd, offset, rw_type, lck_type));
236                 }
237                 /* Was it an alarm timeout ? */
238                 if (errno == EINTR && palarm_fired && *palarm_fired)
239                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
240                 /* Otherwise - generic lock error. */
241                 /* errno set by fcntl */
242                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
243         }
244         return 0;
245 }
246
247 /* lock a list in the database. list -1 is the alloc list */
248 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
249 {
250         if (list < -1 || list >= (int)tdb->header.hash_size) {
251                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
252                            list, ltype));
253                 return -1;
254         }
255         if (tdb->flags & TDB_NOLOCK)
256                 return 0;
257
258         /* Since fcntl locks don't nest, we do a lock for the first one,
259            and simply bump the count for future ones */
260         if (tdb->locked[list+1].count == 0) {
261                 if (!tdb->read_only && tdb->header.rwlocks) {
262                         if (tdb_spinlock(tdb, list, ltype)) {
263                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
264                                            list, ltype));
265                                 return -1;
266                         }
267                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
268                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
269                                            list, ltype, strerror(errno)));
270                         return -1;
271                 }
272                 tdb->locked[list+1].ltype = ltype;
273         }
274         tdb->locked[list+1].count++;
275         return 0;
276 }
277
278 /* unlock the database: returns void because it's too late for errors. */
279         /* changed to return int it may be interesting to know there
280            has been an error  --simo */
281 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
282 {
283         int ret = -1;
284
285         if (tdb->flags & TDB_NOLOCK)
286                 return 0;
287
288         /* Sanity checks */
289         if (list < -1 || list >= (int)tdb->header.hash_size) {
290                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
291                 return ret;
292         }
293
294         if (tdb->locked[list+1].count==0) {
295                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
296                 return ret;
297         }
298
299         if (tdb->locked[list+1].count == 1) {
300                 /* Down to last nested lock: unlock underneath */
301                 if (!tdb->read_only && tdb->header.rwlocks) {
302                         ret = tdb_spinunlock(tdb, list, ltype);
303                 } else {
304                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
305                 }
306         } else {
307                 ret = 0;
308         }
309         tdb->locked[list+1].count--;
310
311         if (ret)
312                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
313         return ret;
314 }
315
316 /* This is based on the hash algorithm from gdbm */
317 static u32 tdb_hash(TDB_DATA *key)
318 {
319         u32 value;      /* Used to compute the hash value.  */
320         u32   i;        /* Used to cycle through random values. */
321
322         /* Set the initial value from the key size. */
323         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
324                 value = (value + (key->dptr[i] << (i*5 % 24)));
325
326         return (1103515243 * value + 12345);  
327 }
328
329 /* check for an out of bounds access - if it is out of bounds then
330    see if the database has been expanded by someone else and expand
331    if necessary 
332    note that "len" is the minimum length needed for the db
333 */
334 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
335 {
336         struct stat st;
337         if (len <= tdb->map_size)
338                 return 0;
339         if (tdb->flags & TDB_INTERNAL) {
340                 if (!probe) {
341                         /* Ensure ecode is set for log fn. */
342                         tdb->ecode = TDB_ERR_IO;
343                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
344                                  (int)len, (int)tdb->map_size));
345                 }
346                 return TDB_ERRCODE(TDB_ERR_IO, -1);
347         }
348
349         if (fstat(tdb->fd, &st) == -1)
350                 return TDB_ERRCODE(TDB_ERR_IO, -1);
351
352         if (st.st_size < (size_t)len) {
353                 if (!probe) {
354                         /* Ensure ecode is set for log fn. */
355                         tdb->ecode = TDB_ERR_IO;
356                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
357                                  (int)len, (int)st.st_size));
358                 }
359                 return TDB_ERRCODE(TDB_ERR_IO, -1);
360         }
361
362         /* Unmap, update size, remap */
363         if (tdb_munmap(tdb) == -1)
364                 return TDB_ERRCODE(TDB_ERR_IO, -1);
365         tdb->map_size = st.st_size;
366         tdb_mmap(tdb);
367         return 0;
368 }
369
370 /* write a lump of data at a specified offset */
371 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
372 {
373         if (tdb_oob(tdb, off + len, 0) != 0)
374                 return -1;
375
376         if (tdb->map_ptr)
377                 memcpy(off + (char *)tdb->map_ptr, buf, len);
378 #ifdef HAVE_PWRITE
379         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
380 #else
381         else if (lseek(tdb->fd, off, SEEK_SET) != off
382                  || write(tdb->fd, buf, len) != (ssize_t)len) {
383 #endif
384                 /* Ensure ecode is set for log fn. */
385                 tdb->ecode = TDB_ERR_IO;
386                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
387                            off, len, strerror(errno)));
388                 return TDB_ERRCODE(TDB_ERR_IO, -1);
389         }
390         return 0;
391 }
392
393 /* read a lump of data at a specified offset, maybe convert */
394 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
395 {
396         if (tdb_oob(tdb, off + len, 0) != 0)
397                 return -1;
398
399         if (tdb->map_ptr)
400                 memcpy(buf, off + (char *)tdb->map_ptr, len);
401 #ifdef HAVE_PREAD
402         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
403 #else
404         else if (lseek(tdb->fd, off, SEEK_SET) != off
405                  || read(tdb->fd, buf, len) != (ssize_t)len) {
406 #endif
407                 /* Ensure ecode is set for log fn. */
408                 tdb->ecode = TDB_ERR_IO;
409                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
410                            off, len, strerror(errno)));
411                 return TDB_ERRCODE(TDB_ERR_IO, -1);
412         }
413         if (cv)
414                 convert(buf, len);
415         return 0;
416 }
417
418 /* read a lump of data, allocating the space for it */
419 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
420 {
421         char *buf;
422
423         if (!(buf = malloc(len))) {
424                 /* Ensure ecode is set for log fn. */
425                 tdb->ecode = TDB_ERR_OOM;
426                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
427                            len, strerror(errno)));
428                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
429         }
430         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
431                 SAFE_FREE(buf);
432                 return NULL;
433         }
434         return buf;
435 }
436
437 /* read/write a tdb_off */
438 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
439 {
440         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
441 }
442 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
443 {
444         tdb_off off = *d;
445         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
446 }
447
448 /* read/write a record */
449 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
450 {
451         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
452                 return -1;
453         if (TDB_BAD_MAGIC(rec)) {
454                 /* Ensure ecode is set for log fn. */
455                 tdb->ecode = TDB_ERR_CORRUPT;
456                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
457                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
458         }
459         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
460 }
461 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
462 {
463         struct list_struct r = *rec;
464         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
465 }
466
467 /* read a freelist record and check for simple errors */
468 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
469 {
470         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
471                 return -1;
472
473         if (rec->magic == TDB_MAGIC) {
474                 /* this happens when a app is showdown while deleting a record - we should
475                    not completely fail when this happens */
476                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
477                          rec->magic, off));
478                 rec->magic = TDB_FREE_MAGIC;
479                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
480                         return -1;
481         }
482
483         if (rec->magic != TDB_FREE_MAGIC) {
484                 /* Ensure ecode is set for log fn. */
485                 tdb->ecode = TDB_ERR_CORRUPT;
486                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
487                            rec->magic, off));
488                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
489         }
490         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
491                 return -1;
492         return 0;
493 }
494
495 /* update a record tailer (must hold allocation lock) */
496 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
497                          const struct list_struct *rec)
498 {
499         tdb_off totalsize;
500
501         /* Offset of tailer from record header */
502         totalsize = sizeof(*rec) + rec->rec_len;
503         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
504                          &totalsize);
505 }
506
507 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
508 {
509         struct list_struct rec;
510         tdb_off tailer_ofs, tailer;
511
512         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
513                 printf("ERROR: failed to read record at %u\n", offset);
514                 return 0;
515         }
516
517         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
518                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
519
520         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
521         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
522                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
523                 return rec.next;
524         }
525
526         if (tailer != rec.rec_len + sizeof(rec)) {
527                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
528                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
529         }
530         return rec.next;
531 }
532
533 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
534 {
535         tdb_off rec_ptr, top;
536
537         top = TDB_HASH_TOP(i);
538
539         if (tdb_lock(tdb, i, F_WRLCK) != 0)
540                 return -1;
541
542         if (ofs_read(tdb, top, &rec_ptr) == -1)
543                 return tdb_unlock(tdb, i, F_WRLCK);
544
545         if (rec_ptr)
546                 printf("hash=%d\n", i);
547
548         while (rec_ptr) {
549                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
550         }
551
552         return tdb_unlock(tdb, i, F_WRLCK);
553 }
554
555 void tdb_dump_all(TDB_CONTEXT *tdb)
556 {
557         int i;
558         for (i=0;i<tdb->header.hash_size;i++) {
559                 tdb_dump_chain(tdb, i);
560         }
561         printf("freelist:\n");
562         tdb_dump_chain(tdb, -1);
563 }
564
565 int tdb_printfreelist(TDB_CONTEXT *tdb)
566 {
567         int ret;
568         long total_free = 0;
569         tdb_off offset, rec_ptr;
570         struct list_struct rec;
571
572         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
573                 return ret;
574
575         offset = FREELIST_TOP;
576
577         /* read in the freelist top */
578         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
579                 tdb_unlock(tdb, -1, F_WRLCK);
580                 return 0;
581         }
582
583         printf("freelist top=[0x%08x]\n", rec_ptr );
584         while (rec_ptr) {
585                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
586                         tdb_unlock(tdb, -1, F_WRLCK);
587                         return -1;
588                 }
589
590                 if (rec.magic != TDB_FREE_MAGIC) {
591                         printf("bad magic 0x%08x in free list\n", rec.magic);
592                         tdb_unlock(tdb, -1, F_WRLCK);
593                         return -1;
594                 }
595
596                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
597                 total_free += rec.rec_len;
598
599                 /* move to the next record */
600                 rec_ptr = rec.next;
601         }
602         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
603                (int)total_free);
604
605         return tdb_unlock(tdb, -1, F_WRLCK);
606 }
607
608 /* Remove an element from the freelist.  Must have alloc lock. */
609 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
610 {
611         tdb_off last_ptr, i;
612
613         /* read in the freelist top */
614         last_ptr = FREELIST_TOP;
615         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
616                 if (i == off) {
617                         /* We've found it! */
618                         return ofs_write(tdb, last_ptr, &next);
619                 }
620                 /* Follow chain (next offset is at start of record) */
621                 last_ptr = i;
622         }
623         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
624         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
625 }
626
627 /* Add an element into the freelist. Merge adjacent records if
628    neccessary. */
629 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
630 {
631         tdb_off right, left;
632
633         /* Allocation and tailer lock */
634         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
635                 return -1;
636
637         /* set an initial tailer, so if we fail we don't leave a bogus record */
638         if (update_tailer(tdb, offset, rec) != 0) {
639                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
640                 goto fail;
641         }
642
643         /* Look right first (I'm an Australian, dammit) */
644         right = offset + sizeof(*rec) + rec->rec_len;
645         if (right + sizeof(*rec) <= tdb->map_size) {
646                 struct list_struct r;
647
648                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
649                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
650                         goto left;
651                 }
652
653                 /* If it's free, expand to include it. */
654                 if (r.magic == TDB_FREE_MAGIC) {
655                         if (remove_from_freelist(tdb, right, r.next) == -1) {
656                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
657                                 goto left;
658                         }
659                         rec->rec_len += sizeof(r) + r.rec_len;
660                 }
661         }
662
663 left:
664         /* Look left */
665         left = offset - sizeof(tdb_off);
666         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
667                 struct list_struct l;
668                 tdb_off leftsize;
669
670                 /* Read in tailer and jump back to header */
671                 if (ofs_read(tdb, left, &leftsize) == -1) {
672                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
673                         goto update;
674                 }
675                 left = offset - leftsize;
676
677                 /* Now read in record */
678                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
679                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
680                         goto update;
681                 }
682
683                 /* If it's free, expand to include it. */
684                 if (l.magic == TDB_FREE_MAGIC) {
685                         if (remove_from_freelist(tdb, left, l.next) == -1) {
686                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
687                                 goto update;
688                         } else {
689                                 offset = left;
690                                 rec->rec_len += leftsize;
691                         }
692                 }
693         }
694
695 update:
696         if (update_tailer(tdb, offset, rec) == -1) {
697                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
698                 goto fail;
699         }
700
701         /* Now, prepend to free list */
702         rec->magic = TDB_FREE_MAGIC;
703
704         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
705             rec_write(tdb, offset, rec) == -1 ||
706             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
707                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
708                 goto fail;
709         }
710
711         /* And we're done. */
712         tdb_unlock(tdb, -1, F_WRLCK);
713         return 0;
714
715  fail:
716         tdb_unlock(tdb, -1, F_WRLCK);
717         return -1;
718 }
719
720
721 /* expand a file.  we prefer to use ftruncate, as that is what posix
722   says to use for mmap expansion */
723 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
724 {
725         char buf[1024];
726 #if HAVE_FTRUNCATE_EXTEND
727         if (ftruncate(tdb->fd, size+addition) != 0) {
728                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
729                            size+addition, strerror(errno)));
730                 return -1;
731         }
732 #else
733         char b = 0;
734
735 #ifdef HAVE_PWRITE
736         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
737 #else
738         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
739             write(tdb->fd, &b, 1) != 1) {
740 #endif
741                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
742                            size+addition, strerror(errno)));
743                 return -1;
744         }
745 #endif
746
747         /* now fill the file with something. This ensures that the file isn't sparse, which would be
748            very bad if we ran out of disk. This must be done with write, not via mmap */
749         memset(buf, 0x42, sizeof(buf));
750         while (addition) {
751                 int n = addition>sizeof(buf)?sizeof(buf):addition;
752 #ifdef HAVE_PWRITE
753                 int ret = pwrite(tdb->fd, buf, n, size);
754 #else
755                 int ret;
756                 if (lseek(tdb->fd, size, SEEK_SET) != size)
757                         return -1;
758                 ret = write(tdb->fd, buf, n);
759 #endif
760                 if (ret != n) {
761                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
762                                    n, strerror(errno)));
763                         return -1;
764                 }
765                 addition -= n;
766                 size += n;
767         }
768         return 0;
769 }
770
771
772 /* expand the database at least size bytes by expanding the underlying
773    file and doing the mmap again if necessary */
774 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
775 {
776         struct list_struct rec;
777         tdb_off offset;
778
779         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
780                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
781                 return -1;
782         }
783
784         /* must know about any previous expansions by another process */
785         tdb_oob(tdb, tdb->map_size + 1, 1);
786
787         /* always make room for at least 10 more records, and round
788            the database up to a multiple of TDB_PAGE_SIZE */
789         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
790
791         if (!(tdb->flags & TDB_INTERNAL))
792                 tdb_munmap(tdb);
793
794         /*
795          * We must ensure the file is unmapped before doing this
796          * to ensure consistency with systems like OpenBSD where
797          * writes and mmaps are not consistent.
798          */
799
800         /* expand the file itself */
801         if (!(tdb->flags & TDB_INTERNAL)) {
802                 if (expand_file(tdb, tdb->map_size, size) != 0)
803                         goto fail;
804         }
805
806         tdb->map_size += size;
807
808         if (tdb->flags & TDB_INTERNAL)
809                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
810         else {
811                 /*
812                  * We must ensure the file is remapped before adding the space
813                  * to ensure consistency with systems like OpenBSD where
814                  * writes and mmaps are not consistent.
815                  */
816
817                 /* We're ok if the mmap fails as we'll fallback to read/write */
818                 tdb_mmap(tdb);
819         }
820
821         /* form a new freelist record */
822         memset(&rec,'\0',sizeof(rec));
823         rec.rec_len = size - sizeof(rec);
824
825         /* link it into the free list */
826         offset = tdb->map_size - size;
827         if (tdb_free(tdb, offset, &rec) == -1)
828                 goto fail;
829
830         tdb_unlock(tdb, -1, F_WRLCK);
831         return 0;
832  fail:
833         tdb_unlock(tdb, -1, F_WRLCK);
834         return -1;
835 }
836
837 /* allocate some space from the free list. The offset returned points
838    to a unconnected list_struct within the database with room for at
839    least length bytes of total data
840
841    0 is returned if the space could not be allocated
842  */
843 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
844                             struct list_struct *rec)
845 {
846         tdb_off rec_ptr, last_ptr, newrec_ptr;
847         struct list_struct newrec;
848
849         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
850                 return 0;
851
852         /* Extra bytes required for tailer */
853         length += sizeof(tdb_off);
854
855  again:
856         last_ptr = FREELIST_TOP;
857
858         /* read in the freelist top */
859         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
860                 goto fail;
861
862         /* keep looking until we find a freelist record big enough */
863         while (rec_ptr) {
864                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
865                         goto fail;
866
867                 if (rec->rec_len >= length) {
868                         /* found it - now possibly split it up  */
869                         if (rec->rec_len > length + MIN_REC_SIZE) {
870                                 /* Length of left piece */
871                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
872
873                                 /* Right piece to go on free list */
874                                 newrec.rec_len = rec->rec_len
875                                         - (sizeof(*rec) + length);
876                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
877
878                                 /* And left record is shortened */
879                                 rec->rec_len = length;
880                         } else
881                                 newrec_ptr = 0;
882
883                         /* Remove allocated record from the free list */
884                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
885                                 goto fail;
886
887                         /* Update header: do this before we drop alloc
888                            lock, otherwise tdb_free() might try to
889                            merge with us, thinking we're free.
890                            (Thanks Jeremy Allison). */
891                         rec->magic = TDB_MAGIC;
892                         if (rec_write(tdb, rec_ptr, rec) == -1)
893                                 goto fail;
894
895                         /* Did we create new block? */
896                         if (newrec_ptr) {
897                                 /* Update allocated record tailer (we
898                                    shortened it). */
899                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
900                                         goto fail;
901
902                                 /* Free new record */
903                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
904                                         goto fail;
905                         }
906
907                         /* all done - return the new record offset */
908                         tdb_unlock(tdb, -1, F_WRLCK);
909                         return rec_ptr;
910                 }
911                 /* move to the next record */
912                 last_ptr = rec_ptr;
913                 rec_ptr = rec->next;
914         }
915         /* we didn't find enough space. See if we can expand the
916            database and if we can then try again */
917         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
918                 goto again;
919  fail:
920         tdb_unlock(tdb, -1, F_WRLCK);
921         return 0;
922 }
923
924 /* initialise a new database with a specified hash size */
925 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
926 {
927         struct tdb_header *newdb;
928         int size, ret = -1;
929
930         /* We make it up in memory, then write it out if not internal */
931         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
932         if (!(newdb = calloc(size, 1)))
933                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
934
935         /* Fill in the header */
936         newdb->version = TDB_VERSION;
937         newdb->hash_size = hash_size;
938 #ifdef USE_SPINLOCKS
939         newdb->rwlocks = size;
940 #endif
941         if (tdb->flags & TDB_INTERNAL) {
942                 tdb->map_size = size;
943                 tdb->map_ptr = (char *)newdb;
944                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
945                 /* Convert the `ondisk' version if asked. */
946                 CONVERT(*newdb);
947                 return 0;
948         }
949         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
950                 goto fail;
951
952         if (ftruncate(tdb->fd, 0) == -1)
953                 goto fail;
954
955         /* This creates an endian-converted header, as if read from disk */
956         CONVERT(*newdb);
957         memcpy(&tdb->header, newdb, sizeof(tdb->header));
958         /* Don't endian-convert the magic food! */
959         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
960         if (write(tdb->fd, newdb, size) != size)
961                 ret = -1;
962         else
963                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
964
965   fail:
966         SAFE_FREE(newdb);
967         return ret;
968 }
969
970 /* Returns 0 on fail.  On success, return offset of record, and fills
971    in rec */
972 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
973                         struct list_struct *r)
974 {
975         tdb_off rec_ptr;
976         
977         /* read in the hash top */
978         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
979                 return 0;
980
981         /* keep looking until we find the right record */
982         while (rec_ptr) {
983                 if (rec_read(tdb, rec_ptr, r) == -1)
984                         return 0;
985
986                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
987                         char *k;
988                         /* a very likely hit - read the key */
989                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
990                                            r->key_len);
991                         if (!k)
992                                 return 0;
993
994                         if (memcmp(key.dptr, k, key.dsize) == 0) {
995                                 SAFE_FREE(k);
996                                 return rec_ptr;
997                         }
998                         SAFE_FREE(k);
999                 }
1000                 rec_ptr = r->next;
1001         }
1002         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1003 }
1004
1005 /* If they do lockkeys, check that this hash is one they locked */
1006 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1007 {
1008         u32 i;
1009         if (!tdb->lockedkeys)
1010                 return 1;
1011         for (i = 0; i < tdb->lockedkeys[0]; i++)
1012                 if (tdb->lockedkeys[i+1] == hash)
1013                         return 1;
1014         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1015 }
1016
1017 /* As tdb_find, but if you succeed, keep the lock */
1018 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
1019                              struct list_struct *rec)
1020 {
1021         u32 hash, rec_ptr;
1022
1023         hash = tdb_hash(&key);
1024         if (!tdb_keylocked(tdb, hash))
1025                 return 0;
1026         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1027                 return 0;
1028         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1029                 tdb_unlock(tdb, BUCKET(hash), locktype);
1030         return rec_ptr;
1031 }
1032
1033 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1034 {
1035         return tdb->ecode;
1036 }
1037
1038 static struct tdb_errname {
1039         enum TDB_ERROR ecode; const char *estring;
1040 } emap[] = { {TDB_SUCCESS, "Success"},
1041              {TDB_ERR_CORRUPT, "Corrupt database"},
1042              {TDB_ERR_IO, "IO Error"},
1043              {TDB_ERR_LOCK, "Locking error"},
1044              {TDB_ERR_OOM, "Out of memory"},
1045              {TDB_ERR_EXISTS, "Record exists"},
1046              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1047              {TDB_ERR_NOEXIST, "Record does not exist"} };
1048
1049 /* Error string for the last tdb error */
1050 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1051 {
1052         u32 i;
1053         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1054                 if (tdb->ecode == emap[i].ecode)
1055                         return emap[i].estring;
1056         return "Invalid error code";
1057 }
1058
1059 /* update an entry in place - this only works if the new data size
1060    is <= the old data size and the key exists.
1061    on failure return -1.
1062 */
1063
1064 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1065 {
1066         struct list_struct rec;
1067         tdb_off rec_ptr;
1068
1069         /* find entry */
1070         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1071                 return -1;
1072
1073         /* must be long enough key, data and tailer */
1074         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1075                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1076                 return -1;
1077         }
1078
1079         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1080                       dbuf.dptr, dbuf.dsize) == -1)
1081                 return -1;
1082
1083         if (dbuf.dsize != rec.data_len) {
1084                 /* update size */
1085                 rec.data_len = dbuf.dsize;
1086                 return rec_write(tdb, rec_ptr, &rec);
1087         }
1088  
1089         return 0;
1090 }
1091
1092 /* find an entry in the database given a key */
1093 /* If an entry doesn't exist tdb_err will be set to
1094  * TDB_ERR_NOEXIST. If a key has no data attached
1095  * tdb_err will not be set. Both will return a
1096  * zero pptr and zero dsize.
1097  */
1098
1099 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1100 {
1101         tdb_off rec_ptr;
1102         struct list_struct rec;
1103         TDB_DATA ret;
1104
1105         /* find which hash bucket it is in */
1106         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1107                 return tdb_null;
1108
1109         if (rec.data_len)
1110                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1111                                           rec.data_len);
1112         else
1113                 ret.dptr = NULL;
1114         ret.dsize = rec.data_len;
1115         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1116         return ret;
1117 }
1118
1119 /* check if an entry in the database exists 
1120
1121    note that 1 is returned if the key is found and 0 is returned if not found
1122    this doesn't match the conventions in the rest of this module, but is
1123    compatible with gdbm
1124 */
1125 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1126 {
1127         struct list_struct rec;
1128         
1129         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1130                 return 0;
1131         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1132         return 1;
1133 }
1134
1135 /* record lock stops delete underneath */
1136 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1137 {
1138         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1139 }
1140 /*
1141   Write locks override our own fcntl readlocks, so check it here.
1142   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1143   an error to fail to get the lock here.
1144 */
1145  
1146 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1147 {
1148         struct tdb_traverse_lock *i;
1149         for (i = &tdb->travlocks; i; i = i->next)
1150                 if (i->off == off)
1151                         return -1;
1152         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1153 }
1154
1155 /*
1156   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1157   an error to fail to get the lock here.
1158 */
1159
1160 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1161 {
1162         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1163 }
1164 /* fcntl locks don't stack: avoid unlocking someone else's */
1165 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1166 {
1167         struct tdb_traverse_lock *i;
1168         u32 count = 0;
1169
1170         if (off == 0)
1171                 return 0;
1172         for (i = &tdb->travlocks; i; i = i->next)
1173                 if (i->off == off)
1174                         count++;
1175         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1176 }
1177
1178 /* actually delete an entry in the database given the offset */
1179 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1180 {
1181         tdb_off last_ptr, i;
1182         struct list_struct lastrec;
1183
1184         if (tdb->read_only) return -1;
1185
1186         if (write_lock_record(tdb, rec_ptr) == -1) {
1187                 /* Someone traversing here: mark it as dead */
1188                 rec->magic = TDB_DEAD_MAGIC;
1189                 return rec_write(tdb, rec_ptr, rec);
1190         }
1191         if (write_unlock_record(tdb, rec_ptr) != 0)
1192                 return -1;
1193
1194         /* find previous record in hash chain */
1195         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1196                 return -1;
1197         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1198                 if (rec_read(tdb, i, &lastrec) == -1)
1199                         return -1;
1200
1201         /* unlink it: next ptr is at start of record. */
1202         if (last_ptr == 0)
1203                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1204         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1205                 return -1;
1206
1207         /* recover the space */
1208         if (tdb_free(tdb, rec_ptr, rec) == -1)
1209                 return -1;
1210         return 0;
1211 }
1212
1213 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1214 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1215                          struct list_struct *rec)
1216 {
1217         int want_next = (tlock->off != 0);
1218
1219         /* No traversal allows if you've called tdb_lockkeys() */
1220         if (tdb->lockedkeys)
1221                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1222
1223         /* Lock each chain from the start one. */
1224         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1225                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1226                         return -1;
1227
1228                 /* No previous record?  Start at top of chain. */
1229                 if (!tlock->off) {
1230                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1231                                      &tlock->off) == -1)
1232                                 goto fail;
1233                 } else {
1234                         /* Otherwise unlock the previous record. */
1235                         if (unlock_record(tdb, tlock->off) != 0)
1236                                 goto fail;
1237                 }
1238
1239                 if (want_next) {
1240                         /* We have offset of old record: grab next */
1241                         if (rec_read(tdb, tlock->off, rec) == -1)
1242                                 goto fail;
1243                         tlock->off = rec->next;
1244                 }
1245
1246                 /* Iterate through chain */
1247                 while( tlock->off) {
1248                         tdb_off current;
1249                         if (rec_read(tdb, tlock->off, rec) == -1)
1250                                 goto fail;
1251                         if (!TDB_DEAD(rec)) {
1252                                 /* Woohoo: we found one! */
1253                                 if (lock_record(tdb, tlock->off) != 0)
1254                                         goto fail;
1255                                 return tlock->off;
1256                         }
1257                         /* Try to clean dead ones from old traverses */
1258                         current = tlock->off;
1259                         tlock->off = rec->next;
1260                         if (do_delete(tdb, current, rec) != 0)
1261                                 goto fail;
1262                 }
1263                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1264                 want_next = 0;
1265         }
1266         /* We finished iteration without finding anything */
1267         return TDB_ERRCODE(TDB_SUCCESS, 0);
1268
1269  fail:
1270         tlock->off = 0;
1271         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1272                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1273         return -1;
1274 }
1275
1276 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1277    return -1 on error or the record count traversed
1278    if fn is NULL then it is not called
1279    a non-zero return value from fn() indicates that the traversal should stop
1280   */
1281 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1282 {
1283         TDB_DATA key, dbuf;
1284         struct list_struct rec;
1285         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1286         int ret, count = 0;
1287
1288         /* This was in the initializaton, above, but the IRIX compiler
1289          * did not like it.  crh
1290          */
1291         tl.next = tdb->travlocks.next;
1292
1293         /* fcntl locks don't stack: beware traverse inside traverse */
1294         tdb->travlocks.next = &tl;
1295
1296         /* tdb_next_lock places locks on the record returned, and its chain */
1297         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1298                 count++;
1299                 /* now read the full record */
1300                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1301                                           rec.key_len + rec.data_len);
1302                 if (!key.dptr) {
1303                         ret = -1;
1304                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1305                                 goto out;
1306                         if (unlock_record(tdb, tl.off) != 0)
1307                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1308                         goto out;
1309                 }
1310                 key.dsize = rec.key_len;
1311                 dbuf.dptr = key.dptr + rec.key_len;
1312                 dbuf.dsize = rec.data_len;
1313
1314                 /* Drop chain lock, call out */
1315                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1316                         ret = -1;
1317                         goto out;
1318                 }
1319                 if (fn && fn(tdb, key, dbuf, state)) {
1320                         /* They want us to terminate traversal */
1321                         ret = count;
1322                         if (unlock_record(tdb, tl.off) != 0) {
1323                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1324                                 ret = -1;
1325                         }
1326                         tdb->travlocks.next = tl.next;
1327                         SAFE_FREE(key.dptr);
1328                         return count;
1329                 }
1330                 SAFE_FREE(key.dptr);
1331         }
1332 out:
1333         tdb->travlocks.next = tl.next;
1334         if (ret < 0)
1335                 return -1;
1336         else
1337                 return count;
1338 }
1339
1340 /* find the first entry in the database and return its key */
1341 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1342 {
1343         TDB_DATA key;
1344         struct list_struct rec;
1345
1346         /* release any old lock */
1347         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1348                 return tdb_null;
1349         tdb->travlocks.off = tdb->travlocks.hash = 0;
1350
1351         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1352                 return tdb_null;
1353         /* now read the key */
1354         key.dsize = rec.key_len;
1355         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1356         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1357                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1358         return key;
1359 }
1360
1361 /* find the next entry in the database, returning its key */
1362 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1363 {
1364         u32 oldhash;
1365         TDB_DATA key = tdb_null;
1366         struct list_struct rec;
1367         char *k = NULL;
1368
1369         /* Is locked key the old key?  If so, traverse will be reliable. */
1370         if (tdb->travlocks.off) {
1371                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1372                         return tdb_null;
1373                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1374                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1375                                             rec.key_len))
1376                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1377                         /* No, it wasn't: unlock it and start from scratch */
1378                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1379                                 return tdb_null;
1380                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1381                                 return tdb_null;
1382                         tdb->travlocks.off = 0;
1383                 }
1384
1385                 SAFE_FREE(k);
1386         }
1387
1388         if (!tdb->travlocks.off) {
1389                 /* No previous element: do normal find, and lock record */
1390                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1391                 if (!tdb->travlocks.off)
1392                         return tdb_null;
1393                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1394                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1395                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1396                         return tdb_null;
1397                 }
1398         }
1399         oldhash = tdb->travlocks.hash;
1400
1401         /* Grab next record: locks chain and returned record,
1402            unlocks old record */
1403         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1404                 key.dsize = rec.key_len;
1405                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1406                                           key.dsize);
1407                 /* Unlock the chain of this new record */
1408                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1409                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1410         }
1411         /* Unlock the chain of old record */
1412         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1413                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1414         return key;
1415 }
1416
1417 /* delete an entry in the database given a key */
1418 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1419 {
1420         tdb_off rec_ptr;
1421         struct list_struct rec;
1422         int ret;
1423
1424         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1425                 return -1;
1426         ret = do_delete(tdb, rec_ptr, &rec);
1427         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1428                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1429         return ret;
1430 }
1431
1432 /* store an element in the database, replacing any existing element
1433    with the same key 
1434
1435    return 0 on success, -1 on failure
1436 */
1437 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1438 {
1439         struct list_struct rec;
1440         u32 hash;
1441         tdb_off rec_ptr;
1442         char *p = NULL;
1443         int ret = 0;
1444
1445         /* find which hash bucket it is in */
1446         hash = tdb_hash(&key);
1447         if (!tdb_keylocked(tdb, hash))
1448                 return -1;
1449         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1450                 return -1;
1451
1452         /* check for it existing, on insert. */
1453         if (flag == TDB_INSERT) {
1454                 if (tdb_exists(tdb, key)) {
1455                         tdb->ecode = TDB_ERR_EXISTS;
1456                         goto fail;
1457                 }
1458         } else {
1459                 /* first try in-place update, on modify or replace. */
1460                 if (tdb_update(tdb, key, dbuf) == 0)
1461                         goto out;
1462                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1463                         goto fail;
1464         }
1465         /* reset the error code potentially set by the tdb_update() */
1466         tdb->ecode = TDB_SUCCESS;
1467
1468         /* delete any existing record - if it doesn't exist we don't
1469            care.  Doing this first reduces fragmentation, and avoids
1470            coalescing with `allocated' block before it's updated. */
1471         if (flag != TDB_INSERT)
1472                 tdb_delete(tdb, key);
1473
1474         /* Copy key+value *before* allocating free space in case malloc
1475            fails and we are left with a dead spot in the tdb. */
1476
1477         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1478                 tdb->ecode = TDB_ERR_OOM;
1479                 goto fail;
1480         }
1481
1482         memcpy(p, key.dptr, key.dsize);
1483         if (dbuf.dsize)
1484                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1485
1486         /* now we're into insert / modify / replace of a record which
1487          * we know could not be optimised by an in-place store (for
1488          * various reasons).  */
1489         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1490                 goto fail;
1491
1492         /* Read hash top into next ptr */
1493         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1494                 goto fail;
1495
1496         rec.key_len = key.dsize;
1497         rec.data_len = dbuf.dsize;
1498         rec.full_hash = hash;
1499         rec.magic = TDB_MAGIC;
1500
1501         /* write out and point the top of the hash chain at it */
1502         if (rec_write(tdb, rec_ptr, &rec) == -1
1503             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1504             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1505                 /* Need to tdb_unallocate() here */
1506                 goto fail;
1507         }
1508  out:
1509         SAFE_FREE(p); 
1510         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1511         return ret;
1512 fail:
1513         ret = -1;
1514         goto out;
1515 }
1516
1517 /* Attempt to append data to an entry in place - this only works if the new data size
1518    is <= the old data size and the key exists.
1519    on failure return -1. Record must be locked before calling.
1520 */
1521 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1522 {
1523         struct list_struct rec;
1524         tdb_off rec_ptr;
1525
1526         /* find entry */
1527         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1528                 return -1;
1529
1530         /* Append of 0 is always ok. */
1531         if (new_dbuf.dsize == 0)
1532                 return 0;
1533
1534         /* must be long enough for key, old data + new data and tailer */
1535         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1536                 /* No room. */
1537                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1538                 return -1;
1539         }
1540
1541         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1542                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1543                 return -1;
1544
1545         /* update size */
1546         rec.data_len += new_dbuf.dsize;
1547         return rec_write(tdb, rec_ptr, &rec);
1548 }
1549
1550 /* Append to an entry. Create if not exist. */
1551
1552 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1553 {
1554         struct list_struct rec;
1555         u32 hash;
1556         tdb_off rec_ptr;
1557         char *p = NULL;
1558         int ret = 0;
1559         size_t new_data_size = 0;
1560
1561         /* find which hash bucket it is in */
1562         hash = tdb_hash(&key);
1563         if (!tdb_keylocked(tdb, hash))
1564                 return -1;
1565         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1566                 return -1;
1567
1568         /* first try in-place. */
1569         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1570                 goto out;
1571
1572         /* reset the error code potentially set by the tdb_append_inplace() */
1573         tdb->ecode = TDB_SUCCESS;
1574
1575         /* find entry */
1576         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1577                 if (tdb->ecode != TDB_ERR_NOEXIST)
1578                         goto fail;
1579
1580                 /* Not found - create. */
1581
1582                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1583                 goto out;
1584         }
1585
1586         new_data_size = rec.data_len + new_dbuf.dsize;
1587
1588         /* Copy key+old_value+value *before* allocating free space in case malloc
1589            fails and we are left with a dead spot in the tdb. */
1590
1591         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1592                 tdb->ecode = TDB_ERR_OOM;
1593                 goto fail;
1594         }
1595
1596         /* Copy the key in place. */
1597         memcpy(p, key.dptr, key.dsize);
1598
1599         /* Now read the old data into place. */
1600         if (rec.data_len &&
1601                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1602                         goto fail;
1603
1604         /* Finally append the new data. */
1605         if (new_dbuf.dsize)
1606                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1607
1608         /* delete any existing record - if it doesn't exist we don't
1609            care.  Doing this first reduces fragmentation, and avoids
1610            coalescing with `allocated' block before it's updated. */
1611
1612         tdb_delete(tdb, key);
1613
1614         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1615                 goto fail;
1616
1617         /* Read hash top into next ptr */
1618         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1619                 goto fail;
1620
1621         rec.key_len = key.dsize;
1622         rec.data_len = new_data_size;
1623         rec.full_hash = hash;
1624         rec.magic = TDB_MAGIC;
1625
1626         /* write out and point the top of the hash chain at it */
1627         if (rec_write(tdb, rec_ptr, &rec) == -1
1628             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1629             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1630                 /* Need to tdb_unallocate() here */
1631                 goto fail;
1632         }
1633
1634  out:
1635         SAFE_FREE(p); 
1636         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1637         return ret;
1638
1639 fail:
1640         ret = -1;
1641         goto out;
1642 }
1643
1644 static int tdb_already_open(dev_t device,
1645                             ino_t ino)
1646 {
1647         TDB_CONTEXT *i;
1648         
1649         for (i = tdbs; i; i = i->next) {
1650                 if (i->device == device && i->inode == ino) {
1651                         return 1;
1652                 }
1653         }
1654
1655         return 0;
1656 }
1657
1658 /* open the database, creating it if necessary 
1659
1660    The open_flags and mode are passed straight to the open call on the
1661    database file. A flags value of O_WRONLY is invalid. The hash size
1662    is advisory, use zero for a default value.
1663
1664    Return is NULL on error, in which case errno is also set.  Don't 
1665    try to call tdb_error or tdb_errname, just do strerror(errno).
1666
1667    @param name may be NULL for internal databases. */
1668 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1669                       int open_flags, mode_t mode)
1670 {
1671         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1672 }
1673
1674
1675 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1676                          int open_flags, mode_t mode,
1677                          tdb_log_func log_fn)
1678 {
1679         TDB_CONTEXT *tdb;
1680         struct stat st;
1681         int rev = 0, locked;
1682         unsigned char *vp;
1683         u32 vertest;
1684
1685         if (!(tdb = calloc(1, sizeof *tdb))) {
1686                 /* Can't log this */
1687                 errno = ENOMEM;
1688                 goto fail;
1689         }
1690         tdb->fd = -1;
1691         tdb->name = NULL;
1692         tdb->map_ptr = NULL;
1693         tdb->lockedkeys = NULL;
1694         tdb->flags = tdb_flags;
1695         tdb->open_flags = open_flags;
1696         tdb->log_fn = log_fn;
1697         
1698         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1699                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1700                          name));
1701                 errno = EINVAL;
1702                 goto fail;
1703         }
1704         
1705         if (hash_size == 0)
1706                 hash_size = DEFAULT_HASH_SIZE;
1707         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1708                 tdb->read_only = 1;
1709                 /* read only databases don't do locking or clear if first */
1710                 tdb->flags |= TDB_NOLOCK;
1711                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1712         }
1713
1714         /* internal databases don't mmap or lock, and start off cleared */
1715         if (tdb->flags & TDB_INTERNAL) {
1716                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1717                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1718                 if (tdb_new_database(tdb, hash_size) != 0) {
1719                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1720                         goto fail;
1721                 }
1722                 goto internal;
1723         }
1724
1725         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1726                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1727                          name, strerror(errno)));
1728                 goto fail;      /* errno set by open(2) */
1729         }
1730
1731         /* ensure there is only one process initialising at once */
1732         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1733                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1734                          name, strerror(errno)));
1735                 goto fail;      /* errno set by tdb_brlock */
1736         }
1737
1738         /* we need to zero database if we are the only one with it open */
1739         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1740             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1741                 open_flags |= O_CREAT;
1742                 if (ftruncate(tdb->fd, 0) == -1) {
1743                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1744                                  "failed to truncate %s: %s\n",
1745                                  name, strerror(errno)));
1746                         goto fail; /* errno set by ftruncate */
1747                 }
1748         }
1749
1750         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1751             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1752             || (tdb->header.version != TDB_VERSION
1753                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1754                 /* its not a valid database - possibly initialise it */
1755                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1756                         errno = EIO; /* ie bad format or something */
1757                         goto fail;
1758                 }
1759                 rev = (tdb->flags & TDB_CONVERT);
1760         }
1761         vp = (unsigned char *)&tdb->header.version;
1762         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1763                   (((u32)vp[2]) << 8) | (u32)vp[3];
1764         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1765         if (!rev)
1766                 tdb->flags &= ~TDB_CONVERT;
1767         else {
1768                 tdb->flags |= TDB_CONVERT;
1769                 convert(&tdb->header, sizeof(tdb->header));
1770         }
1771         if (fstat(tdb->fd, &st) == -1)
1772                 goto fail;
1773
1774         /* Is it already in the open list?  If so, fail. */
1775         if (tdb_already_open(st.st_dev, st.st_ino)) {
1776                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1777                          "%s (%d,%d) is already open in this process\n",
1778                          name, st.st_dev, st.st_ino));
1779                 errno = EBUSY;
1780                 goto fail;
1781         }
1782
1783         if (!(tdb->name = (char *)strdup(name))) {
1784                 errno = ENOMEM;
1785                 goto fail;
1786         }
1787
1788         tdb->map_size = st.st_size;
1789         tdb->device = st.st_dev;
1790         tdb->inode = st.st_ino;
1791         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1792         if (!tdb->locked) {
1793                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1794                          "failed to allocate lock structure for %s\n",
1795                          name));
1796                 errno = ENOMEM;
1797                 goto fail;
1798         }
1799         tdb_mmap(tdb);
1800         if (locked) {
1801                 if (!tdb->read_only)
1802                         if (tdb_clear_spinlocks(tdb) != 0) {
1803                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1804                                 "failed to clear spinlock\n"));
1805                                 goto fail;
1806                         }
1807                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1808                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1809                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1810                                  name, strerror(errno)));
1811                         goto fail;
1812                 }
1813         }
1814         /* leave this lock in place to indicate it's in use */
1815         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1816                 goto fail;
1817
1818  internal:
1819         /* Internal (memory-only) databases skip all the code above to
1820          * do with disk files, and resume here by releasing their
1821          * global lock and hooking into the active list. */
1822         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1823                 goto fail;
1824         tdb->next = tdbs;
1825         tdbs = tdb;
1826         return tdb;
1827
1828  fail:
1829         { int save_errno = errno;
1830
1831         if (!tdb)
1832                 return NULL;
1833         
1834         if (tdb->map_ptr) {
1835                 if (tdb->flags & TDB_INTERNAL)
1836                         SAFE_FREE(tdb->map_ptr);
1837                 else
1838                         tdb_munmap(tdb);
1839         }
1840         SAFE_FREE(tdb->name);
1841         if (tdb->fd != -1)
1842                 if (close(tdb->fd) != 0)
1843                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1844         SAFE_FREE(tdb->locked);
1845         SAFE_FREE(tdb);
1846         errno = save_errno;
1847         return NULL;
1848         }
1849 }
1850
1851 /**
1852  * Close a database.
1853  *
1854  * @returns -1 for error; 0 for success.
1855  **/
1856 int tdb_close(TDB_CONTEXT *tdb)
1857 {
1858         TDB_CONTEXT **i;
1859         int ret = 0;
1860
1861         if (tdb->map_ptr) {
1862                 if (tdb->flags & TDB_INTERNAL)
1863                         SAFE_FREE(tdb->map_ptr);
1864                 else
1865                         tdb_munmap(tdb);
1866         }
1867         SAFE_FREE(tdb->name);
1868         if (tdb->fd != -1)
1869                 ret = close(tdb->fd);
1870         SAFE_FREE(tdb->locked);
1871         SAFE_FREE(tdb->lockedkeys);
1872
1873         /* Remove from contexts list */
1874         for (i = &tdbs; *i; i = &(*i)->next) {
1875                 if (*i == tdb) {
1876                         *i = tdb->next;
1877                         break;
1878                 }
1879         }
1880
1881         memset(tdb, 0, sizeof(*tdb));
1882         SAFE_FREE(tdb);
1883
1884         return ret;
1885 }
1886
1887 /* lock/unlock entire database */
1888 int tdb_lockall(TDB_CONTEXT *tdb)
1889 {
1890         u32 i;
1891
1892         /* There are no locks on read-only dbs */
1893         if (tdb->read_only)
1894                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1895         if (tdb->lockedkeys)
1896                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1897         for (i = 0; i < tdb->header.hash_size; i++) 
1898                 if (tdb_lock(tdb, i, F_WRLCK))
1899                         break;
1900
1901         /* If error, release locks we have... */
1902         if (i < tdb->header.hash_size) {
1903                 u32 j;
1904
1905                 for ( j = 0; j < i; j++)
1906                         tdb_unlock(tdb, j, F_WRLCK);
1907                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1908         }
1909
1910         return 0;
1911 }
1912 void tdb_unlockall(TDB_CONTEXT *tdb)
1913 {
1914         u32 i;
1915         for (i=0; i < tdb->header.hash_size; i++)
1916                 tdb_unlock(tdb, i, F_WRLCK);
1917 }
1918
1919 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1920 {
1921         u32 i, j, hash;
1922
1923         /* Can't lock more keys if already locked */
1924         if (tdb->lockedkeys)
1925                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1926         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1927                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1928         /* First number in array is # keys */
1929         tdb->lockedkeys[0] = number;
1930
1931         /* Insertion sort by bucket */
1932         for (i = 0; i < number; i++) {
1933                 hash = tdb_hash(&keys[i]);
1934                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1935                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1936                 tdb->lockedkeys[j+1] = hash;
1937         }
1938         /* Finally, lock in order */
1939         for (i = 0; i < number; i++)
1940                 if (tdb_lock(tdb, i, F_WRLCK))
1941                         break;
1942
1943         /* If error, release locks we have... */
1944         if (i < number) {
1945                 for ( j = 0; j < i; j++)
1946                         tdb_unlock(tdb, j, F_WRLCK);
1947                 SAFE_FREE(tdb->lockedkeys);
1948                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1949         }
1950         return 0;
1951 }
1952
1953 /* Unlock the keys previously locked by tdb_lockkeys() */
1954 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1955 {
1956         u32 i;
1957         if (!tdb->lockedkeys)
1958                 return;
1959         for (i = 0; i < tdb->lockedkeys[0]; i++)
1960                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1961         SAFE_FREE(tdb->lockedkeys);
1962 }
1963
1964 /* lock/unlock one hash chain. This is meant to be used to reduce
1965    contention - it cannot guarantee how many records will be locked */
1966 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1967 {
1968         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1969 }
1970
1971 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1972 {
1973         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1974 }
1975
1976 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1977 {
1978         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1979 }
1980
1981 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1982 {
1983         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1984 }
1985
1986
1987 /* register a loging function */
1988 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1989 {
1990         tdb->log_fn = fn;
1991 }
1992
1993
1994 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1995    seek pointer from our parent and to re-establish locks */
1996 int tdb_reopen(TDB_CONTEXT *tdb)
1997 {
1998         struct stat st;
1999
2000         if (tdb_munmap(tdb) != 0) {
2001                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2002                 goto fail;
2003         }
2004         if (close(tdb->fd) != 0)
2005                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2006         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2007         if (tdb->fd == -1) {
2008                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2009                 goto fail;
2010         }
2011         if (fstat(tdb->fd, &st) != 0) {
2012                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2013                 goto fail;
2014         }
2015         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2016                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2017                 goto fail;
2018         }
2019         tdb_mmap(tdb);
2020         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2021                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2022                 goto fail;
2023         }
2024
2025         return 0;
2026
2027 fail:
2028         tdb_close(tdb);
2029         return -1;
2030 }
2031
2032 /* reopen all tdb's */
2033 int tdb_reopen_all(void)
2034 {
2035         TDB_CONTEXT *tdb;
2036
2037         for (tdb=tdbs; tdb; tdb = tdb->next) {
2038                 if (tdb_reopen(tdb) != 0) return -1;
2039         }
2040
2041         return 0;
2042 }