2 Unix SMB/CIFS implementation.
4 generic byte range locking code - ctdb backend
6 Copyright (C) Andrew Tridgell 2006
8 This program is free software; you can redistribute it and/or modify
9 it under the terms of the GNU General Public License as published by
10 the Free Software Foundation; either version 3 of the License, or
11 (at your option) any later version.
13 This program is distributed in the hope that it will be useful,
14 but WITHOUT ANY WARRANTY; without even the implied warranty of
15 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 GNU General Public License for more details.
18 You should have received a copy of the GNU General Public License
19 along with this program. If not, see <http://www.gnu.org/licenses/>.
23 #include "system/filesys.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "messaging/messaging.h"
27 #include "lib/messaging/irpc.h"
28 #include "libcli/libcli.h"
29 #include "cluster/cluster.h"
30 #include "ntvfs/ntvfs.h"
31 #include "ntvfs/common/brlock.h"
32 #include "include/ctdb.h"
34 enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2,
35 FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
39 in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
40 a file. For a local posix filesystem this will usually be a combination
41 of the device and inode numbers of the file, but it can be anything
42 that uniquely idetifies a file for locking purposes, as long
43 as it is applied consistently.
46 /* this struct is typically attached to tcon */
48 struct ctdb_context *ctdb;
49 struct ctdb_db_context *ctdb_db;
50 struct server_id server;
51 struct messaging_context *messaging_ctx;
55 the lock context contains the elements that define whether one
56 lock is the same as another lock
59 struct server_id server;
61 struct brl_context *ctx;
64 /* The data in brlock records is an unsorted linear array of these
65 records. It is unnecessary to store the count as tdb provides the
68 struct lock_context context;
69 struct ntvfs_handle *ntvfs;
72 enum brl_type lock_type;
76 /* this struct is attached to on open file handle */
79 struct ntvfs_handle *ntvfs;
80 struct lock_struct last_lock;
84 static void show_locks(const char *op, struct lock_struct *locks, int count)
87 DEBUG(0,("OP: %s\n", op));
88 if (locks == NULL) return;
89 for (i=0;i<count;i++) {
90 DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
91 i, (int)locks[i].start, (int)locks[i].size,
92 locks[i].context.server.node,
93 locks[i].context.server.id,
94 locks[i].context.smbpid,
102 Open up the brlock.tdb database. Close it down using
103 talloc_free(). We need the messaging_ctx to allow for
104 pending lock notifications.
106 static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server,
107 struct messaging_context *messaging_ctx)
109 struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
110 struct ctdb_context);
111 struct brl_context *brl;
113 brl = talloc(mem_ctx, struct brl_context);
119 brl->ctdb_db = ctdb_db_handle(ctdb, "brlock");
120 if (brl->ctdb_db == NULL) {
121 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
125 brl->server = server;
126 brl->messaging_ctx = messaging_ctx;
131 static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs,
134 struct brl_handle *brlh;
136 brlh = talloc(mem_ctx, struct brl_handle);
141 brlh->key = *file_key;
143 ZERO_STRUCT(brlh->last_lock);
149 see if two locking contexts are equal
151 static bool brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
153 return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
154 ctx1->smbpid == ctx2->smbpid &&
155 ctx1->ctx == ctx2->ctx);
159 see if lck1 and lck2 overlap
161 static bool brl_ctdb_overlap(struct lock_struct *lck1,
162 struct lock_struct *lck2)
164 /* this extra check is not redundent - it copes with locks
165 that go beyond the end of 64 bit file space */
166 if (lck1->size != 0 &&
167 lck1->start == lck2->start &&
168 lck1->size == lck2->size) {
172 if (lck1->start >= (lck2->start+lck2->size) ||
173 lck2->start >= (lck1->start+lck1->size)) {
180 See if lock2 can be added when lock1 is in place.
182 static bool brl_ctdb_conflict(struct lock_struct *lck1,
183 struct lock_struct *lck2)
185 /* pending locks don't conflict with anything */
186 if (lck1->lock_type >= PENDING_READ_LOCK ||
187 lck2->lock_type >= PENDING_READ_LOCK) {
191 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
195 if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
196 lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
200 return brl_ctdb_overlap(lck1, lck2);
205 Check to see if this lock conflicts, but ignore our own locks on the
208 static bool brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
210 /* pending locks don't conflict with anything */
211 if (lck1->lock_type >= PENDING_READ_LOCK ||
212 lck2->lock_type >= PENDING_READ_LOCK) {
216 if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK)
220 * note that incoming write calls conflict with existing READ
221 * locks even if the context is the same. JRA. See LOCKTEST7
224 if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
225 lck1->ntvfs == lck2->ntvfs &&
226 (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
230 return brl_ctdb_overlap(lck1, lck2);
235 amazingly enough, w2k3 "remembers" whether the last lock failure
236 is the same as this one and changes its error code. I wonder if any
239 static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
242 * this function is only called for non pending lock!
245 /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
246 if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
247 return NT_STATUS_LOCK_NOT_GRANTED;
251 * if the notify_ptr is non NULL,
252 * it means that we're at the end of a pending lock
253 * and the real lock is requested after the timeout went by
254 * In this case we need to remember the last_lock and always
255 * give FILE_LOCK_CONFLICT
257 if (lock->notify_ptr) {
258 brlh->last_lock = *lock;
259 return NT_STATUS_FILE_LOCK_CONFLICT;
263 * amazing the little things you learn with a test
264 * suite. Locks beyond this offset (as a 64 bit
265 * number!) always generate the conflict error code,
266 * unless the top bit is set
268 if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
269 brlh->last_lock = *lock;
270 return NT_STATUS_FILE_LOCK_CONFLICT;
274 * if the current lock matches the last failed lock on the file handle
275 * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
277 if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
278 lock->context.ctx == brlh->last_lock.context.ctx &&
279 lock->ntvfs == brlh->last_lock.ntvfs &&
280 lock->start == brlh->last_lock.start) {
281 return NT_STATUS_FILE_LOCK_CONFLICT;
284 brlh->last_lock = *lock;
285 return NT_STATUS_LOCK_NOT_GRANTED;
288 struct ctdb_lock_req {
292 enum brl_type lock_type;
294 struct server_id server;
295 struct brl_context *brl;
296 struct ntvfs_handle *ntvfs;
300 ctdb call handling brl_lock()
302 static int brl_ctdb_lock_func(struct ctdb_call_info *call)
304 struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
307 struct lock_struct lock, *locks=NULL;
308 NTSTATUS status = NT_STATUS_OK;
310 /* if this is a pending lock, then with the chainlock held we
311 try to get the real lock. If we succeed then we don't need
312 to make it pending. This prevents a possible race condition
313 where the pending lock gets created after the lock that is
314 preventing the real lock gets removed */
315 if (req->lock_type >= PENDING_READ_LOCK) {
316 enum brl_type lock_type = req->lock_type;
317 req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
318 if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
321 req->lock_type = lock_type;
324 dbuf = call->record_data;
327 lock.context.smbpid = req->smbpid;
328 lock.context.server = req->server;
329 lock.context.ctx = req->brl;
330 lock.ntvfs = req->ntvfs;
331 lock.start = req->start;
332 lock.size = req->size;
333 lock.lock_type = req->lock_type;
334 lock.notify_ptr = req->notify_ptr;
337 /* there are existing locks - make sure they don't conflict */
338 locks = (struct lock_struct *)dbuf.dptr;
339 count = dbuf.dsize / sizeof(*locks);
341 for (i=0; i<count; i++) {
342 if (brl_ctdb_conflict(&locks[i], &lock)) {
343 status = NT_STATUS_LOCK_NOT_GRANTED;
349 call->new_data = talloc(call, TDB_DATA);
350 if (call->new_data == NULL) {
351 return CTDB_ERR_NOMEM;
354 call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
355 if (call->new_data->dptr == NULL) {
356 return CTDB_ERR_NOMEM;
358 memcpy(call->new_data->dptr, locks, dbuf.dsize);
359 memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
360 call->new_data->dsize = dbuf.dsize + sizeof(lock);
362 if (req->lock_type >= PENDING_READ_LOCK) {
363 status = NT_STATUS_LOCK_NOT_GRANTED;
367 call->status = NT_STATUS_V(status);
374 Lock a range of bytes. The lock_type can be a PENDING_*_LOCK, in
375 which case a real lock is first tried, and if that fails then a
376 pending lock is created. When the pending lock is triggered (by
377 someone else closing an overlapping lock range) a messaging
378 notification is sent, identified by the notify_ptr
380 static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
381 struct brl_handle *brlh,
383 uint64_t start, uint64_t size,
384 enum brl_type lock_type,
387 struct ctdb_lock_req req;
388 struct ctdb_call call;
392 call.call_id = FUNC_BRL_LOCK;
393 call.key.dptr = brlh->key.data;
394 call.key.dsize = brlh->key.length;
395 call.call_data.dptr = (uint8_t *)&req;
396 call.call_data.dsize = sizeof(req);
404 req.lock_type = lock_type;
405 req.notify_ptr = notify_ptr;
406 req.server = brl->server;
408 req.ntvfs = brlh->ntvfs;
410 ret = ctdb_call(brl->ctdb_db, &call);
412 return NT_STATUS_INTERNAL_DB_CORRUPTION;
415 status = NT_STATUS(call.status);
417 if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
418 struct lock_struct lock;
419 lock.context.smbpid = smbpid;
420 lock.context.server = brl->server;
421 lock.context.ctx = brl;
422 lock.ntvfs = brlh->ntvfs;
425 lock.lock_type = lock_type;
426 lock.notify_ptr = notify_ptr;
427 status = brl_ctdb_lock_failed(brlh, &lock);
434 we are removing a lock that might be holding up a pending lock. Scan
435 for pending locks that cover this range and if we find any then
436 notify the server that it should retry the lock. In this backend, we
437 notify by sending the list of locks that need to be notified on back
438 in the reply_data of the ctdb call. The caller then does the
441 static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
442 struct lock_struct *locks, int count,
443 struct lock_struct *removed_lock)
447 /* the last_notice logic is to prevent stampeding on a lock
448 range. It prevents us sending hundreds of notifies on the
449 same range of bytes. It doesn't prevent all possible
450 stampedes, but it does prevent the most common problem */
453 for (i=0;i<count;i++) {
454 if (locks[i].lock_type >= PENDING_READ_LOCK &&
455 brl_ctdb_overlap(&locks[i], removed_lock)) {
456 struct lock_struct *nlocks;
459 if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
462 if (locks[i].lock_type == PENDING_WRITE_LOCK) {
465 if (call->reply_data == NULL) {
466 call->reply_data = talloc_zero(call, TDB_DATA);
467 if (call->reply_data == NULL) {
468 return CTDB_ERR_NOMEM;
471 /* add to the list of pending locks to notify caller of */
472 ncount = call->reply_data->dsize / sizeof(struct lock_struct);
473 nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr,
474 struct lock_struct, ncount + 1);
475 if (nlocks == NULL) {
476 return CTDB_ERR_NOMEM;
478 call->reply_data->dptr = (uint8_t *)nlocks;
479 nlocks[ncount] = locks[i];
480 call->reply_data->dsize += sizeof(struct lock_struct);
488 send notifications for all pending locks - the file is being closed by this
491 static int brl_ctdb_notify_all(struct ctdb_call_info *call,
492 struct lock_struct *locks, int count)
495 for (i=0;i<count;i++) {
496 if (locks->lock_type >= PENDING_READ_LOCK) {
497 int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
498 if (ret != 0) return ret;
505 send off any messages needed to notify of pending locks that should now retry
507 static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
509 struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
510 int i, count = reply_data->dsize / sizeof(struct lock_struct);
511 for (i=0;i<count;i++) {
512 messaging_send_ptr(brl->messaging_ctx, locks[i].context.server,
513 MSG_BRL_RETRY, locks[i].notify_ptr);
518 struct ctdb_unlock_req {
522 struct server_id server;
523 struct brl_context *brl;
524 struct ntvfs_handle *ntvfs;
528 Unlock a range of bytes.
530 static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
532 struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
535 struct lock_struct *locks, *lock;
536 struct lock_context context;
537 NTSTATUS status = NT_STATUS_OK;
539 dbuf = call->record_data;
541 context.smbpid = req->smbpid;
542 context.server = req->server;
543 context.ctx = req->brl;
545 /* there are existing locks - find a match */
546 locks = (struct lock_struct *)dbuf.dptr;
547 count = dbuf.dsize / sizeof(*locks);
549 for (i=0; i<count; i++) {
551 if (brl_ctdb_same_context(&lock->context, &context) &&
552 lock->ntvfs == req->ntvfs &&
553 lock->start == req->start &&
554 lock->size == req->size &&
555 lock->lock_type == WRITE_LOCK) {
559 if (i < count) goto found;
561 for (i=0; i<count; i++) {
563 if (brl_ctdb_same_context(&lock->context, &context) &&
564 lock->ntvfs == req->ntvfs &&
565 lock->start == req->start &&
566 lock->size == req->size &&
567 lock->lock_type < PENDING_READ_LOCK) {
574 struct lock_struct removed_lock = *lock;
576 call->new_data = talloc(call, TDB_DATA);
577 if (call->new_data == NULL) {
578 return CTDB_ERR_NOMEM;
581 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
582 if (call->new_data->dptr == NULL) {
583 return CTDB_ERR_NOMEM;
585 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
587 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
588 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
589 (count-(i+1))*sizeof(*lock));
592 int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
593 if (ret != 0) return ret;
598 /* we didn't find it */
599 status = NT_STATUS_RANGE_NOT_LOCKED;
602 call->status = NT_STATUS_V(status);
609 Unlock a range of bytes.
611 static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
612 struct brl_handle *brlh,
614 uint64_t start, uint64_t size)
616 struct ctdb_call call;
617 struct ctdb_unlock_req req;
620 call.call_id = FUNC_BRL_UNLOCK;
621 call.key.dptr = brlh->key.data;
622 call.key.dsize = brlh->key.length;
623 call.call_data.dptr = (uint8_t *)&req;
624 call.call_data.dsize = sizeof(req);
630 req.server = brl->server;
632 req.ntvfs = brlh->ntvfs;
634 ret = ctdb_call(brl->ctdb_db, &call);
636 DEBUG(0,("ctdb_call failed - %s\n", __location__));
637 return NT_STATUS_INTERNAL_DB_CORRUPTION;
640 brl_ctdb_notify_send(brl, &call.reply_data);
642 return NT_STATUS(call.status);
646 struct ctdb_remove_pending_req {
647 struct server_id server;
652 remove a pending lock. This is called when the caller has either
653 given up trying to establish a lock or when they have succeeded in
654 getting it. In either case they no longer need to be notified.
656 static int brl_ctdb_remove_pending_func(struct ctdb_call_info *call)
658 struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
661 struct lock_struct *locks;
662 NTSTATUS status = NT_STATUS_OK;
664 dbuf = call->record_data;
666 /* there are existing locks - find a match */
667 locks = (struct lock_struct *)dbuf.dptr;
668 count = dbuf.dsize / sizeof(*locks);
670 for (i=0; i<count; i++) {
671 struct lock_struct *lock = &locks[i];
673 if (lock->lock_type >= PENDING_READ_LOCK &&
674 lock->notify_ptr == req->notify_ptr &&
675 cluster_id_equal(&lock->context.server, &req->server)) {
676 call->new_data = talloc(call, TDB_DATA);
677 if (call->new_data == NULL) {
678 return CTDB_ERR_NOMEM;
681 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
682 if (call->new_data->dptr == NULL) {
683 return CTDB_ERR_NOMEM;
685 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
687 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
688 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
689 (count-(i+1))*sizeof(*lock));
695 /* we didn't find it */
696 status = NT_STATUS_RANGE_NOT_LOCKED;
699 call->status = NT_STATUS_V(status);
704 static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
705 struct brl_handle *brlh,
708 struct ctdb_call call;
709 struct ctdb_remove_pending_req req;
712 call.call_id = FUNC_BRL_REMOVE_PENDING;
713 call.key.dptr = brlh->key.data;
714 call.key.dsize = brlh->key.length;
715 call.call_data.dptr = (uint8_t *)&req;
716 call.call_data.dsize = sizeof(req);
719 req.notify_ptr = notify_ptr;
720 req.server = brl->server;
722 ret = ctdb_call(brl->ctdb_db, &call);
724 DEBUG(0,("ctdb_call failed - %s\n", __location__));
725 return NT_STATUS_INTERNAL_DB_CORRUPTION;
728 return NT_STATUS(call.status);
732 struct ctdb_locktest_req {
736 enum brl_type lock_type;
737 struct brl_context *brl;
738 struct server_id server;
739 struct ntvfs_handle *ntvfs;
743 remove a pending lock. This is called when the caller has either
744 given up trying to establish a lock or when they have succeeded in
745 getting it. In either case they no longer need to be notified.
747 static int brl_ctdb_locktest_func(struct ctdb_call_info *call)
749 struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
752 struct lock_struct *locks, lock;
753 NTSTATUS status = NT_STATUS_OK;
755 lock.context.smbpid = req->smbpid;
756 lock.context.server = req->server;
757 lock.context.ctx = req->brl;
758 lock.ntvfs = req->ntvfs;
759 lock.start = req->start;
760 lock.size = req->size;
761 lock.lock_type = req->lock_type;
763 dbuf = call->record_data;
765 /* there are existing locks - find a match */
766 locks = (struct lock_struct *)dbuf.dptr;
767 count = dbuf.dsize / sizeof(*locks);
769 for (i=0; i<count; i++) {
770 if (brl_ctdb_conflict_other(&locks[i], &lock)) {
771 status = NT_STATUS_FILE_LOCK_CONFLICT;
776 call->status = NT_STATUS_V(status);
782 Test if we are allowed to perform IO on a region of an open file
784 static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
785 struct brl_handle *brlh,
787 uint64_t start, uint64_t size,
788 enum brl_type lock_type)
790 struct ctdb_call call;
791 struct ctdb_locktest_req req;
794 call.call_id = FUNC_BRL_LOCKTEST;
795 call.key.dptr = brlh->key.data;
796 call.key.dsize = brlh->key.length;
797 call.call_data.dptr = (uint8_t *)&req;
798 call.call_data.dsize = sizeof(req);
804 req.lock_type = lock_type;
805 req.server = brl->server;
807 req.ntvfs = brlh->ntvfs;
809 ret = ctdb_call(brl->ctdb_db, &call);
811 DEBUG(0,("ctdb_call failed - %s\n", __location__));
812 return NT_STATUS_INTERNAL_DB_CORRUPTION;
815 return NT_STATUS(call.status);
819 struct ctdb_close_req {
820 struct brl_context *brl;
821 struct server_id server;
822 struct ntvfs_handle *ntvfs;
826 remove a pending lock. This is called when the caller has either
827 given up trying to establish a lock or when they have succeeded in
828 getting it. In either case they no longer need to be notified.
830 static int brl_ctdb_close_func(struct ctdb_call_info *call)
832 struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
834 int count, dcount=0, i;
835 struct lock_struct *locks;
836 NTSTATUS status = NT_STATUS_OK;
838 dbuf = call->record_data;
840 /* there are existing locks - find a match */
841 locks = (struct lock_struct *)dbuf.dptr;
842 count = dbuf.dsize / sizeof(*locks);
844 for (i=0; i<count; i++) {
845 struct lock_struct *lock = &locks[i];
847 if (lock->context.ctx == req->brl &&
848 cluster_id_equal(&lock->context.server, &req->server) &&
849 lock->ntvfs == req->ntvfs) {
850 /* found it - delete it */
851 if (count > 1 && i < count-1) {
852 memmove(&locks[i], &locks[i+1],
853 sizeof(*locks)*((count-1) - i));
862 call->new_data = talloc(call, TDB_DATA);
863 if (call->new_data == NULL) {
864 return CTDB_ERR_NOMEM;
867 brl_ctdb_notify_all(call, locks, count);
869 call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
870 if (call->new_data->dptr == NULL) {
871 return CTDB_ERR_NOMEM;
873 call->new_data->dsize = count*sizeof(struct lock_struct);
875 memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
878 call->status = NT_STATUS_V(status);
884 Test if we are allowed to perform IO on a region of an open file
886 static NTSTATUS brl_ctdb_close(struct brl_context *brl,
887 struct brl_handle *brlh)
889 struct ctdb_call call;
890 struct ctdb_close_req req;
893 call.call_id = FUNC_BRL_CLOSE;
894 call.key.dptr = brlh->key.data;
895 call.key.dsize = brlh->key.length;
896 call.call_data.dptr = (uint8_t *)&req;
897 call.call_data.dsize = sizeof(req);
901 req.server = brl->server;
902 req.ntvfs = brlh->ntvfs;
904 ret = ctdb_call(brl->ctdb_db, &call);
906 DEBUG(0,("ctdb_call failed - %s\n", __location__));
907 return NT_STATUS_INTERNAL_DB_CORRUPTION;
910 brl_ctdb_notify_send(brl, &call.reply_data);
912 return NT_STATUS(call.status);
916 static const struct brlock_ops brlock_tdb_ops = {
917 .brl_init = brl_ctdb_init,
918 .brl_create_handle = brl_ctdb_create_handle,
919 .brl_lock = brl_ctdb_lock,
920 .brl_unlock = brl_ctdb_unlock,
921 .brl_remove_pending = brl_ctdb_remove_pending,
922 .brl_locktest = brl_ctdb_locktest,
923 .brl_close = brl_ctdb_close
927 void brl_ctdb_init_ops(void)
929 struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(),
930 struct ctdb_context);
931 struct ctdb_db_context *ctdb_db;
933 brl_set_ops(&brlock_tdb_ops);
935 ctdb_db = ctdb_db_handle(ctdb, "brlock");
936 if (ctdb_db == NULL) {
937 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
941 ctdb_set_call(ctdb_db, brl_ctdb_lock_func, FUNC_BRL_LOCK);
942 ctdb_set_call(ctdb_db, brl_ctdb_unlock_func, FUNC_BRL_UNLOCK);
943 ctdb_set_call(ctdb_db, brl_ctdb_remove_pending_func, FUNC_BRL_REMOVE_PENDING);
944 ctdb_set_call(ctdb_db, brl_ctdb_locktest_func, FUNC_BRL_LOCKTEST);
945 ctdb_set_call(ctdb_db, brl_ctdb_close_func, FUNC_BRL_CLOSE);