update lib/tdb from samba4
[kai/samba-autobuild/.git] / ctdb / lib / tdb / common / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3
4    trivial database library
5
6    Copyright (C) Andrew Tridgell              1999-2005
7    Copyright (C) Paul `Rusty' Russell              2000
8    Copyright (C) Jeremy Allison                    2000-2003
9    
10      ** NOTE! The following LGPL license applies to the tdb
11      ** library. This does NOT imply that all of Samba is released
12      ** under the LGPL
13    
14    This library is free software; you can redistribute it and/or
15    modify it under the terms of the GNU Lesser General Public
16    License as published by the Free Software Foundation; either
17    version 3 of the License, or (at your option) any later version.
18
19    This library is distributed in the hope that it will be useful,
20    but WITHOUT ANY WARRANTY; without even the implied warranty of
21    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22    Lesser General Public License for more details.
23
24    You should have received a copy of the GNU Lesser General Public
25    License along with this library; if not, see <http://www.gnu.org/licenses/>.
26 */
27
28 #include "tdb_private.h"
29
30 TDB_DATA tdb_null;
31
32 /*
33   non-blocking increment of the tdb sequence number if the tdb has been opened using
34   the TDB_SEQNUM flag
35 */
36 void tdb_increment_seqnum_nonblock(struct tdb_context *tdb)
37 {
38         tdb_off_t seqnum=0;
39         
40         if (!(tdb->flags & TDB_SEQNUM)) {
41                 return;
42         }
43
44         /* we ignore errors from this, as we have no sane way of
45            dealing with them.
46         */
47         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
48         seqnum++;
49         tdb_ofs_write(tdb, TDB_SEQNUM_OFS, &seqnum);
50 }
51
52 /*
53   increment the tdb sequence number if the tdb has been opened using
54   the TDB_SEQNUM flag
55 */
56 static void tdb_increment_seqnum(struct tdb_context *tdb)
57 {
58         if (!(tdb->flags & TDB_SEQNUM)) {
59                 return;
60         }
61
62         if (tdb_brlock(tdb, TDB_SEQNUM_OFS, F_WRLCK, F_SETLKW, 1, 1) != 0) {
63                 return;
64         }
65
66         tdb_increment_seqnum_nonblock(tdb);
67
68         tdb_brlock(tdb, TDB_SEQNUM_OFS, F_UNLCK, F_SETLKW, 1, 1);
69 }
70
71 static int tdb_key_compare(TDB_DATA key, TDB_DATA data, void *private_data)
72 {
73         return memcmp(data.dptr, key.dptr, data.dsize);
74 }
75
76 /* Returns 0 on fail.  On success, return offset of record, and fills
77    in rec */
78 static tdb_off_t tdb_find(struct tdb_context *tdb, TDB_DATA key, u32 hash,
79                         struct list_struct *r)
80 {
81         tdb_off_t rec_ptr;
82         
83         /* read in the hash top */
84         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
85                 return 0;
86
87         /* keep looking until we find the right record */
88         while (rec_ptr) {
89                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
90                         return 0;
91
92                 if (!TDB_DEAD(r) && hash==r->full_hash
93                     && key.dsize==r->key_len
94                     && tdb_parse_data(tdb, key, rec_ptr + sizeof(*r),
95                                       r->key_len, tdb_key_compare,
96                                       NULL) == 0) {
97                         return rec_ptr;
98                 }
99                 rec_ptr = r->next;
100         }
101         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
102 }
103
104 /* As tdb_find, but if you succeed, keep the lock */
105 tdb_off_t tdb_find_lock_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, int locktype,
106                            struct list_struct *rec)
107 {
108         u32 rec_ptr;
109
110         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
111                 return 0;
112         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
113                 tdb_unlock(tdb, BUCKET(hash), locktype);
114         return rec_ptr;
115 }
116
117
118 /* update an entry in place - this only works if the new data size
119    is <= the old data size and the key exists.
120    on failure return -1.
121 */
122 static int tdb_update_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash, TDB_DATA dbuf)
123 {
124         struct list_struct rec;
125         tdb_off_t rec_ptr;
126
127         /* find entry */
128         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec)))
129                 return -1;
130
131         /* must be long enough key, data and tailer */
132         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off_t)) {
133                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
134                 return -1;
135         }
136
137         if (tdb->methods->tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
138                       dbuf.dptr, dbuf.dsize) == -1)
139                 return -1;
140
141         if (dbuf.dsize != rec.data_len) {
142                 /* update size */
143                 rec.data_len = dbuf.dsize;
144                 return tdb_rec_write(tdb, rec_ptr, &rec);
145         }
146  
147         return 0;
148 }
149
150 /* find an entry in the database given a key */
151 /* If an entry doesn't exist tdb_err will be set to
152  * TDB_ERR_NOEXIST. If a key has no data attached
153  * then the TDB_DATA will have zero length but
154  * a non-zero pointer
155  */
156 TDB_DATA tdb_fetch(struct tdb_context *tdb, TDB_DATA key)
157 {
158         tdb_off_t rec_ptr;
159         struct list_struct rec;
160         TDB_DATA ret;
161         u32 hash;
162
163         /* find which hash bucket it is in */
164         hash = tdb->hash_fn(&key);
165         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec)))
166                 return tdb_null;
167
168         ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
169                                   rec.data_len);
170         ret.dsize = rec.data_len;
171         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
172         return ret;
173 }
174
175 /*
176  * Find an entry in the database and hand the record's data to a parsing
177  * function. The parsing function is executed under the chain read lock, so it
178  * should be fast and should not block on other syscalls.
179  *
180  * DONT CALL OTHER TDB CALLS FROM THE PARSER, THIS MIGHT LEAD TO SEGFAULTS.
181  *
182  * For mmapped tdb's that do not have a transaction open it points the parsing
183  * function directly at the mmap area, it avoids the malloc/memcpy in this
184  * case. If a transaction is open or no mmap is available, it has to do
185  * malloc/read/parse/free.
186  *
187  * This is interesting for all readers of potentially large data structures in
188  * the tdb records, ldb indexes being one example.
189  */
190
191 int tdb_parse_record(struct tdb_context *tdb, TDB_DATA key,
192                      int (*parser)(TDB_DATA key, TDB_DATA data,
193                                    void *private_data),
194                      void *private_data)
195 {
196         tdb_off_t rec_ptr;
197         struct list_struct rec;
198         int ret;
199         u32 hash;
200
201         /* find which hash bucket it is in */
202         hash = tdb->hash_fn(&key);
203
204         if (!(rec_ptr = tdb_find_lock_hash(tdb,key,hash,F_RDLCK,&rec))) {
205                 return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
206         }
207
208         ret = tdb_parse_data(tdb, key, rec_ptr + sizeof(rec) + rec.key_len,
209                              rec.data_len, parser, private_data);
210
211         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
212
213         return ret;
214 }
215
216 /* check if an entry in the database exists 
217
218    note that 1 is returned if the key is found and 0 is returned if not found
219    this doesn't match the conventions in the rest of this module, but is
220    compatible with gdbm
221 */
222 static int tdb_exists_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
223 {
224         struct list_struct rec;
225         
226         if (tdb_find_lock_hash(tdb, key, hash, F_RDLCK, &rec) == 0)
227                 return 0;
228         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
229         return 1;
230 }
231
232 int tdb_exists(struct tdb_context *tdb, TDB_DATA key)
233 {
234         u32 hash = tdb->hash_fn(&key);
235         return tdb_exists_hash(tdb, key, hash);
236 }
237
238 /* actually delete an entry in the database given the offset */
239 int tdb_do_delete(struct tdb_context *tdb, tdb_off_t rec_ptr, struct list_struct*rec)
240 {
241         tdb_off_t last_ptr, i;
242         struct list_struct lastrec;
243
244         if (tdb->read_only || tdb->traverse_read) return -1;
245
246         if (tdb_write_lock_record(tdb, rec_ptr) == -1) {
247                 /* Someone traversing here: mark it as dead */
248                 rec->magic = TDB_DEAD_MAGIC;
249                 return tdb_rec_write(tdb, rec_ptr, rec);
250         }
251         if (tdb_write_unlock_record(tdb, rec_ptr) != 0)
252                 return -1;
253
254         /* find previous record in hash chain */
255         if (tdb_ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
256                 return -1;
257         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
258                 if (tdb_rec_read(tdb, i, &lastrec) == -1)
259                         return -1;
260
261         /* unlink it: next ptr is at start of record. */
262         if (last_ptr == 0)
263                 last_ptr = TDB_HASH_TOP(rec->full_hash);
264         if (tdb_ofs_write(tdb, last_ptr, &rec->next) == -1)
265                 return -1;
266
267         /* recover the space */
268         if (tdb_free(tdb, rec_ptr, rec) == -1)
269                 return -1;
270         return 0;
271 }
272
273 static int tdb_count_dead(struct tdb_context *tdb, u32 hash)
274 {
275         int res = 0;
276         tdb_off_t rec_ptr;
277         struct list_struct rec;
278         
279         /* read in the hash top */
280         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
281                 return 0;
282
283         while (rec_ptr) {
284                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1)
285                         return 0;
286
287                 if (rec.magic == TDB_DEAD_MAGIC) {
288                         res += 1;
289                 }
290                 rec_ptr = rec.next;
291         }
292         return res;
293 }
294
295 /*
296  * Purge all DEAD records from a hash chain
297  */
298 static int tdb_purge_dead(struct tdb_context *tdb, u32 hash)
299 {
300         int res = -1;
301         struct list_struct rec;
302         tdb_off_t rec_ptr;
303
304         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
305                 return -1;
306         }
307         
308         /* read in the hash top */
309         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
310                 goto fail;
311
312         while (rec_ptr) {
313                 tdb_off_t next;
314
315                 if (tdb_rec_read(tdb, rec_ptr, &rec) == -1) {
316                         goto fail;
317                 }
318
319                 next = rec.next;
320
321                 if (rec.magic == TDB_DEAD_MAGIC
322                     && tdb_do_delete(tdb, rec_ptr, &rec) == -1) {
323                         goto fail;
324                 }
325                 rec_ptr = next;
326         }
327         res = 0;
328  fail:
329         tdb_unlock(tdb, -1, F_WRLCK);
330         return res;
331 }
332
333 /* delete an entry in the database given a key */
334 static int tdb_delete_hash(struct tdb_context *tdb, TDB_DATA key, u32 hash)
335 {
336         tdb_off_t rec_ptr;
337         struct list_struct rec;
338         int ret;
339
340         if (tdb->max_dead_records != 0) {
341
342                 /*
343                  * Allow for some dead records per hash chain, mainly for
344                  * tdb's with a very high create/delete rate like locking.tdb.
345                  */
346
347                 if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
348                         return -1;
349
350                 if (tdb_count_dead(tdb, hash) >= tdb->max_dead_records) {
351                         /*
352                          * Don't let the per-chain freelist grow too large,
353                          * delete all existing dead records
354                          */
355                         tdb_purge_dead(tdb, hash);
356                 }
357
358                 if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
359                         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
360                         return -1;
361                 }
362
363                 /*
364                  * Just mark the record as dead.
365                  */
366                 rec.magic = TDB_DEAD_MAGIC;
367                 ret = tdb_rec_write(tdb, rec_ptr, &rec);
368         }
369         else {
370                 if (!(rec_ptr = tdb_find_lock_hash(tdb, key, hash, F_WRLCK,
371                                                    &rec)))
372                         return -1;
373
374                 ret = tdb_do_delete(tdb, rec_ptr, &rec);
375         }
376
377         if (ret == 0) {
378                 tdb_increment_seqnum(tdb);
379         }
380
381         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
382                 TDB_LOG((tdb, TDB_DEBUG_WARNING, "tdb_delete: WARNING tdb_unlock failed!\n"));
383         return ret;
384 }
385
386 int tdb_delete(struct tdb_context *tdb, TDB_DATA key)
387 {
388         u32 hash = tdb->hash_fn(&key);
389         return tdb_delete_hash(tdb, key, hash);
390 }
391
392 /*
393  * See if we have a dead record around with enough space
394  */
395 static tdb_off_t tdb_find_dead(struct tdb_context *tdb, u32 hash,
396                                struct list_struct *r, tdb_len_t length)
397 {
398         tdb_off_t rec_ptr;
399         
400         /* read in the hash top */
401         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
402                 return 0;
403
404         /* keep looking until we find the right record */
405         while (rec_ptr) {
406                 if (tdb_rec_read(tdb, rec_ptr, r) == -1)
407                         return 0;
408
409                 if (TDB_DEAD(r) && r->rec_len >= length) {
410                         /*
411                          * First fit for simple coding, TODO: change to best
412                          * fit
413                          */
414                         return rec_ptr;
415                 }
416                 rec_ptr = r->next;
417         }
418         return 0;
419 }
420
421 /* store an element in the database, replacing any existing element
422    with the same key 
423
424    return 0 on success, -1 on failure
425 */
426 int tdb_store(struct tdb_context *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
427 {
428         struct list_struct rec;
429         u32 hash;
430         tdb_off_t rec_ptr;
431         char *p = NULL;
432         int ret = -1;
433
434         if (tdb->read_only || tdb->traverse_read) {
435                 tdb->ecode = TDB_ERR_RDONLY;
436                 return -1;
437         }
438
439         /* find which hash bucket it is in */
440         hash = tdb->hash_fn(&key);
441         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
442                 return -1;
443
444         /* check for it existing, on insert. */
445         if (flag == TDB_INSERT) {
446                 if (tdb_exists_hash(tdb, key, hash)) {
447                         tdb->ecode = TDB_ERR_EXISTS;
448                         goto fail;
449                 }
450         } else {
451                 /* first try in-place update, on modify or replace. */
452                 if (tdb_update_hash(tdb, key, hash, dbuf) == 0) {
453                         goto done;
454                 }
455                 if (tdb->ecode == TDB_ERR_NOEXIST &&
456                     flag == TDB_MODIFY) {
457                         /* if the record doesn't exist and we are in TDB_MODIFY mode then
458                          we should fail the store */
459                         goto fail;
460                 }
461         }
462         /* reset the error code potentially set by the tdb_update() */
463         tdb->ecode = TDB_SUCCESS;
464
465         /* delete any existing record - if it doesn't exist we don't
466            care.  Doing this first reduces fragmentation, and avoids
467            coalescing with `allocated' block before it's updated. */
468         if (flag != TDB_INSERT)
469                 tdb_delete_hash(tdb, key, hash);
470
471         /* Copy key+value *before* allocating free space in case malloc
472            fails and we are left with a dead spot in the tdb. */
473
474         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
475                 tdb->ecode = TDB_ERR_OOM;
476                 goto fail;
477         }
478
479         memcpy(p, key.dptr, key.dsize);
480         if (dbuf.dsize)
481                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
482
483         if (tdb->max_dead_records != 0) {
484                 /*
485                  * Allow for some dead records per hash chain, look if we can
486                  * find one that can hold the new record. We need enough space
487                  * for key, data and tailer. If we find one, we don't have to
488                  * consult the central freelist.
489                  */
490                 rec_ptr = tdb_find_dead(
491                         tdb, hash, &rec,
492                         key.dsize + dbuf.dsize + sizeof(tdb_off_t));
493
494                 if (rec_ptr != 0) {
495                         rec.key_len = key.dsize;
496                         rec.data_len = dbuf.dsize;
497                         rec.full_hash = hash;
498                         rec.magic = TDB_MAGIC;
499                         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
500                             || tdb->methods->tdb_write(
501                                     tdb, rec_ptr + sizeof(rec),
502                                     p, key.dsize + dbuf.dsize) == -1) {
503                                 goto fail;
504                         }
505                         goto done;
506                 }
507         }
508
509         /*
510          * We have to allocate some space from the freelist, so this means we
511          * have to lock it. Use the chance to purge all the DEAD records from
512          * the hash chain under the freelist lock.
513          */
514
515         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
516                 goto fail;
517         }
518
519         if ((tdb->max_dead_records != 0)
520             && (tdb_purge_dead(tdb, hash) == -1)) {
521                 tdb_unlock(tdb, -1, F_WRLCK);
522                 goto fail;
523         }
524
525         /* we have to allocate some space */
526         rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec);
527
528         tdb_unlock(tdb, -1, F_WRLCK);
529
530         if (rec_ptr == 0) {
531                 goto fail;
532         }
533
534         /* Read hash top into next ptr */
535         if (tdb_ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
536                 goto fail;
537
538         rec.key_len = key.dsize;
539         rec.data_len = dbuf.dsize;
540         rec.full_hash = hash;
541         rec.magic = TDB_MAGIC;
542
543         /* write out and point the top of the hash chain at it */
544         if (tdb_rec_write(tdb, rec_ptr, &rec) == -1
545             || tdb->methods->tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
546             || tdb_ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
547                 /* Need to tdb_unallocate() here */
548                 goto fail;
549         }
550
551  done:
552         ret = 0;
553  fail:
554         if (ret == 0) {
555                 tdb_increment_seqnum(tdb);
556         }
557
558         SAFE_FREE(p); 
559         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
560         return ret;
561 }
562
563
564 /* Append to an entry. Create if not exist. */
565 int tdb_append(struct tdb_context *tdb, TDB_DATA key, TDB_DATA new_dbuf)
566 {
567         u32 hash;
568         TDB_DATA dbuf;
569         int ret = -1;
570
571         /* find which hash bucket it is in */
572         hash = tdb->hash_fn(&key);
573         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
574                 return -1;
575
576         dbuf = tdb_fetch(tdb, key);
577
578         if (dbuf.dptr == NULL) {
579                 dbuf.dptr = (unsigned char *)malloc(new_dbuf.dsize);
580         } else {
581                 dbuf.dptr = (unsigned char *)realloc(dbuf.dptr,
582                                                      dbuf.dsize + new_dbuf.dsize);
583         }
584
585         if (dbuf.dptr == NULL) {
586                 tdb->ecode = TDB_ERR_OOM;
587                 goto failed;
588         }
589
590         memcpy(dbuf.dptr + dbuf.dsize, new_dbuf.dptr, new_dbuf.dsize);
591         dbuf.dsize += new_dbuf.dsize;
592
593         ret = tdb_store(tdb, key, dbuf, 0);
594         
595 failed:
596         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
597         SAFE_FREE(dbuf.dptr);
598         return ret;
599 }
600
601
602 /*
603   return the name of the current tdb file
604   useful for external logging functions
605 */
606 const char *tdb_name(struct tdb_context *tdb)
607 {
608         return tdb->name;
609 }
610
611 /*
612   return the underlying file descriptor being used by tdb, or -1
613   useful for external routines that want to check the device/inode
614   of the fd
615 */
616 int tdb_fd(struct tdb_context *tdb)
617 {
618         return tdb->fd;
619 }
620
621 /*
622   return the current logging function
623   useful for external tdb routines that wish to log tdb errors
624 */
625 tdb_log_func tdb_log_fn(struct tdb_context *tdb)
626 {
627         return tdb->log.log_fn;
628 }
629
630
631 /*
632   get the tdb sequence number. Only makes sense if the writers opened
633   with TDB_SEQNUM set. Note that this sequence number will wrap quite
634   quickly, so it should only be used for a 'has something changed'
635   test, not for code that relies on the count of the number of changes
636   made. If you want a counter then use a tdb record.
637
638   The aim of this sequence number is to allow for a very lightweight
639   test of a possible tdb change.
640 */
641 int tdb_get_seqnum(struct tdb_context *tdb)
642 {
643         tdb_off_t seqnum=0;
644
645         tdb_ofs_read(tdb, TDB_SEQNUM_OFS, &seqnum);
646         return seqnum;
647 }
648
649 int tdb_hash_size(struct tdb_context *tdb)
650 {
651         return tdb->header.hash_size;
652 }
653
654 size_t tdb_map_size(struct tdb_context *tdb)
655 {
656         return tdb->map_size;
657 }
658
659 int tdb_get_flags(struct tdb_context *tdb)
660 {
661         return tdb->flags;
662 }
663
664
665 /*
666   enable sequence number handling on an open tdb
667 */
668 void tdb_enable_seqnum(struct tdb_context *tdb)
669 {
670         tdb->flags |= TDB_SEQNUM;
671 }