r23792: convert Samba4 to GPLv3
[kai/samba-autobuild/.git] / source4 / ntvfs / common / brlock_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - tdb backend
5
6    Copyright (C) Andrew Tridgell 1992-2006
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /* This module implements a tdb based byte range locking service,
24    replacing the fcntl() based byte range locking previously
25    used. This allows us to provide the same semantics as NT */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "lib/tdb/include/tdb.h"
30 #include "messaging/messaging.h"
31 #include "db_wrap.h"
32 #include "lib/messaging/irpc.h"
33 #include "libcli/libcli.h"
34 #include "cluster/cluster.h"
35 #include "ntvfs/common/brlock.h"
36 #include "ntvfs/ntvfs.h"
37
38 /*
39   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
40   a file. For a local posix filesystem this will usually be a combination
41   of the device and inode numbers of the file, but it can be anything 
42   that uniquely idetifies a file for locking purposes, as long
43   as it is applied consistently.
44 */
45
46 /* this struct is typicaly attached to tcon */
47 struct brl_context {
48         struct tdb_wrap *w;
49         struct server_id server;
50         struct messaging_context *messaging_ctx;
51 };
52
53 /*
54   the lock context contains the elements that define whether one
55   lock is the same as another lock
56 */
57 struct lock_context {
58         struct server_id server;
59         uint16_t smbpid;
60         struct brl_context *ctx;
61 };
62
63 /* The data in brlock records is an unsorted linear array of these
64    records.  It is unnecessary to store the count as tdb provides the
65    size of the record */
66 struct lock_struct {
67         struct lock_context context;
68         struct ntvfs_handle *ntvfs;
69         uint64_t start;
70         uint64_t size;
71         enum brl_type lock_type;
72         void *notify_ptr;
73 };
74
75 /* this struct is attached to on oprn file handle */
76 struct brl_handle {
77         DATA_BLOB key;
78         struct ntvfs_handle *ntvfs;
79         struct lock_struct last_lock;
80 };
81
82 /*
83   Open up the brlock.tdb database. Close it down using
84   talloc_free(). We need the messaging_ctx to allow for
85   pending lock notifications.
86 */
87 static struct brl_context *brl_tdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
88                                     struct messaging_context *messaging_ctx)
89 {
90         struct brl_context *brl;
91
92         brl = talloc(mem_ctx, struct brl_context);
93         if (brl == NULL) {
94                 return NULL;
95         }
96
97         brl->w = cluster_tdb_tmp_open(brl, "brlock.tdb", TDB_DEFAULT);
98         if (brl->w == NULL) {
99                 talloc_free(brl);
100                 return NULL;
101         }
102
103         brl->server = server;
104         brl->messaging_ctx = messaging_ctx;
105
106         return brl;
107 }
108
109 static struct brl_handle *brl_tdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
110                                                     DATA_BLOB *file_key)
111 {
112         struct brl_handle *brlh;
113
114         brlh = talloc(mem_ctx, struct brl_handle);
115         if (brlh == NULL) {
116                 return NULL;
117         }
118
119         brlh->key = *file_key;
120         brlh->ntvfs = ntvfs;
121         ZERO_STRUCT(brlh->last_lock);
122
123         return brlh;
124 }
125
126 /*
127   see if two locking contexts are equal
128 */
129 static BOOL brl_tdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
130 {
131         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
132                 ctx1->smbpid == ctx2->smbpid &&
133                 ctx1->ctx == ctx2->ctx);
134 }
135
136 /*
137   see if lck1 and lck2 overlap
138 */
139 static BOOL brl_tdb_overlap(struct lock_struct *lck1, 
140                         struct lock_struct *lck2)
141 {
142         /* this extra check is not redundent - it copes with locks
143            that go beyond the end of 64 bit file space */
144         if (lck1->size != 0 &&
145             lck1->start == lck2->start &&
146             lck1->size == lck2->size) {
147                 return True;
148         }
149             
150         if (lck1->start >= (lck2->start+lck2->size) ||
151             lck2->start >= (lck1->start+lck1->size)) {
152                 return False;
153         }
154         return True;
155
156
157 /*
158  See if lock2 can be added when lock1 is in place.
159 */
160 static BOOL brl_tdb_conflict(struct lock_struct *lck1, 
161                          struct lock_struct *lck2)
162 {
163         /* pending locks don't conflict with anything */
164         if (lck1->lock_type >= PENDING_READ_LOCK ||
165             lck2->lock_type >= PENDING_READ_LOCK) {
166                 return False;
167         }
168
169         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
170                 return False;
171         }
172
173         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
174             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
175                 return False;
176         }
177
178         return brl_tdb_overlap(lck1, lck2);
179
180
181
182 /*
183  Check to see if this lock conflicts, but ignore our own locks on the
184  same fnum only.
185 */
186 static BOOL brl_tdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
187 {
188         /* pending locks don't conflict with anything */
189         if (lck1->lock_type >= PENDING_READ_LOCK ||
190             lck2->lock_type >= PENDING_READ_LOCK) {
191                 return False;
192         }
193
194         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
195                 return False;
196
197         /*
198          * note that incoming write calls conflict with existing READ
199          * locks even if the context is the same. JRA. See LOCKTEST7
200          * in smbtorture.
201          */
202         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
203             lck1->ntvfs == lck2->ntvfs &&
204             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
205                 return False;
206         }
207
208         return brl_tdb_overlap(lck1, lck2);
209
210
211
212 /*
213   amazingly enough, w2k3 "remembers" whether the last lock failure
214   is the same as this one and changes its error code. I wonder if any
215   app depends on this?
216 */
217 static NTSTATUS brl_tdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
218 {
219         /*
220          * this function is only called for non pending lock!
221          */
222
223         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
224         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
225                 return NT_STATUS_LOCK_NOT_GRANTED;
226         }
227
228         /* 
229          * if the notify_ptr is non NULL,
230          * it means that we're at the end of a pending lock
231          * and the real lock is requested after the timout went by
232          * In this case we need to remember the last_lock and always
233          * give FILE_LOCK_CONFLICT
234          */
235         if (lock->notify_ptr) {
236                 brlh->last_lock = *lock;
237                 return NT_STATUS_FILE_LOCK_CONFLICT;
238         }
239
240         /* 
241          * amazing the little things you learn with a test
242          * suite. Locks beyond this offset (as a 64 bit
243          * number!) always generate the conflict error code,
244          * unless the top bit is set
245          */
246         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
247                 brlh->last_lock = *lock;
248                 return NT_STATUS_FILE_LOCK_CONFLICT;
249         }
250
251         /*
252          * if the current lock matches the last failed lock on the file handle
253          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
254          */
255         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
256             lock->context.ctx == brlh->last_lock.context.ctx &&
257             lock->ntvfs == brlh->last_lock.ntvfs &&
258             lock->start == brlh->last_lock.start) {
259                 return NT_STATUS_FILE_LOCK_CONFLICT;
260         }
261
262         brlh->last_lock = *lock;
263         return NT_STATUS_LOCK_NOT_GRANTED;
264 }
265
266 /*
267   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
268   which case a real lock is first tried, and if that fails then a
269   pending lock is created. When the pending lock is triggered (by
270   someone else closing an overlapping lock range) a messaging
271   notification is sent, identified by the notify_ptr
272 */
273 static NTSTATUS brl_tdb_lock(struct brl_context *brl,
274                          struct brl_handle *brlh,
275                          uint16_t smbpid,
276                          uint64_t start, uint64_t size, 
277                          enum brl_type lock_type,
278                          void *notify_ptr)
279 {
280         TDB_DATA kbuf, dbuf;
281         int count=0, i;
282         struct lock_struct lock, *locks=NULL;
283         NTSTATUS status;
284
285         kbuf.dptr = brlh->key.data;
286         kbuf.dsize = brlh->key.length;
287
288         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
289                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
290         }
291
292         /* if this is a pending lock, then with the chainlock held we
293            try to get the real lock. If we succeed then we don't need
294            to make it pending. This prevents a possible race condition
295            where the pending lock gets created after the lock that is
296            preventing the real lock gets removed */
297         if (lock_type >= PENDING_READ_LOCK) {
298                 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
299
300                 /* here we need to force that the last_lock isn't overwritten */
301                 lock = brlh->last_lock;
302                 status = brl_tdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
303                 brlh->last_lock = lock;
304
305                 if (NT_STATUS_IS_OK(status)) {
306                         tdb_chainunlock(brl->w->tdb, kbuf);
307                         return NT_STATUS_OK;
308                 }
309         }
310
311         dbuf = tdb_fetch(brl->w->tdb, kbuf);
312
313         lock.context.smbpid = smbpid;
314         lock.context.server = brl->server;
315         lock.context.ctx = brl;
316         lock.ntvfs = brlh->ntvfs;
317         lock.context.ctx = brl;
318         lock.start = start;
319         lock.size = size;
320         lock.lock_type = lock_type;
321         lock.notify_ptr = notify_ptr;
322
323         if (dbuf.dptr) {
324                 /* there are existing locks - make sure they don't conflict */
325                 locks = (struct lock_struct *)dbuf.dptr;
326                 count = dbuf.dsize / sizeof(*locks);
327                 for (i=0; i<count; i++) {
328                         if (brl_tdb_conflict(&locks[i], &lock)) {
329                                 status = brl_tdb_lock_failed(brlh, &lock);
330                                 goto fail;
331                         }
332                 }
333         }
334
335         /* no conflicts - add it to the list of locks */
336         locks = realloc_p(locks, struct lock_struct, count+1);
337         if (!locks) {
338                 status = NT_STATUS_NO_MEMORY;
339                 goto fail;
340         } else {
341                 dbuf.dptr = (uint8_t *)locks;
342         }
343         locks[count] = lock;
344         dbuf.dsize += sizeof(lock);
345
346         if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
347                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
348                 goto fail;
349         }
350
351         free(dbuf.dptr);
352         tdb_chainunlock(brl->w->tdb, kbuf);
353
354         /* the caller needs to know if the real lock was granted. If
355            we have reached here then it must be a pending lock that
356            was granted, so tell them the lock failed */
357         if (lock_type >= PENDING_READ_LOCK) {
358                 return NT_STATUS_LOCK_NOT_GRANTED;
359         }
360
361         return NT_STATUS_OK;
362
363  fail:
364
365         free(dbuf.dptr);
366         tdb_chainunlock(brl->w->tdb, kbuf);
367         return status;
368 }
369
370
371 /*
372   we are removing a lock that might be holding up a pending lock. Scan for pending
373   locks that cover this range and if we find any then notify the server that it should
374   retry the lock
375 */
376 static void brl_tdb_notify_unlock(struct brl_context *brl,
377                               struct lock_struct *locks, int count, 
378                               struct lock_struct *removed_lock)
379 {
380         int i, last_notice;
381
382         /* the last_notice logic is to prevent stampeding on a lock
383            range. It prevents us sending hundreds of notifies on the
384            same range of bytes. It doesn't prevent all possible
385            stampedes, but it does prevent the most common problem */
386         last_notice = -1;
387
388         for (i=0;i<count;i++) {
389                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
390                     brl_tdb_overlap(&locks[i], removed_lock)) {
391                         if (last_notice != -1 && brl_tdb_overlap(&locks[i], &locks[last_notice])) {
392                                 continue;
393                         }
394                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
395                                 last_notice = i;
396                         }
397                         messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
398                                            MSG_BRL_RETRY, locks[i].notify_ptr);
399                 }
400         }
401 }
402
403
404 /*
405   send notifications for all pending locks - the file is being closed by this
406   user
407 */
408 static void brl_tdb_notify_all(struct brl_context *brl,
409                            struct lock_struct *locks, int count)
410 {
411         int i;
412         for (i=0;i<count;i++) {
413                 if (locks->lock_type >= PENDING_READ_LOCK) {
414                         brl_tdb_notify_unlock(brl, locks, count, &locks[i]);
415                 }
416         }
417 }
418
419
420
421 /*
422  Unlock a range of bytes.
423 */
424 static NTSTATUS brl_tdb_unlock(struct brl_context *brl,
425                            struct brl_handle *brlh, 
426                            uint16_t smbpid,
427                            uint64_t start, uint64_t size)
428 {
429         TDB_DATA kbuf, dbuf;
430         int count, i;
431         struct lock_struct *locks, *lock;
432         struct lock_context context;
433         NTSTATUS status;
434
435         kbuf.dptr = brlh->key.data;
436         kbuf.dsize = brlh->key.length;
437
438         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
439                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
440         }
441
442         dbuf = tdb_fetch(brl->w->tdb, kbuf);
443         if (!dbuf.dptr) {
444                 tdb_chainunlock(brl->w->tdb, kbuf);
445                 return NT_STATUS_RANGE_NOT_LOCKED;
446         }
447
448         context.smbpid = smbpid;
449         context.server = brl->server;
450         context.ctx = brl;
451
452         /* there are existing locks - find a match */
453         locks = (struct lock_struct *)dbuf.dptr;
454         count = dbuf.dsize / sizeof(*locks);
455
456         for (i=0; i<count; i++) {
457                 lock = &locks[i];
458                 if (brl_tdb_same_context(&lock->context, &context) &&
459                     lock->ntvfs == brlh->ntvfs &&
460                     lock->start == start &&
461                     lock->size == size &&
462                     lock->lock_type == WRITE_LOCK) {
463                         break;
464                 }
465         }
466         if (i < count) goto found;
467
468         for (i=0; i<count; i++) {
469                 lock = &locks[i];
470                 if (brl_tdb_same_context(&lock->context, &context) &&
471                     lock->ntvfs == brlh->ntvfs &&
472                     lock->start == start &&
473                     lock->size == size &&
474                     lock->lock_type < PENDING_READ_LOCK) {
475                         break;
476                 }
477         }
478
479 found:
480         if (i < count) {
481                 /* found it - delete it */
482                 if (count == 1) {
483                         if (tdb_delete(brl->w->tdb, kbuf) != 0) {
484                                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
485                                 goto fail;
486                         }
487                 } else {
488                         struct lock_struct removed_lock = *lock;
489                         if (i < count-1) {
490                                 memmove(&locks[i], &locks[i+1], 
491                                         sizeof(*locks)*((count-1) - i));
492                         }
493                         count--;
494                         
495                         /* send notifications for any relevant pending locks */
496                         brl_tdb_notify_unlock(brl, locks, count, &removed_lock);
497                         
498                         dbuf.dsize = count * sizeof(*locks);
499                         
500                         if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
501                                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
502                                 goto fail;
503                         }
504                 }
505                 
506                 free(dbuf.dptr);
507                 tdb_chainunlock(brl->w->tdb, kbuf);
508                 return NT_STATUS_OK;
509         }
510         
511         /* we didn't find it */
512         status = NT_STATUS_RANGE_NOT_LOCKED;
513
514  fail:
515         free(dbuf.dptr);
516         tdb_chainunlock(brl->w->tdb, kbuf);
517         return status;
518 }
519
520
521 /*
522   remove a pending lock. This is called when the caller has either
523   given up trying to establish a lock or when they have succeeded in
524   getting it. In either case they no longer need to be notified.
525 */
526 static NTSTATUS brl_tdb_remove_pending(struct brl_context *brl,
527                                    struct brl_handle *brlh, 
528                                    void *notify_ptr)
529 {
530         TDB_DATA kbuf, dbuf;
531         int count, i;
532         struct lock_struct *locks;
533         NTSTATUS status;
534
535         kbuf.dptr = brlh->key.data;
536         kbuf.dsize = brlh->key.length;
537
538         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
539                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
540         }
541
542         dbuf = tdb_fetch(brl->w->tdb, kbuf);
543         if (!dbuf.dptr) {
544                 tdb_chainunlock(brl->w->tdb, kbuf);
545                 return NT_STATUS_RANGE_NOT_LOCKED;
546         }
547
548         /* there are existing locks - find a match */
549         locks = (struct lock_struct *)dbuf.dptr;
550         count = dbuf.dsize / sizeof(*locks);
551
552         for (i=0; i<count; i++) {
553                 struct lock_struct *lock = &locks[i];
554                 
555                 if (lock->lock_type >= PENDING_READ_LOCK &&
556                     lock->notify_ptr == notify_ptr &&
557                     cluster_id_equal(&lock->context.server, &brl->server)) {
558                         /* found it - delete it */
559                         if (count == 1) {
560                                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
561                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
562                                         goto fail;
563                                 }
564                         } else {
565                                 if (i < count-1) {
566                                         memmove(&locks[i], &locks[i+1], 
567                                                 sizeof(*locks)*((count-1) - i));
568                                 }
569                                 count--;
570                                 dbuf.dsize = count * sizeof(*locks);
571                                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
572                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
573                                         goto fail;
574                                 }
575                         }
576                         
577                         free(dbuf.dptr);
578                         tdb_chainunlock(brl->w->tdb, kbuf);
579                         return NT_STATUS_OK;
580                 }
581         }
582         
583         /* we didn't find it */
584         status = NT_STATUS_RANGE_NOT_LOCKED;
585
586  fail:
587         free(dbuf.dptr);
588         tdb_chainunlock(brl->w->tdb, kbuf);
589         return status;
590 }
591
592
593 /*
594   Test if we are allowed to perform IO on a region of an open file
595 */
596 static NTSTATUS brl_tdb_locktest(struct brl_context *brl,
597                              struct brl_handle *brlh,
598                              uint16_t smbpid, 
599                              uint64_t start, uint64_t size, 
600                              enum brl_type lock_type)
601 {
602         TDB_DATA kbuf, dbuf;
603         int count, i;
604         struct lock_struct lock, *locks;
605
606         kbuf.dptr = brlh->key.data;
607         kbuf.dsize = brlh->key.length;
608
609         dbuf = tdb_fetch(brl->w->tdb, kbuf);
610         if (dbuf.dptr == NULL) {
611                 return NT_STATUS_OK;
612         }
613
614         lock.context.smbpid = smbpid;
615         lock.context.server = brl->server;
616         lock.context.ctx = brl;
617         lock.ntvfs = brlh->ntvfs;
618         lock.start = start;
619         lock.size = size;
620         lock.lock_type = lock_type;
621
622         /* there are existing locks - make sure they don't conflict */
623         locks = (struct lock_struct *)dbuf.dptr;
624         count = dbuf.dsize / sizeof(*locks);
625
626         for (i=0; i<count; i++) {
627                 if (brl_tdb_conflict_other(&locks[i], &lock)) {
628                         free(dbuf.dptr);
629                         return NT_STATUS_FILE_LOCK_CONFLICT;
630                 }
631         }
632
633         free(dbuf.dptr);
634         return NT_STATUS_OK;
635 }
636
637
638 /*
639  Remove any locks associated with a open file.
640 */
641 static NTSTATUS brl_tdb_close(struct brl_context *brl,
642                           struct brl_handle *brlh)
643 {
644         TDB_DATA kbuf, dbuf;
645         int count, i, dcount=0;
646         struct lock_struct *locks;
647         NTSTATUS status;
648
649         kbuf.dptr = brlh->key.data;
650         kbuf.dsize = brlh->key.length;
651
652         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
653                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
654         }
655
656         dbuf = tdb_fetch(brl->w->tdb, kbuf);
657         if (!dbuf.dptr) {
658                 tdb_chainunlock(brl->w->tdb, kbuf);
659                 return NT_STATUS_OK;
660         }
661
662         /* there are existing locks - remove any for this fnum */
663         locks = (struct lock_struct *)dbuf.dptr;
664         count = dbuf.dsize / sizeof(*locks);
665
666         for (i=0; i<count; i++) {
667                 struct lock_struct *lock = &locks[i];
668
669                 if (lock->context.ctx == brl &&
670                     cluster_id_equal(&lock->context.server, &brl->server) &&
671                     lock->ntvfs == brlh->ntvfs) {
672                         /* found it - delete it */
673                         if (count > 1 && i < count-1) {
674                                 memmove(&locks[i], &locks[i+1], 
675                                         sizeof(*locks)*((count-1) - i));
676                         }
677                         count--;
678                         i--;
679                         dcount++;
680                 }
681         }
682
683         status = NT_STATUS_OK;
684
685         if (count == 0) {
686                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
687                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
688                 }
689         } else if (dcount != 0) {
690                 /* tell all pending lock holders for this file that
691                    they have a chance now. This is a bit indiscriminant,
692                    but works OK */
693                 brl_tdb_notify_all(brl, locks, count);
694
695                 dbuf.dsize = count * sizeof(*locks);
696
697                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
698                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
699                 }
700         }
701
702         free(dbuf.dptr);
703         tdb_chainunlock(brl->w->tdb, kbuf);
704
705         return status;
706 }
707
708
709 static const struct brlock_ops brlock_tdb_ops = {
710         .brl_init           = brl_tdb_init,
711         .brl_create_handle  = brl_tdb_create_handle,
712         .brl_lock           = brl_tdb_lock,
713         .brl_unlock         = brl_tdb_unlock,
714         .brl_remove_pending = brl_tdb_remove_pending,
715         .brl_locktest       = brl_tdb_locktest,
716         .brl_close          = brl_tdb_close
717 };
718
719
720 void brl_tdb_init_ops(void)
721 {
722         brl_set_ops(&brlock_tdb_ops);
723 }