This commit was manufactured by cvs2svn to create branch 'SAMBA_3_0'.(This used to...
[samba.git] / source3 / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23 #ifdef STANDALONE
24 #if HAVE_CONFIG_H
25 #include <config.h>
26 #endif
27
28 #include <stdlib.h>
29 #include <stdio.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32 #include <string.h>
33 #include <fcntl.h>
34 #include <errno.h>
35 #include <sys/mman.h>
36 #include <sys/stat.h>
37 #include "tdb.h"
38 #include "spinlock.h"
39 #else
40 #include "includes.h"
41 #endif
42
43 #define TDB_MAGIC_FOOD "TDB file\n"
44 #define TDB_VERSION (0x26011967 + 6)
45 #define TDB_MAGIC (0x26011999U)
46 #define TDB_FREE_MAGIC (~TDB_MAGIC)
47 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
48 #define TDB_ALIGNMENT 4
49 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
50 #define DEFAULT_HASH_SIZE 131
51 #define TDB_PAGE_SIZE 0x2000
52 #define FREELIST_TOP (sizeof(struct tdb_header))
53 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
54 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
55 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
56 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
57 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
58
59 /* NB assumes there is a local variable called "tdb" that is the
60  * current context, also takes doubly-parenthesized print-style
61  * argument. */
62 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
63
64 /* lock offsets */
65 #define GLOBAL_LOCK 0
66 #define ACTIVE_LOCK 4
67
68 #ifndef MAP_FILE
69 #define MAP_FILE 0
70 #endif
71
72 #ifndef MAP_FAILED
73 #define MAP_FAILED ((void *)-1)
74 #endif
75
76 /* free memory if the pointer is valid and zero the pointer */
77 #ifndef SAFE_FREE
78 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
79 #endif
80
81 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
82 TDB_DATA tdb_null;
83
84 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
85 static TDB_CONTEXT *tdbs = NULL;
86
87 static int tdb_munmap(TDB_CONTEXT *tdb)
88 {
89         if (tdb->flags & TDB_INTERNAL)
90                 return 0;
91
92 #ifdef HAVE_MMAP
93         if (tdb->map_ptr) {
94                 int ret = munmap(tdb->map_ptr, tdb->map_size);
95                 if (ret != 0)
96                         return ret;
97         }
98 #endif
99         tdb->map_ptr = NULL;
100         return 0;
101 }
102
103 static void tdb_mmap(TDB_CONTEXT *tdb)
104 {
105         if (tdb->flags & TDB_INTERNAL)
106                 return;
107
108 #ifdef HAVE_MMAP
109         if (!(tdb->flags & TDB_NOMMAP)) {
110                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
111                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
112                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
113
114                 /*
115                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
116                  */
117
118                 if (tdb->map_ptr == MAP_FAILED) {
119                         tdb->map_ptr = NULL;
120                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
121                                  tdb->map_size, strerror(errno)));
122                 }
123         } else {
124                 tdb->map_ptr = NULL;
125         }
126 #else
127         tdb->map_ptr = NULL;
128 #endif
129 }
130
131 /* Endian conversion: we only ever deal with 4 byte quantities */
132 static void *convert(void *buf, u32 size)
133 {
134         u32 i, *p = buf;
135         for (i = 0; i < size / 4; i++)
136                 p[i] = TDB_BYTEREV(p[i]);
137         return buf;
138 }
139 #define DOCONV() (tdb->flags & TDB_CONVERT)
140 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
141
142 /* the body of the database is made of one list_struct for the free space
143    plus a separate data list for each hash value */
144 struct list_struct {
145         tdb_off next; /* offset of the next record in the list */
146         tdb_len rec_len; /* total byte length of record */
147         tdb_len key_len; /* byte length of key */
148         tdb_len data_len; /* byte length of data */
149         u32 full_hash; /* the full 32 bit hash of the key */
150         u32 magic;   /* try to catch errors */
151         /* the following union is implied:
152                 union {
153                         char record[rec_len];
154                         struct {
155                                 char key[key_len];
156                                 char data[data_len];
157                         }
158                         u32 totalsize; (tailer)
159                 }
160         */
161 };
162
163 /* a byte range locking function - return 0 on success
164    this functions locks/unlocks 1 byte at the specified offset.
165
166    On error, errno is also set so that errors are passed back properly
167    through tdb_open(). */
168 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
169                       int rw_type, int lck_type, int probe)
170 {
171         struct flock fl;
172         int ret;
173
174         if (tdb->flags & TDB_NOLOCK)
175                 return 0;
176         if (tdb->read_only) {
177                 errno = EACCES;
178                 return -1;
179         }
180
181         fl.l_type = rw_type;
182         fl.l_whence = SEEK_SET;
183         fl.l_start = offset;
184         fl.l_len = 1;
185         fl.l_pid = 0;
186
187         do {
188                 ret = fcntl(tdb->fd,lck_type,&fl);
189         } while (ret == -1 && errno == EINTR);
190
191         if (ret == -1) {
192                 if (!probe && lck_type != F_SETLK) {
193                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
194                                  tdb->fd, offset, rw_type, lck_type));
195                 }
196                 /* errno set by fcntl */
197                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
198         }
199         return 0;
200 }
201
202 /* lock a list in the database. list -1 is the alloc list */
203 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
204 {
205         if (list < -1 || list >= (int)tdb->header.hash_size) {
206                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
207                            list, ltype));
208                 return -1;
209         }
210         if (tdb->flags & TDB_NOLOCK)
211                 return 0;
212
213         /* Since fcntl locks don't nest, we do a lock for the first one,
214            and simply bump the count for future ones */
215         if (tdb->locked[list+1].count == 0) {
216                 if (!tdb->read_only && tdb->header.rwlocks) {
217                         if (tdb_spinlock(tdb, list, ltype)) {
218                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
219                                            list, ltype));
220                                 return -1;
221                         }
222                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
223                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
224                                            list, ltype, strerror(errno)));
225                         return -1;
226                 }
227                 tdb->locked[list+1].ltype = ltype;
228         }
229         tdb->locked[list+1].count++;
230         return 0;
231 }
232
233 /* unlock the database: returns void because it's too late for errors. */
234         /* changed to return int it may be interesting to know there
235            has been an error  --simo */
236 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
237 {
238         int ret = -1;
239
240         if (tdb->flags & TDB_NOLOCK)
241                 return 0;
242
243         /* Sanity checks */
244         if (list < -1 || list >= (int)tdb->header.hash_size) {
245                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
246                 return ret;
247         }
248
249         if (tdb->locked[list+1].count==0) {
250                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
251                 return ret;
252         }
253
254         if (tdb->locked[list+1].count == 1) {
255                 /* Down to last nested lock: unlock underneath */
256                 if (!tdb->read_only && tdb->header.rwlocks) {
257                         ret = tdb_spinunlock(tdb, list, ltype);
258                 } else {
259                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
260                 }
261         } else {
262                 ret = 0;
263         }
264         tdb->locked[list+1].count--;
265
266         if (ret)
267                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
268         return ret;
269 }
270
271 /* This is based on the hash algorithm from gdbm */
272 static u32 tdb_hash(TDB_DATA *key)
273 {
274         u32 value;      /* Used to compute the hash value.  */
275         u32   i;        /* Used to cycle through random values. */
276
277         /* Set the initial value from the key size. */
278         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
279                 value = (value + (key->dptr[i] << (i*5 % 24)));
280
281         return (1103515243 * value + 12345);  
282 }
283
284 /* check for an out of bounds access - if it is out of bounds then
285    see if the database has been expanded by someone else and expand
286    if necessary 
287    note that "len" is the minimum length needed for the db
288 */
289 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
290 {
291         struct stat st;
292         if (len <= tdb->map_size)
293                 return 0;
294         if (tdb->flags & TDB_INTERNAL) {
295                 if (!probe) {
296                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
297                                  (int)len, (int)tdb->map_size));
298                 }
299                 return TDB_ERRCODE(TDB_ERR_IO, -1);
300         }
301
302         if (fstat(tdb->fd, &st) == -1)
303                 return TDB_ERRCODE(TDB_ERR_IO, -1);
304
305         if (st.st_size < (size_t)len) {
306                 if (!probe) {
307                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
308                                  (int)len, (int)st.st_size));
309                 }
310                 return TDB_ERRCODE(TDB_ERR_IO, -1);
311         }
312
313         /* Unmap, update size, remap */
314         if (tdb_munmap(tdb) == -1)
315                 return TDB_ERRCODE(TDB_ERR_IO, -1);
316         tdb->map_size = st.st_size;
317         tdb_mmap(tdb);
318         return 0;
319 }
320
321 /* write a lump of data at a specified offset */
322 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
323 {
324         if (tdb_oob(tdb, off + len, 0) != 0)
325                 return -1;
326
327         if (tdb->map_ptr)
328                 memcpy(off + (char *)tdb->map_ptr, buf, len);
329 #ifdef HAVE_PWRITE
330         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
331 #else
332         else if (lseek(tdb->fd, off, SEEK_SET) != off
333                  || write(tdb->fd, buf, len) != (ssize_t)len) {
334 #endif
335                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
336                            off, len, strerror(errno)));
337                 return TDB_ERRCODE(TDB_ERR_IO, -1);
338         }
339         return 0;
340 }
341
342 /* read a lump of data at a specified offset, maybe convert */
343 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
344 {
345         if (tdb_oob(tdb, off + len, 0) != 0)
346                 return -1;
347
348         if (tdb->map_ptr)
349                 memcpy(buf, off + (char *)tdb->map_ptr, len);
350 #ifdef HAVE_PREAD
351         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
352 #else
353         else if (lseek(tdb->fd, off, SEEK_SET) != off
354                  || read(tdb->fd, buf, len) != (ssize_t)len) {
355 #endif
356                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
357                            off, len, strerror(errno)));
358                 return TDB_ERRCODE(TDB_ERR_IO, -1);
359         }
360         if (cv)
361                 convert(buf, len);
362         return 0;
363 }
364
365 /* read a lump of data, allocating the space for it */
366 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
367 {
368         char *buf;
369
370         if (!(buf = malloc(len))) {
371                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
372                            len, strerror(errno)));
373                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
374         }
375         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
376                 SAFE_FREE(buf);
377                 return NULL;
378         }
379         return buf;
380 }
381
382 /* read/write a tdb_off */
383 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
384 {
385         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
386 }
387 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
388 {
389         tdb_off off = *d;
390         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
391 }
392
393 /* read/write a record */
394 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
395 {
396         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
397                 return -1;
398         if (TDB_BAD_MAGIC(rec)) {
399                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
400                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
401         }
402         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
403 }
404 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
405 {
406         struct list_struct r = *rec;
407         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
408 }
409
410 /* read a freelist record and check for simple errors */
411 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
412 {
413         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
414                 return -1;
415
416         if (rec->magic == TDB_MAGIC) {
417                 /* this happens when a app is showdown while deleting a record - we should
418                    not completely fail when this happens */
419                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
420                          rec->magic, off));
421                 rec->magic = TDB_FREE_MAGIC;
422                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
423                         return -1;
424         }
425
426         if (rec->magic != TDB_FREE_MAGIC) {
427                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
428                            rec->magic, off));
429                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
430         }
431         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
432                 return -1;
433         return 0;
434 }
435
436 /* update a record tailer (must hold allocation lock) */
437 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
438                          const struct list_struct *rec)
439 {
440         tdb_off totalsize;
441
442         /* Offset of tailer from record header */
443         totalsize = sizeof(*rec) + rec->rec_len;
444         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
445                          &totalsize);
446 }
447
448 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
449 {
450         struct list_struct rec;
451         tdb_off tailer_ofs, tailer;
452
453         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
454                 printf("ERROR: failed to read record at %u\n", offset);
455                 return 0;
456         }
457
458         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
459                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
460
461         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
462         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
463                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
464                 return rec.next;
465         }
466
467         if (tailer != rec.rec_len + sizeof(rec)) {
468                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
469                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
470         }
471         return rec.next;
472 }
473
474 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
475 {
476         tdb_off rec_ptr, top;
477
478         top = TDB_HASH_TOP(i);
479
480         if (tdb_lock(tdb, i, F_WRLCK) != 0)
481                 return -1;
482
483         if (ofs_read(tdb, top, &rec_ptr) == -1)
484                 return tdb_unlock(tdb, i, F_WRLCK);
485
486         if (rec_ptr)
487                 printf("hash=%d\n", i);
488
489         while (rec_ptr) {
490                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
491         }
492
493         return tdb_unlock(tdb, i, F_WRLCK);
494 }
495
496 void tdb_dump_all(TDB_CONTEXT *tdb)
497 {
498         int i;
499         for (i=0;i<tdb->header.hash_size;i++) {
500                 tdb_dump_chain(tdb, i);
501         }
502         printf("freelist:\n");
503         tdb_dump_chain(tdb, -1);
504 }
505
506 int tdb_printfreelist(TDB_CONTEXT *tdb)
507 {
508         int ret;
509         long total_free = 0;
510         tdb_off offset, rec_ptr;
511         struct list_struct rec;
512
513         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
514                 return ret;
515
516         offset = FREELIST_TOP;
517
518         /* read in the freelist top */
519         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
520                 return 0;
521         }
522
523         printf("freelist top=[0x%08x]\n", rec_ptr );
524         while (rec_ptr) {
525                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
526                         return -1;
527                 }
528
529                 if (rec.magic != TDB_FREE_MAGIC) {
530                         printf("bad magic 0x%08x in free list\n", rec.magic);
531                         return -1;
532                 }
533
534                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
535                 total_free += rec.rec_len;
536
537                 /* move to the next record */
538                 rec_ptr = rec.next;
539         }
540         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
541                (int)total_free);
542
543         return tdb_unlock(tdb, -1, F_WRLCK);
544 }
545
546 /* Remove an element from the freelist.  Must have alloc lock. */
547 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
548 {
549         tdb_off last_ptr, i;
550
551         /* read in the freelist top */
552         last_ptr = FREELIST_TOP;
553         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
554                 if (i == off) {
555                         /* We've found it! */
556                         return ofs_write(tdb, last_ptr, &next);
557                 }
558                 /* Follow chain (next offset is at start of record) */
559                 last_ptr = i;
560         }
561         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
562         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
563 }
564
565 /* Add an element into the freelist. Merge adjacent records if
566    neccessary. */
567 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
568 {
569         tdb_off right, left;
570
571         /* Allocation and tailer lock */
572         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
573                 return -1;
574
575         /* set an initial tailer, so if we fail we don't leave a bogus record */
576         if (update_tailer(tdb, offset, rec) != 0) {
577                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
578                 goto fail;
579         }
580
581         /* Look right first (I'm an Australian, dammit) */
582         right = offset + sizeof(*rec) + rec->rec_len;
583         if (right + sizeof(*rec) <= tdb->map_size) {
584                 struct list_struct r;
585
586                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
587                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
588                         goto left;
589                 }
590
591                 /* If it's free, expand to include it. */
592                 if (r.magic == TDB_FREE_MAGIC) {
593                         if (remove_from_freelist(tdb, right, r.next) == -1) {
594                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
595                                 goto left;
596                         }
597                         rec->rec_len += sizeof(r) + r.rec_len;
598                 }
599         }
600
601 left:
602         /* Look left */
603         left = offset - sizeof(tdb_off);
604         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
605                 struct list_struct l;
606                 tdb_off leftsize;
607
608                 /* Read in tailer and jump back to header */
609                 if (ofs_read(tdb, left, &leftsize) == -1) {
610                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
611                         goto update;
612                 }
613                 left = offset - leftsize;
614
615                 /* Now read in record */
616                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
617                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
618                         goto update;
619                 }
620
621                 /* If it's free, expand to include it. */
622                 if (l.magic == TDB_FREE_MAGIC) {
623                         if (remove_from_freelist(tdb, left, l.next) == -1) {
624                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
625                                 goto update;
626                         } else {
627                                 offset = left;
628                                 rec->rec_len += leftsize;
629                         }
630                 }
631         }
632
633 update:
634         if (update_tailer(tdb, offset, rec) == -1) {
635                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
636                 goto fail;
637         }
638
639         /* Now, prepend to free list */
640         rec->magic = TDB_FREE_MAGIC;
641
642         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
643             rec_write(tdb, offset, rec) == -1 ||
644             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
645                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
646                 goto fail;
647         }
648
649         /* And we're done. */
650         tdb_unlock(tdb, -1, F_WRLCK);
651         return 0;
652
653  fail:
654         tdb_unlock(tdb, -1, F_WRLCK);
655         return -1;
656 }
657
658
659 /* expand a file.  we prefer to use ftruncate, as that is what posix
660   says to use for mmap expansion */
661 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
662 {
663         char buf[1024];
664 #if HAVE_FTRUNCATE_EXTEND
665         if (ftruncate(tdb->fd, size+addition) != 0) {
666                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
667                            size+addition, strerror(errno)));
668                 return -1;
669         }
670 #else
671         char b = 0;
672
673 #ifdef HAVE_PWRITE
674         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
675 #else
676         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
677             write(tdb->fd, &b, 1) != 1) {
678 #endif
679                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
680                            size+addition, strerror(errno)));
681                 return -1;
682         }
683 #endif
684
685         /* now fill the file with something. This ensures that the file isn't sparse, which would be
686            very bad if we ran out of disk. This must be done with write, not via mmap */
687         memset(buf, 0x42, sizeof(buf));
688         while (addition) {
689                 int n = addition>sizeof(buf)?sizeof(buf):addition;
690 #ifdef HAVE_PWRITE
691                 int ret = pwrite(tdb->fd, buf, n, size);
692 #else
693                 int ret;
694                 if (lseek(tdb->fd, size, SEEK_SET) != size)
695                         return -1;
696                 ret = write(tdb->fd, buf, n);
697 #endif
698                 if (ret != n) {
699                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
700                                    n, strerror(errno)));
701                         return -1;
702                 }
703                 addition -= n;
704                 size += n;
705         }
706         return 0;
707 }
708
709
710 /* expand the database at least size bytes by expanding the underlying
711    file and doing the mmap again if necessary */
712 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
713 {
714         struct list_struct rec;
715         tdb_off offset;
716
717         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
718                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
719                 return -1;
720         }
721
722         /* must know about any previous expansions by another process */
723         tdb_oob(tdb, tdb->map_size + 1, 1);
724
725         /* always make room for at least 10 more records, and round
726            the database up to a multiple of TDB_PAGE_SIZE */
727         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
728
729         if (!(tdb->flags & TDB_INTERNAL))
730                 tdb_munmap(tdb);
731
732         /*
733          * We must ensure the file is unmapped before doing this
734          * to ensure consistency with systems like OpenBSD where
735          * writes and mmaps are not consistent.
736          */
737
738         /* expand the file itself */
739         if (!(tdb->flags & TDB_INTERNAL)) {
740                 if (expand_file(tdb, tdb->map_size, size) != 0)
741                         goto fail;
742         }
743
744         tdb->map_size += size;
745
746         if (tdb->flags & TDB_INTERNAL)
747                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
748         else {
749                 /*
750                  * We must ensure the file is remapped before adding the space
751                  * to ensure consistency with systems like OpenBSD where
752                  * writes and mmaps are not consistent.
753                  */
754
755                 /* We're ok if the mmap fails as we'll fallback to read/write */
756                 tdb_mmap(tdb);
757         }
758
759         /* form a new freelist record */
760         memset(&rec,'\0',sizeof(rec));
761         rec.rec_len = size - sizeof(rec);
762
763         /* link it into the free list */
764         offset = tdb->map_size - size;
765         if (tdb_free(tdb, offset, &rec) == -1)
766                 goto fail;
767
768         tdb_unlock(tdb, -1, F_WRLCK);
769         return 0;
770  fail:
771         tdb_unlock(tdb, -1, F_WRLCK);
772         return -1;
773 }
774
775 /* allocate some space from the free list. The offset returned points
776    to a unconnected list_struct within the database with room for at
777    least length bytes of total data
778
779    0 is returned if the space could not be allocated
780  */
781 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
782                             struct list_struct *rec)
783 {
784         tdb_off rec_ptr, last_ptr, newrec_ptr;
785         struct list_struct newrec;
786
787         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
788                 return 0;
789
790         /* Extra bytes required for tailer */
791         length += sizeof(tdb_off);
792
793  again:
794         last_ptr = FREELIST_TOP;
795
796         /* read in the freelist top */
797         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
798                 goto fail;
799
800         /* keep looking until we find a freelist record big enough */
801         while (rec_ptr) {
802                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
803                         goto fail;
804
805                 if (rec->rec_len >= length) {
806                         /* found it - now possibly split it up  */
807                         if (rec->rec_len > length + MIN_REC_SIZE) {
808                                 /* Length of left piece */
809                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
810
811                                 /* Right piece to go on free list */
812                                 newrec.rec_len = rec->rec_len
813                                         - (sizeof(*rec) + length);
814                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
815
816                                 /* And left record is shortened */
817                                 rec->rec_len = length;
818                         } else
819                                 newrec_ptr = 0;
820
821                         /* Remove allocated record from the free list */
822                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
823                                 goto fail;
824
825                         /* Update header: do this before we drop alloc
826                            lock, otherwise tdb_free() might try to
827                            merge with us, thinking we're free.
828                            (Thanks Jeremy Allison). */
829                         rec->magic = TDB_MAGIC;
830                         if (rec_write(tdb, rec_ptr, rec) == -1)
831                                 goto fail;
832
833                         /* Did we create new block? */
834                         if (newrec_ptr) {
835                                 /* Update allocated record tailer (we
836                                    shortened it). */
837                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
838                                         goto fail;
839
840                                 /* Free new record */
841                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
842                                         goto fail;
843                         }
844
845                         /* all done - return the new record offset */
846                         tdb_unlock(tdb, -1, F_WRLCK);
847                         return rec_ptr;
848                 }
849                 /* move to the next record */
850                 last_ptr = rec_ptr;
851                 rec_ptr = rec->next;
852         }
853         /* we didn't find enough space. See if we can expand the
854            database and if we can then try again */
855         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
856                 goto again;
857  fail:
858         tdb_unlock(tdb, -1, F_WRLCK);
859         return 0;
860 }
861
862 /* initialise a new database with a specified hash size */
863 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
864 {
865         struct tdb_header *newdb;
866         int size, ret = -1;
867
868         /* We make it up in memory, then write it out if not internal */
869         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
870         if (!(newdb = calloc(size, 1)))
871                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
872
873         /* Fill in the header */
874         newdb->version = TDB_VERSION;
875         newdb->hash_size = hash_size;
876 #ifdef USE_SPINLOCKS
877         newdb->rwlocks = size;
878 #endif
879         if (tdb->flags & TDB_INTERNAL) {
880                 tdb->map_size = size;
881                 tdb->map_ptr = (char *)newdb;
882                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
883                 /* Convert the `ondisk' version if asked. */
884                 CONVERT(*newdb);
885                 return 0;
886         }
887         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
888                 goto fail;
889
890         if (ftruncate(tdb->fd, 0) == -1)
891                 goto fail;
892
893         /* This creates an endian-converted header, as if read from disk */
894         CONVERT(*newdb);
895         memcpy(&tdb->header, newdb, sizeof(tdb->header));
896         /* Don't endian-convert the magic food! */
897         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
898         if (write(tdb->fd, newdb, size) != size)
899                 ret = -1;
900         else
901                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
902
903   fail:
904         SAFE_FREE(newdb);
905         return ret;
906 }
907
908 /* Returns 0 on fail.  On success, return offset of record, and fills
909    in rec */
910 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
911                         struct list_struct *r)
912 {
913         tdb_off rec_ptr;
914         
915         /* read in the hash top */
916         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
917                 return 0;
918
919         /* keep looking until we find the right record */
920         while (rec_ptr) {
921                 if (rec_read(tdb, rec_ptr, r) == -1)
922                         return 0;
923
924                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
925                         char *k;
926                         /* a very likely hit - read the key */
927                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
928                                            r->key_len);
929                         if (!k)
930                                 return 0;
931
932                         if (memcmp(key.dptr, k, key.dsize) == 0) {
933                                 SAFE_FREE(k);
934                                 return rec_ptr;
935                         }
936                         SAFE_FREE(k);
937                 }
938                 rec_ptr = r->next;
939         }
940         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
941 }
942
943 /* If they do lockkeys, check that this hash is one they locked */
944 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
945 {
946         u32 i;
947         if (!tdb->lockedkeys)
948                 return 1;
949         for (i = 0; i < tdb->lockedkeys[0]; i++)
950                 if (tdb->lockedkeys[i+1] == hash)
951                         return 1;
952         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
953 }
954
955 /* As tdb_find, but if you succeed, keep the lock */
956 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
957                              struct list_struct *rec)
958 {
959         u32 hash, rec_ptr;
960
961         hash = tdb_hash(&key);
962         if (!tdb_keylocked(tdb, hash))
963                 return 0;
964         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
965                 return 0;
966         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
967                 tdb_unlock(tdb, BUCKET(hash), locktype);
968         return rec_ptr;
969 }
970
971 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
972 {
973         return tdb->ecode;
974 }
975
976 static struct tdb_errname {
977         enum TDB_ERROR ecode; const char *estring;
978 } emap[] = { {TDB_SUCCESS, "Success"},
979              {TDB_ERR_CORRUPT, "Corrupt database"},
980              {TDB_ERR_IO, "IO Error"},
981              {TDB_ERR_LOCK, "Locking error"},
982              {TDB_ERR_OOM, "Out of memory"},
983              {TDB_ERR_EXISTS, "Record exists"},
984              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
985              {TDB_ERR_NOEXIST, "Record does not exist"} };
986
987 /* Error string for the last tdb error */
988 const char *tdb_errorstr(TDB_CONTEXT *tdb)
989 {
990         u32 i;
991         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
992                 if (tdb->ecode == emap[i].ecode)
993                         return emap[i].estring;
994         return "Invalid error code";
995 }
996
997 /* update an entry in place - this only works if the new data size
998    is <= the old data size and the key exists.
999    on failure return -1
1000 */
1001 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1002 {
1003         struct list_struct rec;
1004         tdb_off rec_ptr;
1005         int ret = -1;
1006
1007         /* find entry */
1008         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1009                 return -1;
1010
1011         /* must be long enough key, data and tailer */
1012         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1013                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1014                 goto out;
1015         }
1016
1017         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1018                       dbuf.dptr, dbuf.dsize) == -1)
1019                 goto out;
1020
1021         if (dbuf.dsize != rec.data_len) {
1022                 /* update size */
1023                 rec.data_len = dbuf.dsize;
1024                 ret = rec_write(tdb, rec_ptr, &rec);
1025         } else
1026                 ret = 0;
1027  out:
1028         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1029         return ret;
1030 }
1031
1032 /* find an entry in the database given a key */
1033 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1034 {
1035         tdb_off rec_ptr;
1036         struct list_struct rec;
1037         TDB_DATA ret;
1038
1039         /* find which hash bucket it is in */
1040         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1041                 return tdb_null;
1042
1043         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1044                                   rec.data_len);
1045         ret.dsize = rec.data_len;
1046         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1047         return ret;
1048 }
1049
1050 /* check if an entry in the database exists 
1051
1052    note that 1 is returned if the key is found and 0 is returned if not found
1053    this doesn't match the conventions in the rest of this module, but is
1054    compatible with gdbm
1055 */
1056 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1057 {
1058         struct list_struct rec;
1059         
1060         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1061                 return 0;
1062         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1063         return 1;
1064 }
1065
1066 /* record lock stops delete underneath */
1067 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1068 {
1069         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1070 }
1071 /*
1072   Write locks override our own fcntl readlocks, so check it here.
1073   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1074   an error to fail to get the lock here.
1075 */
1076  
1077 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1078 {
1079         struct tdb_traverse_lock *i;
1080         for (i = &tdb->travlocks; i; i = i->next)
1081                 if (i->off == off)
1082                         return -1;
1083         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1084 }
1085
1086 /*
1087   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1088   an error to fail to get the lock here.
1089 */
1090
1091 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1092 {
1093         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1094 }
1095 /* fcntl locks don't stack: avoid unlocking someone else's */
1096 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1097 {
1098         struct tdb_traverse_lock *i;
1099         u32 count = 0;
1100
1101         if (off == 0)
1102                 return 0;
1103         for (i = &tdb->travlocks; i; i = i->next)
1104                 if (i->off == off)
1105                         count++;
1106         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1107 }
1108
1109 /* actually delete an entry in the database given the offset */
1110 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1111 {
1112         tdb_off last_ptr, i;
1113         struct list_struct lastrec;
1114
1115         if (tdb->read_only) return -1;
1116
1117         if (write_lock_record(tdb, rec_ptr) == -1) {
1118                 /* Someone traversing here: mark it as dead */
1119                 rec->magic = TDB_DEAD_MAGIC;
1120                 return rec_write(tdb, rec_ptr, rec);
1121         }
1122         if (write_unlock_record(tdb, rec_ptr) != 0)
1123                 return -1;
1124
1125         /* find previous record in hash chain */
1126         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1127                 return -1;
1128         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1129                 if (rec_read(tdb, i, &lastrec) == -1)
1130                         return -1;
1131
1132         /* unlink it: next ptr is at start of record. */
1133         if (last_ptr == 0)
1134                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1135         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1136                 return -1;
1137
1138         /* recover the space */
1139         if (tdb_free(tdb, rec_ptr, rec) == -1)
1140                 return -1;
1141         return 0;
1142 }
1143
1144 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1145 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1146                          struct list_struct *rec)
1147 {
1148         int want_next = (tlock->off != 0);
1149
1150         /* No traversal allows if you've called tdb_lockkeys() */
1151         if (tdb->lockedkeys)
1152                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1153
1154         /* Lock each chain from the start one. */
1155         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1156                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1157                         return -1;
1158
1159                 /* No previous record?  Start at top of chain. */
1160                 if (!tlock->off) {
1161                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1162                                      &tlock->off) == -1)
1163                                 goto fail;
1164                 } else {
1165                         /* Otherwise unlock the previous record. */
1166                         if (unlock_record(tdb, tlock->off) != 0)
1167                                 goto fail;
1168                 }
1169
1170                 if (want_next) {
1171                         /* We have offset of old record: grab next */
1172                         if (rec_read(tdb, tlock->off, rec) == -1)
1173                                 goto fail;
1174                         tlock->off = rec->next;
1175                 }
1176
1177                 /* Iterate through chain */
1178                 while( tlock->off) {
1179                         tdb_off current;
1180                         if (rec_read(tdb, tlock->off, rec) == -1)
1181                                 goto fail;
1182                         if (!TDB_DEAD(rec)) {
1183                                 /* Woohoo: we found one! */
1184                                 if (lock_record(tdb, tlock->off) != 0)
1185                                         goto fail;
1186                                 return tlock->off;
1187                         }
1188                         /* Try to clean dead ones from old traverses */
1189                         current = tlock->off;
1190                         tlock->off = rec->next;
1191                         if (do_delete(tdb, current, rec) != 0)
1192                                 goto fail;
1193                 }
1194                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1195                 want_next = 0;
1196         }
1197         /* We finished iteration without finding anything */
1198         return TDB_ERRCODE(TDB_SUCCESS, 0);
1199
1200  fail:
1201         tlock->off = 0;
1202         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1203                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1204         return -1;
1205 }
1206
1207 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1208    return -1 on error or the record count traversed
1209    if fn is NULL then it is not called
1210    a non-zero return value from fn() indicates that the traversal should stop
1211   */
1212 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1213 {
1214         TDB_DATA key, dbuf;
1215         struct list_struct rec;
1216         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1217         int ret, count = 0;
1218
1219         /* This was in the initializaton, above, but the IRIX compiler
1220          * did not like it.  crh
1221          */
1222         tl.next = tdb->travlocks.next;
1223
1224         /* fcntl locks don't stack: beware traverse inside traverse */
1225         tdb->travlocks.next = &tl;
1226
1227         /* tdb_next_lock places locks on the record returned, and its chain */
1228         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1229                 count++;
1230                 /* now read the full record */
1231                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1232                                           rec.key_len + rec.data_len);
1233                 if (!key.dptr) {
1234                         ret = -1;
1235                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1236                                 goto out;
1237                         if (unlock_record(tdb, tl.off) != 0)
1238                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1239                         goto out;
1240                 }
1241                 key.dsize = rec.key_len;
1242                 dbuf.dptr = key.dptr + rec.key_len;
1243                 dbuf.dsize = rec.data_len;
1244
1245                 /* Drop chain lock, call out */
1246                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1247                         ret = -1;
1248                         goto out;
1249                 }
1250                 if (fn && fn(tdb, key, dbuf, state)) {
1251                         /* They want us to terminate traversal */
1252                         ret = count;
1253                         if (unlock_record(tdb, tl.off) != 0) {
1254                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1255                                 ret = -1;
1256                         }
1257                         tdb->travlocks.next = tl.next;
1258                         SAFE_FREE(key.dptr);
1259                         return count;
1260                 }
1261                 SAFE_FREE(key.dptr);
1262         }
1263 out:
1264         tdb->travlocks.next = tl.next;
1265         if (ret < 0)
1266                 return -1;
1267         else
1268                 return count;
1269 }
1270
1271 /* find the first entry in the database and return its key */
1272 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1273 {
1274         TDB_DATA key;
1275         struct list_struct rec;
1276
1277         /* release any old lock */
1278         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1279                 return tdb_null;
1280         tdb->travlocks.off = tdb->travlocks.hash = 0;
1281
1282         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1283                 return tdb_null;
1284         /* now read the key */
1285         key.dsize = rec.key_len;
1286         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1287         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1288                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1289         return key;
1290 }
1291
1292 /* find the next entry in the database, returning its key */
1293 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1294 {
1295         u32 oldhash;
1296         TDB_DATA key = tdb_null;
1297         struct list_struct rec;
1298         char *k = NULL;
1299
1300         /* Is locked key the old key?  If so, traverse will be reliable. */
1301         if (tdb->travlocks.off) {
1302                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1303                         return tdb_null;
1304                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1305                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1306                                             rec.key_len))
1307                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1308                         /* No, it wasn't: unlock it and start from scratch */
1309                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1310                                 return tdb_null;
1311                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1312                                 return tdb_null;
1313                         tdb->travlocks.off = 0;
1314                 }
1315
1316                 SAFE_FREE(k);
1317         }
1318
1319         if (!tdb->travlocks.off) {
1320                 /* No previous element: do normal find, and lock record */
1321                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1322                 if (!tdb->travlocks.off)
1323                         return tdb_null;
1324                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1325                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1326                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1327                         return tdb_null;
1328                 }
1329         }
1330         oldhash = tdb->travlocks.hash;
1331
1332         /* Grab next record: locks chain and returned record,
1333            unlocks old record */
1334         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1335                 key.dsize = rec.key_len;
1336                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1337                                           key.dsize);
1338                 /* Unlock the chain of this new record */
1339                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1340                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1341         }
1342         /* Unlock the chain of old record */
1343         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1344                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1345         return key;
1346 }
1347
1348 /* delete an entry in the database given a key */
1349 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1350 {
1351         tdb_off rec_ptr;
1352         struct list_struct rec;
1353         int ret;
1354
1355         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1356                 return -1;
1357         ret = do_delete(tdb, rec_ptr, &rec);
1358         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1359                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1360         return ret;
1361 }
1362
1363 /* store an element in the database, replacing any existing element
1364    with the same key 
1365
1366    return 0 on success, -1 on failure
1367 */
1368 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1369 {
1370         struct list_struct rec;
1371         u32 hash;
1372         tdb_off rec_ptr;
1373         char *p = NULL;
1374         int ret = 0;
1375
1376         /* find which hash bucket it is in */
1377         hash = tdb_hash(&key);
1378         if (!tdb_keylocked(tdb, hash))
1379                 return -1;
1380         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1381                 return -1;
1382
1383         /* check for it existing, on insert. */
1384         if (flag == TDB_INSERT) {
1385                 if (tdb_exists(tdb, key)) {
1386                         tdb->ecode = TDB_ERR_EXISTS;
1387                         goto fail;
1388                 }
1389         } else {
1390                 /* first try in-place update, on modify or replace. */
1391                 if (tdb_update(tdb, key, dbuf) == 0)
1392                         goto out;
1393                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1394                         goto fail;
1395         }
1396         /* reset the error code potentially set by the tdb_update() */
1397         tdb->ecode = TDB_SUCCESS;
1398
1399         /* delete any existing record - if it doesn't exist we don't
1400            care.  Doing this first reduces fragmentation, and avoids
1401            coalescing with `allocated' block before it's updated. */
1402         if (flag != TDB_INSERT)
1403                 tdb_delete(tdb, key);
1404
1405         /* Copy key+value *before* allocating free space in case malloc
1406            fails and we are left with a dead spot in the tdb. */
1407
1408         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1409                 tdb->ecode = TDB_ERR_OOM;
1410                 goto fail;
1411         }
1412
1413         memcpy(p, key.dptr, key.dsize);
1414         memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1415
1416         /* now we're into insert / modify / replace of a record which
1417          * we know could not be optimised by an in-place store (for
1418          * various reasons).  */
1419         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1420                 goto fail;
1421
1422         /* Read hash top into next ptr */
1423         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1424                 goto fail;
1425
1426         rec.key_len = key.dsize;
1427         rec.data_len = dbuf.dsize;
1428         rec.full_hash = hash;
1429         rec.magic = TDB_MAGIC;
1430
1431         /* write out and point the top of the hash chain at it */
1432         if (rec_write(tdb, rec_ptr, &rec) == -1
1433             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1434             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1435                 /* Need to tdb_unallocate() here */
1436                 goto fail;
1437         }
1438  out:
1439         SAFE_FREE(p); 
1440         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1441         return ret;
1442 fail:
1443         ret = -1;
1444         goto out;
1445 }
1446
1447 static int tdb_already_open(dev_t device,
1448                             ino_t ino)
1449 {
1450         TDB_CONTEXT *i;
1451         
1452         for (i = tdbs; i; i = i->next) {
1453                 if (i->device == device && i->inode == ino) {
1454                         return 1;
1455                 }
1456         }
1457
1458         return 0;
1459 }
1460
1461 /* open the database, creating it if necessary 
1462
1463    The open_flags and mode are passed straight to the open call on the
1464    database file. A flags value of O_WRONLY is invalid. The hash size
1465    is advisory, use zero for a default value.
1466
1467    Return is NULL on error, in which case errno is also set.  Don't 
1468    try to call tdb_error or tdb_errname, just do strerror(errno).
1469
1470    @param name may be NULL for internal databases. */
1471 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1472                       int open_flags, mode_t mode)
1473 {
1474         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1475 }
1476
1477
1478 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1479                          int open_flags, mode_t mode,
1480                          tdb_log_func log_fn)
1481 {
1482         TDB_CONTEXT *tdb;
1483         struct stat st;
1484         int rev = 0, locked;
1485         unsigned char *vp;
1486         u32 vertest;
1487
1488         if (!(tdb = calloc(1, sizeof *tdb))) {
1489                 /* Can't log this */
1490                 errno = ENOMEM;
1491                 goto fail;
1492         }
1493         tdb->fd = -1;
1494         tdb->name = NULL;
1495         tdb->map_ptr = NULL;
1496         tdb->lockedkeys = NULL;
1497         tdb->flags = tdb_flags;
1498         tdb->open_flags = open_flags;
1499         tdb->log_fn = log_fn;
1500         
1501         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1502                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1503                          name));
1504                 errno = EINVAL;
1505                 goto fail;
1506         }
1507         
1508         if (hash_size == 0)
1509                 hash_size = DEFAULT_HASH_SIZE;
1510         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1511                 tdb->read_only = 1;
1512                 /* read only databases don't do locking or clear if first */
1513                 tdb->flags |= TDB_NOLOCK;
1514                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1515         }
1516
1517         /* internal databases don't mmap or lock, and start off cleared */
1518         if (tdb->flags & TDB_INTERNAL) {
1519                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1520                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1521                 if (tdb_new_database(tdb, hash_size) != 0) {
1522                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1523                         goto fail;
1524                 }
1525                 goto internal;
1526         }
1527
1528         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1529                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1530                          name, strerror(errno)));
1531                 goto fail;      /* errno set by open(2) */
1532         }
1533
1534         /* ensure there is only one process initialising at once */
1535         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1536                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1537                          name, strerror(errno)));
1538                 goto fail;      /* errno set by tdb_brlock */
1539         }
1540
1541         /* we need to zero database if we are the only one with it open */
1542         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1543             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1544                 open_flags |= O_CREAT;
1545                 if (ftruncate(tdb->fd, 0) == -1) {
1546                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1547                                  "failed to truncate %s: %s\n",
1548                                  name, strerror(errno)));
1549                         goto fail; /* errno set by ftruncate */
1550                 }
1551         }
1552
1553         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1554             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1555             || (tdb->header.version != TDB_VERSION
1556                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1557                 /* its not a valid database - possibly initialise it */
1558                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1559                         errno = EIO; /* ie bad format or something */
1560                         goto fail;
1561                 }
1562                 rev = (tdb->flags & TDB_CONVERT);
1563         }
1564         vp = (unsigned char *)&tdb->header.version;
1565         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1566                   (((u32)vp[2]) << 8) | (u32)vp[3];
1567         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1568         if (!rev)
1569                 tdb->flags &= ~TDB_CONVERT;
1570         else {
1571                 tdb->flags |= TDB_CONVERT;
1572                 convert(&tdb->header, sizeof(tdb->header));
1573         }
1574         if (fstat(tdb->fd, &st) == -1)
1575                 goto fail;
1576
1577         /* Is it already in the open list?  If so, fail. */
1578         if (tdb_already_open(st.st_dev, st.st_ino)) {
1579                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1580                          "%s (%d,%d) is already open in this process\n",
1581                          name, st.st_dev, st.st_ino));
1582                 errno = EBUSY;
1583                 goto fail;
1584         }
1585
1586         if (!(tdb->name = (char *)strdup(name))) {
1587                 errno = ENOMEM;
1588                 goto fail;
1589         }
1590
1591         tdb->map_size = st.st_size;
1592         tdb->device = st.st_dev;
1593         tdb->inode = st.st_ino;
1594         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1595         if (!tdb->locked) {
1596                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1597                          "failed to allocate lock structure for %s\n",
1598                          name));
1599                 errno = ENOMEM;
1600                 goto fail;
1601         }
1602         tdb_mmap(tdb);
1603         if (locked) {
1604                 if (!tdb->read_only)
1605                         if (tdb_clear_spinlocks(tdb) != 0) {
1606                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1607                                 "failed to clear spinlock\n"));
1608                                 goto fail;
1609                         }
1610                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1611                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1612                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1613                                  name, strerror(errno)));
1614                         goto fail;
1615                 }
1616         }
1617         /* leave this lock in place to indicate it's in use */
1618         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1619                 goto fail;
1620
1621  internal:
1622         /* Internal (memory-only) databases skip all the code above to
1623          * do with disk files, and resume here by releasing their
1624          * global lock and hooking into the active list. */
1625         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1626                 goto fail;
1627         tdb->next = tdbs;
1628         tdbs = tdb;
1629         return tdb;
1630
1631  fail:
1632         { int save_errno = errno;
1633
1634         if (!tdb)
1635                 return NULL;
1636         
1637         if (tdb->map_ptr) {
1638                 if (tdb->flags & TDB_INTERNAL)
1639                         SAFE_FREE(tdb->map_ptr);
1640                 else
1641                         tdb_munmap(tdb);
1642         }
1643         SAFE_FREE(tdb->name);
1644         if (tdb->fd != -1)
1645                 if (close(tdb->fd) != 0)
1646                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1647         SAFE_FREE(tdb->locked);
1648         SAFE_FREE(tdb);
1649         errno = save_errno;
1650         return NULL;
1651         }
1652 }
1653
1654 /* close a database */
1655 int tdb_close(TDB_CONTEXT *tdb)
1656 {
1657         TDB_CONTEXT **i;
1658         int ret = 0;
1659
1660         if (tdb->map_ptr) {
1661                 if (tdb->flags & TDB_INTERNAL)
1662                         SAFE_FREE(tdb->map_ptr);
1663                 else
1664                         tdb_munmap(tdb);
1665         }
1666         SAFE_FREE(tdb->name);
1667         if (tdb->fd != -1)
1668                 ret = close(tdb->fd);
1669         SAFE_FREE(tdb->locked);
1670         SAFE_FREE(tdb->lockedkeys);
1671
1672         /* Remove from contexts list */
1673         for (i = &tdbs; *i; i = &(*i)->next) {
1674                 if (*i == tdb) {
1675                         *i = tdb->next;
1676                         break;
1677                 }
1678         }
1679
1680         memset(tdb, 0, sizeof(*tdb));
1681         SAFE_FREE(tdb);
1682
1683         return ret;
1684 }
1685
1686 /* lock/unlock entire database */
1687 int tdb_lockall(TDB_CONTEXT *tdb)
1688 {
1689         u32 i;
1690
1691         /* There are no locks on read-only dbs */
1692         if (tdb->read_only)
1693                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1694         if (tdb->lockedkeys)
1695                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1696         for (i = 0; i < tdb->header.hash_size; i++) 
1697                 if (tdb_lock(tdb, i, F_WRLCK))
1698                         break;
1699
1700         /* If error, release locks we have... */
1701         if (i < tdb->header.hash_size) {
1702                 u32 j;
1703
1704                 for ( j = 0; j < i; j++)
1705                         tdb_unlock(tdb, j, F_WRLCK);
1706                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1707         }
1708
1709         return 0;
1710 }
1711 void tdb_unlockall(TDB_CONTEXT *tdb)
1712 {
1713         u32 i;
1714         for (i=0; i < tdb->header.hash_size; i++)
1715                 tdb_unlock(tdb, i, F_WRLCK);
1716 }
1717
1718 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1719 {
1720         u32 i, j, hash;
1721
1722         /* Can't lock more keys if already locked */
1723         if (tdb->lockedkeys)
1724                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1725         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1726                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1727         /* First number in array is # keys */
1728         tdb->lockedkeys[0] = number;
1729
1730         /* Insertion sort by bucket */
1731         for (i = 0; i < number; i++) {
1732                 hash = tdb_hash(&keys[i]);
1733                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1734                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1735                 tdb->lockedkeys[j+1] = hash;
1736         }
1737         /* Finally, lock in order */
1738         for (i = 0; i < number; i++)
1739                 if (tdb_lock(tdb, i, F_WRLCK))
1740                         break;
1741
1742         /* If error, release locks we have... */
1743         if (i < number) {
1744                 for ( j = 0; j < i; j++)
1745                         tdb_unlock(tdb, j, F_WRLCK);
1746                 SAFE_FREE(tdb->lockedkeys);
1747                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1748         }
1749         return 0;
1750 }
1751
1752 /* Unlock the keys previously locked by tdb_lockkeys() */
1753 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1754 {
1755         u32 i;
1756         for (i = 0; i < tdb->lockedkeys[0]; i++)
1757                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1758         SAFE_FREE(tdb->lockedkeys);
1759 }
1760
1761 /* lock/unlock one hash chain. This is meant to be used to reduce
1762    contention - it cannot guarantee how many records will be locked */
1763 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1764 {
1765         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1766 }
1767
1768 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1769 {
1770         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1771 }
1772
1773
1774 /* register a loging function */
1775 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1776 {
1777         tdb->log_fn = fn;
1778 }
1779
1780
1781 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1782    seek pointer from our parent and to re-establish locks */
1783 int tdb_reopen(TDB_CONTEXT *tdb)
1784 {
1785         struct stat st;
1786
1787         if (tdb_munmap(tdb) != 0) {
1788                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
1789                 goto fail;
1790         }
1791         if (close(tdb->fd) != 0)
1792                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
1793         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
1794         if (tdb->fd == -1) {
1795                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
1796                 goto fail;
1797         }
1798         if (fstat(tdb->fd, &st) != 0) {
1799                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
1800                 goto fail;
1801         }
1802         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
1803                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
1804                 goto fail;
1805         }
1806         tdb_mmap(tdb);
1807         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
1808                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
1809                 goto fail;
1810         }
1811
1812         return 0;
1813
1814 fail:
1815         tdb_close(tdb);
1816         return -1;
1817 }
1818
1819 /* reopen all tdb's */
1820 int tdb_reopen_all(void)
1821 {
1822         TDB_CONTEXT *tdb;
1823
1824         for (tdb=tdbs; tdb; tdb = tdb->next) {
1825                 if (tdb_reopen(tdb) != 0) return -1;
1826         }
1827
1828         return 0;
1829 }