ensure that we unlock in case we hit a tdb error
[samba.git] / source3 / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23 #ifdef STANDALONE
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <string.h>
33 #include <fcntl.h>
34 #include <errno.h>
35 #include <sys/mman.h>
36 #include <sys/stat.h>
37 #include "tdb.h"
38 #include "spinlock.h"
39 #else
40 #include "includes.h"
41 #endif
42
43 #define TDB_MAGIC_FOOD "TDB file\n"
44 #define TDB_VERSION (0x26011967 + 6)
45 #define TDB_MAGIC (0x26011999U)
46 #define TDB_FREE_MAGIC (~TDB_MAGIC)
47 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
48 #define TDB_ALIGNMENT 4
49 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
50 #define DEFAULT_HASH_SIZE 131
51 #define TDB_PAGE_SIZE 0x2000
52 #define FREELIST_TOP (sizeof(struct tdb_header))
53 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
54 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
55 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
56 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
57 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
58
59 /* NB assumes there is a local variable called "tdb" that is the
60  * current context, also takes doubly-parenthesized print-style
61  * argument. */
62 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
63
64 /* lock offsets */
65 #define GLOBAL_LOCK 0
66 #define ACTIVE_LOCK 4
67
68 #ifndef MAP_FILE
69 #define MAP_FILE 0
70 #endif
71
72 #ifndef MAP_FAILED
73 #define MAP_FAILED ((void *)-1)
74 #endif
75
76 /* free memory if the pointer is valid and zero the pointer */
77 #ifndef SAFE_FREE
78 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
79 #endif
80
81 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
82 TDB_DATA tdb_null;
83
84 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
85 static TDB_CONTEXT *tdbs = NULL;
86
87 static int tdb_munmap(TDB_CONTEXT *tdb)
88 {
89         if (tdb->flags & TDB_INTERNAL)
90                 return 0;
91
92 #ifdef HAVE_MMAP
93         if (tdb->map_ptr) {
94                 int ret = munmap(tdb->map_ptr, tdb->map_size);
95                 if (ret != 0)
96                         return ret;
97         }
98 #endif
99         tdb->map_ptr = NULL;
100         return 0;
101 }
102
103 static void tdb_mmap(TDB_CONTEXT *tdb)
104 {
105         if (tdb->flags & TDB_INTERNAL)
106                 return;
107
108 #ifdef HAVE_MMAP
109         if (!(tdb->flags & TDB_NOMMAP)) {
110                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
111                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
112                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
113
114                 /*
115                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
116                  */
117
118                 if (tdb->map_ptr == MAP_FAILED) {
119                         tdb->map_ptr = NULL;
120                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
121                                  tdb->map_size, strerror(errno)));
122                 }
123         } else {
124                 tdb->map_ptr = NULL;
125         }
126 #else
127         tdb->map_ptr = NULL;
128 #endif
129 }
130
131 /* Endian conversion: we only ever deal with 4 byte quantities */
132 static void *convert(void *buf, u32 size)
133 {
134         u32 i, *p = buf;
135         for (i = 0; i < size / 4; i++)
136                 p[i] = TDB_BYTEREV(p[i]);
137         return buf;
138 }
139 #define DOCONV() (tdb->flags & TDB_CONVERT)
140 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
141
142 /* the body of the database is made of one list_struct for the free space
143    plus a separate data list for each hash value */
144 struct list_struct {
145         tdb_off next; /* offset of the next record in the list */
146         tdb_len rec_len; /* total byte length of record */
147         tdb_len key_len; /* byte length of key */
148         tdb_len data_len; /* byte length of data */
149         u32 full_hash; /* the full 32 bit hash of the key */
150         u32 magic;   /* try to catch errors */
151         /* the following union is implied:
152                 union {
153                         char record[rec_len];
154                         struct {
155                                 char key[key_len];
156                                 char data[data_len];
157                         }
158                         u32 totalsize; (tailer)
159                 }
160         */
161 };
162
163 /* a byte range locking function - return 0 on success
164    this functions locks/unlocks 1 byte at the specified offset.
165
166    On error, errno is also set so that errors are passed back properly
167    through tdb_open(). */
168 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
169                       int rw_type, int lck_type, int probe)
170 {
171         struct flock fl;
172         int ret;
173
174         if (tdb->flags & TDB_NOLOCK)
175                 return 0;
176         if (tdb->read_only) {
177                 errno = EACCES;
178                 return -1;
179         }
180
181         fl.l_type = rw_type;
182         fl.l_whence = SEEK_SET;
183         fl.l_start = offset;
184         fl.l_len = 1;
185         fl.l_pid = 0;
186
187         do {
188                 ret = fcntl(tdb->fd,lck_type,&fl);
189         } while (ret == -1 && errno == EINTR);
190
191         if (ret == -1) {
192                 if (!probe && lck_type != F_SETLK) {
193                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
194                                  tdb->fd, offset, rw_type, lck_type));
195                 }
196                 /* errno set by fcntl */
197                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
198         }
199         return 0;
200 }
201
202 /* lock a list in the database. list -1 is the alloc list */
203 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
204 {
205         if (list < -1 || list >= (int)tdb->header.hash_size) {
206                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
207                            list, ltype));
208                 return -1;
209         }
210         if (tdb->flags & TDB_NOLOCK)
211                 return 0;
212
213         /* Since fcntl locks don't nest, we do a lock for the first one,
214            and simply bump the count for future ones */
215         if (tdb->locked[list+1].count == 0) {
216                 if (!tdb->read_only && tdb->header.rwlocks) {
217                         if (tdb_spinlock(tdb, list, ltype)) {
218                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
219                                            list, ltype));
220                                 return -1;
221                         }
222                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
223                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
224                                            list, ltype, strerror(errno)));
225                         return -1;
226                 }
227                 tdb->locked[list+1].ltype = ltype;
228         }
229         tdb->locked[list+1].count++;
230         return 0;
231 }
232
233 /* unlock the database: returns void because it's too late for errors. */
234         /* changed to return int it may be interesting to know there
235            has been an error  --simo */
236 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
237 {
238         int ret = -1;
239
240         if (tdb->flags & TDB_NOLOCK)
241                 return 0;
242
243         /* Sanity checks */
244         if (list < -1 || list >= (int)tdb->header.hash_size) {
245                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
246                 return ret;
247         }
248
249         if (tdb->locked[list+1].count==0) {
250                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
251                 return ret;
252         }
253
254         if (tdb->locked[list+1].count == 1) {
255                 /* Down to last nested lock: unlock underneath */
256                 if (!tdb->read_only && tdb->header.rwlocks) {
257                         ret = tdb_spinunlock(tdb, list, ltype);
258                 } else {
259                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
260                 }
261         } else {
262                 ret = 0;
263         }
264         tdb->locked[list+1].count--;
265
266         if (ret)
267                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
268         return ret;
269 }
270
271 /* This is based on the hash algorithm from gdbm */
272 static u32 tdb_hash(TDB_DATA *key)
273 {
274         u32 value;      /* Used to compute the hash value.  */
275         u32   i;        /* Used to cycle through random values. */
276
277         /* Set the initial value from the key size. */
278         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
279                 value = (value + (key->dptr[i] << (i*5 % 24)));
280
281         return (1103515243 * value + 12345);  
282 }
283
284 /* check for an out of bounds access - if it is out of bounds then
285    see if the database has been expanded by someone else and expand
286    if necessary 
287    note that "len" is the minimum length needed for the db
288 */
289 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
290 {
291         struct stat st;
292         if (len <= tdb->map_size)
293                 return 0;
294         if (tdb->flags & TDB_INTERNAL) {
295                 if (!probe) {
296                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
297                                  (int)len, (int)tdb->map_size));
298                 }
299                 return TDB_ERRCODE(TDB_ERR_IO, -1);
300         }
301
302         if (fstat(tdb->fd, &st) == -1)
303                 return TDB_ERRCODE(TDB_ERR_IO, -1);
304
305         if (st.st_size < (size_t)len) {
306                 if (!probe) {
307                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
308                                  (int)len, (int)st.st_size));
309                 }
310                 return TDB_ERRCODE(TDB_ERR_IO, -1);
311         }
312
313         /* Unmap, update size, remap */
314         if (tdb_munmap(tdb) == -1)
315                 return TDB_ERRCODE(TDB_ERR_IO, -1);
316         tdb->map_size = st.st_size;
317         tdb_mmap(tdb);
318         return 0;
319 }
320
321 /* write a lump of data at a specified offset */
322 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
323 {
324         if (tdb_oob(tdb, off + len, 0) != 0)
325                 return -1;
326
327         if (tdb->map_ptr)
328                 memcpy(off + (char *)tdb->map_ptr, buf, len);
329 #ifdef HAVE_PWRITE
330         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
331 #else
332         else if (lseek(tdb->fd, off, SEEK_SET) != off
333                  || write(tdb->fd, buf, len) != (ssize_t)len) {
334 #endif
335                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
336                            off, len, strerror(errno)));
337                 return TDB_ERRCODE(TDB_ERR_IO, -1);
338         }
339         return 0;
340 }
341
342 /* read a lump of data at a specified offset, maybe convert */
343 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
344 {
345         if (tdb_oob(tdb, off + len, 0) != 0)
346                 return -1;
347
348         if (tdb->map_ptr)
349                 memcpy(buf, off + (char *)tdb->map_ptr, len);
350 #ifdef HAVE_PREAD
351         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
352 #else
353         else if (lseek(tdb->fd, off, SEEK_SET) != off
354                  || read(tdb->fd, buf, len) != (ssize_t)len) {
355 #endif
356                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
357                            off, len, strerror(errno)));
358                 return TDB_ERRCODE(TDB_ERR_IO, -1);
359         }
360         if (cv)
361                 convert(buf, len);
362         return 0;
363 }
364
365 /* read a lump of data, allocating the space for it */
366 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
367 {
368         char *buf;
369
370         if (!(buf = malloc(len))) {
371                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
372                            len, strerror(errno)));
373                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
374         }
375         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
376                 SAFE_FREE(buf);
377                 return NULL;
378         }
379         return buf;
380 }
381
382 /* read/write a tdb_off */
383 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
384 {
385         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
386 }
387 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
388 {
389         tdb_off off = *d;
390         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
391 }
392
393 /* read/write a record */
394 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
395 {
396         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
397                 return -1;
398         if (TDB_BAD_MAGIC(rec)) {
399                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
400                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
401         }
402         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
403 }
404 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
405 {
406         struct list_struct r = *rec;
407         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
408 }
409
410 /* read a freelist record and check for simple errors */
411 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
412 {
413         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
414                 return -1;
415
416         if (rec->magic == TDB_MAGIC) {
417                 /* this happens when a app is showdown while deleting a record - we should
418                    not completely fail when this happens */
419                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
420                          rec->magic, off));
421                 rec->magic = TDB_FREE_MAGIC;
422                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
423                         return -1;
424         }
425
426         if (rec->magic != TDB_FREE_MAGIC) {
427                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
428                            rec->magic, off));
429                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
430         }
431         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
432                 return -1;
433         return 0;
434 }
435
436 /* update a record tailer (must hold allocation lock) */
437 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
438                          const struct list_struct *rec)
439 {
440         tdb_off totalsize;
441
442         /* Offset of tailer from record header */
443         totalsize = sizeof(*rec) + rec->rec_len;
444         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
445                          &totalsize);
446 }
447
448 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
449 {
450         struct list_struct rec;
451         tdb_off tailer_ofs, tailer;
452
453         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
454                 printf("ERROR: failed to read record at %u\n", offset);
455                 return 0;
456         }
457
458         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
459                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
460
461         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
462         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
463                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
464                 return rec.next;
465         }
466
467         if (tailer != rec.rec_len + sizeof(rec)) {
468                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
469                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
470         }
471         return rec.next;
472 }
473
474 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
475 {
476         tdb_off rec_ptr, top;
477
478         top = TDB_HASH_TOP(i);
479
480         if (tdb_lock(tdb, i, F_WRLCK) != 0)
481                 return -1;
482
483         if (ofs_read(tdb, top, &rec_ptr) == -1)
484                 return tdb_unlock(tdb, i, F_WRLCK);
485
486         if (rec_ptr)
487                 printf("hash=%d\n", i);
488
489         while (rec_ptr) {
490                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
491         }
492
493         return tdb_unlock(tdb, i, F_WRLCK);
494 }
495
496 void tdb_dump_all(TDB_CONTEXT *tdb)
497 {
498         int i;
499         for (i=0;i<tdb->header.hash_size;i++) {
500                 tdb_dump_chain(tdb, i);
501         }
502         printf("freelist:\n");
503         tdb_dump_chain(tdb, -1);
504 }
505
506 int tdb_printfreelist(TDB_CONTEXT *tdb)
507 {
508         int ret;
509         long total_free = 0;
510         tdb_off offset, rec_ptr;
511         struct list_struct rec;
512
513         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
514                 return ret;
515
516         offset = FREELIST_TOP;
517
518         /* read in the freelist top */
519         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
520                 tdb_unlock(tdb, -1, F_WRLCK);
521                 return 0;
522         }
523
524         printf("freelist top=[0x%08x]\n", rec_ptr );
525         while (rec_ptr) {
526                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
527                         tdb_unlock(tdb, -1, F_WRLCK);
528                         return -1;
529                 }
530
531                 if (rec.magic != TDB_FREE_MAGIC) {
532                         printf("bad magic 0x%08x in free list\n", rec.magic);
533                         tdb_unlock(tdb, -1, F_WRLCK);
534                         return -1;
535                 }
536
537                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
538                 total_free += rec.rec_len;
539
540                 /* move to the next record */
541                 rec_ptr = rec.next;
542         }
543         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
544                (int)total_free);
545
546         return tdb_unlock(tdb, -1, F_WRLCK);
547 }
548
549 /* Remove an element from the freelist.  Must have alloc lock. */
550 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
551 {
552         tdb_off last_ptr, i;
553
554         /* read in the freelist top */
555         last_ptr = FREELIST_TOP;
556         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
557                 if (i == off) {
558                         /* We've found it! */
559                         return ofs_write(tdb, last_ptr, &next);
560                 }
561                 /* Follow chain (next offset is at start of record) */
562                 last_ptr = i;
563         }
564         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
565         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
566 }
567
568 /* Add an element into the freelist. Merge adjacent records if
569    neccessary. */
570 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
571 {
572         tdb_off right, left;
573
574         /* Allocation and tailer lock */
575         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
576                 return -1;
577
578         /* set an initial tailer, so if we fail we don't leave a bogus record */
579         if (update_tailer(tdb, offset, rec) != 0) {
580                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
581                 goto fail;
582         }
583
584         /* Look right first (I'm an Australian, dammit) */
585         right = offset + sizeof(*rec) + rec->rec_len;
586         if (right + sizeof(*rec) <= tdb->map_size) {
587                 struct list_struct r;
588
589                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
590                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
591                         goto left;
592                 }
593
594                 /* If it's free, expand to include it. */
595                 if (r.magic == TDB_FREE_MAGIC) {
596                         if (remove_from_freelist(tdb, right, r.next) == -1) {
597                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
598                                 goto left;
599                         }
600                         rec->rec_len += sizeof(r) + r.rec_len;
601                 }
602         }
603
604 left:
605         /* Look left */
606         left = offset - sizeof(tdb_off);
607         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
608                 struct list_struct l;
609                 tdb_off leftsize;
610
611                 /* Read in tailer and jump back to header */
612                 if (ofs_read(tdb, left, &leftsize) == -1) {
613                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
614                         goto update;
615                 }
616                 left = offset - leftsize;
617
618                 /* Now read in record */
619                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
620                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
621                         goto update;
622                 }
623
624                 /* If it's free, expand to include it. */
625                 if (l.magic == TDB_FREE_MAGIC) {
626                         if (remove_from_freelist(tdb, left, l.next) == -1) {
627                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
628                                 goto update;
629                         } else {
630                                 offset = left;
631                                 rec->rec_len += leftsize;
632                         }
633                 }
634         }
635
636 update:
637         if (update_tailer(tdb, offset, rec) == -1) {
638                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
639                 goto fail;
640         }
641
642         /* Now, prepend to free list */
643         rec->magic = TDB_FREE_MAGIC;
644
645         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
646             rec_write(tdb, offset, rec) == -1 ||
647             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
648                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
649                 goto fail;
650         }
651
652         /* And we're done. */
653         tdb_unlock(tdb, -1, F_WRLCK);
654         return 0;
655
656  fail:
657         tdb_unlock(tdb, -1, F_WRLCK);
658         return -1;
659 }
660
661
662 /* expand a file.  we prefer to use ftruncate, as that is what posix
663   says to use for mmap expansion */
664 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
665 {
666         char buf[1024];
667 #if HAVE_FTRUNCATE_EXTEND
668         if (ftruncate(tdb->fd, size+addition) != 0) {
669                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
670                            size+addition, strerror(errno)));
671                 return -1;
672         }
673 #else
674         char b = 0;
675
676 #ifdef HAVE_PWRITE
677         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
678 #else
679         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
680             write(tdb->fd, &b, 1) != 1) {
681 #endif
682                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
683                            size+addition, strerror(errno)));
684                 return -1;
685         }
686 #endif
687
688         /* now fill the file with something. This ensures that the file isn't sparse, which would be
689            very bad if we ran out of disk. This must be done with write, not via mmap */
690         memset(buf, 0x42, sizeof(buf));
691         while (addition) {
692                 int n = addition>sizeof(buf)?sizeof(buf):addition;
693 #ifdef HAVE_PWRITE
694                 int ret = pwrite(tdb->fd, buf, n, size);
695 #else
696                 int ret;
697                 if (lseek(tdb->fd, size, SEEK_SET) != size)
698                         return -1;
699                 ret = write(tdb->fd, buf, n);
700 #endif
701                 if (ret != n) {
702                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
703                                    n, strerror(errno)));
704                         return -1;
705                 }
706                 addition -= n;
707                 size += n;
708         }
709         return 0;
710 }
711
712
713 /* expand the database at least size bytes by expanding the underlying
714    file and doing the mmap again if necessary */
715 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
716 {
717         struct list_struct rec;
718         tdb_off offset;
719
720         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
721                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
722                 return -1;
723         }
724
725         /* must know about any previous expansions by another process */
726         tdb_oob(tdb, tdb->map_size + 1, 1);
727
728         /* always make room for at least 10 more records, and round
729            the database up to a multiple of TDB_PAGE_SIZE */
730         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
731
732         if (!(tdb->flags & TDB_INTERNAL))
733                 tdb_munmap(tdb);
734
735         /*
736          * We must ensure the file is unmapped before doing this
737          * to ensure consistency with systems like OpenBSD where
738          * writes and mmaps are not consistent.
739          */
740
741         /* expand the file itself */
742         if (!(tdb->flags & TDB_INTERNAL)) {
743                 if (expand_file(tdb, tdb->map_size, size) != 0)
744                         goto fail;
745         }
746
747         tdb->map_size += size;
748
749         if (tdb->flags & TDB_INTERNAL)
750                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
751         else {
752                 /*
753                  * We must ensure the file is remapped before adding the space
754                  * to ensure consistency with systems like OpenBSD where
755                  * writes and mmaps are not consistent.
756                  */
757
758                 /* We're ok if the mmap fails as we'll fallback to read/write */
759                 tdb_mmap(tdb);
760         }
761
762         /* form a new freelist record */
763         memset(&rec,'\0',sizeof(rec));
764         rec.rec_len = size - sizeof(rec);
765
766         /* link it into the free list */
767         offset = tdb->map_size - size;
768         if (tdb_free(tdb, offset, &rec) == -1)
769                 goto fail;
770
771         tdb_unlock(tdb, -1, F_WRLCK);
772         return 0;
773  fail:
774         tdb_unlock(tdb, -1, F_WRLCK);
775         return -1;
776 }
777
778 /* allocate some space from the free list. The offset returned points
779    to a unconnected list_struct within the database with room for at
780    least length bytes of total data
781
782    0 is returned if the space could not be allocated
783  */
784 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
785                             struct list_struct *rec)
786 {
787         tdb_off rec_ptr, last_ptr, newrec_ptr;
788         struct list_struct newrec;
789
790         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
791                 return 0;
792
793         /* Extra bytes required for tailer */
794         length += sizeof(tdb_off);
795
796  again:
797         last_ptr = FREELIST_TOP;
798
799         /* read in the freelist top */
800         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
801                 goto fail;
802
803         /* keep looking until we find a freelist record big enough */
804         while (rec_ptr) {
805                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
806                         goto fail;
807
808                 if (rec->rec_len >= length) {
809                         /* found it - now possibly split it up  */
810                         if (rec->rec_len > length + MIN_REC_SIZE) {
811                                 /* Length of left piece */
812                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
813
814                                 /* Right piece to go on free list */
815                                 newrec.rec_len = rec->rec_len
816                                         - (sizeof(*rec) + length);
817                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
818
819                                 /* And left record is shortened */
820                                 rec->rec_len = length;
821                         } else
822                                 newrec_ptr = 0;
823
824                         /* Remove allocated record from the free list */
825                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
826                                 goto fail;
827
828                         /* Update header: do this before we drop alloc
829                            lock, otherwise tdb_free() might try to
830                            merge with us, thinking we're free.
831                            (Thanks Jeremy Allison). */
832                         rec->magic = TDB_MAGIC;
833                         if (rec_write(tdb, rec_ptr, rec) == -1)
834                                 goto fail;
835
836                         /* Did we create new block? */
837                         if (newrec_ptr) {
838                                 /* Update allocated record tailer (we
839                                    shortened it). */
840                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
841                                         goto fail;
842
843                                 /* Free new record */
844                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
845                                         goto fail;
846                         }
847
848                         /* all done - return the new record offset */
849                         tdb_unlock(tdb, -1, F_WRLCK);
850                         return rec_ptr;
851                 }
852                 /* move to the next record */
853                 last_ptr = rec_ptr;
854                 rec_ptr = rec->next;
855         }
856         /* we didn't find enough space. See if we can expand the
857            database and if we can then try again */
858         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
859                 goto again;
860  fail:
861         tdb_unlock(tdb, -1, F_WRLCK);
862         return 0;
863 }
864
865 /* initialise a new database with a specified hash size */
866 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
867 {
868         struct tdb_header *newdb;
869         int size, ret = -1;
870
871         /* We make it up in memory, then write it out if not internal */
872         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
873         if (!(newdb = calloc(size, 1)))
874                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
875
876         /* Fill in the header */
877         newdb->version = TDB_VERSION;
878         newdb->hash_size = hash_size;
879 #ifdef USE_SPINLOCKS
880         newdb->rwlocks = size;
881 #endif
882         if (tdb->flags & TDB_INTERNAL) {
883                 tdb->map_size = size;
884                 tdb->map_ptr = (char *)newdb;
885                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
886                 /* Convert the `ondisk' version if asked. */
887                 CONVERT(*newdb);
888                 return 0;
889         }
890         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
891                 goto fail;
892
893         if (ftruncate(tdb->fd, 0) == -1)
894                 goto fail;
895
896         /* This creates an endian-converted header, as if read from disk */
897         CONVERT(*newdb);
898         memcpy(&tdb->header, newdb, sizeof(tdb->header));
899         /* Don't endian-convert the magic food! */
900         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
901         if (write(tdb->fd, newdb, size) != size)
902                 ret = -1;
903         else
904                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
905
906   fail:
907         SAFE_FREE(newdb);
908         return ret;
909 }
910
911 /* Returns 0 on fail.  On success, return offset of record, and fills
912    in rec */
913 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
914                         struct list_struct *r)
915 {
916         tdb_off rec_ptr;
917         
918         /* read in the hash top */
919         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
920                 return 0;
921
922         /* keep looking until we find the right record */
923         while (rec_ptr) {
924                 if (rec_read(tdb, rec_ptr, r) == -1)
925                         return 0;
926
927                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
928                         char *k;
929                         /* a very likely hit - read the key */
930                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
931                                            r->key_len);
932                         if (!k)
933                                 return 0;
934
935                         if (memcmp(key.dptr, k, key.dsize) == 0) {
936                                 SAFE_FREE(k);
937                                 return rec_ptr;
938                         }
939                         SAFE_FREE(k);
940                 }
941                 rec_ptr = r->next;
942         }
943         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
944 }
945
946 /* If they do lockkeys, check that this hash is one they locked */
947 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
948 {
949         u32 i;
950         if (!tdb->lockedkeys)
951                 return 1;
952         for (i = 0; i < tdb->lockedkeys[0]; i++)
953                 if (tdb->lockedkeys[i+1] == hash)
954                         return 1;
955         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
956 }
957
958 /* As tdb_find, but if you succeed, keep the lock */
959 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
960                              struct list_struct *rec)
961 {
962         u32 hash, rec_ptr;
963
964         hash = tdb_hash(&key);
965         if (!tdb_keylocked(tdb, hash))
966                 return 0;
967         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
968                 return 0;
969         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
970                 tdb_unlock(tdb, BUCKET(hash), locktype);
971         return rec_ptr;
972 }
973
974 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
975 {
976         return tdb->ecode;
977 }
978
979 static struct tdb_errname {
980         enum TDB_ERROR ecode; const char *estring;
981 } emap[] = { {TDB_SUCCESS, "Success"},
982              {TDB_ERR_CORRUPT, "Corrupt database"},
983              {TDB_ERR_IO, "IO Error"},
984              {TDB_ERR_LOCK, "Locking error"},
985              {TDB_ERR_OOM, "Out of memory"},
986              {TDB_ERR_EXISTS, "Record exists"},
987              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
988              {TDB_ERR_NOEXIST, "Record does not exist"} };
989
990 /* Error string for the last tdb error */
991 const char *tdb_errorstr(TDB_CONTEXT *tdb)
992 {
993         u32 i;
994         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
995                 if (tdb->ecode == emap[i].ecode)
996                         return emap[i].estring;
997         return "Invalid error code";
998 }
999
1000 /* update an entry in place - this only works if the new data size
1001    is <= the old data size and the key exists.
1002    on failure return -1
1003 */
1004 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1005 {
1006         struct list_struct rec;
1007         tdb_off rec_ptr;
1008         int ret = -1;
1009
1010         /* find entry */
1011         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1012                 return -1;
1013
1014         /* must be long enough key, data and tailer */
1015         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1016                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1017                 goto out;
1018         }
1019
1020         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1021                       dbuf.dptr, dbuf.dsize) == -1)
1022                 goto out;
1023
1024         if (dbuf.dsize != rec.data_len) {
1025                 /* update size */
1026                 rec.data_len = dbuf.dsize;
1027                 ret = rec_write(tdb, rec_ptr, &rec);
1028         } else
1029                 ret = 0;
1030  out:
1031         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1032         return ret;
1033 }
1034
1035 /* find an entry in the database given a key */
1036 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1037 {
1038         tdb_off rec_ptr;
1039         struct list_struct rec;
1040         TDB_DATA ret;
1041
1042         /* find which hash bucket it is in */
1043         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1044                 return tdb_null;
1045
1046         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1047                                   rec.data_len);
1048         ret.dsize = rec.data_len;
1049         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1050         return ret;
1051 }
1052
1053 /* check if an entry in the database exists 
1054
1055    note that 1 is returned if the key is found and 0 is returned if not found
1056    this doesn't match the conventions in the rest of this module, but is
1057    compatible with gdbm
1058 */
1059 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1060 {
1061         struct list_struct rec;
1062         
1063         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1064                 return 0;
1065         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1066         return 1;
1067 }
1068
1069 /* record lock stops delete underneath */
1070 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1071 {
1072         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1073 }
1074 /*
1075   Write locks override our own fcntl readlocks, so check it here.
1076   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1077   an error to fail to get the lock here.
1078 */
1079  
1080 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1081 {
1082         struct tdb_traverse_lock *i;
1083         for (i = &tdb->travlocks; i; i = i->next)
1084                 if (i->off == off)
1085                         return -1;
1086         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1087 }
1088
1089 /*
1090   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1091   an error to fail to get the lock here.
1092 */
1093
1094 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1095 {
1096         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1097 }
1098 /* fcntl locks don't stack: avoid unlocking someone else's */
1099 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1100 {
1101         struct tdb_traverse_lock *i;
1102         u32 count = 0;
1103
1104         if (off == 0)
1105                 return 0;
1106         for (i = &tdb->travlocks; i; i = i->next)
1107                 if (i->off == off)
1108                         count++;
1109         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1110 }
1111
1112 /* actually delete an entry in the database given the offset */
1113 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1114 {
1115         tdb_off last_ptr, i;
1116         struct list_struct lastrec;
1117
1118         if (tdb->read_only) return -1;
1119
1120         if (write_lock_record(tdb, rec_ptr) == -1) {
1121                 /* Someone traversing here: mark it as dead */
1122                 rec->magic = TDB_DEAD_MAGIC;
1123                 return rec_write(tdb, rec_ptr, rec);
1124         }
1125         if (write_unlock_record(tdb, rec_ptr) != 0)
1126                 return -1;
1127
1128         /* find previous record in hash chain */
1129         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1130                 return -1;
1131         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1132                 if (rec_read(tdb, i, &lastrec) == -1)
1133                         return -1;
1134
1135         /* unlink it: next ptr is at start of record. */
1136         if (last_ptr == 0)
1137                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1138         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1139                 return -1;
1140
1141         /* recover the space */
1142         if (tdb_free(tdb, rec_ptr, rec) == -1)
1143                 return -1;
1144         return 0;
1145 }
1146
1147 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1148 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1149                          struct list_struct *rec)
1150 {
1151         int want_next = (tlock->off != 0);
1152
1153         /* No traversal allows if you've called tdb_lockkeys() */
1154         if (tdb->lockedkeys)
1155                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1156
1157         /* Lock each chain from the start one. */
1158         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1159                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1160                         return -1;
1161
1162                 /* No previous record?  Start at top of chain. */
1163                 if (!tlock->off) {
1164                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1165                                      &tlock->off) == -1)
1166                                 goto fail;
1167                 } else {
1168                         /* Otherwise unlock the previous record. */
1169                         if (unlock_record(tdb, tlock->off) != 0)
1170                                 goto fail;
1171                 }
1172
1173                 if (want_next) {
1174                         /* We have offset of old record: grab next */
1175                         if (rec_read(tdb, tlock->off, rec) == -1)
1176                                 goto fail;
1177                         tlock->off = rec->next;
1178                 }
1179
1180                 /* Iterate through chain */
1181                 while( tlock->off) {
1182                         tdb_off current;
1183                         if (rec_read(tdb, tlock->off, rec) == -1)
1184                                 goto fail;
1185                         if (!TDB_DEAD(rec)) {
1186                                 /* Woohoo: we found one! */
1187                                 if (lock_record(tdb, tlock->off) != 0)
1188                                         goto fail;
1189                                 return tlock->off;
1190                         }
1191                         /* Try to clean dead ones from old traverses */
1192                         current = tlock->off;
1193                         tlock->off = rec->next;
1194                         if (do_delete(tdb, current, rec) != 0)
1195                                 goto fail;
1196                 }
1197                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1198                 want_next = 0;
1199         }
1200         /* We finished iteration without finding anything */
1201         return TDB_ERRCODE(TDB_SUCCESS, 0);
1202
1203  fail:
1204         tlock->off = 0;
1205         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1206                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1207         return -1;
1208 }
1209
1210 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1211    return -1 on error or the record count traversed
1212    if fn is NULL then it is not called
1213    a non-zero return value from fn() indicates that the traversal should stop
1214   */
1215 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1216 {
1217         TDB_DATA key, dbuf;
1218         struct list_struct rec;
1219         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1220         int ret, count = 0;
1221
1222         /* This was in the initializaton, above, but the IRIX compiler
1223          * did not like it.  crh
1224          */
1225         tl.next = tdb->travlocks.next;
1226
1227         /* fcntl locks don't stack: beware traverse inside traverse */
1228         tdb->travlocks.next = &tl;
1229
1230         /* tdb_next_lock places locks on the record returned, and its chain */
1231         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1232                 count++;
1233                 /* now read the full record */
1234                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1235                                           rec.key_len + rec.data_len);
1236                 if (!key.dptr) {
1237                         ret = -1;
1238                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1239                                 goto out;
1240                         if (unlock_record(tdb, tl.off) != 0)
1241                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1242                         goto out;
1243                 }
1244                 key.dsize = rec.key_len;
1245                 dbuf.dptr = key.dptr + rec.key_len;
1246                 dbuf.dsize = rec.data_len;
1247
1248                 /* Drop chain lock, call out */
1249                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1250                         ret = -1;
1251                         goto out;
1252                 }
1253                 if (fn && fn(tdb, key, dbuf, state)) {
1254                         /* They want us to terminate traversal */
1255                         ret = count;
1256                         if (unlock_record(tdb, tl.off) != 0) {
1257                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1258                                 ret = -1;
1259                         }
1260                         tdb->travlocks.next = tl.next;
1261                         SAFE_FREE(key.dptr);
1262                         return count;
1263                 }
1264                 SAFE_FREE(key.dptr);
1265         }
1266 out:
1267         tdb->travlocks.next = tl.next;
1268         if (ret < 0)
1269                 return -1;
1270         else
1271                 return count;
1272 }
1273
1274 /* find the first entry in the database and return its key */
1275 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1276 {
1277         TDB_DATA key;
1278         struct list_struct rec;
1279
1280         /* release any old lock */
1281         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1282                 return tdb_null;
1283         tdb->travlocks.off = tdb->travlocks.hash = 0;
1284
1285         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1286                 return tdb_null;
1287         /* now read the key */
1288         key.dsize = rec.key_len;
1289         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1290         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1291                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1292         return key;
1293 }
1294
1295 /* find the next entry in the database, returning its key */
1296 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1297 {
1298         u32 oldhash;
1299         TDB_DATA key = tdb_null;
1300         struct list_struct rec;
1301         char *k = NULL;
1302
1303         /* Is locked key the old key?  If so, traverse will be reliable. */
1304         if (tdb->travlocks.off) {
1305                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1306                         return tdb_null;
1307                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1308                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1309                                             rec.key_len))
1310                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1311                         /* No, it wasn't: unlock it and start from scratch */
1312                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1313                                 return tdb_null;
1314                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1315                                 return tdb_null;
1316                         tdb->travlocks.off = 0;
1317                 }
1318
1319                 SAFE_FREE(k);
1320         }
1321
1322         if (!tdb->travlocks.off) {
1323                 /* No previous element: do normal find, and lock record */
1324                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1325                 if (!tdb->travlocks.off)
1326                         return tdb_null;
1327                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1328                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1329                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1330                         return tdb_null;
1331                 }
1332         }
1333         oldhash = tdb->travlocks.hash;
1334
1335         /* Grab next record: locks chain and returned record,
1336            unlocks old record */
1337         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1338                 key.dsize = rec.key_len;
1339                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1340                                           key.dsize);
1341                 /* Unlock the chain of this new record */
1342                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1343                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1344         }
1345         /* Unlock the chain of old record */
1346         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1347                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1348         return key;
1349 }
1350
1351 /* delete an entry in the database given a key */
1352 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1353 {
1354         tdb_off rec_ptr;
1355         struct list_struct rec;
1356         int ret;
1357
1358         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1359                 return -1;
1360         ret = do_delete(tdb, rec_ptr, &rec);
1361         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1362                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1363         return ret;
1364 }
1365
1366 /* store an element in the database, replacing any existing element
1367    with the same key 
1368
1369    return 0 on success, -1 on failure
1370 */
1371 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1372 {
1373         struct list_struct rec;
1374         u32 hash;
1375         tdb_off rec_ptr;
1376         char *p = NULL;
1377         int ret = 0;
1378
1379         /* find which hash bucket it is in */
1380         hash = tdb_hash(&key);
1381         if (!tdb_keylocked(tdb, hash))
1382                 return -1;
1383         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1384                 return -1;
1385
1386         /* check for it existing, on insert. */
1387         if (flag == TDB_INSERT) {
1388                 if (tdb_exists(tdb, key)) {
1389                         tdb->ecode = TDB_ERR_EXISTS;
1390                         goto fail;
1391                 }
1392         } else {
1393                 /* first try in-place update, on modify or replace. */
1394                 if (tdb_update(tdb, key, dbuf) == 0)
1395                         goto out;
1396                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1397                         goto fail;
1398         }
1399         /* reset the error code potentially set by the tdb_update() */
1400         tdb->ecode = TDB_SUCCESS;
1401
1402         /* delete any existing record - if it doesn't exist we don't
1403            care.  Doing this first reduces fragmentation, and avoids
1404            coalescing with `allocated' block before it's updated. */
1405         if (flag != TDB_INSERT)
1406                 tdb_delete(tdb, key);
1407
1408         /* Copy key+value *before* allocating free space in case malloc
1409            fails and we are left with a dead spot in the tdb. */
1410
1411         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1412                 tdb->ecode = TDB_ERR_OOM;
1413                 goto fail;
1414         }
1415
1416         memcpy(p, key.dptr, key.dsize);
1417         memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1418
1419         /* now we're into insert / modify / replace of a record which
1420          * we know could not be optimised by an in-place store (for
1421          * various reasons).  */
1422         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1423                 goto fail;
1424
1425         /* Read hash top into next ptr */
1426         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1427                 goto fail;
1428
1429         rec.key_len = key.dsize;
1430         rec.data_len = dbuf.dsize;
1431         rec.full_hash = hash;
1432         rec.magic = TDB_MAGIC;
1433
1434         /* write out and point the top of the hash chain at it */
1435         if (rec_write(tdb, rec_ptr, &rec) == -1
1436             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1437             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1438                 /* Need to tdb_unallocate() here */
1439                 goto fail;
1440         }
1441  out:
1442         SAFE_FREE(p); 
1443         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1444         return ret;
1445 fail:
1446         ret = -1;
1447         goto out;
1448 }
1449
1450 static int tdb_already_open(dev_t device,
1451                             ino_t ino)
1452 {
1453         TDB_CONTEXT *i;
1454         
1455         for (i = tdbs; i; i = i->next) {
1456                 if (i->device == device && i->inode == ino) {
1457                         return 1;
1458                 }
1459         }
1460
1461         return 0;
1462 }
1463
1464 /* open the database, creating it if necessary 
1465
1466    The open_flags and mode are passed straight to the open call on the
1467    database file. A flags value of O_WRONLY is invalid. The hash size
1468    is advisory, use zero for a default value.
1469
1470    Return is NULL on error, in which case errno is also set.  Don't 
1471    try to call tdb_error or tdb_errname, just do strerror(errno).
1472
1473    @param name may be NULL for internal databases. */
1474 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1475                       int open_flags, mode_t mode)
1476 {
1477         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1478 }
1479
1480
1481 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1482                          int open_flags, mode_t mode,
1483                          tdb_log_func log_fn)
1484 {
1485         TDB_CONTEXT *tdb;
1486         struct stat st;
1487         int rev = 0, locked;
1488         unsigned char *vp;
1489         u32 vertest;
1490
1491         if (!(tdb = calloc(1, sizeof *tdb))) {
1492                 /* Can't log this */
1493                 errno = ENOMEM;
1494                 goto fail;
1495         }
1496         tdb->fd = -1;
1497         tdb->name = NULL;
1498         tdb->map_ptr = NULL;
1499         tdb->lockedkeys = NULL;
1500         tdb->flags = tdb_flags;
1501         tdb->open_flags = open_flags;
1502         tdb->log_fn = log_fn;
1503         
1504         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1505                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1506                          name));
1507                 errno = EINVAL;
1508                 goto fail;
1509         }
1510         
1511         if (hash_size == 0)
1512                 hash_size = DEFAULT_HASH_SIZE;
1513         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1514                 tdb->read_only = 1;
1515                 /* read only databases don't do locking or clear if first */
1516                 tdb->flags |= TDB_NOLOCK;
1517                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1518         }
1519
1520         /* internal databases don't mmap or lock, and start off cleared */
1521         if (tdb->flags & TDB_INTERNAL) {
1522                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1523                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1524                 if (tdb_new_database(tdb, hash_size) != 0) {
1525                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1526                         goto fail;
1527                 }
1528                 goto internal;
1529         }
1530
1531         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1532                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1533                          name, strerror(errno)));
1534                 goto fail;      /* errno set by open(2) */
1535         }
1536
1537         /* ensure there is only one process initialising at once */
1538         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1539                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1540                          name, strerror(errno)));
1541                 goto fail;      /* errno set by tdb_brlock */
1542         }
1543
1544         /* we need to zero database if we are the only one with it open */
1545         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1546             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1547                 open_flags |= O_CREAT;
1548                 if (ftruncate(tdb->fd, 0) == -1) {
1549                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1550                                  "failed to truncate %s: %s\n",
1551                                  name, strerror(errno)));
1552                         goto fail; /* errno set by ftruncate */
1553                 }
1554         }
1555
1556         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1557             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1558             || (tdb->header.version != TDB_VERSION
1559                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1560                 /* its not a valid database - possibly initialise it */
1561                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1562                         errno = EIO; /* ie bad format or something */
1563                         goto fail;
1564                 }
1565                 rev = (tdb->flags & TDB_CONVERT);
1566         }
1567         vp = (unsigned char *)&tdb->header.version;
1568         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1569                   (((u32)vp[2]) << 8) | (u32)vp[3];
1570         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1571         if (!rev)
1572                 tdb->flags &= ~TDB_CONVERT;
1573         else {
1574                 tdb->flags |= TDB_CONVERT;
1575                 convert(&tdb->header, sizeof(tdb->header));
1576         }
1577         if (fstat(tdb->fd, &st) == -1)
1578                 goto fail;
1579
1580         /* Is it already in the open list?  If so, fail. */
1581         if (tdb_already_open(st.st_dev, st.st_ino)) {
1582                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1583                          "%s (%d,%d) is already open in this process\n",
1584                          name, st.st_dev, st.st_ino));
1585                 errno = EBUSY;
1586                 goto fail;
1587         }
1588
1589         if (!(tdb->name = (char *)strdup(name))) {
1590                 errno = ENOMEM;
1591                 goto fail;
1592         }
1593
1594         tdb->map_size = st.st_size;
1595         tdb->device = st.st_dev;
1596         tdb->inode = st.st_ino;
1597         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1598         if (!tdb->locked) {
1599                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1600                          "failed to allocate lock structure for %s\n",
1601                          name));
1602                 errno = ENOMEM;
1603                 goto fail;
1604         }
1605         tdb_mmap(tdb);
1606         if (locked) {
1607                 if (!tdb->read_only)
1608                         if (tdb_clear_spinlocks(tdb) != 0) {
1609                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1610                                 "failed to clear spinlock\n"));
1611                                 goto fail;
1612                         }
1613                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1614                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1615                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1616                                  name, strerror(errno)));
1617                         goto fail;
1618                 }
1619         }
1620         /* leave this lock in place to indicate it's in use */
1621         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1622                 goto fail;
1623
1624  internal:
1625         /* Internal (memory-only) databases skip all the code above to
1626          * do with disk files, and resume here by releasing their
1627          * global lock and hooking into the active list. */
1628         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1629                 goto fail;
1630         tdb->next = tdbs;
1631         tdbs = tdb;
1632         return tdb;
1633
1634  fail:
1635         { int save_errno = errno;
1636
1637         if (!tdb)
1638                 return NULL;
1639         
1640         if (tdb->map_ptr) {
1641                 if (tdb->flags & TDB_INTERNAL)
1642                         SAFE_FREE(tdb->map_ptr);
1643                 else
1644                         tdb_munmap(tdb);
1645         }
1646         SAFE_FREE(tdb->name);
1647         if (tdb->fd != -1)
1648                 if (close(tdb->fd) != 0)
1649                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1650         SAFE_FREE(tdb->locked);
1651         SAFE_FREE(tdb);
1652         errno = save_errno;
1653         return NULL;
1654         }
1655 }
1656
1657 /* close a database */
1658 int tdb_close(TDB_CONTEXT *tdb)
1659 {
1660         TDB_CONTEXT **i;
1661         int ret = 0;
1662
1663         if (tdb->map_ptr) {
1664                 if (tdb->flags & TDB_INTERNAL)
1665                         SAFE_FREE(tdb->map_ptr);
1666                 else
1667                         tdb_munmap(tdb);
1668         }
1669         SAFE_FREE(tdb->name);
1670         if (tdb->fd != -1)
1671                 ret = close(tdb->fd);
1672         SAFE_FREE(tdb->locked);
1673         SAFE_FREE(tdb->lockedkeys);
1674
1675         /* Remove from contexts list */
1676         for (i = &tdbs; *i; i = &(*i)->next) {
1677                 if (*i == tdb) {
1678                         *i = tdb->next;
1679                         break;
1680                 }
1681         }
1682
1683         memset(tdb, 0, sizeof(*tdb));
1684         SAFE_FREE(tdb);
1685
1686         return ret;
1687 }
1688
1689 /* lock/unlock entire database */
1690 int tdb_lockall(TDB_CONTEXT *tdb)
1691 {
1692         u32 i;
1693
1694         /* There are no locks on read-only dbs */
1695         if (tdb->read_only)
1696                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1697         if (tdb->lockedkeys)
1698                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1699         for (i = 0; i < tdb->header.hash_size; i++) 
1700                 if (tdb_lock(tdb, i, F_WRLCK))
1701                         break;
1702
1703         /* If error, release locks we have... */
1704         if (i < tdb->header.hash_size) {
1705                 u32 j;
1706
1707                 for ( j = 0; j < i; j++)
1708                         tdb_unlock(tdb, j, F_WRLCK);
1709                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1710         }
1711
1712         return 0;
1713 }
1714 void tdb_unlockall(TDB_CONTEXT *tdb)
1715 {
1716         u32 i;
1717         for (i=0; i < tdb->header.hash_size; i++)
1718                 tdb_unlock(tdb, i, F_WRLCK);
1719 }
1720
1721 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1722 {
1723         u32 i, j, hash;
1724
1725         /* Can't lock more keys if already locked */
1726         if (tdb->lockedkeys)
1727                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1728         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1729                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1730         /* First number in array is # keys */
1731         tdb->lockedkeys[0] = number;
1732
1733         /* Insertion sort by bucket */
1734         for (i = 0; i < number; i++) {
1735                 hash = tdb_hash(&keys[i]);
1736                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1737                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1738                 tdb->lockedkeys[j+1] = hash;
1739         }
1740         /* Finally, lock in order */
1741         for (i = 0; i < number; i++)
1742                 if (tdb_lock(tdb, i, F_WRLCK))
1743                         break;
1744
1745         /* If error, release locks we have... */
1746         if (i < number) {
1747                 for ( j = 0; j < i; j++)
1748                         tdb_unlock(tdb, j, F_WRLCK);
1749                 SAFE_FREE(tdb->lockedkeys);
1750                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1751         }
1752         return 0;
1753 }
1754
1755 /* Unlock the keys previously locked by tdb_lockkeys() */
1756 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1757 {
1758         u32 i;
1759         for (i = 0; i < tdb->lockedkeys[0]; i++)
1760                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1761         SAFE_FREE(tdb->lockedkeys);
1762 }
1763
1764 /* lock/unlock one hash chain. This is meant to be used to reduce
1765    contention - it cannot guarantee how many records will be locked */
1766 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1767 {
1768         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1769 }
1770
1771 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1772 {
1773         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1774 }
1775
1776
1777 /* register a loging function */
1778 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1779 {
1780         tdb->log_fn = fn;
1781 }
1782
1783
1784 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1785    seek pointer from our parent and to re-establish locks */
1786 int tdb_reopen(TDB_CONTEXT *tdb)
1787 {
1788         struct stat st;
1789
1790         if (tdb_munmap(tdb) != 0) {
1791                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
1792                 goto fail;
1793         }
1794         if (close(tdb->fd) != 0)
1795                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
1796         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
1797         if (tdb->fd == -1) {
1798                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
1799                 goto fail;
1800         }
1801         if (fstat(tdb->fd, &st) != 0) {
1802                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
1803                 goto fail;
1804         }
1805         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
1806                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
1807                 goto fail;
1808         }
1809         tdb_mmap(tdb);
1810         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
1811                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
1812                 goto fail;
1813         }
1814
1815         return 0;
1816
1817 fail:
1818         tdb_close(tdb);
1819         return -1;
1820 }
1821
1822 /* reopen all tdb's */
1823 int tdb_reopen_all(void)
1824 {
1825         TDB_CONTEXT *tdb;
1826
1827         for (tdb=tdbs; tdb; tdb = tdb->next) {
1828                 if (tdb_reopen(tdb) != 0) return -1;
1829         }
1830
1831         return 0;
1832 }