2 Unix SMB/CIFS implementation.
4 generic byte range locking code - ctdb backend
6 Copyright (C) Andrew Tridgell 2006
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "system/filesys.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "messaging/messaging.h"
26 #include "lib/messaging/irpc.h"
27 #include "libcli/libcli.h"
28 #include "cluster/cluster.h"
29 #include "ntvfs/ntvfs.h"
30 #include "ntvfs/common/brlock.h"
31 #include "include/ctdb.h"
33 enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2,
34 FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
38 in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
39 a file. For a local posix filesystem this will usually be a combination
40 of the device and inode numbers of the file, but it can be anything
41 that uniquely idetifies a file for locking purposes, as long
42 as it is applied consistently.
45 /* this struct is typically attached to tcon */
47 struct ctdb_context *ctdb;
48 struct ctdb_db_context *ctdb_db;
49 struct server_id server;
50 struct messaging_context *messaging_ctx;
54 the lock context contains the elements that define whether one
55 lock is the same as another lock
58 struct server_id server;
60 struct brl_context *ctx;
63 /* The data in brlock records is an unsorted linear array of these
64 records. It is unnecessary to store the count as tdb provides the
67 struct lock_context context;
68 struct ntvfs_handle *ntvfs;
71 enum brl_type lock_type;
75 /* this struct is attached to on open file handle */
78 struct ntvfs_handle *ntvfs;
79 struct lock_struct last_lock;
83 static void show_locks(const char *op, struct lock_struct *locks, int count)
86 DEBUG(0,("OP: %s\n", op));
87 if (locks == NULL) return;
88 for (i=0;i<count;i++) {
89 DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
90 i, (int)locks[i].start, (int)locks[i].size,
91 locks[i].context.server.node,
92 locks[i].context.server.id,
93 locks[i].context.smbpid,
101 Open up the brlock.tdb database. Close it down using
102 talloc_free(). We need the messaging_ctx to allow for
103 pending lock notifications.
105 static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server,
106 struct messaging_context *messaging_ctx)
108 struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
109 struct ctdb_context);
110 struct brl_context *brl;
112 brl = talloc(mem_ctx, struct brl_context);
118 brl->ctdb_db = ctdb_db_handle(ctdb, "brlock");
119 if (brl->ctdb_db == NULL) {
120 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
124 brl->server = server;
125 brl->messaging_ctx = messaging_ctx;
130 static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs,
133 struct brl_handle *brlh;
135 brlh = talloc(mem_ctx, struct brl_handle);
140 brlh->key = *file_key;
142 ZERO_STRUCT(brlh->last_lock);
148 see if two locking contexts are equal
150 static bool brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
152 return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
153 ctx1->smbpid == ctx2->smbpid &&
154 ctx1->ctx == ctx2->ctx);
158 see if lck1 and lck2 overlap
160 static bool brl_ctdb_overlap(struct lock_struct *lck1,
161 struct lock_struct *lck2)
163 /* this extra check is not redundent - it copes with locks
164 that go beyond the end of 64 bit file space */
165 if (lck1->size != 0 &&
166 lck1->start == lck2->start &&
167 lck1->size == lck2->size) {
171 if (lck1->start >= (lck2->start+lck2->size) ||
172 lck2->start >= (lck1->start+lck1->size)) {
179 See if lock2 can be added when lock1 is in place.
181 static bool brl_ctdb_conflict(struct lock_struct *lck1,
182 struct lock_struct *lck2)
184 /* pending locks don't conflict with anything */
185 if (lck1->lock_type >= PENDING_READ_LOCK ||
186 lck2->lock_type >= PENDING_READ_LOCK) {
190 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
194 if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
195 lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
199 return brl_ctdb_overlap(lck1, lck2);
204 Check to see if this lock conflicts, but ignore our own locks on the
207 static bool brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
209 /* pending locks don't conflict with anything */
210 if (lck1->lock_type >= PENDING_READ_LOCK ||
211 lck2->lock_type >= PENDING_READ_LOCK) {
215 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK)
219 * note that incoming write calls conflict with existing READ
220 * locks even if the context is the same. JRA. See LOCKTEST7
223 if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
224 lck1->ntvfs == lck2->ntvfs &&
225 (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
229 return brl_ctdb_overlap(lck1, lck2);
234 amazingly enough, w2k3 "remembers" whether the last lock failure
235 is the same as this one and changes its error code. I wonder if any
238 static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
241 * this function is only called for non pending lock!
244 /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
245 if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
246 return NT_STATUS_LOCK_NOT_GRANTED;
250 * if the notify_ptr is non NULL,
251 * it means that we're at the end of a pending lock
252 * and the real lock is requested after the timeout went by
253 * In this case we need to remember the last_lock and always
254 * give FILE_LOCK_CONFLICT
256 if (lock->notify_ptr) {
257 brlh->last_lock = *lock;
258 return NT_STATUS_FILE_LOCK_CONFLICT;
262 * amazing the little things you learn with a test
263 * suite. Locks beyond this offset (as a 64 bit
264 * number!) always generate the conflict error code,
265 * unless the top bit is set
267 if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
268 brlh->last_lock = *lock;
269 return NT_STATUS_FILE_LOCK_CONFLICT;
273 * if the current lock matches the last failed lock on the file handle
274 * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
276 if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
277 lock->context.ctx == brlh->last_lock.context.ctx &&
278 lock->ntvfs == brlh->last_lock.ntvfs &&
279 lock->start == brlh->last_lock.start) {
280 return NT_STATUS_FILE_LOCK_CONFLICT;
283 brlh->last_lock = *lock;
284 return NT_STATUS_LOCK_NOT_GRANTED;
287 struct ctdb_lock_req {
291 enum brl_type lock_type;
293 struct server_id server;
294 struct brl_context *brl;
295 struct ntvfs_handle *ntvfs;
299 ctdb call handling brl_lock()
301 static int brl_ctdb_lock_func(struct ctdb_call_info *call)
303 struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
306 struct lock_struct lock, *locks=NULL;
307 NTSTATUS status = NT_STATUS_OK;
309 /* if this is a pending lock, then with the chainlock held we
310 try to get the real lock. If we succeed then we don't need
311 to make it pending. This prevents a possible race condition
312 where the pending lock gets created after the lock that is
313 preventing the real lock gets removed */
314 if (req->lock_type >= PENDING_READ_LOCK) {
315 enum brl_type lock_type = req->lock_type;
316 req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
317 if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
320 req->lock_type = lock_type;
323 dbuf = call->record_data;
326 lock.context.smbpid = req->smbpid;
327 lock.context.server = req->server;
328 lock.context.ctx = req->brl;
329 lock.ntvfs = req->ntvfs;
330 lock.start = req->start;
331 lock.size = req->size;
332 lock.lock_type = req->lock_type;
333 lock.notify_ptr = req->notify_ptr;
336 /* there are existing locks - make sure they don't conflict */
337 locks = (struct lock_struct *)dbuf.dptr;
338 count = dbuf.dsize / sizeof(*locks);
340 for (i=0; i<count; i++) {
341 if (brl_ctdb_conflict(&locks[i], &lock)) {
342 status = NT_STATUS_LOCK_NOT_GRANTED;
348 call->new_data = talloc(call, TDB_DATA);
349 if (call->new_data == NULL) {
350 return CTDB_ERR_NOMEM;
353 call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
354 if (call->new_data->dptr == NULL) {
355 return CTDB_ERR_NOMEM;
357 memcpy(call->new_data->dptr, locks, dbuf.dsize);
358 memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
359 call->new_data->dsize = dbuf.dsize + sizeof(lock);
361 if (req->lock_type >= PENDING_READ_LOCK) {
362 status = NT_STATUS_LOCK_NOT_GRANTED;
366 call->status = NT_STATUS_V(status);
373 Lock a range of bytes. The lock_type can be a PENDING_*_LOCK, in
374 which case a real lock is first tried, and if that fails then a
375 pending lock is created. When the pending lock is triggered (by
376 someone else closing an overlapping lock range) a messaging
377 notification is sent, identified by the notify_ptr
379 static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
380 struct brl_handle *brlh,
382 uint64_t start, uint64_t size,
383 enum brl_type lock_type,
386 struct ctdb_lock_req req;
387 struct ctdb_call call;
391 call.call_id = FUNC_BRL_LOCK;
392 call.key.dptr = brlh->key.data;
393 call.key.dsize = brlh->key.length;
394 call.call_data.dptr = (uint8_t *)&req;
395 call.call_data.dsize = sizeof(req);
403 req.lock_type = lock_type;
404 req.notify_ptr = notify_ptr;
405 req.server = brl->server;
407 req.ntvfs = brlh->ntvfs;
409 ret = ctdb_call(brl->ctdb_db, &call);
411 return NT_STATUS_INTERNAL_DB_CORRUPTION;
414 status = NT_STATUS(call.status);
416 if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
417 struct lock_struct lock;
418 lock.context.smbpid = smbpid;
419 lock.context.server = brl->server;
420 lock.context.ctx = brl;
421 lock.ntvfs = brlh->ntvfs;
424 lock.lock_type = lock_type;
425 lock.notify_ptr = notify_ptr;
426 status = brl_ctdb_lock_failed(brlh, &lock);
433 we are removing a lock that might be holding up a pending lock. Scan
434 for pending locks that cover this range and if we find any then
435 notify the server that it should retry the lock. In this backend, we
436 notify by sending the list of locks that need to be notified on back
437 in the reply_data of the ctdb call. The caller then does the
440 static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
441 struct lock_struct *locks, int count,
442 struct lock_struct *removed_lock)
446 /* the last_notice logic is to prevent stampeding on a lock
447 range. It prevents us sending hundreds of notifies on the
448 same range of bytes. It doesn't prevent all possible
449 stampedes, but it does prevent the most common problem */
452 for (i=0;i<count;i++) {
453 if (locks[i].lock_type >= PENDING_READ_LOCK &&
454 brl_ctdb_overlap(&locks[i], removed_lock)) {
455 struct lock_struct *nlocks;
458 if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
461 if (locks[i].lock_type == PENDING_WRITE_LOCK) {
464 if (call->reply_data == NULL) {
465 call->reply_data = talloc_zero(call, TDB_DATA);
466 if (call->reply_data == NULL) {
467 return CTDB_ERR_NOMEM;
470 /* add to the list of pending locks to notify caller of */
471 ncount = call->reply_data->dsize / sizeof(struct lock_struct);
472 nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr,
473 struct lock_struct, ncount + 1);
474 if (nlocks == NULL) {
475 return CTDB_ERR_NOMEM;
477 call->reply_data->dptr = (uint8_t *)nlocks;
478 nlocks[ncount] = locks[i];
479 call->reply_data->dsize += sizeof(struct lock_struct);
487 send notifications for all pending locks - the file is being closed by this
490 static int brl_ctdb_notify_all(struct ctdb_call_info *call,
491 struct lock_struct *locks, int count)
494 for (i=0;i<count;i++) {
495 if (locks->lock_type >= PENDING_READ_LOCK) {
496 int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
497 if (ret != 0) return ret;
504 send off any messages needed to notify of pending locks that should now retry
506 static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
508 struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
509 int i, count = reply_data->dsize / sizeof(struct lock_struct);
510 for (i=0;i<count;i++) {
511 messaging_send_ptr(brl->messaging_ctx, locks[i].context.server,
512 MSG_BRL_RETRY, locks[i].notify_ptr);
517 struct ctdb_unlock_req {
521 struct server_id server;
522 struct brl_context *brl;
523 struct ntvfs_handle *ntvfs;
527 Unlock a range of bytes.
529 static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
531 struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
534 struct lock_struct *locks, *lock;
535 struct lock_context context;
536 NTSTATUS status = NT_STATUS_OK;
538 dbuf = call->record_data;
540 context.smbpid = req->smbpid;
541 context.server = req->server;
542 context.ctx = req->brl;
544 /* there are existing locks - find a match */
545 locks = (struct lock_struct *)dbuf.dptr;
546 count = dbuf.dsize / sizeof(*locks);
548 for (i=0; i<count; i++) {
550 if (brl_ctdb_same_context(&lock->context, &context) &&
551 lock->ntvfs == req->ntvfs &&
552 lock->start == req->start &&
553 lock->size == req->size &&
554 lock->lock_type == WRITE_LOCK) {
558 if (i < count) goto found;
560 for (i=0; i<count; i++) {
562 if (brl_ctdb_same_context(&lock->context, &context) &&
563 lock->ntvfs == req->ntvfs &&
564 lock->start == req->start &&
565 lock->size == req->size &&
566 lock->lock_type < PENDING_READ_LOCK) {
573 struct lock_struct removed_lock = *lock;
575 call->new_data = talloc(call, TDB_DATA);
576 if (call->new_data == NULL) {
577 return CTDB_ERR_NOMEM;
580 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
581 if (call->new_data->dptr == NULL) {
582 return CTDB_ERR_NOMEM;
584 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
586 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
587 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
588 (count-(i+1))*sizeof(*lock));
591 int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
592 if (ret != 0) return ret;
597 /* we didn't find it */
598 status = NT_STATUS_RANGE_NOT_LOCKED;
601 call->status = NT_STATUS_V(status);
608 Unlock a range of bytes.
610 static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
611 struct brl_handle *brlh,
613 uint64_t start, uint64_t size)
615 struct ctdb_call call;
616 struct ctdb_unlock_req req;
619 call.call_id = FUNC_BRL_UNLOCK;
620 call.key.dptr = brlh->key.data;
621 call.key.dsize = brlh->key.length;
622 call.call_data.dptr = (uint8_t *)&req;
623 call.call_data.dsize = sizeof(req);
629 req.server = brl->server;
631 req.ntvfs = brlh->ntvfs;
633 ret = ctdb_call(brl->ctdb_db, &call);
635 DEBUG(0,("ctdb_call failed - %s\n", __location__));
636 return NT_STATUS_INTERNAL_DB_CORRUPTION;
639 brl_ctdb_notify_send(brl, &call.reply_data);
641 return NT_STATUS(call.status);
645 struct ctdb_remove_pending_req {
646 struct server_id server;
651 remove a pending lock. This is called when the caller has either
652 given up trying to establish a lock or when they have succeeded in
653 getting it. In either case they no longer need to be notified.
655 static int brl_ctdb_remove_pending_func(struct ctdb_call_info *call)
657 struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
660 struct lock_struct *locks;
661 NTSTATUS status = NT_STATUS_OK;
663 dbuf = call->record_data;
665 /* there are existing locks - find a match */
666 locks = (struct lock_struct *)dbuf.dptr;
667 count = dbuf.dsize / sizeof(*locks);
669 for (i=0; i<count; i++) {
670 struct lock_struct *lock = &locks[i];
672 if (lock->lock_type >= PENDING_READ_LOCK &&
673 lock->notify_ptr == req->notify_ptr &&
674 cluster_id_equal(&lock->context.server, &req->server)) {
675 call->new_data = talloc(call, TDB_DATA);
676 if (call->new_data == NULL) {
677 return CTDB_ERR_NOMEM;
680 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
681 if (call->new_data->dptr == NULL) {
682 return CTDB_ERR_NOMEM;
684 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
686 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
687 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
688 (count-(i+1))*sizeof(*lock));
694 /* we didn't find it */
695 status = NT_STATUS_RANGE_NOT_LOCKED;
698 call->status = NT_STATUS_V(status);
703 static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
704 struct brl_handle *brlh,
707 struct ctdb_call call;
708 struct ctdb_remove_pending_req req;
711 call.call_id = FUNC_BRL_REMOVE_PENDING;
712 call.key.dptr = brlh->key.data;
713 call.key.dsize = brlh->key.length;
714 call.call_data.dptr = (uint8_t *)&req;
715 call.call_data.dsize = sizeof(req);
718 req.notify_ptr = notify_ptr;
719 req.server = brl->server;
721 ret = ctdb_call(brl->ctdb_db, &call);
723 DEBUG(0,("ctdb_call failed - %s\n", __location__));
724 return NT_STATUS_INTERNAL_DB_CORRUPTION;
727 return NT_STATUS(call.status);
731 struct ctdb_locktest_req {
735 enum brl_type lock_type;
736 struct brl_context *brl;
737 struct server_id server;
738 struct ntvfs_handle *ntvfs;
742 remove a pending lock. This is called when the caller has either
743 given up trying to establish a lock or when they have succeeded in
744 getting it. In either case they no longer need to be notified.
746 static int brl_ctdb_locktest_func(struct ctdb_call_info *call)
748 struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
751 struct lock_struct *locks, lock;
752 NTSTATUS status = NT_STATUS_OK;
754 lock.context.smbpid = req->smbpid;
755 lock.context.server = req->server;
756 lock.context.ctx = req->brl;
757 lock.ntvfs = req->ntvfs;
758 lock.start = req->start;
759 lock.size = req->size;
760 lock.lock_type = req->lock_type;
762 dbuf = call->record_data;
764 /* there are existing locks - find a match */
765 locks = (struct lock_struct *)dbuf.dptr;
766 count = dbuf.dsize / sizeof(*locks);
768 for (i=0; i<count; i++) {
769 if (brl_ctdb_conflict_other(&locks[i], &lock)) {
770 status = NT_STATUS_FILE_LOCK_CONFLICT;
775 call->status = NT_STATUS_V(status);
781 Test if we are allowed to perform IO on a region of an open file
783 static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
784 struct brl_handle *brlh,
786 uint64_t start, uint64_t size,
787 enum brl_type lock_type)
789 struct ctdb_call call;
790 struct ctdb_locktest_req req;
793 call.call_id = FUNC_BRL_LOCKTEST;
794 call.key.dptr = brlh->key.data;
795 call.key.dsize = brlh->key.length;
796 call.call_data.dptr = (uint8_t *)&req;
797 call.call_data.dsize = sizeof(req);
803 req.lock_type = lock_type;
804 req.server = brl->server;
806 req.ntvfs = brlh->ntvfs;
808 ret = ctdb_call(brl->ctdb_db, &call);
810 DEBUG(0,("ctdb_call failed - %s\n", __location__));
811 return NT_STATUS_INTERNAL_DB_CORRUPTION;
814 return NT_STATUS(call.status);
818 struct ctdb_close_req {
819 struct brl_context *brl;
820 struct server_id server;
821 struct ntvfs_handle *ntvfs;
825 remove a pending lock. This is called when the caller has either
826 given up trying to establish a lock or when they have succeeded in
827 getting it. In either case they no longer need to be notified.
829 static int brl_ctdb_close_func(struct ctdb_call_info *call)
831 struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
833 int count, dcount=0, i;
834 struct lock_struct *locks;
835 NTSTATUS status = NT_STATUS_OK;
837 dbuf = call->record_data;
839 /* there are existing locks - find a match */
840 locks = (struct lock_struct *)dbuf.dptr;
841 count = dbuf.dsize / sizeof(*locks);
843 for (i=0; i<count; i++) {
844 struct lock_struct *lock = &locks[i];
846 if (lock->context.ctx == req->brl &&
847 cluster_id_equal(&lock->context.server, &req->server) &&
848 lock->ntvfs == req->ntvfs) {
849 /* found it - delete it */
850 if (count > 1 && i < count-1) {
851 memmove(&locks[i], &locks[i+1],
852 sizeof(*locks)*((count-1) - i));
861 call->new_data = talloc(call, TDB_DATA);
862 if (call->new_data == NULL) {
863 return CTDB_ERR_NOMEM;
866 brl_ctdb_notify_all(call, locks, count);
868 call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
869 if (call->new_data->dptr == NULL) {
870 return CTDB_ERR_NOMEM;
872 call->new_data->dsize = count*sizeof(struct lock_struct);
874 memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
877 call->status = NT_STATUS_V(status);
883 Test if we are allowed to perform IO on a region of an open file
885 static NTSTATUS brl_ctdb_close(struct brl_context *brl,
886 struct brl_handle *brlh)
888 struct ctdb_call call;
889 struct ctdb_close_req req;
892 call.call_id = FUNC_BRL_CLOSE;
893 call.key.dptr = brlh->key.data;
894 call.key.dsize = brlh->key.length;
895 call.call_data.dptr = (uint8_t *)&req;
896 call.call_data.dsize = sizeof(req);
900 req.server = brl->server;
901 req.ntvfs = brlh->ntvfs;
903 ret = ctdb_call(brl->ctdb_db, &call);
905 DEBUG(0,("ctdb_call failed - %s\n", __location__));
906 return NT_STATUS_INTERNAL_DB_CORRUPTION;
909 brl_ctdb_notify_send(brl, &call.reply_data);
911 return NT_STATUS(call.status);
915 static const struct brlock_ops brlock_tdb_ops = {
916 .brl_init = brl_ctdb_init,
917 .brl_create_handle = brl_ctdb_create_handle,
918 .brl_lock = brl_ctdb_lock,
919 .brl_unlock = brl_ctdb_unlock,
920 .brl_remove_pending = brl_ctdb_remove_pending,
921 .brl_locktest = brl_ctdb_locktest,
922 .brl_close = brl_ctdb_close
926 void brl_ctdb_init_ops(void)
928 struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
929 struct ctdb_context);
930 struct ctdb_db_context *ctdb_db;
932 brl_set_ops(&brlock_tdb_ops);
934 ctdb_db = ctdb_db_handle(ctdb, "brlock");
935 if (ctdb_db == NULL) {
936 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
940 ctdb_set_call(ctdb_db, brl_ctdb_lock_func, FUNC_BRL_LOCK);
941 ctdb_set_call(ctdb_db, brl_ctdb_unlock_func, FUNC_BRL_UNLOCK);
942 ctdb_set_call(ctdb_db, brl_ctdb_remove_pending_func, FUNC_BRL_REMOVE_PENDING);
943 ctdb_set_call(ctdb_db, brl_ctdb_locktest_func, FUNC_BRL_LOCKTEST);
944 ctdb_set_call(ctdb_db, brl_ctdb_close_func, FUNC_BRL_CLOSE);