r10656: BIG merge from trunk. Features not copied over
[kai/samba.git] / source3 / locking / brlock.c
1 /* 
2    Unix SMB/CIFS implementation.
3    byte range locking code
4    Updated to handle range splits/merges.
5
6    Copyright (C) Andrew Tridgell 1992-2000
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 /* This module implements a tdb based byte range locking service,
25    replacing the fcntl() based byte range locking previously
26    used. This allows us to provide the same semantics as NT */
27
28 #include "includes.h"
29
30 #undef DBGC_CLASS
31 #define DBGC_CLASS DBGC_LOCKING
32
33 #define ZERO_ZERO 0
34
35 /* This contains elements that differentiate locks. The smbpid is a
36    client supplied pid, and is essentially the locking context for
37    this client */
38
39 struct lock_context {
40         uint16 smbpid;
41         uint16 tid;
42         struct process_id pid;
43 };
44
45 /* The data in brlock records is an unsorted linear array of these
46    records.  It is unnecessary to store the count as tdb provides the
47    size of the record */
48
49 struct lock_struct {
50         struct lock_context context;
51         br_off start;
52         br_off size;
53         int fnum;
54         enum brl_type lock_type;
55 };
56
57 /* The key used in the brlock database. */
58
59 struct lock_key {
60         SMB_DEV_T device;
61         SMB_INO_T inode;
62 };
63
64 /* The open brlock.tdb database. */
65
66 static TDB_CONTEXT *tdb;
67
68 /****************************************************************************
69  Create a locking key - ensuring zero filled for pad purposes.
70 ****************************************************************************/
71
72 static TDB_DATA locking_key(SMB_DEV_T dev, SMB_INO_T inode)
73 {
74         static struct lock_key key;
75         TDB_DATA kbuf;
76
77         memset(&key, '\0', sizeof(key));
78         key.device = dev;
79         key.inode = inode;
80         kbuf.dptr = (char *)&key;
81         kbuf.dsize = sizeof(key);
82         return kbuf;
83 }
84
85 /****************************************************************************
86  See if two locking contexts are equal.
87 ****************************************************************************/
88
89 static BOOL brl_same_context(struct lock_context *ctx1, 
90                              struct lock_context *ctx2)
91 {
92         return (procid_equal(&ctx1->pid, &ctx2->pid) &&
93                 (ctx1->smbpid == ctx2->smbpid) &&
94                 (ctx1->tid == ctx2->tid));
95 }
96
97 /****************************************************************************
98  See if lck1 and lck2 overlap.
99 ****************************************************************************/
100
101 static BOOL brl_overlap(struct lock_struct *lck1,
102                         struct lock_struct *lck2)
103 {
104         /* this extra check is not redundent - it copes with locks
105            that go beyond the end of 64 bit file space */
106         if (lck1->size != 0 &&
107             lck1->start == lck2->start &&
108             lck1->size == lck2->size) {
109                 return True;
110         }
111
112         if (lck1->start >= (lck2->start+lck2->size) ||
113             lck2->start >= (lck1->start+lck1->size)) {
114                 return False;
115         }
116         return True;
117 }
118
119 /****************************************************************************
120  See if lock2 can be added when lock1 is in place.
121 ****************************************************************************/
122
123 static BOOL brl_conflict(struct lock_struct *lck1, 
124                          struct lock_struct *lck2)
125 {
126         if (lck1->lock_type == PENDING_LOCK || lck2->lock_type == PENDING_LOCK )
127                 return False;
128
129         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
130                 return False;
131         }
132
133         if (brl_same_context(&lck1->context, &lck2->context) &&
134             lck2->lock_type == READ_LOCK && lck1->fnum == lck2->fnum) {
135                 return False;
136         }
137
138         return brl_overlap(lck1, lck2);
139
140
141 #if ZERO_ZERO
142 static BOOL brl_conflict1(struct lock_struct *lck1, 
143                          struct lock_struct *lck2)
144 {
145         if (lck1->lock_type == PENDING_LOCK || lck2->lock_type == PENDING_LOCK )
146                 return False;
147
148         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
149                 return False;
150         }
151
152         if (brl_same_context(&lck1->context, &lck2->context) &&
153             lck2->lock_type == READ_LOCK && lck1->fnum == lck2->fnum) {
154                 return False;
155         }
156
157         if (lck2->start == 0 && lck2->size == 0 && lck1->size != 0) {
158                 return True;
159         }
160
161         if (lck1->start >= (lck2->start + lck2->size) ||
162             lck2->start >= (lck1->start + lck1->size)) {
163                 return False;
164         }
165             
166         return True;
167
168 #endif
169
170 /****************************************************************************
171  Check to see if this lock conflicts, but ignore our own locks on the
172  same fnum only.
173 ****************************************************************************/
174
175 static BOOL brl_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
176 {
177         if (lck1->lock_type == PENDING_LOCK || lck2->lock_type == PENDING_LOCK )
178                 return False;
179
180         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
181                 return False;
182
183         /*
184          * Incoming WRITE locks conflict with existing READ locks even
185          * if the context is the same. JRA. See LOCKTEST7 in smbtorture.
186          */
187
188         if (!(lck2->lock_type == WRITE_LOCK && lck1->lock_type == READ_LOCK)) {
189                 if (brl_same_context(&lck1->context, &lck2->context) &&
190                                         lck1->fnum == lck2->fnum)
191                         return False;
192         }
193
194         return brl_overlap(lck1, lck2);
195
196
197 /****************************************************************************
198  Amazingly enough, w2k3 "remembers" whether the last lock failure
199  is the same as this one and changes its error code. I wonder if any
200  app depends on this ?
201 ****************************************************************************/
202
203 static NTSTATUS brl_lock_failed(struct lock_struct *lock)
204 {
205         static struct lock_struct last_lock_failure;
206
207         if (brl_same_context(&lock->context, &last_lock_failure.context) &&
208                         lock->fnum == last_lock_failure.fnum &&
209                         lock->start == last_lock_failure.start &&
210                         lock->size == last_lock_failure.size) {
211                 return NT_STATUS_FILE_LOCK_CONFLICT;
212         }
213         last_lock_failure = *lock;
214         if (lock->start >= 0xEF000000 &&
215                         (lock->start >> 63) == 0) {
216                 /* amazing the little things you learn with a test
217                    suite. Locks beyond this offset (as a 64 bit
218                    number!) always generate the conflict error code,
219                    unless the top bit is set */
220                 return NT_STATUS_FILE_LOCK_CONFLICT;
221         }
222         return NT_STATUS_LOCK_NOT_GRANTED;
223 }
224
225 #if DONT_DO_THIS
226         /* doing this traversal could kill solaris machines under high load (tridge) */
227         /* delete any dead locks */
228
229 /****************************************************************************
230  Delete a record if it is for a dead process, if check_self is true, then
231  delete any records belonging to this pid also (there shouldn't be any).
232 ****************************************************************************/
233
234 static int delete_fn(TDB_CONTEXT *ttdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
235 {
236         struct lock_struct *locks;
237         int count, i;
238         BOOL check_self = *(BOOL *)state;
239         pid_t mypid = sys_getpid();
240
241         tdb_chainlock(tdb, kbuf);
242
243         locks = (struct lock_struct *)dbuf.dptr;
244
245         count = dbuf.dsize / sizeof(*locks);
246         for (i=0; i<count; i++) {
247                 struct lock_struct *lock = &locks[i];
248
249                 /* If check_self is true we want to remove our own records. */
250                 if (check_self && (mypid == lock->context.pid)) {
251
252                         DEBUG(0,("brlock : delete_fn. LOGIC ERROR ! Shutting down and a record for my pid (%u) exists !\n",
253                                         (unsigned int)lock->context.pid ));
254
255                 } else if (process_exists(&lock->context.pid)) {
256
257                         DEBUG(10,("brlock : delete_fn. pid %u exists.\n", (unsigned int)lock->context.pid ));
258                         continue;
259                 }
260
261                 DEBUG(10,("brlock : delete_fn. Deleting record for process %u\n",
262                                 (unsigned int)lock->context.pid ));
263
264                 if (count > 1 && i < count-1) {
265                         memmove(&locks[i], &locks[i+1], 
266                                 sizeof(*locks)*((count-1) - i));
267                 }
268                 count--;
269                 i--;
270         }
271
272         if (count == 0) {
273                 tdb_delete(tdb, kbuf);
274         } else if (count < (dbuf.dsize / sizeof(*locks))) {
275                 dbuf.dsize = count * sizeof(*locks);
276                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
277         }
278
279         tdb_chainunlock(tdb, kbuf);
280         return 0;
281 }
282 #endif
283
284 /****************************************************************************
285  Open up the brlock.tdb database.
286 ****************************************************************************/
287
288 void brl_init(int read_only)
289 {
290         if (tdb)
291                 return;
292         tdb = tdb_open_log(lock_path("brlock.tdb"), 0,  TDB_DEFAULT|(read_only?0x0:TDB_CLEAR_IF_FIRST),
293                        read_only?O_RDONLY:(O_RDWR|O_CREAT), 0644 );
294         if (!tdb) {
295                 DEBUG(0,("Failed to open byte range locking database\n"));
296                 return;
297         }
298
299 #if DONT_DO_THIS
300         /* doing this traversal could kill solaris machines under high load (tridge) */
301         /* delete any dead locks */
302         if (!read_only) {
303                 BOOL check_self = False;
304                 tdb_traverse(tdb, delete_fn, &check_self);
305         }
306 #endif
307 }
308
309 /****************************************************************************
310  Close down the brlock.tdb database.
311 ****************************************************************************/
312
313 void brl_shutdown(int read_only)
314 {
315         if (!tdb)
316                 return;
317
318 #if DONT_DO_THIS
319         /* doing this traversal could kill solaris machines under high load (tridge) */
320         /* delete any dead locks */
321         if (!read_only) {
322                 BOOL check_self = True;
323                 tdb_traverse(tdb, delete_fn, &check_self);
324         }
325 #endif
326
327         tdb_close(tdb);
328 }
329
330 #if ZERO_ZERO
331 /****************************************************************************
332 compare two locks for sorting
333 ****************************************************************************/
334 static int lock_compare(struct lock_struct *lck1, 
335                          struct lock_struct *lck2)
336 {
337         if (lck1->start != lck2->start) return (lck1->start - lck2->start);
338         if (lck2->size != lck1->size) {
339                 return ((int)lck1->size - (int)lck2->size);
340         }
341         return 0;
342 }
343 #endif
344
345 /****************************************************************************
346  Lock a range of bytes.
347 ****************************************************************************/
348
349 NTSTATUS brl_lock(SMB_DEV_T dev, SMB_INO_T ino, int fnum,
350                   uint16 smbpid, struct process_id pid, uint16 tid,
351                   br_off start, br_off size, 
352                   enum brl_type lock_type, BOOL *my_lock_ctx)
353 {
354         TDB_DATA kbuf, dbuf;
355         int count, i;
356         struct lock_struct lock, *locks;
357         char *tp;
358         NTSTATUS status = NT_STATUS_OK;
359
360         *my_lock_ctx = False;
361         kbuf = locking_key(dev,ino);
362
363         dbuf.dptr = NULL;
364
365 #if !ZERO_ZERO
366         if (start == 0 && size == 0) {
367                 DEBUG(0,("client sent 0/0 lock - please report this\n"));
368         }
369 #endif
370
371         tdb_chainlock(tdb, kbuf);
372         dbuf = tdb_fetch(tdb, kbuf);
373
374         lock.context.smbpid = smbpid;
375         lock.context.pid = pid;
376         lock.context.tid = tid;
377         lock.start = start;
378         lock.size = size;
379         lock.fnum = fnum;
380         lock.lock_type = lock_type;
381
382         if (dbuf.dptr) {
383                 /* there are existing locks - make sure they don't conflict */
384                 locks = (struct lock_struct *)dbuf.dptr;
385                 count = dbuf.dsize / sizeof(*locks);
386                 for (i=0; i<count; i++) {
387                         if (brl_conflict(&locks[i], &lock)) {
388                                 status = brl_lock_failed(&lock);;
389                                 /* Did we block ourselves ? */
390                                 if (brl_same_context(&locks[i].context, &lock.context))
391                                         *my_lock_ctx = True;
392                                 goto fail;
393                         }
394 #if ZERO_ZERO
395                         if (lock.start == 0 && lock.size == 0 && 
396                             locks[i].size == 0) {
397                                 break;
398                         }
399 #endif
400                 }
401         }
402
403         /* no conflicts - add it to the list of locks */
404         tp = SMB_REALLOC(dbuf.dptr, dbuf.dsize + sizeof(*locks));
405         if (!tp) {
406                 status = NT_STATUS_NO_MEMORY;
407                 goto fail;
408         } else {
409                 dbuf.dptr = tp;
410         }
411         memcpy(dbuf.dptr + dbuf.dsize, &lock, sizeof(lock));
412         dbuf.dsize += sizeof(lock);
413
414 #if ZERO_ZERO
415         /* sort the lock list */
416         qsort(dbuf.dptr, dbuf.dsize/sizeof(lock), sizeof(lock), lock_compare);
417 #endif
418
419         if (tdb_store(tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
420                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
421                 goto fail;
422         }
423
424         SAFE_FREE(dbuf.dptr);
425         tdb_chainunlock(tdb, kbuf);
426         return NT_STATUS_OK;
427
428  fail:
429
430         SAFE_FREE(dbuf.dptr);
431         tdb_chainunlock(tdb, kbuf);
432         return status;
433 }
434
435 /****************************************************************************
436  Check if an unlock overlaps a pending lock.
437 ****************************************************************************/
438
439 static BOOL brl_pending_overlap(struct lock_struct *lock, struct lock_struct *pend_lock)
440 {
441         if ((lock->start <= pend_lock->start) && (lock->start + lock->size > pend_lock->start))
442                 return True;
443         if ((lock->start >= pend_lock->start) && (lock->start <= pend_lock->start + pend_lock->size))
444                 return True;
445         return False;
446 }
447
448 /****************************************************************************
449  Unlock a range of bytes.
450 ****************************************************************************/
451
452 BOOL brl_unlock(SMB_DEV_T dev, SMB_INO_T ino, int fnum,
453                 uint16 smbpid, struct process_id pid, uint16 tid,
454                 br_off start, br_off size,
455                 BOOL remove_pending_locks_only,
456                 void (*pre_unlock_fn)(void *),
457                 void *pre_unlock_data)
458 {
459         TDB_DATA kbuf, dbuf;
460         int count, i, j;
461         struct lock_struct *locks;
462         struct lock_context context;
463
464         kbuf = locking_key(dev,ino);
465
466         dbuf.dptr = NULL;
467
468         tdb_chainlock(tdb, kbuf);
469         dbuf = tdb_fetch(tdb, kbuf);
470
471         if (!dbuf.dptr) {
472                 DEBUG(10,("brl_unlock: tdb_fetch failed !\n"));
473                 goto fail;
474         }
475
476         context.smbpid = smbpid;
477         context.pid = pid;
478         context.tid = tid;
479
480         /* there are existing locks - find a match */
481         locks = (struct lock_struct *)dbuf.dptr;
482         count = dbuf.dsize / sizeof(*locks);
483
484 #if ZERO_ZERO
485         for (i=0; i<count; i++) {
486                 struct lock_struct *lock = &locks[i];
487
488                 if (lock->lock_type == WRITE_LOCK &&
489                     brl_same_context(&lock->context, &context) &&
490                     lock->fnum == fnum &&
491                     lock->start == start &&
492                     lock->size == size) {
493
494                         if (pre_unlock_fn)
495                                 (*pre_unlock_fn)(pre_unlock_data);
496
497                         /* found it - delete it */
498                         if (count == 1) {
499                                 tdb_delete(tdb, kbuf);
500                         } else {
501                                 if (i < count-1) {
502                                         memmove(&locks[i], &locks[i+1], 
503                                                 sizeof(*locks)*((count-1) - i));
504                                 }
505                                 dbuf.dsize -= sizeof(*locks);
506                                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
507                         }
508
509                         SAFE_FREE(dbuf.dptr);
510                         tdb_chainunlock(tdb, kbuf);
511                         return True;
512                 }
513         }
514 #endif
515
516         locks = (struct lock_struct *)dbuf.dptr;
517         count = dbuf.dsize / sizeof(*locks);
518         for (i=0; i<count; i++) {
519                 struct lock_struct *lock = &locks[i];
520
521                 if (brl_same_context(&lock->context, &context) &&
522                                 lock->fnum == fnum &&
523                                 lock->start == start &&
524                                 lock->size == size) {
525
526                         if (remove_pending_locks_only && lock->lock_type != PENDING_LOCK)
527                                 continue;
528
529                         if (lock->lock_type != PENDING_LOCK) {
530
531                                 /* Do any POSIX unlocks needed. */
532                                 if (pre_unlock_fn)
533                                         (*pre_unlock_fn)(pre_unlock_data);
534
535                                 /* Send unlock messages to any pending waiters that overlap. */
536                                 for (j=0; j<count; j++) {
537                                         struct lock_struct *pend_lock = &locks[j];
538
539                                         /* Ignore non-pending locks. */
540                                         if (pend_lock->lock_type != PENDING_LOCK)
541                                                 continue;
542
543                                         /* We could send specific lock info here... */
544                                         if (brl_pending_overlap(lock, pend_lock)) {
545                                                 DEBUG(10,("brl_unlock: sending unlock message to pid %s\n",
546                                                                         procid_str_static(&pend_lock->context.pid )));
547
548                                                 message_send_pid(pend_lock->context.pid,
549                                                                 MSG_SMB_UNLOCK,
550                                                                 NULL, 0, True);
551                                         }
552                                 }
553                         }
554
555                         /* found it - delete it */
556                         if (count == 1) {
557                                 tdb_delete(tdb, kbuf);
558                         } else {
559                                 if (i < count-1) {
560                                         memmove(&locks[i], &locks[i+1], 
561                                                 sizeof(*locks)*((count-1) - i));
562                                 }
563                                 dbuf.dsize -= sizeof(*locks);
564                                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
565                         }
566
567                         SAFE_FREE(dbuf.dptr);
568                         tdb_chainunlock(tdb, kbuf);
569                         return True;
570                 }
571         }
572
573         /* we didn't find it */
574
575  fail:
576         SAFE_FREE(dbuf.dptr);
577         tdb_chainunlock(tdb, kbuf);
578         return False;
579 }
580
581
582 /****************************************************************************
583  Test if we could add a lock if we wanted to.
584 ****************************************************************************/
585
586 BOOL brl_locktest(SMB_DEV_T dev, SMB_INO_T ino, int fnum,
587                   uint16 smbpid, struct process_id pid, uint16 tid,
588                   br_off start, br_off size, 
589                   enum brl_type lock_type)
590 {
591         TDB_DATA kbuf, dbuf;
592         int count, i;
593         struct lock_struct lock, *locks;
594
595         kbuf = locking_key(dev,ino);
596
597         dbuf.dptr = NULL;
598
599         dbuf = tdb_fetch(tdb, kbuf);
600
601         lock.context.smbpid = smbpid;
602         lock.context.pid = pid;
603         lock.context.tid = tid;
604         lock.start = start;
605         lock.size = size;
606         lock.fnum = fnum;
607         lock.lock_type = lock_type;
608
609         if (dbuf.dptr) {
610                 /* there are existing locks - make sure they don't conflict */
611                 locks = (struct lock_struct *)dbuf.dptr;
612                 count = dbuf.dsize / sizeof(*locks);
613                 for (i=0; i<count; i++) {
614                         /*
615                          * Our own locks don't conflict.
616                          */
617                         if (brl_conflict_other(&locks[i], &lock))
618                                 goto fail;
619                 }
620         }
621
622         /* no conflicts - we could have added it */
623         SAFE_FREE(dbuf.dptr);
624         return True;
625
626  fail:
627         SAFE_FREE(dbuf.dptr);
628         return False;
629 }
630
631 /****************************************************************************
632  Remove any locks associated with a open file.
633 ****************************************************************************/
634
635 void brl_close(SMB_DEV_T dev, SMB_INO_T ino, struct process_id pid, int tid, int fnum)
636 {
637         TDB_DATA kbuf, dbuf;
638         int count, i, j, dcount=0;
639         struct lock_struct *locks;
640
641         kbuf = locking_key(dev,ino);
642
643         dbuf.dptr = NULL;
644
645         tdb_chainlock(tdb, kbuf);
646         dbuf = tdb_fetch(tdb, kbuf);
647
648         if (!dbuf.dptr) goto fail;
649
650         /* there are existing locks - remove any for this fnum */
651         locks = (struct lock_struct *)dbuf.dptr;
652         count = dbuf.dsize / sizeof(*locks);
653
654         for (i=0; i<count; i++) {
655                 struct lock_struct *lock = &locks[i];
656
657                 if (lock->context.tid == tid &&
658                     procid_equal(&lock->context.pid, &pid) &&
659                     lock->fnum == fnum) {
660
661                         /* Send unlock messages to any pending waiters that overlap. */
662                         for (j=0; j<count; j++) {
663                                 struct lock_struct *pend_lock = &locks[j];
664
665                                 /* Ignore our own or non-pending locks. */
666                                 if (pend_lock->lock_type != PENDING_LOCK)
667                                         continue;
668
669                                 if (pend_lock->context.tid == tid &&
670                                     procid_equal(&pend_lock->context.pid, &pid) &&
671                                     pend_lock->fnum == fnum)
672                                         continue;
673
674                                 /* We could send specific lock info here... */
675                                 if (brl_pending_overlap(lock, pend_lock))
676                                         message_send_pid(pend_lock->context.pid,
677                                                         MSG_SMB_UNLOCK,
678                                                         NULL, 0, True);
679                         }
680
681                         /* found it - delete it */
682                         if (count > 1 && i < count-1) {
683                                 memmove(&locks[i], &locks[i+1], 
684                                         sizeof(*locks)*((count-1) - i));
685                         }
686                         count--;
687                         i--;
688                         dcount++;
689                 }
690         }
691
692         if (count == 0) {
693                 tdb_delete(tdb, kbuf);
694         } else if (count < (dbuf.dsize / sizeof(*locks))) {
695                 dbuf.dsize -= dcount * sizeof(*locks);
696                 tdb_store(tdb, kbuf, dbuf, TDB_REPLACE);
697         }
698
699         /* we didn't find it */
700  fail:
701         SAFE_FREE(dbuf.dptr);
702         tdb_chainunlock(tdb, kbuf);
703 }
704
705 /****************************************************************************
706  Traverse the whole database with this function, calling traverse_callback
707  on each lock.
708 ****************************************************************************/
709
710 static int traverse_fn(TDB_CONTEXT *ttdb, TDB_DATA kbuf, TDB_DATA dbuf, void *state)
711 {
712         struct lock_struct *locks;
713         struct lock_key *key;
714         int i;
715
716         BRLOCK_FN(traverse_callback) = (BRLOCK_FN_CAST())state;
717
718         locks = (struct lock_struct *)dbuf.dptr;
719         key = (struct lock_key *)kbuf.dptr;
720
721         for (i=0;i<dbuf.dsize/sizeof(*locks);i++) {
722                 traverse_callback(key->device, key->inode,
723                                   locks[i].context.pid,
724                                   locks[i].lock_type,
725                                   locks[i].start,
726                                   locks[i].size);
727         }
728         return 0;
729 }
730
731 /*******************************************************************
732  Call the specified function on each lock in the database.
733 ********************************************************************/
734
735 int brl_forall(BRLOCK_FN(fn))
736 {
737         if (!tdb) return 0;
738         return tdb_traverse(tdb, traverse_fn, (void *)fn);
739 }