Fix based on Andrew's insight as re-using a tdb after fork means
[gd/samba-autobuild/.git] / source3 / tdb / tdb.c
1 /* 
2    Unix SMB/Netbios implementation.
3    Version 3.0
4    Samba database functions
5    Copyright (C) Andrew Tridgell              1999-2000
6    Copyright (C) Luke Kenneth Casson Leighton      2000
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24 #ifdef STANDALONE
25 #if HAVE_CONFIG_H
26 #include <config.h>
27 #endif
28
29 #include <stdlib.h>
30 #include <stdio.h>
31 #include <fcntl.h>
32 #include <unistd.h>
33 #include <string.h>
34 #include <fcntl.h>
35 #include <errno.h>
36 #include <sys/mman.h>
37 #include <sys/stat.h>
38 #include "tdb.h"
39 #include "spinlock.h"
40 #else
41 #include "includes.h"
42 #endif
43
44 #define TDB_MAGIC_FOOD "TDB file\n"
45 #define TDB_VERSION (0x26011967 + 6)
46 #define TDB_MAGIC (0x26011999U)
47 #define TDB_FREE_MAGIC (~TDB_MAGIC)
48 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
49 #define TDB_ALIGNMENT 4
50 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
51 #define DEFAULT_HASH_SIZE 131
52 #define TDB_PAGE_SIZE 0x2000
53 #define FREELIST_TOP (sizeof(struct tdb_header))
54 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
55 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
56 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
57 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
58 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
59 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
60
61 /* lock offsets */
62 #define GLOBAL_LOCK 0
63 #define ACTIVE_LOCK 4
64
65 #ifndef MAP_FILE
66 #define MAP_FILE 0
67 #endif
68
69 #ifndef MAP_FAILED
70 #define MAP_FAILED ((void *)-1)
71 #endif
72
73 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
74 TDB_DATA tdb_null;
75
76 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
77 static TDB_CONTEXT *tdbs = NULL;
78
79 static void tdb_munmap(TDB_CONTEXT *tdb)
80 {
81         if (tdb->flags & TDB_INTERNAL)
82                 return;
83
84 #ifdef HAVE_MMAP
85         if (tdb->map_ptr)
86                 munmap(tdb->map_ptr, tdb->map_size);
87 #endif
88         tdb->map_ptr = NULL;
89 }
90
91 static void tdb_mmap(TDB_CONTEXT *tdb)
92 {
93         if (tdb->flags & TDB_INTERNAL)
94                 return;
95
96 #ifdef HAVE_MMAP
97         if (!(tdb->flags & TDB_NOMMAP)) {
98                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
99                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
100                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
101
102                 /*
103                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
104                  */
105
106                 if (tdb->map_ptr == MAP_FAILED) {
107                         tdb->map_ptr = NULL;
108                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
109                                  tdb->map_size, strerror(errno)));
110                 }
111         } else {
112                 tdb->map_ptr = NULL;
113         }
114 #else
115         tdb->map_ptr = NULL;
116 #endif
117 }
118
119 /* Endian conversion: we only ever deal with 4 byte quantities */
120 static void *convert(void *buf, u32 size)
121 {
122         u32 i, *p = buf;
123         for (i = 0; i < size / 4; i++)
124                 p[i] = TDB_BYTEREV(p[i]);
125         return buf;
126 }
127 #define DOCONV() (tdb->flags & TDB_CONVERT)
128 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
129
130 /* the body of the database is made of one list_struct for the free space
131    plus a separate data list for each hash value */
132 struct list_struct {
133         tdb_off next; /* offset of the next record in the list */
134         tdb_len rec_len; /* total byte length of record */
135         tdb_len key_len; /* byte length of key */
136         tdb_len data_len; /* byte length of data */
137         u32 full_hash; /* the full 32 bit hash of the key */
138         u32 magic;   /* try to catch errors */
139         /* the following union is implied:
140                 union {
141                         char record[rec_len];
142                         struct {
143                                 char key[key_len];
144                                 char data[data_len];
145                         }
146                         u32 totalsize; (tailer)
147                 }
148         */
149 };
150
151 /* a byte range locking function - return 0 on success
152    this functions locks/unlocks 1 byte at the specified offset */
153 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
154                       int rw_type, int lck_type, int probe)
155 {
156         struct flock fl;
157
158         if (tdb->flags & TDB_NOLOCK)
159                 return 0;
160         if (tdb->read_only)
161                 return -1;
162
163         fl.l_type = rw_type;
164         fl.l_whence = SEEK_SET;
165         fl.l_start = offset;
166         fl.l_len = 1;
167         fl.l_pid = 0;
168
169         if (fcntl(tdb->fd,lck_type,&fl)) {
170                 if (!probe) {
171                         TDB_LOG((tdb, 5,"tdb_brlock failed at offset %d rw_type=%d lck_type=%d\n", 
172                                  offset, rw_type, lck_type));
173                 }
174                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
175         }
176         return 0;
177 }
178
179 /* lock a list in the database. list -1 is the alloc list */
180 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
181 {
182         if (list < -1 || list >= (int)tdb->header.hash_size) {
183                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
184                            list, ltype));
185                 return -1;
186         }
187         if (tdb->flags & TDB_NOLOCK)
188                 return 0;
189
190         /* Since fcntl locks don't nest, we do a lock for the first one,
191            and simply bump the count for future ones */
192         if (tdb->locked[list+1].count == 0) {
193                 if (tdb->header.rwlocks) {
194                         if (tdb_spinlock(tdb, list, ltype)) {
195                                 TDB_LOG((tdb, 0, "tdb_lock spinlock on list ltype=%d\n", 
196                                            list, ltype));
197                                 return -1;
198                         }
199                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
200                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
201                                            list, ltype, strerror(errno)));
202                         return -1;
203                 }
204                 tdb->locked[list+1].ltype = ltype;
205         }
206         tdb->locked[list+1].count++;
207         return 0;
208 }
209
210 /* unlock the database: returns void because it's too late for errors. */
211 static void tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
212 {
213         if (tdb->flags & TDB_NOLOCK)
214                 return;
215
216         /* Sanity checks */
217         if (list < -1 || list >= (int)tdb->header.hash_size)
218                 return;
219         if (tdb->locked[list+1].count==0)
220                 return;
221
222         if (tdb->locked[list+1].count == 1) {
223                 /* Down to last nested lock: unlock underneath */
224                 if (tdb->header.rwlocks)
225                         tdb_spinunlock(tdb, list, ltype);
226                 else
227                         tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
228         }
229         tdb->locked[list+1].count--;
230 }
231
232 /* This is based on the hash agorithm from gdbm */
233 static u32 tdb_hash(TDB_DATA *key)
234 {
235         u32 value;      /* Used to compute the hash value.  */
236         u32   i;        /* Used to cycle through random values. */
237
238         /* Set the initial value from the key size. */
239         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
240                 value = (value + (key->dptr[i] << (i*5 % 24)));
241
242         return (1103515243 * value + 12345);  
243 }
244
245 /* check for an out of bounds access - if it is out of bounds then
246    see if the database has been expanded by someone else and expand
247    if necessary 
248    note that "len" is the minimum length needed for the db
249 */
250 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
251 {
252         struct stat st;
253         if (len <= tdb->map_size)
254                 return 0;
255         if (tdb->flags & TDB_INTERNAL)
256                 return 0;
257
258         if (fstat(tdb->fd, &st) == -1)
259                 return TDB_ERRCODE(TDB_ERR_IO, -1);
260
261         if (st.st_size < (size_t)len) {
262                 if (!probe) {
263                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
264                                  (int)len, (int)st.st_size));
265                 }
266                 return TDB_ERRCODE(TDB_ERR_IO, -1);
267         }
268
269         /* Unmap, update size, remap */
270         tdb_munmap(tdb);
271         tdb->map_size = st.st_size;
272         tdb_mmap(tdb);
273         return 0;
274 }
275
276 /* write a lump of data at a specified offset */
277 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
278 {
279         if (tdb_oob(tdb, off + len, 0) != 0)
280                 return -1;
281
282         if (tdb->map_ptr)
283                 memcpy(off + (char *)tdb->map_ptr, buf, len);
284 #ifdef HAVE_PWRITE
285         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
286 #else
287         else if (lseek(tdb->fd, off, SEEK_SET) != off
288                  || write(tdb->fd, buf, len) != (ssize_t)len) {
289 #endif
290                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
291                            off, len, strerror(errno)));
292                 return TDB_ERRCODE(TDB_ERR_IO, -1);
293         }
294         return 0;
295 }
296
297 /* read a lump of data at a specified offset, maybe convert */
298 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
299 {
300         if (tdb_oob(tdb, off + len, 0) != 0)
301                 return -1;
302
303         if (tdb->map_ptr)
304                 memcpy(buf, off + (char *)tdb->map_ptr, len);
305 #ifdef HAVE_PREAD
306         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
307 #else
308         else if (lseek(tdb->fd, off, SEEK_SET) != off
309                  || read(tdb->fd, buf, len) != (ssize_t)len) {
310 #endif
311                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
312                            off, len, strerror(errno)));
313                 return TDB_ERRCODE(TDB_ERR_IO, -1);
314         }
315         if (cv)
316                 convert(buf, len);
317         return 0;
318 }
319
320 /* read a lump of data, allocating the space for it */
321 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
322 {
323         char *buf;
324
325         if (!(buf = malloc(len))) {
326                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
327                            len, strerror(errno)));
328                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
329         }
330         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
331                 free(buf);
332                 return NULL;
333         }
334         return buf;
335 }
336
337 /* read/write a tdb_off */
338 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
339 {
340         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
341 }
342 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
343 {
344         tdb_off off = *d;
345         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
346 }
347
348 /* read/write a record */
349 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
350 {
351         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
352                 return -1;
353         if (TDB_BAD_MAGIC(rec)) {
354                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
355                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
356         }
357         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
358 }
359 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
360 {
361         struct list_struct r = *rec;
362         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
363 }
364
365 /* read a freelist record and check for simple errors */
366 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
367 {
368         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
369                 return -1;
370         if (rec->magic != TDB_FREE_MAGIC) {
371                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
372                            rec->magic, off));
373                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
374         }
375         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
376                 return -1;
377         return 0;
378 }
379
380 /* update a record tailer (must hold allocation lock) */
381 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
382                          const struct list_struct *rec)
383 {
384         tdb_off totalsize;
385
386         /* Offset of tailer from record header */
387         totalsize = sizeof(*rec) + rec->rec_len;
388         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
389                          &totalsize);
390 }
391
392 #ifdef TDB_DEBUG
393 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
394 {
395         struct list_struct rec;
396         tdb_off tailer_ofs, tailer;
397
398         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
399                 printf("ERROR: failed to read record at %u\n", offset);
400                 return 0;
401         }
402
403         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
404                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
405
406         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
407         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
408                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
409                 return rec.next;
410         }
411
412         if (tailer != rec.rec_len + sizeof(rec)) {
413                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n", tailer, rec.rec_len + sizeof(rec));
414         }
415         return rec.next;
416 }
417
418 static void tdb_dump_chain(TDB_CONTEXT *tdb, int i)
419 {
420         tdb_off rec_ptr, top;
421
422         top = TDB_HASH_TOP(i);
423
424         tdb_lock(tdb, i, F_WRLCK);
425
426         if (ofs_read(tdb, top, &rec_ptr) == -1) {
427                 tdb_unlock(tdb, i, F_WRLCK);
428                 return;
429         }
430
431         if (rec_ptr)
432                 printf("hash=%d\n", i);
433
434         while (rec_ptr) {
435                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
436         }
437         tdb_unlock(tdb, i, F_WRLCK);
438 }
439
440 void tdb_dump_all(TDB_CONTEXT *tdb)
441 {
442         tdb_off off;
443         int i;
444         for (i=0;i<tdb->header.hash_size;i++) {
445                 tdb_dump_chain(tdb, i);
446         }
447         printf("freelist:\n");
448         tdb_dump_chain(tdb, -1);
449 }
450
451 void tdb_printfreelist(TDB_CONTEXT *tdb)
452 {
453         long total_free = 0;
454         tdb_off offset, rec_ptr, last_ptr;
455         struct list_struct rec, lastrec, newrec;
456
457         tdb_lock(tdb, -1, F_WRLCK);
458
459         last_ptr = 0;
460         offset = FREELIST_TOP;
461
462         /* read in the freelist top */
463         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
464                 return;
465         }
466
467         printf("freelist top=[0x%08x]\n", rec_ptr );
468         while (rec_ptr) {
469                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
470                         return;
471                 }
472
473                 if (rec.magic != TDB_FREE_MAGIC) {
474                         printf("bad magic 0x%08x in free list\n", rec.magic);
475                         return;
476                 }
477
478                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
479                 total_free += rec.rec_len;
480
481                 /* move to the next record */
482                 rec_ptr = rec.next;
483         }
484         printf("total rec_len = [0x%08x (%d)]\n", total_free, total_free );
485
486         tdb_unlock(tdb, -1, F_WRLCK);
487 }
488 #endif
489
490 /* Remove an element from the freelist.  Must have alloc lock. */
491 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
492 {
493         tdb_off last_ptr, i;
494
495         /* read in the freelist top */
496         last_ptr = FREELIST_TOP;
497         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
498                 if (i == off) {
499                         /* We've found it! */
500                         return ofs_write(tdb, last_ptr, &next);
501                 }
502                 /* Follow chain (next offset is at start of record) */
503                 last_ptr = i;
504         }
505         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
506         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
507 }
508
509 /* Add an element into the freelist. Merge adjacent records if
510    neccessary. */
511 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
512 {
513         tdb_off right, left;
514
515         /* Allocation and tailer lock */
516         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
517                 return -1;
518
519         /* set an initial tailer, so if we fail we don't leave a bogus record */
520         update_tailer(tdb, offset, rec);
521
522         /* Look right first (I'm an Australian, dammit) */
523         right = offset + sizeof(*rec) + rec->rec_len;
524         if (right + sizeof(*rec) <= tdb->map_size) {
525                 struct list_struct r;
526
527                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
528                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
529                         goto left;
530                 }
531
532                 /* If it's free, expand to include it. */
533                 if (r.magic == TDB_FREE_MAGIC) {
534                         if (remove_from_freelist(tdb, right, r.next) == -1) {
535                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
536                                 goto left;
537                         }
538                         rec->rec_len += sizeof(r) + r.rec_len;
539                 }
540         }
541
542 left:
543         /* Look left */
544         left = offset - sizeof(tdb_off);
545         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
546                 struct list_struct l;
547                 tdb_off leftsize;
548
549                 /* Read in tailer and jump back to header */
550                 if (ofs_read(tdb, left, &leftsize) == -1) {
551                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
552                         goto update;
553                 }
554                 left = offset - leftsize;
555
556                 /* Now read in record */
557                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
558                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
559                         goto update;
560                 }
561
562                 /* If it's free, expand to include it. */
563                 if (l.magic == TDB_FREE_MAGIC) {
564                         if (remove_from_freelist(tdb, left, l.next) == -1) {
565                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
566                                 goto update;
567                         } else {
568                                 offset = left;
569                                 rec->rec_len += leftsize;
570                         }
571                 }
572         }
573
574 update:
575         if (update_tailer(tdb, offset, rec) == -1) {
576                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
577                 goto fail;
578         }
579
580         /* Now, prepend to free list */
581         rec->magic = TDB_FREE_MAGIC;
582
583         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
584             rec_write(tdb, offset, rec) == -1 ||
585             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
586                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
587                 goto fail;
588         }
589
590         /* And we're done. */
591         tdb_unlock(tdb, -1, F_WRLCK);
592         return 0;
593
594  fail:
595         tdb_unlock(tdb, -1, F_WRLCK);
596         return -1;
597 }
598
599
600 /* expand a file.  we prefer to use ftruncate, as that is what posix
601   says to use for mmap expansion */
602 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
603 {
604         char buf[1024];
605 #if HAVE_FTRUNCATE_EXTEND
606         if (ftruncate(tdb->fd, size+addition) != 0) {
607                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
608                            size+addition, strerror(errno)));
609                 return -1;
610         }
611 #else
612         char b = 0;
613
614 #ifdef HAVE_PWRITE
615         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
616 #else
617         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
618             write(tdb->fd, &b, 1) != 1) {
619 #endif
620                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
621                            size+addition, strerror(errno)));
622                 return -1;
623         }
624 #endif
625
626         /* now fill the file with something. This ensures that the file isn't sparse, which would be
627            very bad if we ran out of disk. This must be done with write, not via mmap */
628         memset(buf, 0x42, sizeof(buf));
629         while (addition) {
630                 int n = addition>sizeof(buf)?sizeof(buf):addition;
631 #ifdef HAVE_PWRITE
632                 int ret = pwrite(tdb->fd, buf, n, size);
633 #else
634                 int ret;
635                 if (lseek(tdb->fd, size, SEEK_SET) != size)
636                         return -1;
637                 ret = write(tdb->fd, buf, n);
638 #endif
639                 if (ret != n) {
640                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
641                                    n, strerror(errno)));
642                         return -1;
643                 }
644                 addition -= n;
645                 size += n;
646         }
647         return 0;
648 }
649
650
651 /* expand the database at least size bytes by expanding the underlying
652    file and doing the mmap again if necessary */
653 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
654 {
655         struct list_struct rec;
656         tdb_off offset;
657
658         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
659                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
660                 return -1;
661         }
662
663         /* must know about any previous expansions by another process */
664         tdb_oob(tdb, tdb->map_size + 1, 1);
665
666         /* always make room for at least 10 more records, and round
667            the database up to a multiple of TDB_PAGE_SIZE */
668         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
669
670         if (!(tdb->flags & TDB_INTERNAL))
671                 tdb_munmap(tdb);
672
673         /*
674          * We must ensure the file is unmapped before doing this
675          * to ensure consistency with systems like OpenBSD where
676          * writes and mmaps are not consistent.
677          */
678
679         /* expand the file itself */
680         if (!(tdb->flags & TDB_INTERNAL)) {
681                 if (expand_file(tdb, tdb->map_size, size) != 0)
682                         goto fail;
683         }
684
685         tdb->map_size += size;
686
687         if (tdb->flags & TDB_INTERNAL)
688                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
689         else {
690                 /*
691                  * We must ensure the file is remapped before adding the space
692                  * to ensure consistency with systems like OpenBSD where
693                  * writes and mmaps are not consistent.
694                  */
695
696                 /* We're ok if the mmap fails as we'll fallback to read/write */
697                 tdb_mmap(tdb);
698         }
699
700         /* form a new freelist record */
701         memset(&rec,'\0',sizeof(rec));
702         rec.rec_len = size - sizeof(rec);
703
704         /* link it into the free list */
705         offset = tdb->map_size - size;
706         if (tdb_free(tdb, offset, &rec) == -1)
707                 goto fail;
708
709         tdb_unlock(tdb, -1, F_WRLCK);
710         return 0;
711  fail:
712         tdb_unlock(tdb, -1, F_WRLCK);
713         return -1;
714 }
715
716 /* allocate some space from the free list. The offset returned points
717    to a unconnected list_struct within the database with room for at
718    least length bytes of total data
719
720    0 is returned if the space could not be allocated
721  */
722 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
723                             struct list_struct *rec)
724 {
725         tdb_off rec_ptr, last_ptr, newrec_ptr;
726         struct list_struct newrec;
727
728         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
729                 return 0;
730
731         /* Extra bytes required for tailer */
732         length += sizeof(tdb_off);
733
734  again:
735         last_ptr = FREELIST_TOP;
736
737         /* read in the freelist top */
738         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
739                 goto fail;
740
741         /* keep looking until we find a freelist record big enough */
742         while (rec_ptr) {
743                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
744                         goto fail;
745
746                 if (rec->rec_len >= length) {
747                         /* found it - now possibly split it up  */
748                         if (rec->rec_len > length + MIN_REC_SIZE) {
749                                 /* Length of left piece */
750                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
751
752                                 /* Right piece to go on free list */
753                                 newrec.rec_len = rec->rec_len
754                                         - (sizeof(*rec) + length);
755                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
756
757                                 /* And left record is shortened */
758                                 rec->rec_len = length;
759                         } else
760                                 newrec_ptr = 0;
761
762                         /* Remove allocated record from the free list */
763                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
764                                 goto fail;
765
766                         /* Update header: do this before we drop alloc
767                            lock, otherwise tdb_free() might try to
768                            merge with us, thinking we're free.
769                            (Thanks Jeremy Allison). */
770                         rec->magic = TDB_MAGIC;
771                         if (rec_write(tdb, rec_ptr, rec) == -1)
772                                 goto fail;
773
774                         /* Did we create new block? */
775                         if (newrec_ptr) {
776                                 /* Update allocated record tailer (we
777                                    shortened it). */
778                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
779                                         goto fail;
780
781                                 /* Free new record */
782                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
783                                         goto fail;
784                         }
785
786                         /* all done - return the new record offset */
787                         tdb_unlock(tdb, -1, F_WRLCK);
788                         return rec_ptr;
789                 }
790                 /* move to the next record */
791                 last_ptr = rec_ptr;
792                 rec_ptr = rec->next;
793         }
794         /* we didn't find enough space. See if we can expand the
795            database and if we can then try again */
796         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
797                 goto again;
798  fail:
799         tdb_unlock(tdb, -1, F_WRLCK);
800         return 0;
801 }
802
803 /* initialise a new database with a specified hash size */
804 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
805 {
806         struct tdb_header *newdb;
807         int size, ret = -1;
808
809         /* We make it up in memory, then write it out if not internal */
810         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
811         if (!(newdb = calloc(size, 1)))
812                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
813
814         /* Fill in the header */
815         newdb->version = TDB_VERSION;
816         newdb->hash_size = hash_size;
817 #ifdef USE_SPINLOCKS
818         newdb->rwlocks = size;
819 #endif
820         if (tdb->flags & TDB_INTERNAL) {
821                 tdb->map_size = size;
822                 tdb->map_ptr = (char *)newdb;
823                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
824                 /* Convert the `ondisk' version if asked. */
825                 CONVERT(*newdb);
826                 return 0;
827         }
828         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
829                 goto fail;
830
831         if (ftruncate(tdb->fd, 0) == -1)
832                 goto fail;
833
834         /* This creates an endian-converted header, as if read from disk */
835         CONVERT(*newdb);
836         memcpy(&tdb->header, newdb, sizeof(tdb->header));
837         /* Don't endian-convert the magic food! */
838         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
839         if (write(tdb->fd, newdb, size) != size)
840                 ret = -1;
841         else
842                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
843
844   fail:
845         free(newdb);
846         return ret;
847 }
848
849 /* Returns 0 on fail.  On success, return offset of record, and fills
850    in rec */
851 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
852                         struct list_struct *r)
853 {
854         tdb_off rec_ptr;
855         
856         /* read in the hash top */
857         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
858                 return 0;
859
860         /* keep looking until we find the right record */
861         while (rec_ptr) {
862                 if (rec_read(tdb, rec_ptr, r) == -1)
863                         return 0;
864
865                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
866                         char *k;
867                         /* a very likely hit - read the key */
868                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
869                                            r->key_len);
870                         if (!k)
871                                 return 0;
872
873                         if (memcmp(key.dptr, k, key.dsize) == 0) {
874                                 free(k);
875                                 return rec_ptr;
876                         }
877                         free(k);
878                 }
879                 rec_ptr = r->next;
880         }
881         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
882 }
883
884 /* If they do lockkeys, check that this hash is one they locked */
885 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
886 {
887         u32 i;
888         if (!tdb->lockedkeys)
889                 return 1;
890         for (i = 0; i < tdb->lockedkeys[0]; i++)
891                 if (tdb->lockedkeys[i+1] == hash)
892                         return 1;
893         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
894 }
895
896 /* As tdb_find, but if you succeed, keep the lock */
897 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
898                              struct list_struct *rec)
899 {
900         u32 hash, rec_ptr;
901
902         hash = tdb_hash(&key);
903         if (!tdb_keylocked(tdb, hash))
904                 return 0;
905         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
906                 return 0;
907         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
908                 tdb_unlock(tdb, BUCKET(hash), locktype);
909         return rec_ptr;
910 }
911
912 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
913 {
914         return tdb->ecode;
915 }
916
917 static struct tdb_errname {
918         enum TDB_ERROR ecode; const char *estring;
919 } emap[] = { {TDB_SUCCESS, "Success"},
920              {TDB_ERR_CORRUPT, "Corrupt database"},
921              {TDB_ERR_IO, "IO Error"},
922              {TDB_ERR_LOCK, "Locking error"},
923              {TDB_ERR_OOM, "Out of memory"},
924              {TDB_ERR_EXISTS, "Record exists"},
925              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
926              {TDB_ERR_NOEXIST, "Record does not exist"} };
927
928 /* Error string for the last tdb error */
929 const char *tdb_errorstr(TDB_CONTEXT *tdb)
930 {
931         u32 i;
932         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
933                 if (tdb->ecode == emap[i].ecode)
934                         return emap[i].estring;
935         return "Invalid error code";
936 }
937
938 /* update an entry in place - this only works if the new data size
939    is <= the old data size and the key exists.
940    on failure return -1
941 */
942 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
943 {
944         struct list_struct rec;
945         tdb_off rec_ptr;
946         int ret = -1;
947
948         /* find entry */
949         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
950                 return -1;
951
952         /* must be long enough key, data and tailer */
953         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
954                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
955                 goto out;
956         }
957
958         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
959                       dbuf.dptr, dbuf.dsize) == -1)
960                 goto out;
961
962         if (dbuf.dsize != rec.data_len) {
963                 /* update size */
964                 rec.data_len = dbuf.dsize;
965                 ret = rec_write(tdb, rec_ptr, &rec);
966         } else
967                 ret = 0;
968  out:
969         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
970         return ret;
971 }
972
973 /* find an entry in the database given a key */
974 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
975 {
976         tdb_off rec_ptr;
977         struct list_struct rec;
978         TDB_DATA ret;
979
980         /* find which hash bucket it is in */
981         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
982                 return tdb_null;
983
984         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
985                                   rec.data_len);
986         ret.dsize = rec.data_len;
987         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
988         return ret;
989 }
990
991 /* check if an entry in the database exists 
992
993    note that 1 is returned if the key is found and 0 is returned if not found
994    this doesn't match the conventions in the rest of this module, but is
995    compatible with gdbm
996 */
997 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
998 {
999         struct list_struct rec;
1000         
1001         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1002                 return 0;
1003         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1004         return 1;
1005 }
1006
1007 /* record lock stops delete underneath */
1008 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1009 {
1010         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1011 }
1012 /* write locks override our own fcntl readlocks, so check it here */
1013 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1014 {
1015         struct tdb_traverse_lock *i;
1016         for (i = &tdb->travlocks; i; i = i->next)
1017                 if (i->off == off)
1018                         return -1;
1019         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1020 }
1021 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1022 {
1023         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1024 }
1025 /* fcntl locks don't stack: avoid unlocking someone else's */
1026 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1027 {
1028         struct tdb_traverse_lock *i;
1029         u32 count = 0;
1030
1031         if (off == 0)
1032                 return 0;
1033         for (i = &tdb->travlocks; i; i = i->next)
1034                 if (i->off == off)
1035                         count++;
1036         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1037 }
1038
1039 /* actually delete an entry in the database given the offset */
1040 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1041 {
1042         tdb_off last_ptr, i;
1043         struct list_struct lastrec;
1044
1045         if (tdb->read_only) return -1;
1046
1047         if (write_lock_record(tdb, rec_ptr) == -1) {
1048                 /* Someone traversing here: mark it as dead */
1049                 rec->magic = TDB_DEAD_MAGIC;
1050                 return rec_write(tdb, rec_ptr, rec);
1051         }
1052         write_unlock_record(tdb, rec_ptr);
1053
1054         /* find previous record in hash chain */
1055         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1056                 return -1;
1057         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1058                 if (rec_read(tdb, i, &lastrec) == -1)
1059                         return -1;
1060
1061         /* unlink it: next ptr is at start of record. */
1062         if (last_ptr == 0)
1063                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1064         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1065                 return -1;
1066
1067         /* recover the space */
1068         if (tdb_free(tdb, rec_ptr, rec) == -1)
1069                 return -1;
1070         return 0;
1071 }
1072
1073 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1074 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1075                          struct list_struct *rec)
1076 {
1077         int want_next = (tlock->off != 0);
1078
1079         /* No traversal allows if you've called tdb_lockkeys() */
1080         if (tdb->lockedkeys)
1081                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1082
1083         /* Lock each chain from the start one. */
1084         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1085                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1086                         return -1;
1087
1088                 /* No previous record?  Start at top of chain. */
1089                 if (!tlock->off) {
1090                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1091                                      &tlock->off) == -1)
1092                                 goto fail;
1093                 } else {
1094                         /* Otherwise unlock the previous record. */
1095                         unlock_record(tdb, tlock->off);
1096                 }
1097
1098                 if (want_next) {
1099                         /* We have offset of old record: grab next */
1100                         if (rec_read(tdb, tlock->off, rec) == -1)
1101                                 goto fail;
1102                         tlock->off = rec->next;
1103                 }
1104
1105                 /* Iterate through chain */
1106                 while( tlock->off) {
1107                         tdb_off current;
1108                         if (rec_read(tdb, tlock->off, rec) == -1)
1109                                 goto fail;
1110                         if (!TDB_DEAD(rec)) {
1111                                 /* Woohoo: we found one! */
1112                                 lock_record(tdb, tlock->off);
1113                                 return tlock->off;
1114                         }
1115                         /* Try to clean dead ones from old traverses */
1116                         current = tlock->off;
1117                         tlock->off = rec->next;
1118                         do_delete(tdb, current, rec);
1119                 }
1120                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1121                 want_next = 0;
1122         }
1123         /* We finished iteration without finding anything */
1124         return TDB_ERRCODE(TDB_SUCCESS, 0);
1125
1126  fail:
1127         tlock->off = 0;
1128         tdb_unlock(tdb, tlock->hash, F_WRLCK);
1129         return -1;
1130 }
1131
1132 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1133    return -1 on error or the record count traversed
1134    if fn is NULL then it is not called
1135    a non-zero return value from fn() indicates that the traversal should stop
1136   */
1137 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1138 {
1139         TDB_DATA key, dbuf;
1140         struct list_struct rec;
1141         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1142         int ret, count = 0;
1143
1144         /* This was in the initializaton, above, but the IRIX compiler
1145          * did not like it.  crh
1146          */
1147         tl.next = tdb->travlocks.next;
1148
1149         /* fcntl locks don't stack: beware traverse inside traverse */
1150         tdb->travlocks.next = &tl;
1151
1152         /* tdb_next_lock places locks on the record returned, and its chain */
1153         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1154                 count++;
1155                 /* now read the full record */
1156                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1157                                           rec.key_len + rec.data_len);
1158                 if (!key.dptr) {
1159                         tdb_unlock(tdb, tl.hash, F_WRLCK);
1160                         unlock_record(tdb, tl.off);
1161                         tdb->travlocks.next = tl.next;
1162                         return -1;
1163                 }
1164                 key.dsize = rec.key_len;
1165                 dbuf.dptr = key.dptr + rec.key_len;
1166                 dbuf.dsize = rec.data_len;
1167
1168                 /* Drop chain lock, call out */
1169                 tdb_unlock(tdb, tl.hash, F_WRLCK);
1170                 if (fn && fn(tdb, key, dbuf, state)) {
1171                         /* They want us to terminate traversal */
1172                         unlock_record(tdb, tl.off);
1173                         tdb->travlocks.next = tl.next;
1174                         free(key.dptr);
1175                         return count;
1176                 }
1177                 free(key.dptr);
1178         }
1179         tdb->travlocks.next = tl.next;
1180         if (ret < 0)
1181                 return -1;
1182         else
1183                 return count;
1184 }
1185
1186 /* find the first entry in the database and return its key */
1187 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1188 {
1189         TDB_DATA key;
1190         struct list_struct rec;
1191
1192         /* release any old lock */
1193         unlock_record(tdb, tdb->travlocks.off);
1194         tdb->travlocks.off = tdb->travlocks.hash = 0;
1195
1196         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1197                 return tdb_null;
1198         /* now read the key */
1199         key.dsize = rec.key_len;
1200         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1201         tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK);
1202         return key;
1203 }
1204
1205 /* find the next entry in the database, returning its key */
1206 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1207 {
1208         u32 oldhash;
1209         TDB_DATA key = tdb_null;
1210         struct list_struct rec;
1211         char *k = NULL;
1212
1213         /* Is locked key the old key?  If so, traverse will be reliable. */
1214         if (tdb->travlocks.off) {
1215                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1216                         return tdb_null;
1217                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1218                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1219                                             rec.key_len))
1220                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1221                         /* No, it wasn't: unlock it and start from scratch */
1222                         unlock_record(tdb, tdb->travlocks.off);
1223                         tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
1224                         tdb->travlocks.off = 0;
1225                 }
1226
1227                 if (k)
1228                         free(k);
1229         }
1230
1231         if (!tdb->travlocks.off) {
1232                 /* No previous element: do normal find, and lock record */
1233                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1234                 if (!tdb->travlocks.off)
1235                         return tdb_null;
1236                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1237                 lock_record(tdb, tdb->travlocks.off);
1238         }
1239         oldhash = tdb->travlocks.hash;
1240
1241         /* Grab next record: locks chain and returned record,
1242            unlocks old record */
1243         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1244                 key.dsize = rec.key_len;
1245                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1246                                           key.dsize);
1247                 /* Unlock the chain of this new record */
1248                 tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK);
1249         }
1250         /* Unlock the chain of old record */
1251         tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK);
1252         return key;
1253 }
1254
1255 /* delete an entry in the database given a key */
1256 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1257 {
1258         tdb_off rec_ptr;
1259         struct list_struct rec;
1260         int ret;
1261
1262         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1263                 return -1;
1264         ret = do_delete(tdb, rec_ptr, &rec);
1265         tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK);
1266         return ret;
1267 }
1268
1269 /* store an element in the database, replacing any existing element
1270    with the same key 
1271
1272    return 0 on success, -1 on failure
1273 */
1274 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1275 {
1276         struct list_struct rec;
1277         u32 hash;
1278         tdb_off rec_ptr;
1279         char *p = NULL;
1280         int ret = 0;
1281
1282         /* find which hash bucket it is in */
1283         hash = tdb_hash(&key);
1284         if (!tdb_keylocked(tdb, hash))
1285                 return -1;
1286         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1287                 return -1;
1288
1289         /* check for it existing, on insert. */
1290         if (flag == TDB_INSERT) {
1291                 if (tdb_exists(tdb, key)) {
1292                         tdb->ecode = TDB_ERR_EXISTS;
1293                         goto fail;
1294                 }
1295         } else {
1296                 /* first try in-place update, on modify or replace. */
1297                 if (tdb_update(tdb, key, dbuf) == 0)
1298                         goto out;
1299                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1300                         goto fail;
1301         }
1302         /* reset the error code potentially set by the tdb_update() */
1303         tdb->ecode = TDB_SUCCESS;
1304
1305         /* delete any existing record - if it doesn't exist we don't
1306            care.  Doing this first reduces fragmentation, and avoids
1307            coalescing with `allocated' block before it's updated. */
1308         if (flag != TDB_INSERT)
1309                 tdb_delete(tdb, key);
1310
1311         /* Copy key+value *before* allocating free space in case malloc
1312            fails and we are left with a dead spot in the tdb. */
1313
1314         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1315                 tdb->ecode = TDB_ERR_OOM;
1316                 goto fail;
1317         }
1318
1319         memcpy(p, key.dptr, key.dsize);
1320         memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1321
1322         /* now we're into insert / modify / replace of a record which
1323          * we know could not be optimised by an in-place store (for
1324          * various reasons).  */
1325         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1326                 goto fail;
1327
1328         /* Read hash top into next ptr */
1329         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1330                 goto fail;
1331
1332         rec.key_len = key.dsize;
1333         rec.data_len = dbuf.dsize;
1334         rec.full_hash = hash;
1335         rec.magic = TDB_MAGIC;
1336
1337         /* write out and point the top of the hash chain at it */
1338         if (rec_write(tdb, rec_ptr, &rec) == -1
1339             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1340             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1341         fail:
1342                 /* Need to tdb_unallocate() here */
1343                 ret = -1;
1344         }
1345  out:
1346         if (p)
1347                 free(p); 
1348         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1349         return ret;
1350 }
1351
1352 /* open the database, creating it if necessary 
1353
1354    The open_flags and mode are passed straight to the open call on the
1355    database file. A flags value of O_WRONLY is invalid. The hash size
1356    is advisory, use zero for a default value.
1357
1358    return is NULL on error */
1359 TDB_CONTEXT *tdb_open(char *name, int hash_size, int tdb_flags,
1360                       int open_flags, mode_t mode)
1361 {
1362         TDB_CONTEXT tdb, *ret, *i;
1363         struct stat st;
1364         int rev = 0, locked;
1365
1366         memset(&tdb, 0, sizeof(tdb));
1367         tdb.fd = -1;
1368         tdb.name = NULL;
1369         tdb.map_ptr = NULL;
1370         tdb.lockedkeys = NULL;
1371         tdb.flags = tdb_flags;
1372
1373         if ((open_flags & O_ACCMODE) == O_WRONLY)
1374                 goto fail;
1375         if (hash_size == 0)
1376                 hash_size = DEFAULT_HASH_SIZE;
1377         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1378                 tdb.read_only = 1;
1379                 /* read only databases don't do locking */
1380                 tdb.flags |= TDB_NOLOCK;
1381         }
1382
1383         /* internal databases don't mmap or lock, and start off cleared */
1384         if (tdb.flags & TDB_INTERNAL) {
1385                 tdb.flags |= (TDB_NOLOCK | TDB_NOMMAP);
1386                 tdb.flags &= ~TDB_CLEAR_IF_FIRST;
1387                 tdb_new_database(&tdb, hash_size);
1388                 goto internal;
1389         }
1390
1391         if ((tdb.fd = open(name, open_flags, mode)) == -1)
1392                 goto fail;
1393
1394         /* ensure there is only one process initialising at once */
1395         if (tdb_brlock(&tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1)
1396                 goto fail;
1397
1398         /* we need to zero database if we are the only one with it open */
1399         if ((locked = (tdb_brlock(&tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1400             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1401                 open_flags |= O_CREAT;
1402                 if (ftruncate(tdb.fd, 0) == -1)
1403                         goto fail;
1404         }
1405
1406         if (read(tdb.fd, &tdb.header, sizeof(tdb.header)) != sizeof(tdb.header)
1407             || strcmp(tdb.header.magic_food, TDB_MAGIC_FOOD) != 0
1408             || (tdb.header.version != TDB_VERSION
1409                 && !(rev = (tdb.header.version==TDB_BYTEREV(TDB_VERSION))))) {
1410                 /* its not a valid database - possibly initialise it */
1411                 if (!(open_flags & O_CREAT) || tdb_new_database(&tdb, hash_size) == -1)
1412                         goto fail;
1413                 rev = (tdb.flags & TDB_CONVERT);
1414         }
1415         if (!rev)
1416                 tdb.flags &= ~TDB_CONVERT;
1417         else {
1418                 tdb.flags |= TDB_CONVERT;
1419                 convert(&tdb.header, sizeof(tdb.header));
1420         }
1421         if (fstat(tdb.fd, &st) == -1)
1422                 goto fail;
1423
1424         /* Is it already in the open list?  If so, fail. */
1425         for (i = tdbs; i; i = i->next) {
1426                 if (i->device == st.st_dev && i->inode == st.st_ino) {
1427                         errno = EBUSY;
1428                         close(tdb.fd);
1429                         return NULL;
1430                 }
1431         }
1432
1433         /* map the database and fill in the return structure */
1434         tdb.name = (char *)strdup(name);
1435         if (!tdb.name)
1436                 goto fail;
1437         tdb.map_size = st.st_size;
1438         tdb.device = st.st_dev;
1439         tdb.inode = st.st_ino;
1440         tdb.locked = calloc(tdb.header.hash_size+1, sizeof(tdb.locked[0]));
1441         if (!tdb.locked)
1442                 goto fail;
1443         tdb_mmap(&tdb);
1444         if (locked) {
1445                 tdb_clear_spinlocks(&tdb);
1446                 if (tdb_brlock(&tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1)
1447                         goto fail;
1448         }
1449         /* leave this lock in place to indicate it's in use */
1450         if (tdb_brlock(&tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1451                 goto fail;
1452
1453  internal:
1454         if (!(ret = malloc(sizeof(tdb))))
1455                 goto fail;
1456         *ret = tdb;
1457         if (tdb_brlock(&tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1458                 goto fail;
1459         ret->next = tdbs;
1460         tdbs = ret;
1461         return ret;
1462
1463  fail:
1464         if (tdb.map_ptr) {
1465                 if (tdb.flags & TDB_INTERNAL)
1466                         free(tdb.map_ptr);
1467                 else
1468                         tdb_munmap(&tdb);
1469         }
1470         if (tdb.name)
1471                 free(tdb.name);
1472         if (tdb.fd != -1)
1473                 close(tdb.fd);
1474         if (tdb.locked)
1475                 free(tdb.locked);
1476         return NULL;
1477 }
1478
1479 /* close a database */
1480 int tdb_close(TDB_CONTEXT *tdb)
1481 {
1482         TDB_CONTEXT **i;
1483         int ret = 0;
1484
1485         if (tdb->map_ptr) {
1486                 if (tdb->flags & TDB_INTERNAL)
1487                         free(tdb->map_ptr);
1488                 else
1489                         tdb_munmap(tdb);
1490         }
1491         if (tdb->name)
1492                 free(tdb->name);
1493         if (tdb->fd != -1)
1494                 ret = close(tdb->fd);
1495         if (tdb->locked)
1496                 free(tdb->locked);
1497         if (tdb->lockedkeys)
1498                 free(tdb->lockedkeys);
1499
1500         /* Remove from contexts list */
1501         for (i = &tdbs; *i; i = &(*i)->next) {
1502                 if (*i == tdb) {
1503                         *i = tdb->next;
1504                         break;
1505                 }
1506         }
1507
1508         memset(tdb, 0, sizeof(*tdb));
1509         free(tdb);
1510
1511         return ret;
1512 }
1513
1514 /* lock/unlock entire database */
1515 int tdb_lockall(TDB_CONTEXT *tdb)
1516 {
1517         u32 i;
1518
1519         /* There are no locks on read-only dbs */
1520         if (tdb->read_only)
1521                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1522         if (tdb->lockedkeys)
1523                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1524         for (i = 0; i < tdb->header.hash_size; i++) 
1525                 if (tdb_lock(tdb, i, F_WRLCK))
1526                         break;
1527
1528         /* If error, release locks we have... */
1529         if (i < tdb->header.hash_size) {
1530                 u32 j;
1531
1532                 for ( j = 0; j < i; j++)
1533                         tdb_unlock(tdb, j, F_WRLCK);
1534                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1535         }
1536
1537         return 0;
1538 }
1539 void tdb_unlockall(TDB_CONTEXT *tdb)
1540 {
1541         u32 i;
1542         for (i=0; i < tdb->header.hash_size; i++)
1543                 tdb_unlock(tdb, i, F_WRLCK);
1544 }
1545
1546 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1547 {
1548         u32 i, j, hash;
1549
1550         /* Can't lock more keys if already locked */
1551         if (tdb->lockedkeys)
1552                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1553         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1554                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1555         /* First number in array is # keys */
1556         tdb->lockedkeys[0] = number;
1557
1558         /* Insertion sort by bucket */
1559         for (i = 0; i < number; i++) {
1560                 hash = tdb_hash(&keys[i]);
1561                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1562                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1563                 tdb->lockedkeys[j+1] = hash;
1564         }
1565         /* Finally, lock in order */
1566         for (i = 0; i < number; i++)
1567                 if (tdb_lock(tdb, i, F_WRLCK))
1568                         break;
1569
1570         /* If error, release locks we have... */
1571         if (i < number) {
1572                 for ( j = 0; j < i; j++)
1573                         tdb_unlock(tdb, j, F_WRLCK);
1574                 free(tdb->lockedkeys);
1575                 tdb->lockedkeys = NULL;
1576                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1577         }
1578         return 0;
1579 }
1580
1581 /* Unlock the keys previously locked by tdb_lockkeys() */
1582 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1583 {
1584         u32 i;
1585         for (i = 0; i < tdb->lockedkeys[0]; i++)
1586                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1587         free(tdb->lockedkeys);
1588         tdb->lockedkeys = NULL;
1589 }
1590
1591 /* lock/unlock one hash chain. This is meant to be used to reduce
1592    contention - it cannot guarantee how many records will be locked */
1593 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1594 {
1595         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1596 }
1597 void tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1598 {
1599         tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1600 }
1601
1602
1603 /* register a loging function */
1604 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1605 {
1606         tdb->log_fn = fn;
1607 }