c38d666c3c60f7e1cf039a506faf6ba0b724b38f
[samba.git] / source4 / cluster / ctdb / brlock_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - ctdb backend
5
6    Copyright (C) Andrew Tridgell 2006
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "messaging/messaging.h"
26 #include "lib/messaging/irpc.h"
27 #include "libcli/libcli.h"
28 #include "cluster/cluster.h"
29 #include "ntvfs/ntvfs.h"
30 #include "ntvfs/common/brlock.h"
31 #include "include/ctdb.h"
32
33 enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2, 
34                    FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
35                    FUNC_BRL_CLOSE=5};
36
37 /*
38   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
39   a file. For a local posix filesystem this will usually be a combination
40   of the device and inode numbers of the file, but it can be anything 
41   that uniquely idetifies a file for locking purposes, as long
42   as it is applied consistently.
43 */
44
45 /* this struct is typically attached to tcon */
46 struct brl_context {
47         struct ctdb_context *ctdb;
48         struct ctdb_db_context *ctdb_db;
49         struct server_id server;
50         struct messaging_context *messaging_ctx;
51 };
52
53 /*
54   the lock context contains the elements that define whether one
55   lock is the same as another lock
56 */
57 struct lock_context {
58         struct server_id server;
59         uint16_t smbpid;
60         struct brl_context *ctx;
61 };
62
63 /* The data in brlock records is an unsorted linear array of these
64    records.  It is unnecessary to store the count as tdb provides the
65    size of the record */
66 struct lock_struct {
67         struct lock_context context;
68         struct ntvfs_handle *ntvfs;
69         uint64_t start;
70         uint64_t size;
71         enum brl_type lock_type;
72         void *notify_ptr;
73 };
74
75 /* this struct is attached to on open file handle */
76 struct brl_handle {
77         DATA_BLOB key;
78         struct ntvfs_handle *ntvfs;
79         struct lock_struct last_lock;
80 };
81
82 #if 0
83 static void show_locks(const char *op, struct lock_struct *locks, int count)
84 {
85         int i;
86         DEBUG(0,("OP: %s\n", op));
87         if (locks == NULL) return;
88         for (i=0;i<count;i++) {
89                 DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
90                          i, (int)locks[i].start, (int)locks[i].size, 
91                          locks[i].context.server.node,
92                          locks[i].context.server.id,
93                          locks[i].context.smbpid,
94                          locks[i].context.ctx,
95                          locks[i].ntvfs));
96         }
97 }
98 #endif
99
100 /*
101   Open up the brlock.tdb database. Close it down using
102   talloc_free(). We need the messaging_ctx to allow for
103   pending lock notifications.
104 */
105 static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
106                                     struct messaging_context *messaging_ctx)
107 {
108         struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), 
109                                                     struct ctdb_context);
110         struct brl_context *brl;
111
112         brl = talloc(mem_ctx, struct brl_context);
113         if (brl == NULL) {
114                 return NULL;
115         }
116
117         brl->ctdb = ctdb;
118         brl->ctdb_db = ctdb_db_handle(ctdb, "brlock");
119         if (brl->ctdb_db == NULL) {
120                 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
121                 talloc_free(brl);
122                 return NULL;
123         }
124         brl->server = server;
125         brl->messaging_ctx = messaging_ctx;
126
127         return brl;
128 }
129
130 static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
131                                                     DATA_BLOB *file_key)
132 {
133         struct brl_handle *brlh;
134
135         brlh = talloc(mem_ctx, struct brl_handle);
136         if (brlh == NULL) {
137                 return NULL;
138         }
139
140         brlh->key = *file_key;
141         brlh->ntvfs = ntvfs;
142         ZERO_STRUCT(brlh->last_lock);
143
144         return brlh;
145 }
146
147 /*
148   see if two locking contexts are equal
149 */
150 static bool brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
151 {
152         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
153                 ctx1->smbpid == ctx2->smbpid &&
154                 ctx1->ctx == ctx2->ctx);
155 }
156
157 /*
158   see if lck1 and lck2 overlap
159 */
160 static bool brl_ctdb_overlap(struct lock_struct *lck1, 
161                         struct lock_struct *lck2)
162 {
163         /* this extra check is not redundent - it copes with locks
164            that go beyond the end of 64 bit file space */
165         if (lck1->size != 0 &&
166             lck1->start == lck2->start &&
167             lck1->size == lck2->size) {
168                 return true;
169         }
170             
171         if (lck1->start >= (lck2->start+lck2->size) ||
172             lck2->start >= (lck1->start+lck1->size)) {
173                 return false;
174         }
175         return true;
176
177
178 /*
179  See if lock2 can be added when lock1 is in place.
180 */
181 static bool brl_ctdb_conflict(struct lock_struct *lck1, 
182                          struct lock_struct *lck2)
183 {
184         /* pending locks don't conflict with anything */
185         if (lck1->lock_type >= PENDING_READ_LOCK ||
186             lck2->lock_type >= PENDING_READ_LOCK) {
187                 return false;
188         }
189
190         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
191                 return false;
192         }
193
194         if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
195             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
196                 return false;
197         }
198
199         return brl_ctdb_overlap(lck1, lck2);
200
201
202
203 /*
204  Check to see if this lock conflicts, but ignore our own locks on the
205  same fnum only.
206 */
207 static bool brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
208 {
209         /* pending locks don't conflict with anything */
210         if (lck1->lock_type >= PENDING_READ_LOCK ||
211             lck2->lock_type >= PENDING_READ_LOCK) {
212                 return false;
213         }
214
215         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
216                 return false;
217
218         /*
219          * note that incoming write calls conflict with existing READ
220          * locks even if the context is the same. JRA. See LOCKTEST7
221          * in smbtorture.
222          */
223         if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
224             lck1->ntvfs == lck2->ntvfs &&
225             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
226                 return false;
227         }
228
229         return brl_ctdb_overlap(lck1, lck2);
230
231
232
233 /*
234   amazingly enough, w2k3 "remembers" whether the last lock failure
235   is the same as this one and changes its error code. I wonder if any
236   app depends on this?
237 */
238 static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
239 {
240         /*
241          * this function is only called for non pending lock!
242          */
243
244         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
245         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
246                 return NT_STATUS_LOCK_NOT_GRANTED;
247         }
248
249         /* 
250          * if the notify_ptr is non NULL,
251          * it means that we're at the end of a pending lock
252          * and the real lock is requested after the timeout went by
253          * In this case we need to remember the last_lock and always
254          * give FILE_LOCK_CONFLICT
255          */
256         if (lock->notify_ptr) {
257                 brlh->last_lock = *lock;
258                 return NT_STATUS_FILE_LOCK_CONFLICT;
259         }
260
261         /* 
262          * amazing the little things you learn with a test
263          * suite. Locks beyond this offset (as a 64 bit
264          * number!) always generate the conflict error code,
265          * unless the top bit is set
266          */
267         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
268                 brlh->last_lock = *lock;
269                 return NT_STATUS_FILE_LOCK_CONFLICT;
270         }
271
272         /*
273          * if the current lock matches the last failed lock on the file handle
274          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
275          */
276         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
277             lock->context.ctx == brlh->last_lock.context.ctx &&
278             lock->ntvfs == brlh->last_lock.ntvfs &&
279             lock->start == brlh->last_lock.start) {
280                 return NT_STATUS_FILE_LOCK_CONFLICT;
281         }
282
283         brlh->last_lock = *lock;
284         return NT_STATUS_LOCK_NOT_GRANTED;
285 }
286
287 struct ctdb_lock_req {
288         uint16_t smbpid;
289         uint64_t start;
290         uint64_t size;
291         enum brl_type lock_type;
292         void *notify_ptr;
293         struct server_id server;
294         struct brl_context *brl;
295         struct ntvfs_handle *ntvfs;
296 };
297
298 /*
299   ctdb call handling brl_lock()
300 */
301 static int brl_ctdb_lock_func(struct ctdb_call_info *call)
302 {
303         struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
304         TDB_DATA dbuf;
305         int count=0, i;
306         struct lock_struct lock, *locks=NULL;
307         NTSTATUS status = NT_STATUS_OK;
308
309         /* if this is a pending lock, then with the chainlock held we
310            try to get the real lock. If we succeed then we don't need
311            to make it pending. This prevents a possible race condition
312            where the pending lock gets created after the lock that is
313            preventing the real lock gets removed */
314         if (req->lock_type >= PENDING_READ_LOCK) {
315                 enum brl_type lock_type = req->lock_type;
316                 req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
317                 if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
318                         return 0;
319                 }
320                 req->lock_type = lock_type;
321         }
322
323         dbuf = call->record_data;
324
325         ZERO_STRUCT(lock);
326         lock.context.smbpid = req->smbpid;
327         lock.context.server = req->server;
328         lock.context.ctx = req->brl;
329         lock.ntvfs = req->ntvfs;
330         lock.start = req->start;
331         lock.size = req->size;
332         lock.lock_type = req->lock_type;
333         lock.notify_ptr = req->notify_ptr;
334
335         if (dbuf.dptr) {
336                 /* there are existing locks - make sure they don't conflict */
337                 locks = (struct lock_struct *)dbuf.dptr;
338                 count = dbuf.dsize / sizeof(*locks);
339
340                 for (i=0; i<count; i++) {
341                         if (brl_ctdb_conflict(&locks[i], &lock)) {
342                                 status = NT_STATUS_LOCK_NOT_GRANTED;
343                                 goto reply;
344                         }
345                 }
346         }
347
348         call->new_data = talloc(call, TDB_DATA);
349         if (call->new_data == NULL) {
350                 return CTDB_ERR_NOMEM;
351         }
352
353         call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
354         if (call->new_data->dptr == NULL) {
355                 return CTDB_ERR_NOMEM;
356         }
357         memcpy(call->new_data->dptr, locks, dbuf.dsize);
358         memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
359         call->new_data->dsize = dbuf.dsize + sizeof(lock);
360
361         if (req->lock_type >= PENDING_READ_LOCK) {
362                 status = NT_STATUS_LOCK_NOT_GRANTED;
363         }
364
365 reply:
366         call->status = NT_STATUS_V(status);
367
368         return 0;
369 }
370
371
372 /*
373   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
374   which case a real lock is first tried, and if that fails then a
375   pending lock is created. When the pending lock is triggered (by
376   someone else closing an overlapping lock range) a messaging
377   notification is sent, identified by the notify_ptr
378 */
379 static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
380                               struct brl_handle *brlh,
381                               uint16_t smbpid,
382                               uint64_t start, uint64_t size, 
383                               enum brl_type lock_type,
384                               void *notify_ptr)
385 {
386         struct ctdb_lock_req req;
387         struct ctdb_call call;
388         int ret;
389         NTSTATUS status;
390
391         call.call_id = FUNC_BRL_LOCK;
392         call.key.dptr = brlh->key.data;
393         call.key.dsize = brlh->key.length;
394         call.call_data.dptr = (uint8_t *)&req;
395         call.call_data.dsize = sizeof(req);
396         call.flags = 0;
397         call.status = 0;
398
399         ZERO_STRUCT(req);
400         req.smbpid = smbpid;
401         req.start  = start;
402         req.size   = size;
403         req.lock_type = lock_type;
404         req.notify_ptr = notify_ptr;
405         req.server = brl->server;
406         req.brl = brl;
407         req.ntvfs = brlh->ntvfs;
408
409         ret = ctdb_call(brl->ctdb_db, &call);
410         if (ret == -1) {
411                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
412         }
413
414         status = NT_STATUS(call.status);
415
416         if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
417                 struct lock_struct lock;
418                 lock.context.smbpid = smbpid;
419                 lock.context.server = brl->server;
420                 lock.context.ctx = brl;
421                 lock.ntvfs = brlh->ntvfs;
422                 lock.start = start;
423                 lock.size = size;
424                 lock.lock_type = lock_type;
425                 lock.notify_ptr = notify_ptr;
426                 status = brl_ctdb_lock_failed(brlh, &lock);
427         }
428
429         return status;
430 }
431
432 /*
433   we are removing a lock that might be holding up a pending lock. Scan
434   for pending locks that cover this range and if we find any then
435   notify the server that it should retry the lock. In this backend, we
436   notify by sending the list of locks that need to be notified on back
437   in the reply_data of the ctdb call. The caller then does the
438   messaging for us. 
439 */
440 static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
441                                   struct lock_struct *locks, int count, 
442                                    struct lock_struct *removed_lock)
443 {
444         int i, last_notice;
445
446         /* the last_notice logic is to prevent stampeding on a lock
447            range. It prevents us sending hundreds of notifies on the
448            same range of bytes. It doesn't prevent all possible
449            stampedes, but it does prevent the most common problem */
450         last_notice = -1;
451
452         for (i=0;i<count;i++) {
453                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
454                     brl_ctdb_overlap(&locks[i], removed_lock)) {
455                         struct lock_struct *nlocks;
456                         int ncount;
457
458                         if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
459                                 continue;
460                         }
461                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
462                                 last_notice = i;
463                         }
464                         if (call->reply_data == NULL) {
465                                 call->reply_data = talloc_zero(call, TDB_DATA);
466                                 if (call->reply_data == NULL) {
467                                         return CTDB_ERR_NOMEM;
468                                 }
469                         }
470                         /* add to the list of pending locks to notify caller of */
471                         ncount = call->reply_data->dsize / sizeof(struct lock_struct);
472                         nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr, 
473                                                 struct lock_struct, ncount + 1);
474                         if (nlocks == NULL) {
475                                 return CTDB_ERR_NOMEM;
476                         }
477                         call->reply_data->dptr = (uint8_t *)nlocks;
478                         nlocks[ncount] = locks[i];
479                         call->reply_data->dsize += sizeof(struct lock_struct);
480                 }
481         }
482
483         return 0;
484 }
485
486 /*
487   send notifications for all pending locks - the file is being closed by this
488   user
489 */
490 static int brl_ctdb_notify_all(struct ctdb_call_info *call,
491                                 struct lock_struct *locks, int count)
492 {
493         int i;
494         for (i=0;i<count;i++) {
495                 if (locks->lock_type >= PENDING_READ_LOCK) {
496                         int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
497                         if (ret != 0) return ret;
498                 }
499         }
500         return 0;
501 }
502
503 /*
504   send off any messages needed to notify of pending locks that should now retry
505 */
506 static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
507 {
508         struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
509         int i, count = reply_data->dsize / sizeof(struct lock_struct);
510         for (i=0;i<count;i++) {
511                 messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
512                                    MSG_BRL_RETRY, locks[i].notify_ptr);
513         }
514 }
515
516
517 struct ctdb_unlock_req {
518         uint16_t smbpid;
519         uint64_t start;
520         uint64_t size;
521         struct server_id server;
522         struct brl_context *brl;
523         struct ntvfs_handle *ntvfs;
524 };
525
526 /*
527  Unlock a range of bytes.
528 */
529 static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
530 {
531         struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
532         TDB_DATA dbuf;
533         int count, i;
534         struct lock_struct *locks, *lock;
535         struct lock_context context;
536         NTSTATUS status = NT_STATUS_OK;
537
538         dbuf = call->record_data;
539
540         context.smbpid = req->smbpid;
541         context.server = req->server;
542         context.ctx = req->brl;
543
544         /* there are existing locks - find a match */
545         locks = (struct lock_struct *)dbuf.dptr;
546         count = dbuf.dsize / sizeof(*locks);
547
548         for (i=0; i<count; i++) {
549                 lock = &locks[i];
550                 if (brl_ctdb_same_context(&lock->context, &context) &&
551                     lock->ntvfs == req->ntvfs &&
552                     lock->start == req->start &&
553                     lock->size == req->size &&
554                     lock->lock_type == WRITE_LOCK) {
555                         break;
556                 }
557         }
558         if (i < count) goto found;
559
560         for (i=0; i<count; i++) {
561                 lock = &locks[i];
562                 if (brl_ctdb_same_context(&lock->context, &context) &&
563                     lock->ntvfs == req->ntvfs &&
564                     lock->start == req->start &&
565                     lock->size == req->size &&
566                     lock->lock_type < PENDING_READ_LOCK) {
567                         break;
568                 }
569         }
570
571 found:
572         if (i < count) {
573                 struct lock_struct removed_lock = *lock;
574
575                 call->new_data = talloc(call, TDB_DATA);
576                 if (call->new_data == NULL) {
577                         return CTDB_ERR_NOMEM;
578                 }
579                 
580                 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
581                 if (call->new_data->dptr == NULL) {
582                         return CTDB_ERR_NOMEM;
583                 }
584                 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
585                 
586                 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
587                 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
588                        (count-(i+1))*sizeof(*lock));
589                 
590                 if (count > 1) {
591                         int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
592                         if (ret != 0) return ret;
593                 }
594         }
595
596         if (i == count) {
597                 /* we didn't find it */
598                 status = NT_STATUS_RANGE_NOT_LOCKED;
599         }
600
601         call->status = NT_STATUS_V(status);
602
603         return 0;
604 }
605
606
607 /*
608  Unlock a range of bytes.
609 */
610 static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
611                                 struct brl_handle *brlh, 
612                                 uint16_t smbpid,
613                                 uint64_t start, uint64_t size)
614 {
615         struct ctdb_call call;
616         struct ctdb_unlock_req req;
617         int ret;
618
619         call.call_id = FUNC_BRL_UNLOCK;
620         call.key.dptr = brlh->key.data;
621         call.key.dsize = brlh->key.length;
622         call.call_data.dptr = (uint8_t *)&req;
623         call.call_data.dsize = sizeof(req);
624
625         ZERO_STRUCT(req);
626         req.smbpid = smbpid;
627         req.start  = start;
628         req.size   = size;
629         req.server = brl->server;
630         req.brl = brl;
631         req.ntvfs = brlh->ntvfs;
632                 
633         ret = ctdb_call(brl->ctdb_db, &call);
634         if (ret == -1) {
635                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
636                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
637         }
638
639         brl_ctdb_notify_send(brl, &call.reply_data);
640
641         return NT_STATUS(call.status);
642 }
643
644
645 struct ctdb_remove_pending_req {
646         struct server_id server;
647         void *notify_ptr;
648 };
649
650 /*
651   remove a pending lock. This is called when the caller has either
652   given up trying to establish a lock or when they have succeeded in
653   getting it. In either case they no longer need to be notified.
654 */
655 static int brl_ctdb_remove_pending_func(struct ctdb_call_info *call)
656 {
657         struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
658         TDB_DATA dbuf;
659         int count, i;
660         struct lock_struct *locks;
661         NTSTATUS status = NT_STATUS_OK;
662
663         dbuf = call->record_data;
664
665         /* there are existing locks - find a match */
666         locks = (struct lock_struct *)dbuf.dptr;
667         count = dbuf.dsize / sizeof(*locks);
668
669         for (i=0; i<count; i++) {
670                 struct lock_struct *lock = &locks[i];
671                 
672                 if (lock->lock_type >= PENDING_READ_LOCK &&
673                     lock->notify_ptr == req->notify_ptr &&
674                     cluster_id_equal(&lock->context.server, &req->server)) {
675                         call->new_data = talloc(call, TDB_DATA);
676                         if (call->new_data == NULL) {
677                                 return CTDB_ERR_NOMEM;
678                         }
679
680                         call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
681                         if (call->new_data->dptr == NULL) {
682                                 return CTDB_ERR_NOMEM;
683                         }
684                         call->new_data->dsize = dbuf.dsize - sizeof(*lock);
685
686                         memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
687                         memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
688                                (count-(i+1))*sizeof(*lock));
689                         break;
690                 }
691         }
692         
693         if (i == count) {
694                 /* we didn't find it */
695                 status = NT_STATUS_RANGE_NOT_LOCKED;
696         }
697
698         call->status = NT_STATUS_V(status);
699
700         return 0;
701 }
702
703 static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
704                                         struct brl_handle *brlh, 
705                                         void *notify_ptr)
706 {
707         struct ctdb_call call;
708         struct ctdb_remove_pending_req req;
709         int ret;
710
711         call.call_id = FUNC_BRL_REMOVE_PENDING;
712         call.key.dptr = brlh->key.data;
713         call.key.dsize = brlh->key.length;
714         call.call_data.dptr = (uint8_t *)&req;
715         call.call_data.dsize = sizeof(req);
716
717         ZERO_STRUCT(req);
718         req.notify_ptr = notify_ptr;
719         req.server = brl->server;
720                 
721         ret = ctdb_call(brl->ctdb_db, &call);
722         if (ret == -1) {
723                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
724                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
725         }
726
727         return NT_STATUS(call.status);
728 }
729
730
731 struct ctdb_locktest_req {
732         uint16_t smbpid;
733         uint64_t start;
734         uint64_t size;
735         enum brl_type lock_type;
736         struct brl_context *brl;
737         struct server_id server;
738         struct ntvfs_handle *ntvfs;
739 };
740
741 /*
742   remove a pending lock. This is called when the caller has either
743   given up trying to establish a lock or when they have succeeded in
744   getting it. In either case they no longer need to be notified.
745 */
746 static int brl_ctdb_locktest_func(struct ctdb_call_info *call)
747 {
748         struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
749         TDB_DATA dbuf;
750         int count, i;
751         struct lock_struct *locks, lock;
752         NTSTATUS status = NT_STATUS_OK;
753
754         lock.context.smbpid = req->smbpid;
755         lock.context.server = req->server;
756         lock.context.ctx = req->brl;
757         lock.ntvfs = req->ntvfs;
758         lock.start = req->start;
759         lock.size = req->size;
760         lock.lock_type = req->lock_type;
761
762         dbuf = call->record_data;
763
764         /* there are existing locks - find a match */
765         locks = (struct lock_struct *)dbuf.dptr;
766         count = dbuf.dsize / sizeof(*locks);
767
768         for (i=0; i<count; i++) {
769                 if (brl_ctdb_conflict_other(&locks[i], &lock)) {
770                         status = NT_STATUS_FILE_LOCK_CONFLICT;
771                         break;
772                 }
773         }
774         
775         call->status = NT_STATUS_V(status);
776
777         return 0;
778 }
779
780 /*
781   Test if we are allowed to perform IO on a region of an open file
782 */
783 static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
784                                   struct brl_handle *brlh,
785                                   uint16_t smbpid, 
786                                   uint64_t start, uint64_t size, 
787                                   enum brl_type lock_type)
788 {
789         struct ctdb_call call;
790         struct ctdb_locktest_req req;
791         int ret;
792
793         call.call_id = FUNC_BRL_LOCKTEST;
794         call.key.dptr = brlh->key.data;
795         call.key.dsize = brlh->key.length;
796         call.call_data.dptr = (uint8_t *)&req;
797         call.call_data.dsize = sizeof(req);
798
799         ZERO_STRUCT(req);
800         req.smbpid = smbpid;
801         req.start  = start;
802         req.size   = size;
803         req.lock_type = lock_type;
804         req.server = brl->server;
805         req.brl = brl;
806         req.ntvfs = brlh->ntvfs;
807
808         ret = ctdb_call(brl->ctdb_db, &call);
809         if (ret == -1) {
810                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
811                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
812         }
813
814         return NT_STATUS(call.status);
815 }
816
817
818 struct ctdb_close_req {
819         struct brl_context *brl;
820         struct server_id server;
821         struct ntvfs_handle *ntvfs;
822 };
823
824 /*
825   remove a pending lock. This is called when the caller has either
826   given up trying to establish a lock or when they have succeeded in
827   getting it. In either case they no longer need to be notified.
828 */
829 static int brl_ctdb_close_func(struct ctdb_call_info *call)
830 {
831         struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
832         TDB_DATA dbuf;
833         int count, dcount=0, i;
834         struct lock_struct *locks;
835         NTSTATUS status = NT_STATUS_OK;
836
837         dbuf = call->record_data;
838
839         /* there are existing locks - find a match */
840         locks = (struct lock_struct *)dbuf.dptr;
841         count = dbuf.dsize / sizeof(*locks);
842
843         for (i=0; i<count; i++) {
844                 struct lock_struct *lock = &locks[i];
845
846                 if (lock->context.ctx == req->brl &&
847                     cluster_id_equal(&lock->context.server, &req->server) &&
848                     lock->ntvfs == req->ntvfs) {
849                         /* found it - delete it */
850                         if (count > 1 && i < count-1) {
851                                 memmove(&locks[i], &locks[i+1], 
852                                         sizeof(*locks)*((count-1) - i));
853                         }
854                         count--;
855                         i--;
856                         dcount++;
857                 }
858         }
859
860         if (dcount > 0) {
861                 call->new_data = talloc(call, TDB_DATA);
862                 if (call->new_data == NULL) {
863                         return CTDB_ERR_NOMEM;
864                 }
865
866                 brl_ctdb_notify_all(call, locks, count);
867                 
868                 call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
869                 if (call->new_data->dptr == NULL) {
870                         return CTDB_ERR_NOMEM;
871                 }
872                 call->new_data->dsize = count*sizeof(struct lock_struct);
873
874                 memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
875         }
876
877         call->status = NT_STATUS_V(status);
878
879         return 0;
880 }
881
882 /*
883   Test if we are allowed to perform IO on a region of an open file
884 */
885 static NTSTATUS brl_ctdb_close(struct brl_context *brl,
886                                struct brl_handle *brlh)
887 {
888         struct ctdb_call call;
889         struct ctdb_close_req req;
890         int ret;
891
892         call.call_id = FUNC_BRL_CLOSE;
893         call.key.dptr = brlh->key.data;
894         call.key.dsize = brlh->key.length;
895         call.call_data.dptr = (uint8_t *)&req;
896         call.call_data.dsize = sizeof(req);
897
898         ZERO_STRUCT(req);
899         req.brl = brl;
900         req.server = brl->server;
901         req.ntvfs = brlh->ntvfs;
902
903         ret = ctdb_call(brl->ctdb_db, &call);
904         if (ret == -1) {
905                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
906                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
907         }
908
909         brl_ctdb_notify_send(brl, &call.reply_data);
910
911         return NT_STATUS(call.status);
912 }
913
914
915 static const struct brlock_ops brlock_tdb_ops = {
916         .brl_init           = brl_ctdb_init,
917         .brl_create_handle  = brl_ctdb_create_handle,
918         .brl_lock           = brl_ctdb_lock,
919         .brl_unlock         = brl_ctdb_unlock,
920         .brl_remove_pending = brl_ctdb_remove_pending,
921         .brl_locktest       = brl_ctdb_locktest,
922         .brl_close          = brl_ctdb_close
923 };
924
925
926 void brl_ctdb_init_ops(void)
927 {
928         struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), 
929                                                     struct ctdb_context);
930         struct ctdb_db_context *ctdb_db;
931
932         brl_set_ops(&brlock_tdb_ops);
933
934         ctdb_db = ctdb_db_handle(ctdb, "brlock");
935         if (ctdb_db == NULL) {
936                 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
937                 return;
938         }
939
940         ctdb_set_call(ctdb_db, brl_ctdb_lock_func,  FUNC_BRL_LOCK);
941         ctdb_set_call(ctdb_db, brl_ctdb_unlock_func,  FUNC_BRL_UNLOCK);
942         ctdb_set_call(ctdb_db, brl_ctdb_remove_pending_func,  FUNC_BRL_REMOVE_PENDING);
943         ctdb_set_call(ctdb_db, brl_ctdb_locktest_func,  FUNC_BRL_LOCKTEST);
944         ctdb_set_call(ctdb_db, brl_ctdb_close_func,  FUNC_BRL_CLOSE);
945 }