d5eb7d82fb0fbe0e513d50e47db78c15f9d81e73
[sfrench/samba-autobuild/.git] / source4 / ntvfs / common / brlock_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - tdb backend
5
6    Copyright (C) Andrew Tridgell 1992-2006
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 /* This module implements a tdb based byte range locking service,
25    replacing the fcntl() based byte range locking previously
26    used. This allows us to provide the same semantics as NT */
27
28 #include "includes.h"
29 #include "system/filesys.h"
30 #include "lib/tdb/include/tdb.h"
31 #include "messaging/messaging.h"
32 #include "db_wrap.h"
33 #include "lib/messaging/irpc.h"
34 #include "libcli/libcli.h"
35 #include "cluster/cluster.h"
36 #include "ntvfs/common/brlock.h"
37 #include "ntvfs/ntvfs.h"
38
39 /*
40   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
41   a file. For a local posix filesystem this will usually be a combination
42   of the device and inode numbers of the file, but it can be anything 
43   that uniquely idetifies a file for locking purposes, as long
44   as it is applied consistently.
45 */
46
47 /* this struct is typicaly attached to tcon */
48 struct brl_context {
49         struct tdb_wrap *w;
50         struct server_id server;
51         struct messaging_context *messaging_ctx;
52 };
53
54 /*
55   the lock context contains the elements that define whether one
56   lock is the same as another lock
57 */
58 struct lock_context {
59         struct server_id server;
60         uint16_t smbpid;
61         struct brl_context *ctx;
62 };
63
64 /* The data in brlock records is an unsorted linear array of these
65    records.  It is unnecessary to store the count as tdb provides the
66    size of the record */
67 struct lock_struct {
68         struct lock_context context;
69         struct ntvfs_handle *ntvfs;
70         uint64_t start;
71         uint64_t size;
72         enum brl_type lock_type;
73         void *notify_ptr;
74 };
75
76 /* this struct is attached to on oprn file handle */
77 struct brl_handle {
78         DATA_BLOB key;
79         struct ntvfs_handle *ntvfs;
80         struct lock_struct last_lock;
81 };
82
83 /*
84   Open up the brlock.tdb database. Close it down using
85   talloc_free(). We need the messaging_ctx to allow for
86   pending lock notifications.
87 */
88 static struct brl_context *brl_tdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
89                                     struct messaging_context *messaging_ctx)
90 {
91         struct brl_context *brl;
92
93         brl = talloc(mem_ctx, struct brl_context);
94         if (brl == NULL) {
95                 return NULL;
96         }
97
98         brl->w = cluster_tdb_tmp_open(brl, "brlock.tdb", TDB_DEFAULT);
99         if (brl->w == NULL) {
100                 talloc_free(brl);
101                 return NULL;
102         }
103
104         brl->server = server;
105         brl->messaging_ctx = messaging_ctx;
106
107         return brl;
108 }
109
110 static struct brl_handle *brl_tdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
111                                                     DATA_BLOB *file_key)
112 {
113         struct brl_handle *brlh;
114
115         brlh = talloc(mem_ctx, struct brl_handle);
116         if (brlh == NULL) {
117                 return NULL;
118         }
119
120         brlh->key = *file_key;
121         brlh->ntvfs = ntvfs;
122         ZERO_STRUCT(brlh->last_lock);
123
124         return brlh;
125 }
126
127 /*
128   see if two locking contexts are equal
129 */
130 static BOOL brl_tdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
131 {
132         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
133                 ctx1->smbpid == ctx2->smbpid &&
134                 ctx1->ctx == ctx2->ctx);
135 }
136
137 /*
138   see if lck1 and lck2 overlap
139 */
140 static BOOL brl_tdb_overlap(struct lock_struct *lck1, 
141                         struct lock_struct *lck2)
142 {
143         /* this extra check is not redundent - it copes with locks
144            that go beyond the end of 64 bit file space */
145         if (lck1->size != 0 &&
146             lck1->start == lck2->start &&
147             lck1->size == lck2->size) {
148                 return True;
149         }
150             
151         if (lck1->start >= (lck2->start+lck2->size) ||
152             lck2->start >= (lck1->start+lck1->size)) {
153                 return False;
154         }
155         return True;
156
157
158 /*
159  See if lock2 can be added when lock1 is in place.
160 */
161 static BOOL brl_tdb_conflict(struct lock_struct *lck1, 
162                          struct lock_struct *lck2)
163 {
164         /* pending locks don't conflict with anything */
165         if (lck1->lock_type >= PENDING_READ_LOCK ||
166             lck2->lock_type >= PENDING_READ_LOCK) {
167                 return False;
168         }
169
170         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
171                 return False;
172         }
173
174         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
175             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
176                 return False;
177         }
178
179         return brl_tdb_overlap(lck1, lck2);
180
181
182
183 /*
184  Check to see if this lock conflicts, but ignore our own locks on the
185  same fnum only.
186 */
187 static BOOL brl_tdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
188 {
189         /* pending locks don't conflict with anything */
190         if (lck1->lock_type >= PENDING_READ_LOCK ||
191             lck2->lock_type >= PENDING_READ_LOCK) {
192                 return False;
193         }
194
195         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
196                 return False;
197
198         /*
199          * note that incoming write calls conflict with existing READ
200          * locks even if the context is the same. JRA. See LOCKTEST7
201          * in smbtorture.
202          */
203         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
204             lck1->ntvfs == lck2->ntvfs &&
205             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
206                 return False;
207         }
208
209         return brl_tdb_overlap(lck1, lck2);
210
211
212
213 /*
214   amazingly enough, w2k3 "remembers" whether the last lock failure
215   is the same as this one and changes its error code. I wonder if any
216   app depends on this?
217 */
218 static NTSTATUS brl_tdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
219 {
220         /*
221          * this function is only called for non pending lock!
222          */
223
224         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
225         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
226                 return NT_STATUS_LOCK_NOT_GRANTED;
227         }
228
229         /* 
230          * if the notify_ptr is non NULL,
231          * it means that we're at the end of a pending lock
232          * and the real lock is requested after the timout went by
233          * In this case we need to remember the last_lock and always
234          * give FILE_LOCK_CONFLICT
235          */
236         if (lock->notify_ptr) {
237                 brlh->last_lock = *lock;
238                 return NT_STATUS_FILE_LOCK_CONFLICT;
239         }
240
241         /* 
242          * amazing the little things you learn with a test
243          * suite. Locks beyond this offset (as a 64 bit
244          * number!) always generate the conflict error code,
245          * unless the top bit is set
246          */
247         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
248                 brlh->last_lock = *lock;
249                 return NT_STATUS_FILE_LOCK_CONFLICT;
250         }
251
252         /*
253          * if the current lock matches the last failed lock on the file handle
254          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
255          */
256         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
257             lock->context.ctx == brlh->last_lock.context.ctx &&
258             lock->ntvfs == brlh->last_lock.ntvfs &&
259             lock->start == brlh->last_lock.start) {
260                 return NT_STATUS_FILE_LOCK_CONFLICT;
261         }
262
263         brlh->last_lock = *lock;
264         return NT_STATUS_LOCK_NOT_GRANTED;
265 }
266
267 /*
268   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
269   which case a real lock is first tried, and if that fails then a
270   pending lock is created. When the pending lock is triggered (by
271   someone else closing an overlapping lock range) a messaging
272   notification is sent, identified by the notify_ptr
273 */
274 static NTSTATUS brl_tdb_lock(struct brl_context *brl,
275                          struct brl_handle *brlh,
276                          uint16_t smbpid,
277                          uint64_t start, uint64_t size, 
278                          enum brl_type lock_type,
279                          void *notify_ptr)
280 {
281         TDB_DATA kbuf, dbuf;
282         int count=0, i;
283         struct lock_struct lock, *locks=NULL;
284         NTSTATUS status;
285
286         kbuf.dptr = brlh->key.data;
287         kbuf.dsize = brlh->key.length;
288
289         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
290                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
291         }
292
293         /* if this is a pending lock, then with the chainlock held we
294            try to get the real lock. If we succeed then we don't need
295            to make it pending. This prevents a possible race condition
296            where the pending lock gets created after the lock that is
297            preventing the real lock gets removed */
298         if (lock_type >= PENDING_READ_LOCK) {
299                 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
300
301                 /* here we need to force that the last_lock isn't overwritten */
302                 lock = brlh->last_lock;
303                 status = brl_tdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
304                 brlh->last_lock = lock;
305
306                 if (NT_STATUS_IS_OK(status)) {
307                         tdb_chainunlock(brl->w->tdb, kbuf);
308                         return NT_STATUS_OK;
309                 }
310         }
311
312         dbuf = tdb_fetch(brl->w->tdb, kbuf);
313
314         lock.context.smbpid = smbpid;
315         lock.context.server = brl->server;
316         lock.context.ctx = brl;
317         lock.ntvfs = brlh->ntvfs;
318         lock.context.ctx = brl;
319         lock.start = start;
320         lock.size = size;
321         lock.lock_type = lock_type;
322         lock.notify_ptr = notify_ptr;
323
324         if (dbuf.dptr) {
325                 /* there are existing locks - make sure they don't conflict */
326                 locks = (struct lock_struct *)dbuf.dptr;
327                 count = dbuf.dsize / sizeof(*locks);
328                 for (i=0; i<count; i++) {
329                         if (brl_tdb_conflict(&locks[i], &lock)) {
330                                 status = brl_tdb_lock_failed(brlh, &lock);
331                                 goto fail;
332                         }
333                 }
334         }
335
336         /* no conflicts - add it to the list of locks */
337         locks = realloc_p(locks, struct lock_struct, count+1);
338         if (!locks) {
339                 status = NT_STATUS_NO_MEMORY;
340                 goto fail;
341         } else {
342                 dbuf.dptr = (uint8_t *)locks;
343         }
344         locks[count] = lock;
345         dbuf.dsize += sizeof(lock);
346
347         if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
348                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
349                 goto fail;
350         }
351
352         free(dbuf.dptr);
353         tdb_chainunlock(brl->w->tdb, kbuf);
354
355         /* the caller needs to know if the real lock was granted. If
356            we have reached here then it must be a pending lock that
357            was granted, so tell them the lock failed */
358         if (lock_type >= PENDING_READ_LOCK) {
359                 return NT_STATUS_LOCK_NOT_GRANTED;
360         }
361
362         return NT_STATUS_OK;
363
364  fail:
365
366         free(dbuf.dptr);
367         tdb_chainunlock(brl->w->tdb, kbuf);
368         return status;
369 }
370
371
372 /*
373   we are removing a lock that might be holding up a pending lock. Scan for pending
374   locks that cover this range and if we find any then notify the server that it should
375   retry the lock
376 */
377 static void brl_tdb_notify_unlock(struct brl_context *brl,
378                               struct lock_struct *locks, int count, 
379                               struct lock_struct *removed_lock)
380 {
381         int i, last_notice;
382
383         /* the last_notice logic is to prevent stampeding on a lock
384            range. It prevents us sending hundreds of notifies on the
385            same range of bytes. It doesn't prevent all possible
386            stampedes, but it does prevent the most common problem */
387         last_notice = -1;
388
389         for (i=0;i<count;i++) {
390                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
391                     brl_tdb_overlap(&locks[i], removed_lock)) {
392                         if (last_notice != -1 && brl_tdb_overlap(&locks[i], &locks[last_notice])) {
393                                 continue;
394                         }
395                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
396                                 last_notice = i;
397                         }
398                         messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
399                                            MSG_BRL_RETRY, locks[i].notify_ptr);
400                 }
401         }
402 }
403
404
405 /*
406   send notifications for all pending locks - the file is being closed by this
407   user
408 */
409 static void brl_tdb_notify_all(struct brl_context *brl,
410                            struct lock_struct *locks, int count)
411 {
412         int i;
413         for (i=0;i<count;i++) {
414                 if (locks->lock_type >= PENDING_READ_LOCK) {
415                         brl_tdb_notify_unlock(brl, locks, count, &locks[i]);
416                 }
417         }
418 }
419
420
421
422 /*
423  Unlock a range of bytes.
424 */
425 static NTSTATUS brl_tdb_unlock(struct brl_context *brl,
426                            struct brl_handle *brlh, 
427                            uint16_t smbpid,
428                            uint64_t start, uint64_t size)
429 {
430         TDB_DATA kbuf, dbuf;
431         int count, i;
432         struct lock_struct *locks, *lock;
433         struct lock_context context;
434         NTSTATUS status;
435
436         kbuf.dptr = brlh->key.data;
437         kbuf.dsize = brlh->key.length;
438
439         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
440                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
441         }
442
443         dbuf = tdb_fetch(brl->w->tdb, kbuf);
444         if (!dbuf.dptr) {
445                 tdb_chainunlock(brl->w->tdb, kbuf);
446                 return NT_STATUS_RANGE_NOT_LOCKED;
447         }
448
449         context.smbpid = smbpid;
450         context.server = brl->server;
451         context.ctx = brl;
452
453         /* there are existing locks - find a match */
454         locks = (struct lock_struct *)dbuf.dptr;
455         count = dbuf.dsize / sizeof(*locks);
456
457         for (i=0; i<count; i++) {
458                 lock = &locks[i];
459                 if (brl_tdb_same_context(&lock->context, &context) &&
460                     lock->ntvfs == brlh->ntvfs &&
461                     lock->start == start &&
462                     lock->size == size &&
463                     lock->lock_type == WRITE_LOCK) {
464                         break;
465                 }
466         }
467         if (i < count) goto found;
468
469         for (i=0; i<count; i++) {
470                 lock = &locks[i];
471                 if (brl_tdb_same_context(&lock->context, &context) &&
472                     lock->ntvfs == brlh->ntvfs &&
473                     lock->start == start &&
474                     lock->size == size &&
475                     lock->lock_type < PENDING_READ_LOCK) {
476                         break;
477                 }
478         }
479
480 found:
481         if (i < count) {
482                 /* found it - delete it */
483                 if (count == 1) {
484                         if (tdb_delete(brl->w->tdb, kbuf) != 0) {
485                                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
486                                 goto fail;
487                         }
488                 } else {
489                         struct lock_struct removed_lock = *lock;
490                         if (i < count-1) {
491                                 memmove(&locks[i], &locks[i+1], 
492                                         sizeof(*locks)*((count-1) - i));
493                         }
494                         count--;
495                         
496                         /* send notifications for any relevant pending locks */
497                         brl_tdb_notify_unlock(brl, locks, count, &removed_lock);
498                         
499                         dbuf.dsize = count * sizeof(*locks);
500                         
501                         if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
502                                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
503                                 goto fail;
504                         }
505                 }
506                 
507                 free(dbuf.dptr);
508                 tdb_chainunlock(brl->w->tdb, kbuf);
509                 return NT_STATUS_OK;
510         }
511         
512         /* we didn't find it */
513         status = NT_STATUS_RANGE_NOT_LOCKED;
514
515  fail:
516         free(dbuf.dptr);
517         tdb_chainunlock(brl->w->tdb, kbuf);
518         return status;
519 }
520
521
522 /*
523   remove a pending lock. This is called when the caller has either
524   given up trying to establish a lock or when they have succeeded in
525   getting it. In either case they no longer need to be notified.
526 */
527 static NTSTATUS brl_tdb_remove_pending(struct brl_context *brl,
528                                    struct brl_handle *brlh, 
529                                    void *notify_ptr)
530 {
531         TDB_DATA kbuf, dbuf;
532         int count, i;
533         struct lock_struct *locks;
534         NTSTATUS status;
535
536         kbuf.dptr = brlh->key.data;
537         kbuf.dsize = brlh->key.length;
538
539         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
540                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
541         }
542
543         dbuf = tdb_fetch(brl->w->tdb, kbuf);
544         if (!dbuf.dptr) {
545                 tdb_chainunlock(brl->w->tdb, kbuf);
546                 return NT_STATUS_RANGE_NOT_LOCKED;
547         }
548
549         /* there are existing locks - find a match */
550         locks = (struct lock_struct *)dbuf.dptr;
551         count = dbuf.dsize / sizeof(*locks);
552
553         for (i=0; i<count; i++) {
554                 struct lock_struct *lock = &locks[i];
555                 
556                 if (lock->lock_type >= PENDING_READ_LOCK &&
557                     lock->notify_ptr == notify_ptr &&
558                     cluster_id_equal(&lock->context.server, &brl->server)) {
559                         /* found it - delete it */
560                         if (count == 1) {
561                                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
562                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
563                                         goto fail;
564                                 }
565                         } else {
566                                 if (i < count-1) {
567                                         memmove(&locks[i], &locks[i+1], 
568                                                 sizeof(*locks)*((count-1) - i));
569                                 }
570                                 count--;
571                                 dbuf.dsize = count * sizeof(*locks);
572                                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
573                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
574                                         goto fail;
575                                 }
576                         }
577                         
578                         free(dbuf.dptr);
579                         tdb_chainunlock(brl->w->tdb, kbuf);
580                         return NT_STATUS_OK;
581                 }
582         }
583         
584         /* we didn't find it */
585         status = NT_STATUS_RANGE_NOT_LOCKED;
586
587  fail:
588         free(dbuf.dptr);
589         tdb_chainunlock(brl->w->tdb, kbuf);
590         return status;
591 }
592
593
594 /*
595   Test if we are allowed to perform IO on a region of an open file
596 */
597 static NTSTATUS brl_tdb_locktest(struct brl_context *brl,
598                              struct brl_handle *brlh,
599                              uint16_t smbpid, 
600                              uint64_t start, uint64_t size, 
601                              enum brl_type lock_type)
602 {
603         TDB_DATA kbuf, dbuf;
604         int count, i;
605         struct lock_struct lock, *locks;
606
607         kbuf.dptr = brlh->key.data;
608         kbuf.dsize = brlh->key.length;
609
610         dbuf = tdb_fetch(brl->w->tdb, kbuf);
611         if (dbuf.dptr == NULL) {
612                 return NT_STATUS_OK;
613         }
614
615         lock.context.smbpid = smbpid;
616         lock.context.server = brl->server;
617         lock.context.ctx = brl;
618         lock.ntvfs = brlh->ntvfs;
619         lock.start = start;
620         lock.size = size;
621         lock.lock_type = lock_type;
622
623         /* there are existing locks - make sure they don't conflict */
624         locks = (struct lock_struct *)dbuf.dptr;
625         count = dbuf.dsize / sizeof(*locks);
626
627         for (i=0; i<count; i++) {
628                 if (brl_tdb_conflict_other(&locks[i], &lock)) {
629                         free(dbuf.dptr);
630                         return NT_STATUS_FILE_LOCK_CONFLICT;
631                 }
632         }
633
634         free(dbuf.dptr);
635         return NT_STATUS_OK;
636 }
637
638
639 /*
640  Remove any locks associated with a open file.
641 */
642 static NTSTATUS brl_tdb_close(struct brl_context *brl,
643                           struct brl_handle *brlh)
644 {
645         TDB_DATA kbuf, dbuf;
646         int count, i, dcount=0;
647         struct lock_struct *locks;
648         NTSTATUS status;
649
650         kbuf.dptr = brlh->key.data;
651         kbuf.dsize = brlh->key.length;
652
653         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
654                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
655         }
656
657         dbuf = tdb_fetch(brl->w->tdb, kbuf);
658         if (!dbuf.dptr) {
659                 tdb_chainunlock(brl->w->tdb, kbuf);
660                 return NT_STATUS_OK;
661         }
662
663         /* there are existing locks - remove any for this fnum */
664         locks = (struct lock_struct *)dbuf.dptr;
665         count = dbuf.dsize / sizeof(*locks);
666
667         for (i=0; i<count; i++) {
668                 struct lock_struct *lock = &locks[i];
669
670                 if (lock->context.ctx == brl &&
671                     cluster_id_equal(&lock->context.server, &brl->server) &&
672                     lock->ntvfs == brlh->ntvfs) {
673                         /* found it - delete it */
674                         if (count > 1 && i < count-1) {
675                                 memmove(&locks[i], &locks[i+1], 
676                                         sizeof(*locks)*((count-1) - i));
677                         }
678                         count--;
679                         i--;
680                         dcount++;
681                 }
682         }
683
684         status = NT_STATUS_OK;
685
686         if (count == 0) {
687                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
688                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
689                 }
690         } else if (dcount != 0) {
691                 /* tell all pending lock holders for this file that
692                    they have a chance now. This is a bit indiscriminant,
693                    but works OK */
694                 brl_tdb_notify_all(brl, locks, count);
695
696                 dbuf.dsize = count * sizeof(*locks);
697
698                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
699                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
700                 }
701         }
702
703         free(dbuf.dptr);
704         tdb_chainunlock(brl->w->tdb, kbuf);
705
706         return status;
707 }
708
709
710 static const struct brlock_ops brlock_tdb_ops = {
711         .brl_init           = brl_tdb_init,
712         .brl_create_handle  = brl_tdb_create_handle,
713         .brl_lock           = brl_tdb_lock,
714         .brl_unlock         = brl_tdb_unlock,
715         .brl_remove_pending = brl_tdb_remove_pending,
716         .brl_locktest       = brl_tdb_locktest,
717         .brl_close          = brl_tdb_close
718 };
719
720
721 void brl_tdb_init_ops(void)
722 {
723         brl_set_ops(&brlock_tdb_ops);
724 }