trying to get HEAD building again. If you want the code
[samba.git] / source3 / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24
25 /* NOTE: If you use tdbs under valgrind, and in particular if you run
26  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
27  * think this is because valgrind doesn't understand that the mmap'd
28  * area may be written to by other processes.  Memory can, from the
29  * point of view of the grinded process, spontaneously become
30  * initialized.
31  *
32  * I can think of a few solutions.  [mbp 20030311]
33  *
34  * 1 - Write suppressions for Valgrind so that it doesn't complain
35  * about this.  Probably the most reasonable but people need to
36  * remember to use them.
37  *
38  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
39  *
40  * 3 - Use the special valgrind macros to mark memory as valid at the
41  * right time.  Probably too hard -- the process just doesn't know.
42  */ 
43
44 #ifdef STANDALONE
45 #if HAVE_CONFIG_H
46 #include <config.h>
47 #endif
48
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <fcntl.h>
52 #include <unistd.h>
53 #include <string.h>
54 #include <fcntl.h>
55 #include <errno.h>
56 #include <sys/mman.h>
57 #include <sys/stat.h>
58 #include <signal.h>
59 #include "tdb.h"
60 #include "spinlock.h"
61 #else
62 #include "includes.h"
63 #endif
64
65 #define TDB_MAGIC_FOOD "TDB file\n"
66 #define TDB_VERSION (0x26011967 + 6)
67 #define TDB_MAGIC (0x26011999U)
68 #define TDB_FREE_MAGIC (~TDB_MAGIC)
69 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
70 #define TDB_ALIGNMENT 4
71 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
72 #define DEFAULT_HASH_SIZE 131
73 #define TDB_PAGE_SIZE 0x2000
74 #define FREELIST_TOP (sizeof(struct tdb_header))
75 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
76 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
77 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
78 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
79 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
80
81 /* NB assumes there is a local variable called "tdb" that is the
82  * current context, also takes doubly-parenthesized print-style
83  * argument. */
84 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
85
86 /* lock offsets */
87 #define GLOBAL_LOCK 0
88 #define ACTIVE_LOCK 4
89
90 #ifndef MAP_FILE
91 #define MAP_FILE 0
92 #endif
93
94 #ifndef MAP_FAILED
95 #define MAP_FAILED ((void *)-1)
96 #endif
97
98 /* free memory if the pointer is valid and zero the pointer */
99 #ifndef SAFE_FREE
100 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
101 #endif
102
103 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
104 TDB_DATA tdb_null;
105
106 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
107 static TDB_CONTEXT *tdbs = NULL;
108
109 static int tdb_munmap(TDB_CONTEXT *tdb)
110 {
111         if (tdb->flags & TDB_INTERNAL)
112                 return 0;
113
114 #ifdef HAVE_MMAP
115         if (tdb->map_ptr) {
116                 int ret = munmap(tdb->map_ptr, tdb->map_size);
117                 if (ret != 0)
118                         return ret;
119         }
120 #endif
121         tdb->map_ptr = NULL;
122         return 0;
123 }
124
125 static void tdb_mmap(TDB_CONTEXT *tdb)
126 {
127         if (tdb->flags & TDB_INTERNAL)
128                 return;
129
130 #ifdef HAVE_MMAP
131         if (!(tdb->flags & TDB_NOMMAP)) {
132                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
133                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
134                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
135
136                 /*
137                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
138                  */
139
140                 if (tdb->map_ptr == MAP_FAILED) {
141                         tdb->map_ptr = NULL;
142                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
143                                  tdb->map_size, strerror(errno)));
144                 }
145         } else {
146                 tdb->map_ptr = NULL;
147         }
148 #else
149         tdb->map_ptr = NULL;
150 #endif
151 }
152
153 /* Endian conversion: we only ever deal with 4 byte quantities */
154 static void *convert(void *buf, u32 size)
155 {
156         u32 i, *p = buf;
157         for (i = 0; i < size / 4; i++)
158                 p[i] = TDB_BYTEREV(p[i]);
159         return buf;
160 }
161 #define DOCONV() (tdb->flags & TDB_CONVERT)
162 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
163
164 /* the body of the database is made of one list_struct for the free space
165    plus a separate data list for each hash value */
166 struct list_struct {
167         tdb_off next; /* offset of the next record in the list */
168         tdb_len rec_len; /* total byte length of record */
169         tdb_len key_len; /* byte length of key */
170         tdb_len data_len; /* byte length of data */
171         u32 full_hash; /* the full 32 bit hash of the key */
172         u32 magic;   /* try to catch errors */
173         /* the following union is implied:
174                 union {
175                         char record[rec_len];
176                         struct {
177                                 char key[key_len];
178                                 char data[data_len];
179                         }
180                         u32 totalsize; (tailer)
181                 }
182         */
183 };
184
185 /***************************************************************
186  Allow a caller to set a "alarm" flag that tdb can check to abort
187  a blocking lock on SIGALRM.
188 ***************************************************************/
189
190 static sig_atomic_t *palarm_fired;
191
192 void tdb_set_lock_alarm(sig_atomic_t *palarm)
193 {
194         palarm_fired = palarm;
195 }
196
197 /* a byte range locking function - return 0 on success
198    this functions locks/unlocks 1 byte at the specified offset.
199
200    On error, errno is also set so that errors are passed back properly
201    through tdb_open(). */
202 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
203                       int rw_type, int lck_type, int probe)
204 {
205         struct flock fl;
206         int ret;
207
208         if (tdb->flags & TDB_NOLOCK)
209                 return 0;
210         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
211                 errno = EACCES;
212                 return -1;
213         }
214
215         fl.l_type = rw_type;
216         fl.l_whence = SEEK_SET;
217         fl.l_start = offset;
218         fl.l_len = 1;
219         fl.l_pid = 0;
220
221         do {
222                 ret = fcntl(tdb->fd,lck_type,&fl);
223                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
224                         break;
225         } while (ret == -1 && errno == EINTR);
226
227         if (ret == -1) {
228                 if (!probe && lck_type != F_SETLK) {
229                         /* Ensure error code is set for log fun to examine. */
230                         if (errno == EINTR && palarm_fired && *palarm_fired)
231                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
232                         else
233                                 tdb->ecode = TDB_ERR_LOCK;
234                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
235                                  tdb->fd, offset, rw_type, lck_type));
236                 }
237                 /* Was it an alarm timeout ? */
238                 if (errno == EINTR && palarm_fired && *palarm_fired)
239                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
240                 /* Otherwise - generic lock error. */
241                 /* errno set by fcntl */
242                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
243         }
244         return 0;
245 }
246
247 /* lock a list in the database. list -1 is the alloc list */
248 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
249 {
250         if (list < -1 || list >= (int)tdb->header.hash_size) {
251                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
252                            list, ltype));
253                 return -1;
254         }
255         if (tdb->flags & TDB_NOLOCK)
256                 return 0;
257
258         /* Since fcntl locks don't nest, we do a lock for the first one,
259            and simply bump the count for future ones */
260         if (tdb->locked[list+1].count == 0) {
261                 if (!tdb->read_only && tdb->header.rwlocks) {
262                         if (tdb_spinlock(tdb, list, ltype)) {
263                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
264                                            list, ltype));
265                                 return -1;
266                         }
267                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
268                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
269                                            list, ltype, strerror(errno)));
270                         return -1;
271                 }
272                 tdb->locked[list+1].ltype = ltype;
273         }
274         tdb->locked[list+1].count++;
275         return 0;
276 }
277
278 /* unlock the database: returns void because it's too late for errors. */
279         /* changed to return int it may be interesting to know there
280            has been an error  --simo */
281 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
282 {
283         int ret = -1;
284
285         if (tdb->flags & TDB_NOLOCK)
286                 return 0;
287
288         /* Sanity checks */
289         if (list < -1 || list >= (int)tdb->header.hash_size) {
290                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
291                 return ret;
292         }
293
294         if (tdb->locked[list+1].count==0) {
295                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
296                 return ret;
297         }
298
299         if (tdb->locked[list+1].count == 1) {
300                 /* Down to last nested lock: unlock underneath */
301                 if (!tdb->read_only && tdb->header.rwlocks) {
302                         ret = tdb_spinunlock(tdb, list, ltype);
303                 } else {
304                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
305                 }
306         } else {
307                 ret = 0;
308         }
309         tdb->locked[list+1].count--;
310
311         if (ret)
312                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
313         return ret;
314 }
315
316 /* This is based on the hash algorithm from gdbm */
317 static u32 tdb_hash(TDB_DATA *key)
318 {
319         u32 value;      /* Used to compute the hash value.  */
320         u32   i;        /* Used to cycle through random values. */
321
322         /* Set the initial value from the key size. */
323         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
324                 value = (value + (key->dptr[i] << (i*5 % 24)));
325
326         return (1103515243 * value + 12345);  
327 }
328
329 /* check for an out of bounds access - if it is out of bounds then
330    see if the database has been expanded by someone else and expand
331    if necessary 
332    note that "len" is the minimum length needed for the db
333 */
334 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
335 {
336         struct stat st;
337         if (len <= tdb->map_size)
338                 return 0;
339         if (tdb->flags & TDB_INTERNAL) {
340                 if (!probe) {
341                         /* Ensure ecode is set for log fn. */
342                         tdb->ecode = TDB_ERR_IO;
343                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
344                                  (int)len, (int)tdb->map_size));
345                 }
346                 return TDB_ERRCODE(TDB_ERR_IO, -1);
347         }
348
349         if (fstat(tdb->fd, &st) == -1)
350                 return TDB_ERRCODE(TDB_ERR_IO, -1);
351
352         if (st.st_size < (size_t)len) {
353                 if (!probe) {
354                         /* Ensure ecode is set for log fn. */
355                         tdb->ecode = TDB_ERR_IO;
356                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
357                                  (int)len, (int)st.st_size));
358                 }
359                 return TDB_ERRCODE(TDB_ERR_IO, -1);
360         }
361
362         /* Unmap, update size, remap */
363         if (tdb_munmap(tdb) == -1)
364                 return TDB_ERRCODE(TDB_ERR_IO, -1);
365         tdb->map_size = st.st_size;
366         tdb_mmap(tdb);
367         return 0;
368 }
369
370 /* write a lump of data at a specified offset */
371 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
372 {
373         if (tdb_oob(tdb, off + len, 0) != 0)
374                 return -1;
375
376         if (tdb->map_ptr)
377                 memcpy(off + (char *)tdb->map_ptr, buf, len);
378 #ifdef HAVE_PWRITE
379         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
380 #else
381         else if (lseek(tdb->fd, off, SEEK_SET) != off
382                  || write(tdb->fd, buf, len) != (ssize_t)len) {
383 #endif
384                 /* Ensure ecode is set for log fn. */
385                 tdb->ecode = TDB_ERR_IO;
386                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
387                            off, len, strerror(errno)));
388                 return TDB_ERRCODE(TDB_ERR_IO, -1);
389         }
390         return 0;
391 }
392
393 /* read a lump of data at a specified offset, maybe convert */
394 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
395 {
396         if (tdb_oob(tdb, off + len, 0) != 0)
397                 return -1;
398
399         if (tdb->map_ptr)
400                 memcpy(buf, off + (char *)tdb->map_ptr, len);
401 #ifdef HAVE_PREAD
402         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
403 #else
404         else if (lseek(tdb->fd, off, SEEK_SET) != off
405                  || read(tdb->fd, buf, len) != (ssize_t)len) {
406 #endif
407                 /* Ensure ecode is set for log fn. */
408                 tdb->ecode = TDB_ERR_IO;
409                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
410                            off, len, strerror(errno)));
411                 return TDB_ERRCODE(TDB_ERR_IO, -1);
412         }
413         if (cv)
414                 convert(buf, len);
415         return 0;
416 }
417
418 /* read a lump of data, allocating the space for it */
419 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
420 {
421         char *buf;
422
423         if (!(buf = malloc(len))) {
424                 /* Ensure ecode is set for log fn. */
425                 tdb->ecode = TDB_ERR_OOM;
426                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
427                            len, strerror(errno)));
428                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
429         }
430         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
431                 SAFE_FREE(buf);
432                 return NULL;
433         }
434         return buf;
435 }
436
437 /* read/write a tdb_off */
438 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
439 {
440         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
441 }
442 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
443 {
444         tdb_off off = *d;
445         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
446 }
447
448 /* read/write a record */
449 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
450 {
451         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
452                 return -1;
453         if (TDB_BAD_MAGIC(rec)) {
454                 /* Ensure ecode is set for log fn. */
455                 tdb->ecode = TDB_ERR_CORRUPT;
456                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
457                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
458         }
459         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
460 }
461 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
462 {
463         struct list_struct r = *rec;
464         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
465 }
466
467 /* read a freelist record and check for simple errors */
468 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
469 {
470         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
471                 return -1;
472
473         if (rec->magic == TDB_MAGIC) {
474                 /* this happens when a app is showdown while deleting a record - we should
475                    not completely fail when this happens */
476                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
477                          rec->magic, off));
478                 rec->magic = TDB_FREE_MAGIC;
479                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
480                         return -1;
481         }
482
483         if (rec->magic != TDB_FREE_MAGIC) {
484                 /* Ensure ecode is set for log fn. */
485                 tdb->ecode = TDB_ERR_CORRUPT;
486                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
487                            rec->magic, off));
488                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
489         }
490         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
491                 return -1;
492         return 0;
493 }
494
495 /* update a record tailer (must hold allocation lock) */
496 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
497                          const struct list_struct *rec)
498 {
499         tdb_off totalsize;
500
501         /* Offset of tailer from record header */
502         totalsize = sizeof(*rec) + rec->rec_len;
503         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
504                          &totalsize);
505 }
506
507 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
508 {
509         struct list_struct rec;
510         tdb_off tailer_ofs, tailer;
511
512         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
513                 printf("ERROR: failed to read record at %u\n", offset);
514                 return 0;
515         }
516
517         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
518                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
519
520         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
521         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
522                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
523                 return rec.next;
524         }
525
526         if (tailer != rec.rec_len + sizeof(rec)) {
527                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
528                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
529         }
530         return rec.next;
531 }
532
533 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
534 {
535         tdb_off rec_ptr, top;
536
537         top = TDB_HASH_TOP(i);
538
539         if (tdb_lock(tdb, i, F_WRLCK) != 0)
540                 return -1;
541
542         if (ofs_read(tdb, top, &rec_ptr) == -1)
543                 return tdb_unlock(tdb, i, F_WRLCK);
544
545         if (rec_ptr)
546                 printf("hash=%d\n", i);
547
548         while (rec_ptr) {
549                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
550         }
551
552         return tdb_unlock(tdb, i, F_WRLCK);
553 }
554
555 void tdb_dump_all(TDB_CONTEXT *tdb)
556 {
557         int i;
558         for (i=0;i<tdb->header.hash_size;i++) {
559                 tdb_dump_chain(tdb, i);
560         }
561         printf("freelist:\n");
562         tdb_dump_chain(tdb, -1);
563 }
564
565 int tdb_printfreelist(TDB_CONTEXT *tdb)
566 {
567         int ret;
568         long total_free = 0;
569         tdb_off offset, rec_ptr;
570         struct list_struct rec;
571
572         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
573                 return ret;
574
575         offset = FREELIST_TOP;
576
577         /* read in the freelist top */
578         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
579                 tdb_unlock(tdb, -1, F_WRLCK);
580                 return 0;
581         }
582
583         printf("freelist top=[0x%08x]\n", rec_ptr );
584         while (rec_ptr) {
585                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
586                         tdb_unlock(tdb, -1, F_WRLCK);
587                         return -1;
588                 }
589
590                 if (rec.magic != TDB_FREE_MAGIC) {
591                         printf("bad magic 0x%08x in free list\n", rec.magic);
592                         tdb_unlock(tdb, -1, F_WRLCK);
593                         return -1;
594                 }
595
596                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
597                 total_free += rec.rec_len;
598
599                 /* move to the next record */
600                 rec_ptr = rec.next;
601         }
602         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
603                (int)total_free);
604
605         return tdb_unlock(tdb, -1, F_WRLCK);
606 }
607
608 /* Remove an element from the freelist.  Must have alloc lock. */
609 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
610 {
611         tdb_off last_ptr, i;
612
613         /* read in the freelist top */
614         last_ptr = FREELIST_TOP;
615         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
616                 if (i == off) {
617                         /* We've found it! */
618                         return ofs_write(tdb, last_ptr, &next);
619                 }
620                 /* Follow chain (next offset is at start of record) */
621                 last_ptr = i;
622         }
623         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
624         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
625 }
626
627 /* Add an element into the freelist. Merge adjacent records if
628    neccessary. */
629 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
630 {
631         tdb_off right, left;
632
633         /* Allocation and tailer lock */
634         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
635                 return -1;
636
637         /* set an initial tailer, so if we fail we don't leave a bogus record */
638         if (update_tailer(tdb, offset, rec) != 0) {
639                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
640                 goto fail;
641         }
642
643         /* Look right first (I'm an Australian, dammit) */
644         right = offset + sizeof(*rec) + rec->rec_len;
645         if (right + sizeof(*rec) <= tdb->map_size) {
646                 struct list_struct r;
647
648                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
649                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
650                         goto left;
651                 }
652
653                 /* If it's free, expand to include it. */
654                 if (r.magic == TDB_FREE_MAGIC) {
655                         if (remove_from_freelist(tdb, right, r.next) == -1) {
656                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
657                                 goto left;
658                         }
659                         rec->rec_len += sizeof(r) + r.rec_len;
660                 }
661         }
662
663 left:
664         /* Look left */
665         left = offset - sizeof(tdb_off);
666         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
667                 struct list_struct l;
668                 tdb_off leftsize;
669
670                 /* Read in tailer and jump back to header */
671                 if (ofs_read(tdb, left, &leftsize) == -1) {
672                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
673                         goto update;
674                 }
675                 left = offset - leftsize;
676
677                 /* Now read in record */
678                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
679                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
680                         goto update;
681                 }
682
683                 /* If it's free, expand to include it. */
684                 if (l.magic == TDB_FREE_MAGIC) {
685                         if (remove_from_freelist(tdb, left, l.next) == -1) {
686                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
687                                 goto update;
688                         } else {
689                                 offset = left;
690                                 rec->rec_len += leftsize;
691                         }
692                 }
693         }
694
695 update:
696         if (update_tailer(tdb, offset, rec) == -1) {
697                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
698                 goto fail;
699         }
700
701         /* Now, prepend to free list */
702         rec->magic = TDB_FREE_MAGIC;
703
704         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
705             rec_write(tdb, offset, rec) == -1 ||
706             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
707                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
708                 goto fail;
709         }
710
711         /* And we're done. */
712         tdb_unlock(tdb, -1, F_WRLCK);
713         return 0;
714
715  fail:
716         tdb_unlock(tdb, -1, F_WRLCK);
717         return -1;
718 }
719
720
721 /* expand a file.  we prefer to use ftruncate, as that is what posix
722   says to use for mmap expansion */
723 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
724 {
725         char buf[1024];
726 #if HAVE_FTRUNCATE_EXTEND
727         if (ftruncate(tdb->fd, size+addition) != 0) {
728                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
729                            size+addition, strerror(errno)));
730                 return -1;
731         }
732 #else
733         char b = 0;
734
735 #ifdef HAVE_PWRITE
736         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
737 #else
738         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
739             write(tdb->fd, &b, 1) != 1) {
740 #endif
741                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
742                            size+addition, strerror(errno)));
743                 return -1;
744         }
745 #endif
746
747         /* now fill the file with something. This ensures that the file isn't sparse, which would be
748            very bad if we ran out of disk. This must be done with write, not via mmap */
749         memset(buf, 0x42, sizeof(buf));
750         while (addition) {
751                 int n = addition>sizeof(buf)?sizeof(buf):addition;
752 #ifdef HAVE_PWRITE
753                 int ret = pwrite(tdb->fd, buf, n, size);
754 #else
755                 int ret;
756                 if (lseek(tdb->fd, size, SEEK_SET) != size)
757                         return -1;
758                 ret = write(tdb->fd, buf, n);
759 #endif
760                 if (ret != n) {
761                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
762                                    n, strerror(errno)));
763                         return -1;
764                 }
765                 addition -= n;
766                 size += n;
767         }
768         return 0;
769 }
770
771
772 /* expand the database at least size bytes by expanding the underlying
773    file and doing the mmap again if necessary */
774 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
775 {
776         struct list_struct rec;
777         tdb_off offset;
778
779         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
780                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
781                 return -1;
782         }
783
784         /* must know about any previous expansions by another process */
785         tdb_oob(tdb, tdb->map_size + 1, 1);
786
787         /* always make room for at least 10 more records, and round
788            the database up to a multiple of TDB_PAGE_SIZE */
789         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
790
791         if (!(tdb->flags & TDB_INTERNAL))
792                 tdb_munmap(tdb);
793
794         /*
795          * We must ensure the file is unmapped before doing this
796          * to ensure consistency with systems like OpenBSD where
797          * writes and mmaps are not consistent.
798          */
799
800         /* expand the file itself */
801         if (!(tdb->flags & TDB_INTERNAL)) {
802                 if (expand_file(tdb, tdb->map_size, size) != 0)
803                         goto fail;
804         }
805
806         tdb->map_size += size;
807
808         if (tdb->flags & TDB_INTERNAL)
809                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
810         else {
811                 /*
812                  * We must ensure the file is remapped before adding the space
813                  * to ensure consistency with systems like OpenBSD where
814                  * writes and mmaps are not consistent.
815                  */
816
817                 /* We're ok if the mmap fails as we'll fallback to read/write */
818                 tdb_mmap(tdb);
819         }
820
821         /* form a new freelist record */
822         memset(&rec,'\0',sizeof(rec));
823         rec.rec_len = size - sizeof(rec);
824
825         /* link it into the free list */
826         offset = tdb->map_size - size;
827         if (tdb_free(tdb, offset, &rec) == -1)
828                 goto fail;
829
830         tdb_unlock(tdb, -1, F_WRLCK);
831         return 0;
832  fail:
833         tdb_unlock(tdb, -1, F_WRLCK);
834         return -1;
835 }
836
837 /* allocate some space from the free list. The offset returned points
838    to a unconnected list_struct within the database with room for at
839    least length bytes of total data
840
841    0 is returned if the space could not be allocated
842  */
843 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
844                             struct list_struct *rec)
845 {
846         tdb_off rec_ptr, last_ptr, newrec_ptr;
847         struct list_struct newrec;
848
849         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
850                 return 0;
851
852         /* Extra bytes required for tailer */
853         length += sizeof(tdb_off);
854
855  again:
856         last_ptr = FREELIST_TOP;
857
858         /* read in the freelist top */
859         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
860                 goto fail;
861
862         /* keep looking until we find a freelist record big enough */
863         while (rec_ptr) {
864                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
865                         goto fail;
866
867                 if (rec->rec_len >= length) {
868                         /* found it - now possibly split it up  */
869                         if (rec->rec_len > length + MIN_REC_SIZE) {
870                                 /* Length of left piece */
871                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
872
873                                 /* Right piece to go on free list */
874                                 newrec.rec_len = rec->rec_len
875                                         - (sizeof(*rec) + length);
876                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
877
878                                 /* And left record is shortened */
879                                 rec->rec_len = length;
880                         } else
881                                 newrec_ptr = 0;
882
883                         /* Remove allocated record from the free list */
884                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
885                                 goto fail;
886
887                         /* Update header: do this before we drop alloc
888                            lock, otherwise tdb_free() might try to
889                            merge with us, thinking we're free.
890                            (Thanks Jeremy Allison). */
891                         rec->magic = TDB_MAGIC;
892                         if (rec_write(tdb, rec_ptr, rec) == -1)
893                                 goto fail;
894
895                         /* Did we create new block? */
896                         if (newrec_ptr) {
897                                 /* Update allocated record tailer (we
898                                    shortened it). */
899                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
900                                         goto fail;
901
902                                 /* Free new record */
903                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
904                                         goto fail;
905                         }
906
907                         /* all done - return the new record offset */
908                         tdb_unlock(tdb, -1, F_WRLCK);
909                         return rec_ptr;
910                 }
911                 /* move to the next record */
912                 last_ptr = rec_ptr;
913                 rec_ptr = rec->next;
914         }
915         /* we didn't find enough space. See if we can expand the
916            database and if we can then try again */
917         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
918                 goto again;
919  fail:
920         tdb_unlock(tdb, -1, F_WRLCK);
921         return 0;
922 }
923
924 /* initialise a new database with a specified hash size */
925 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
926 {
927         struct tdb_header *newdb;
928         int size, ret = -1;
929
930         /* We make it up in memory, then write it out if not internal */
931         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
932         if (!(newdb = calloc(size, 1)))
933                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
934
935         /* Fill in the header */
936         newdb->version = TDB_VERSION;
937         newdb->hash_size = hash_size;
938 #ifdef USE_SPINLOCKS
939         newdb->rwlocks = size;
940 #endif
941         if (tdb->flags & TDB_INTERNAL) {
942                 tdb->map_size = size;
943                 tdb->map_ptr = (char *)newdb;
944                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
945                 /* Convert the `ondisk' version if asked. */
946                 CONVERT(*newdb);
947                 return 0;
948         }
949         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
950                 goto fail;
951
952         if (ftruncate(tdb->fd, 0) == -1)
953                 goto fail;
954
955         /* This creates an endian-converted header, as if read from disk */
956         CONVERT(*newdb);
957         memcpy(&tdb->header, newdb, sizeof(tdb->header));
958         /* Don't endian-convert the magic food! */
959         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
960         if (write(tdb->fd, newdb, size) != size)
961                 ret = -1;
962         else
963                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
964
965   fail:
966         SAFE_FREE(newdb);
967         return ret;
968 }
969
970 /* Returns 0 on fail.  On success, return offset of record, and fills
971    in rec */
972 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
973                         struct list_struct *r)
974 {
975         tdb_off rec_ptr;
976         
977         /* read in the hash top */
978         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
979                 return 0;
980
981         /* keep looking until we find the right record */
982         while (rec_ptr) {
983                 if (rec_read(tdb, rec_ptr, r) == -1)
984                         return 0;
985
986                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
987                         char *k;
988                         /* a very likely hit - read the key */
989                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
990                                            r->key_len);
991                         if (!k)
992                                 return 0;
993
994                         if (memcmp(key.dptr, k, key.dsize) == 0) {
995                                 SAFE_FREE(k);
996                                 return rec_ptr;
997                         }
998                         SAFE_FREE(k);
999                 }
1000                 rec_ptr = r->next;
1001         }
1002         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1003 }
1004
1005 /* If they do lockkeys, check that this hash is one they locked */
1006 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1007 {
1008         u32 i;
1009         if (!tdb->lockedkeys)
1010                 return 1;
1011         for (i = 0; i < tdb->lockedkeys[0]; i++)
1012                 if (tdb->lockedkeys[i+1] == hash)
1013                         return 1;
1014         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1015 }
1016
1017 /* As tdb_find, but if you succeed, keep the lock */
1018 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
1019                              struct list_struct *rec)
1020 {
1021         u32 hash, rec_ptr;
1022
1023         hash = tdb_hash(&key);
1024         if (!tdb_keylocked(tdb, hash))
1025                 return 0;
1026         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1027                 return 0;
1028         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1029                 tdb_unlock(tdb, BUCKET(hash), locktype);
1030         return rec_ptr;
1031 }
1032
1033 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1034 {
1035         return tdb->ecode;
1036 }
1037
1038 static struct tdb_errname {
1039         enum TDB_ERROR ecode; const char *estring;
1040 } emap[] = { {TDB_SUCCESS, "Success"},
1041              {TDB_ERR_CORRUPT, "Corrupt database"},
1042              {TDB_ERR_IO, "IO Error"},
1043              {TDB_ERR_LOCK, "Locking error"},
1044              {TDB_ERR_OOM, "Out of memory"},
1045              {TDB_ERR_EXISTS, "Record exists"},
1046              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1047              {TDB_ERR_NOEXIST, "Record does not exist"} };
1048
1049 /* Error string for the last tdb error */
1050 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1051 {
1052         u32 i;
1053         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1054                 if (tdb->ecode == emap[i].ecode)
1055                         return emap[i].estring;
1056         return "Invalid error code";
1057 }
1058
1059 /* update an entry in place - this only works if the new data size
1060    is <= the old data size and the key exists.
1061    on failure return -1.
1062 */
1063
1064 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1065 {
1066         struct list_struct rec;
1067         tdb_off rec_ptr;
1068
1069         /* find entry */
1070         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1071                 return -1;
1072
1073         /* must be long enough key, data and tailer */
1074         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1075                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1076                 return -1;
1077         }
1078
1079         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1080                       dbuf.dptr, dbuf.dsize) == -1)
1081                 return -1;
1082
1083         if (dbuf.dsize != rec.data_len) {
1084                 /* update size */
1085                 rec.data_len = dbuf.dsize;
1086                 return rec_write(tdb, rec_ptr, &rec);
1087         }
1088  
1089         return 0;
1090 }
1091
1092 /* find an entry in the database given a key */
1093 /* If an entry doesn't exist tdb_err will be set to
1094  * TDB_ERR_NOEXIST. If a key has no data attached
1095  * tdb_err will not be set. Both will return a
1096  * zero pptr and zero dsize.
1097  */
1098
1099 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1100 {
1101         tdb_off rec_ptr;
1102         struct list_struct rec;
1103         TDB_DATA ret;
1104
1105         /* find which hash bucket it is in */
1106         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1107                 return tdb_null;
1108
1109         if (rec.data_len)
1110                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1111                                           rec.data_len);
1112         else
1113                 ret.dptr = NULL;
1114         ret.dsize = rec.data_len;
1115         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1116         return ret;
1117 }
1118
1119 /* check if an entry in the database exists 
1120
1121    note that 1 is returned if the key is found and 0 is returned if not found
1122    this doesn't match the conventions in the rest of this module, but is
1123    compatible with gdbm
1124 */
1125 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1126 {
1127         struct list_struct rec;
1128         
1129         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1130                 return 0;
1131         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1132         return 1;
1133 }
1134
1135 /* record lock stops delete underneath */
1136 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1137 {
1138         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1139 }
1140 /*
1141   Write locks override our own fcntl readlocks, so check it here.
1142   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1143   an error to fail to get the lock here.
1144 */
1145  
1146 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1147 {
1148         struct tdb_traverse_lock *i;
1149         for (i = &tdb->travlocks; i; i = i->next)
1150                 if (i->off == off)
1151                         return -1;
1152         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1153 }
1154
1155 /*
1156   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1157   an error to fail to get the lock here.
1158 */
1159
1160 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1161 {
1162         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1163 }
1164 /* fcntl locks don't stack: avoid unlocking someone else's */
1165 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1166 {
1167         struct tdb_traverse_lock *i;
1168         u32 count = 0;
1169
1170         if (off == 0)
1171                 return 0;
1172         for (i = &tdb->travlocks; i; i = i->next)
1173                 if (i->off == off)
1174                         count++;
1175         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1176 }
1177
1178 /* actually delete an entry in the database given the offset */
1179 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1180 {
1181         tdb_off last_ptr, i;
1182         struct list_struct lastrec;
1183
1184         if (tdb->read_only) return -1;
1185
1186         if (write_lock_record(tdb, rec_ptr) == -1) {
1187                 /* Someone traversing here: mark it as dead */
1188                 rec->magic = TDB_DEAD_MAGIC;
1189                 return rec_write(tdb, rec_ptr, rec);
1190         }
1191         if (write_unlock_record(tdb, rec_ptr) != 0)
1192                 return -1;
1193
1194         /* find previous record in hash chain */
1195         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1196                 return -1;
1197         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1198                 if (rec_read(tdb, i, &lastrec) == -1)
1199                         return -1;
1200
1201         /* unlink it: next ptr is at start of record. */
1202         if (last_ptr == 0)
1203                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1204         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1205                 return -1;
1206
1207         /* recover the space */
1208         if (tdb_free(tdb, rec_ptr, rec) == -1)
1209                 return -1;
1210         return 0;
1211 }
1212
1213 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1214 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1215                          struct list_struct *rec)
1216 {
1217         int want_next = (tlock->off != 0);
1218
1219         /* No traversal allows if you've called tdb_lockkeys() */
1220         if (tdb->lockedkeys)
1221                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1222
1223         /* Lock each chain from the start one. */
1224         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1225                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1226                         return -1;
1227
1228                 /* No previous record?  Start at top of chain. */
1229                 if (!tlock->off) {
1230                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1231                                      &tlock->off) == -1)
1232                                 goto fail;
1233                 } else {
1234                         /* Otherwise unlock the previous record. */
1235                         if (unlock_record(tdb, tlock->off) != 0)
1236                                 goto fail;
1237                 }
1238
1239                 if (want_next) {
1240                         /* We have offset of old record: grab next */
1241                         if (rec_read(tdb, tlock->off, rec) == -1)
1242                                 goto fail;
1243                         tlock->off = rec->next;
1244                 }
1245
1246                 /* Iterate through chain */
1247                 while( tlock->off) {
1248                         tdb_off current;
1249                         if (rec_read(tdb, tlock->off, rec) == -1)
1250                                 goto fail;
1251                         if (!TDB_DEAD(rec)) {
1252                                 /* Woohoo: we found one! */
1253                                 if (lock_record(tdb, tlock->off) != 0)
1254                                         goto fail;
1255                                 return tlock->off;
1256                         }
1257                         /* Try to clean dead ones from old traverses */
1258                         current = tlock->off;
1259                         tlock->off = rec->next;
1260                         if (!tdb->read_only && 
1261                             do_delete(tdb, current, rec) != 0)
1262                                 goto fail;
1263                 }
1264                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1265                 want_next = 0;
1266         }
1267         /* We finished iteration without finding anything */
1268         return TDB_ERRCODE(TDB_SUCCESS, 0);
1269
1270  fail:
1271         tlock->off = 0;
1272         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1273                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1274         return -1;
1275 }
1276
1277 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1278    return -1 on error or the record count traversed
1279    if fn is NULL then it is not called
1280    a non-zero return value from fn() indicates that the traversal should stop
1281   */
1282 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1283 {
1284         TDB_DATA key, dbuf;
1285         struct list_struct rec;
1286         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1287         int ret, count = 0;
1288
1289         /* This was in the initializaton, above, but the IRIX compiler
1290          * did not like it.  crh
1291          */
1292         tl.next = tdb->travlocks.next;
1293
1294         /* fcntl locks don't stack: beware traverse inside traverse */
1295         tdb->travlocks.next = &tl;
1296
1297         /* tdb_next_lock places locks on the record returned, and its chain */
1298         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1299                 count++;
1300                 /* now read the full record */
1301                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1302                                           rec.key_len + rec.data_len);
1303                 if (!key.dptr) {
1304                         ret = -1;
1305                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1306                                 goto out;
1307                         if (unlock_record(tdb, tl.off) != 0)
1308                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1309                         goto out;
1310                 }
1311                 key.dsize = rec.key_len;
1312                 dbuf.dptr = key.dptr + rec.key_len;
1313                 dbuf.dsize = rec.data_len;
1314
1315                 /* Drop chain lock, call out */
1316                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1317                         ret = -1;
1318                         goto out;
1319                 }
1320                 if (fn && fn(tdb, key, dbuf, state)) {
1321                         /* They want us to terminate traversal */
1322                         ret = count;
1323                         if (unlock_record(tdb, tl.off) != 0) {
1324                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1325                                 ret = -1;
1326                         }
1327                         tdb->travlocks.next = tl.next;
1328                         SAFE_FREE(key.dptr);
1329                         return count;
1330                 }
1331                 SAFE_FREE(key.dptr);
1332         }
1333 out:
1334         tdb->travlocks.next = tl.next;
1335         if (ret < 0)
1336                 return -1;
1337         else
1338                 return count;
1339 }
1340
1341 /* find the first entry in the database and return its key */
1342 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1343 {
1344         TDB_DATA key;
1345         struct list_struct rec;
1346
1347         /* release any old lock */
1348         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1349                 return tdb_null;
1350         tdb->travlocks.off = tdb->travlocks.hash = 0;
1351
1352         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1353                 return tdb_null;
1354         /* now read the key */
1355         key.dsize = rec.key_len;
1356         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1357         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1358                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1359         return key;
1360 }
1361
1362 /* find the next entry in the database, returning its key */
1363 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1364 {
1365         u32 oldhash;
1366         TDB_DATA key = tdb_null;
1367         struct list_struct rec;
1368         char *k = NULL;
1369
1370         /* Is locked key the old key?  If so, traverse will be reliable. */
1371         if (tdb->travlocks.off) {
1372                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1373                         return tdb_null;
1374                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1375                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1376                                             rec.key_len))
1377                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1378                         /* No, it wasn't: unlock it and start from scratch */
1379                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1380                                 return tdb_null;
1381                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1382                                 return tdb_null;
1383                         tdb->travlocks.off = 0;
1384                 }
1385
1386                 SAFE_FREE(k);
1387         }
1388
1389         if (!tdb->travlocks.off) {
1390                 /* No previous element: do normal find, and lock record */
1391                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1392                 if (!tdb->travlocks.off)
1393                         return tdb_null;
1394                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1395                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1396                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1397                         return tdb_null;
1398                 }
1399         }
1400         oldhash = tdb->travlocks.hash;
1401
1402         /* Grab next record: locks chain and returned record,
1403            unlocks old record */
1404         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1405                 key.dsize = rec.key_len;
1406                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1407                                           key.dsize);
1408                 /* Unlock the chain of this new record */
1409                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1410                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1411         }
1412         /* Unlock the chain of old record */
1413         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1414                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1415         return key;
1416 }
1417
1418 /* delete an entry in the database given a key */
1419 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1420 {
1421         tdb_off rec_ptr;
1422         struct list_struct rec;
1423         int ret;
1424
1425         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1426                 return -1;
1427         ret = do_delete(tdb, rec_ptr, &rec);
1428         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1429                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1430         return ret;
1431 }
1432
1433 /* store an element in the database, replacing any existing element
1434    with the same key 
1435
1436    return 0 on success, -1 on failure
1437 */
1438 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1439 {
1440         struct list_struct rec;
1441         u32 hash;
1442         tdb_off rec_ptr;
1443         char *p = NULL;
1444         int ret = 0;
1445
1446         /* find which hash bucket it is in */
1447         hash = tdb_hash(&key);
1448         if (!tdb_keylocked(tdb, hash))
1449                 return -1;
1450         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1451                 return -1;
1452
1453         /* check for it existing, on insert. */
1454         if (flag == TDB_INSERT) {
1455                 if (tdb_exists(tdb, key)) {
1456                         tdb->ecode = TDB_ERR_EXISTS;
1457                         goto fail;
1458                 }
1459         } else {
1460                 /* first try in-place update, on modify or replace. */
1461                 if (tdb_update(tdb, key, dbuf) == 0)
1462                         goto out;
1463                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1464                         goto fail;
1465         }
1466         /* reset the error code potentially set by the tdb_update() */
1467         tdb->ecode = TDB_SUCCESS;
1468
1469         /* delete any existing record - if it doesn't exist we don't
1470            care.  Doing this first reduces fragmentation, and avoids
1471            coalescing with `allocated' block before it's updated. */
1472         if (flag != TDB_INSERT)
1473                 tdb_delete(tdb, key);
1474
1475         /* Copy key+value *before* allocating free space in case malloc
1476            fails and we are left with a dead spot in the tdb. */
1477
1478         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1479                 tdb->ecode = TDB_ERR_OOM;
1480                 goto fail;
1481         }
1482
1483         memcpy(p, key.dptr, key.dsize);
1484         if (dbuf.dsize)
1485                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1486
1487         /* now we're into insert / modify / replace of a record which
1488          * we know could not be optimised by an in-place store (for
1489          * various reasons).  */
1490         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1491                 goto fail;
1492
1493         /* Read hash top into next ptr */
1494         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1495                 goto fail;
1496
1497         rec.key_len = key.dsize;
1498         rec.data_len = dbuf.dsize;
1499         rec.full_hash = hash;
1500         rec.magic = TDB_MAGIC;
1501
1502         /* write out and point the top of the hash chain at it */
1503         if (rec_write(tdb, rec_ptr, &rec) == -1
1504             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1505             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1506                 /* Need to tdb_unallocate() here */
1507                 goto fail;
1508         }
1509  out:
1510         SAFE_FREE(p); 
1511         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1512         return ret;
1513 fail:
1514         ret = -1;
1515         goto out;
1516 }
1517
1518 /* Attempt to append data to an entry in place - this only works if the new data size
1519    is <= the old data size and the key exists.
1520    on failure return -1. Record must be locked before calling.
1521 */
1522 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1523 {
1524         struct list_struct rec;
1525         tdb_off rec_ptr;
1526
1527         /* find entry */
1528         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1529                 return -1;
1530
1531         /* Append of 0 is always ok. */
1532         if (new_dbuf.dsize == 0)
1533                 return 0;
1534
1535         /* must be long enough for key, old data + new data and tailer */
1536         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1537                 /* No room. */
1538                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1539                 return -1;
1540         }
1541
1542         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1543                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1544                 return -1;
1545
1546         /* update size */
1547         rec.data_len += new_dbuf.dsize;
1548         return rec_write(tdb, rec_ptr, &rec);
1549 }
1550
1551 /* Append to an entry. Create if not exist. */
1552
1553 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1554 {
1555         struct list_struct rec;
1556         u32 hash;
1557         tdb_off rec_ptr;
1558         char *p = NULL;
1559         int ret = 0;
1560         size_t new_data_size = 0;
1561
1562         /* find which hash bucket it is in */
1563         hash = tdb_hash(&key);
1564         if (!tdb_keylocked(tdb, hash))
1565                 return -1;
1566         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1567                 return -1;
1568
1569         /* first try in-place. */
1570         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1571                 goto out;
1572
1573         /* reset the error code potentially set by the tdb_append_inplace() */
1574         tdb->ecode = TDB_SUCCESS;
1575
1576         /* find entry */
1577         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1578                 if (tdb->ecode != TDB_ERR_NOEXIST)
1579                         goto fail;
1580
1581                 /* Not found - create. */
1582
1583                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1584                 goto out;
1585         }
1586
1587         new_data_size = rec.data_len + new_dbuf.dsize;
1588
1589         /* Copy key+old_value+value *before* allocating free space in case malloc
1590            fails and we are left with a dead spot in the tdb. */
1591
1592         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1593                 tdb->ecode = TDB_ERR_OOM;
1594                 goto fail;
1595         }
1596
1597         /* Copy the key in place. */
1598         memcpy(p, key.dptr, key.dsize);
1599
1600         /* Now read the old data into place. */
1601         if (rec.data_len &&
1602                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1603                         goto fail;
1604
1605         /* Finally append the new data. */
1606         if (new_dbuf.dsize)
1607                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1608
1609         /* delete any existing record - if it doesn't exist we don't
1610            care.  Doing this first reduces fragmentation, and avoids
1611            coalescing with `allocated' block before it's updated. */
1612
1613         tdb_delete(tdb, key);
1614
1615         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1616                 goto fail;
1617
1618         /* Read hash top into next ptr */
1619         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1620                 goto fail;
1621
1622         rec.key_len = key.dsize;
1623         rec.data_len = new_data_size;
1624         rec.full_hash = hash;
1625         rec.magic = TDB_MAGIC;
1626
1627         /* write out and point the top of the hash chain at it */
1628         if (rec_write(tdb, rec_ptr, &rec) == -1
1629             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1630             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1631                 /* Need to tdb_unallocate() here */
1632                 goto fail;
1633         }
1634
1635  out:
1636         SAFE_FREE(p); 
1637         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1638         return ret;
1639
1640 fail:
1641         ret = -1;
1642         goto out;
1643 }
1644
1645 static int tdb_already_open(dev_t device,
1646                             ino_t ino)
1647 {
1648         TDB_CONTEXT *i;
1649         
1650         for (i = tdbs; i; i = i->next) {
1651                 if (i->device == device && i->inode == ino) {
1652                         return 1;
1653                 }
1654         }
1655
1656         return 0;
1657 }
1658
1659 /* open the database, creating it if necessary 
1660
1661    The open_flags and mode are passed straight to the open call on the
1662    database file. A flags value of O_WRONLY is invalid. The hash size
1663    is advisory, use zero for a default value.
1664
1665    Return is NULL on error, in which case errno is also set.  Don't 
1666    try to call tdb_error or tdb_errname, just do strerror(errno).
1667
1668    @param name may be NULL for internal databases. */
1669 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1670                       int open_flags, mode_t mode)
1671 {
1672         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1673 }
1674
1675
1676 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1677                          int open_flags, mode_t mode,
1678                          tdb_log_func log_fn)
1679 {
1680         TDB_CONTEXT *tdb;
1681         struct stat st;
1682         int rev = 0, locked;
1683         unsigned char *vp;
1684         u32 vertest;
1685
1686         if (!(tdb = calloc(1, sizeof *tdb))) {
1687                 /* Can't log this */
1688                 errno = ENOMEM;
1689                 goto fail;
1690         }
1691         tdb->fd = -1;
1692         tdb->name = NULL;
1693         tdb->map_ptr = NULL;
1694         tdb->lockedkeys = NULL;
1695         tdb->flags = tdb_flags;
1696         tdb->open_flags = open_flags;
1697         tdb->log_fn = log_fn;
1698         
1699         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1700                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1701                          name));
1702                 errno = EINVAL;
1703                 goto fail;
1704         }
1705         
1706         if (hash_size == 0)
1707                 hash_size = DEFAULT_HASH_SIZE;
1708         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1709                 tdb->read_only = 1;
1710                 /* read only databases don't do locking or clear if first */
1711                 tdb->flags |= TDB_NOLOCK;
1712                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1713         }
1714
1715         /* internal databases don't mmap or lock, and start off cleared */
1716         if (tdb->flags & TDB_INTERNAL) {
1717                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1718                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1719                 if (tdb_new_database(tdb, hash_size) != 0) {
1720                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1721                         goto fail;
1722                 }
1723                 goto internal;
1724         }
1725
1726         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1727                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1728                          name, strerror(errno)));
1729                 goto fail;      /* errno set by open(2) */
1730         }
1731
1732         /* ensure there is only one process initialising at once */
1733         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1734                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1735                          name, strerror(errno)));
1736                 goto fail;      /* errno set by tdb_brlock */
1737         }
1738
1739         /* we need to zero database if we are the only one with it open */
1740         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1741             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1742                 open_flags |= O_CREAT;
1743                 if (ftruncate(tdb->fd, 0) == -1) {
1744                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1745                                  "failed to truncate %s: %s\n",
1746                                  name, strerror(errno)));
1747                         goto fail; /* errno set by ftruncate */
1748                 }
1749         }
1750
1751         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1752             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1753             || (tdb->header.version != TDB_VERSION
1754                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1755                 /* its not a valid database - possibly initialise it */
1756                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1757                         errno = EIO; /* ie bad format or something */
1758                         goto fail;
1759                 }
1760                 rev = (tdb->flags & TDB_CONVERT);
1761         }
1762         vp = (unsigned char *)&tdb->header.version;
1763         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1764                   (((u32)vp[2]) << 8) | (u32)vp[3];
1765         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1766         if (!rev)
1767                 tdb->flags &= ~TDB_CONVERT;
1768         else {
1769                 tdb->flags |= TDB_CONVERT;
1770                 convert(&tdb->header, sizeof(tdb->header));
1771         }
1772         if (fstat(tdb->fd, &st) == -1)
1773                 goto fail;
1774
1775         /* Is it already in the open list?  If so, fail. */
1776         if (tdb_already_open(st.st_dev, st.st_ino)) {
1777                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1778                          "%s (%d,%d) is already open in this process\n",
1779                          name, st.st_dev, st.st_ino));
1780                 errno = EBUSY;
1781                 goto fail;
1782         }
1783
1784         if (!(tdb->name = (char *)strdup(name))) {
1785                 errno = ENOMEM;
1786                 goto fail;
1787         }
1788
1789         tdb->map_size = st.st_size;
1790         tdb->device = st.st_dev;
1791         tdb->inode = st.st_ino;
1792         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1793         if (!tdb->locked) {
1794                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1795                          "failed to allocate lock structure for %s\n",
1796                          name));
1797                 errno = ENOMEM;
1798                 goto fail;
1799         }
1800         tdb_mmap(tdb);
1801         if (locked) {
1802                 if (!tdb->read_only)
1803                         if (tdb_clear_spinlocks(tdb) != 0) {
1804                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1805                                 "failed to clear spinlock\n"));
1806                                 goto fail;
1807                         }
1808                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1809                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1810                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1811                                  name, strerror(errno)));
1812                         goto fail;
1813                 }
1814         }
1815         /* leave this lock in place to indicate it's in use */
1816         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1817                 goto fail;
1818
1819  internal:
1820         /* Internal (memory-only) databases skip all the code above to
1821          * do with disk files, and resume here by releasing their
1822          * global lock and hooking into the active list. */
1823         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1824                 goto fail;
1825         tdb->next = tdbs;
1826         tdbs = tdb;
1827         return tdb;
1828
1829  fail:
1830         { int save_errno = errno;
1831
1832         if (!tdb)
1833                 return NULL;
1834         
1835         if (tdb->map_ptr) {
1836                 if (tdb->flags & TDB_INTERNAL)
1837                         SAFE_FREE(tdb->map_ptr);
1838                 else
1839                         tdb_munmap(tdb);
1840         }
1841         SAFE_FREE(tdb->name);
1842         if (tdb->fd != -1)
1843                 if (close(tdb->fd) != 0)
1844                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1845         SAFE_FREE(tdb->locked);
1846         SAFE_FREE(tdb);
1847         errno = save_errno;
1848         return NULL;
1849         }
1850 }
1851
1852 /**
1853  * Close a database.
1854  *
1855  * @returns -1 for error; 0 for success.
1856  **/
1857 int tdb_close(TDB_CONTEXT *tdb)
1858 {
1859         TDB_CONTEXT **i;
1860         int ret = 0;
1861
1862         if (tdb->map_ptr) {
1863                 if (tdb->flags & TDB_INTERNAL)
1864                         SAFE_FREE(tdb->map_ptr);
1865                 else
1866                         tdb_munmap(tdb);
1867         }
1868         SAFE_FREE(tdb->name);
1869         if (tdb->fd != -1)
1870                 ret = close(tdb->fd);
1871         SAFE_FREE(tdb->locked);
1872         SAFE_FREE(tdb->lockedkeys);
1873
1874         /* Remove from contexts list */
1875         for (i = &tdbs; *i; i = &(*i)->next) {
1876                 if (*i == tdb) {
1877                         *i = tdb->next;
1878                         break;
1879                 }
1880         }
1881
1882         memset(tdb, 0, sizeof(*tdb));
1883         SAFE_FREE(tdb);
1884
1885         return ret;
1886 }
1887
1888 /* lock/unlock entire database */
1889 int tdb_lockall(TDB_CONTEXT *tdb)
1890 {
1891         u32 i;
1892
1893         /* There are no locks on read-only dbs */
1894         if (tdb->read_only)
1895                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1896         if (tdb->lockedkeys)
1897                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1898         for (i = 0; i < tdb->header.hash_size; i++) 
1899                 if (tdb_lock(tdb, i, F_WRLCK))
1900                         break;
1901
1902         /* If error, release locks we have... */
1903         if (i < tdb->header.hash_size) {
1904                 u32 j;
1905
1906                 for ( j = 0; j < i; j++)
1907                         tdb_unlock(tdb, j, F_WRLCK);
1908                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1909         }
1910
1911         return 0;
1912 }
1913 void tdb_unlockall(TDB_CONTEXT *tdb)
1914 {
1915         u32 i;
1916         for (i=0; i < tdb->header.hash_size; i++)
1917                 tdb_unlock(tdb, i, F_WRLCK);
1918 }
1919
1920 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1921 {
1922         u32 i, j, hash;
1923
1924         /* Can't lock more keys if already locked */
1925         if (tdb->lockedkeys)
1926                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1927         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1928                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1929         /* First number in array is # keys */
1930         tdb->lockedkeys[0] = number;
1931
1932         /* Insertion sort by bucket */
1933         for (i = 0; i < number; i++) {
1934                 hash = tdb_hash(&keys[i]);
1935                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1936                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1937                 tdb->lockedkeys[j+1] = hash;
1938         }
1939         /* Finally, lock in order */
1940         for (i = 0; i < number; i++)
1941                 if (tdb_lock(tdb, i, F_WRLCK))
1942                         break;
1943
1944         /* If error, release locks we have... */
1945         if (i < number) {
1946                 for ( j = 0; j < i; j++)
1947                         tdb_unlock(tdb, j, F_WRLCK);
1948                 SAFE_FREE(tdb->lockedkeys);
1949                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1950         }
1951         return 0;
1952 }
1953
1954 /* Unlock the keys previously locked by tdb_lockkeys() */
1955 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1956 {
1957         u32 i;
1958         if (!tdb->lockedkeys)
1959                 return;
1960         for (i = 0; i < tdb->lockedkeys[0]; i++)
1961                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1962         SAFE_FREE(tdb->lockedkeys);
1963 }
1964
1965 /* lock/unlock one hash chain. This is meant to be used to reduce
1966    contention - it cannot guarantee how many records will be locked */
1967 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1968 {
1969         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1970 }
1971
1972 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1973 {
1974         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1975 }
1976
1977 int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1978 {
1979         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1980 }
1981
1982 int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1983 {
1984         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1985 }
1986
1987
1988 /* register a loging function */
1989 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1990 {
1991         tdb->log_fn = fn;
1992 }
1993
1994
1995 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1996    seek pointer from our parent and to re-establish locks */
1997 int tdb_reopen(TDB_CONTEXT *tdb)
1998 {
1999         struct stat st;
2000
2001         if (tdb_munmap(tdb) != 0) {
2002                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2003                 goto fail;
2004         }
2005         if (close(tdb->fd) != 0)
2006                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2007         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2008         if (tdb->fd == -1) {
2009                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2010                 goto fail;
2011         }
2012         if (fstat(tdb->fd, &st) != 0) {
2013                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2014                 goto fail;
2015         }
2016         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2017                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2018                 goto fail;
2019         }
2020         tdb_mmap(tdb);
2021         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2022                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2023                 goto fail;
2024         }
2025
2026         return 0;
2027
2028 fail:
2029         tdb_close(tdb);
2030         return -1;
2031 }
2032
2033 /* reopen all tdb's */
2034 int tdb_reopen_all(void)
2035 {
2036         TDB_CONTEXT *tdb;
2037
2038         for (tdb=tdbs; tdb; tdb = tdb->next) {
2039                 if (tdb_reopen(tdb) != 0) return -1;
2040         }
2041
2042         return 0;
2043 }