r23792: convert Samba4 to GPLv3
[bbaumbach/samba-autobuild/.git] / source4 / cluster / ctdb / brlock_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - ctdb backend
5
6    Copyright (C) Andrew Tridgell 2006
7    
8    This program is free software; you can redistribute it and/or modify
9    it under the terms of the GNU General Public License as published by
10    the Free Software Foundation; either version 3 of the License, or
11    (at your option) any later version.
12    
13    This program is distributed in the hope that it will be useful,
14    but WITHOUT ANY WARRANTY; without even the implied warranty of
15    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16    GNU General Public License for more details.
17    
18    You should have received a copy of the GNU General Public License
19    along with this program.  If not, see <http://www.gnu.org/licenses/>.
20 */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "messaging/messaging.h"
26 #include "db_wrap.h"
27 #include "lib/messaging/irpc.h"
28 #include "libcli/libcli.h"
29 #include "cluster/cluster.h"
30 #include "ntvfs/ntvfs.h"
31 #include "ntvfs/common/brlock.h"
32 #include "include/ctdb.h"
33
34 enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2, 
35                    FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
36                    FUNC_BRL_CLOSE=5};
37
38 /*
39   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
40   a file. For a local posix filesystem this will usually be a combination
41   of the device and inode numbers of the file, but it can be anything 
42   that uniquely idetifies a file for locking purposes, as long
43   as it is applied consistently.
44 */
45
46 /* this struct is typically attached to tcon */
47 struct brl_context {
48         struct ctdb_context *ctdb;
49         struct ctdb_db_context *ctdb_db;
50         struct server_id server;
51         struct messaging_context *messaging_ctx;
52 };
53
54 /*
55   the lock context contains the elements that define whether one
56   lock is the same as another lock
57 */
58 struct lock_context {
59         struct server_id server;
60         uint16_t smbpid;
61         struct brl_context *ctx;
62 };
63
64 /* The data in brlock records is an unsorted linear array of these
65    records.  It is unnecessary to store the count as tdb provides the
66    size of the record */
67 struct lock_struct {
68         struct lock_context context;
69         struct ntvfs_handle *ntvfs;
70         uint64_t start;
71         uint64_t size;
72         enum brl_type lock_type;
73         void *notify_ptr;
74 };
75
76 /* this struct is attached to on open file handle */
77 struct brl_handle {
78         DATA_BLOB key;
79         struct ntvfs_handle *ntvfs;
80         struct lock_struct last_lock;
81 };
82
83 #if 0
84 static void show_locks(const char *op, struct lock_struct *locks, int count)
85 {
86         int i;
87         DEBUG(0,("OP: %s\n", op));
88         if (locks == NULL) return;
89         for (i=0;i<count;i++) {
90                 DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
91                          i, (int)locks[i].start, (int)locks[i].size, 
92                          locks[i].context.server.node,
93                          locks[i].context.server.id,
94                          locks[i].context.smbpid,
95                          locks[i].context.ctx,
96                          locks[i].ntvfs));
97         }
98 }
99 #endif
100
101 /*
102   Open up the brlock.tdb database. Close it down using
103   talloc_free(). We need the messaging_ctx to allow for
104   pending lock notifications.
105 */
106 static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
107                                     struct messaging_context *messaging_ctx)
108 {
109         struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), 
110                                                     struct ctdb_context);
111         struct brl_context *brl;
112
113         brl = talloc(mem_ctx, struct brl_context);
114         if (brl == NULL) {
115                 return NULL;
116         }
117
118         brl->ctdb = ctdb;
119         brl->ctdb_db = ctdb_db_handle(ctdb, "brlock");
120         if (brl->ctdb_db == NULL) {
121                 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
122                 talloc_free(brl);
123                 return NULL;
124         }
125         brl->server = server;
126         brl->messaging_ctx = messaging_ctx;
127
128         return brl;
129 }
130
131 static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
132                                                     DATA_BLOB *file_key)
133 {
134         struct brl_handle *brlh;
135
136         brlh = talloc(mem_ctx, struct brl_handle);
137         if (brlh == NULL) {
138                 return NULL;
139         }
140
141         brlh->key = *file_key;
142         brlh->ntvfs = ntvfs;
143         ZERO_STRUCT(brlh->last_lock);
144
145         return brlh;
146 }
147
148 /*
149   see if two locking contexts are equal
150 */
151 static BOOL brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
152 {
153         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
154                 ctx1->smbpid == ctx2->smbpid &&
155                 ctx1->ctx == ctx2->ctx);
156 }
157
158 /*
159   see if lck1 and lck2 overlap
160 */
161 static BOOL brl_ctdb_overlap(struct lock_struct *lck1, 
162                         struct lock_struct *lck2)
163 {
164         /* this extra check is not redundent - it copes with locks
165            that go beyond the end of 64 bit file space */
166         if (lck1->size != 0 &&
167             lck1->start == lck2->start &&
168             lck1->size == lck2->size) {
169                 return True;
170         }
171             
172         if (lck1->start >= (lck2->start+lck2->size) ||
173             lck2->start >= (lck1->start+lck1->size)) {
174                 return False;
175         }
176         return True;
177
178
179 /*
180  See if lock2 can be added when lock1 is in place.
181 */
182 static BOOL brl_ctdb_conflict(struct lock_struct *lck1, 
183                          struct lock_struct *lck2)
184 {
185         /* pending locks don't conflict with anything */
186         if (lck1->lock_type >= PENDING_READ_LOCK ||
187             lck2->lock_type >= PENDING_READ_LOCK) {
188                 return False;
189         }
190
191         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
192                 return False;
193         }
194
195         if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
196             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
197                 return False;
198         }
199
200         return brl_ctdb_overlap(lck1, lck2);
201
202
203
204 /*
205  Check to see if this lock conflicts, but ignore our own locks on the
206  same fnum only.
207 */
208 static BOOL brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
209 {
210         /* pending locks don't conflict with anything */
211         if (lck1->lock_type >= PENDING_READ_LOCK ||
212             lck2->lock_type >= PENDING_READ_LOCK) {
213                 return False;
214         }
215
216         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
217                 return False;
218
219         /*
220          * note that incoming write calls conflict with existing READ
221          * locks even if the context is the same. JRA. See LOCKTEST7
222          * in smbtorture.
223          */
224         if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
225             lck1->ntvfs == lck2->ntvfs &&
226             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
227                 return False;
228         }
229
230         return brl_ctdb_overlap(lck1, lck2);
231
232
233
234 /*
235   amazingly enough, w2k3 "remembers" whether the last lock failure
236   is the same as this one and changes its error code. I wonder if any
237   app depends on this?
238 */
239 static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
240 {
241         /*
242          * this function is only called for non pending lock!
243          */
244
245         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
246         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
247                 return NT_STATUS_LOCK_NOT_GRANTED;
248         }
249
250         /* 
251          * if the notify_ptr is non NULL,
252          * it means that we're at the end of a pending lock
253          * and the real lock is requested after the timeout went by
254          * In this case we need to remember the last_lock and always
255          * give FILE_LOCK_CONFLICT
256          */
257         if (lock->notify_ptr) {
258                 brlh->last_lock = *lock;
259                 return NT_STATUS_FILE_LOCK_CONFLICT;
260         }
261
262         /* 
263          * amazing the little things you learn with a test
264          * suite. Locks beyond this offset (as a 64 bit
265          * number!) always generate the conflict error code,
266          * unless the top bit is set
267          */
268         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
269                 brlh->last_lock = *lock;
270                 return NT_STATUS_FILE_LOCK_CONFLICT;
271         }
272
273         /*
274          * if the current lock matches the last failed lock on the file handle
275          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
276          */
277         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
278             lock->context.ctx == brlh->last_lock.context.ctx &&
279             lock->ntvfs == brlh->last_lock.ntvfs &&
280             lock->start == brlh->last_lock.start) {
281                 return NT_STATUS_FILE_LOCK_CONFLICT;
282         }
283
284         brlh->last_lock = *lock;
285         return NT_STATUS_LOCK_NOT_GRANTED;
286 }
287
288 struct ctdb_lock_req {
289         uint16_t smbpid;
290         uint64_t start;
291         uint64_t size;
292         enum brl_type lock_type;
293         void *notify_ptr;
294         struct server_id server;
295         struct brl_context *brl;
296         struct ntvfs_handle *ntvfs;
297 };
298
299 /*
300   ctdb call handling brl_lock()
301 */
302 static int brl_ctdb_lock_func(struct ctdb_call_info *call)
303 {
304         struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
305         TDB_DATA dbuf;
306         int count=0, i;
307         struct lock_struct lock, *locks=NULL;
308         NTSTATUS status = NT_STATUS_OK;
309
310         /* if this is a pending lock, then with the chainlock held we
311            try to get the real lock. If we succeed then we don't need
312            to make it pending. This prevents a possible race condition
313            where the pending lock gets created after the lock that is
314            preventing the real lock gets removed */
315         if (req->lock_type >= PENDING_READ_LOCK) {
316                 enum brl_type lock_type = req->lock_type;
317                 req->lock_type = (req->lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
318                 if (brl_ctdb_lock_func(call) == 0 && call->status == NT_STATUS_V(NT_STATUS_OK)) {
319                         return 0;
320                 }
321                 req->lock_type = lock_type;
322         }
323
324         dbuf = call->record_data;
325
326         ZERO_STRUCT(lock);
327         lock.context.smbpid = req->smbpid;
328         lock.context.server = req->server;
329         lock.context.ctx = req->brl;
330         lock.ntvfs = req->ntvfs;
331         lock.start = req->start;
332         lock.size = req->size;
333         lock.lock_type = req->lock_type;
334         lock.notify_ptr = req->notify_ptr;
335
336         if (dbuf.dptr) {
337                 /* there are existing locks - make sure they don't conflict */
338                 locks = (struct lock_struct *)dbuf.dptr;
339                 count = dbuf.dsize / sizeof(*locks);
340
341                 for (i=0; i<count; i++) {
342                         if (brl_ctdb_conflict(&locks[i], &lock)) {
343                                 status = NT_STATUS_LOCK_NOT_GRANTED;
344                                 goto reply;
345                         }
346                 }
347         }
348
349         call->new_data = talloc(call, TDB_DATA);
350         if (call->new_data == NULL) {
351                 return CTDB_ERR_NOMEM;
352         }
353
354         call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
355         if (call->new_data->dptr == NULL) {
356                 return CTDB_ERR_NOMEM;
357         }
358         memcpy(call->new_data->dptr, locks, dbuf.dsize);
359         memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
360         call->new_data->dsize = dbuf.dsize + sizeof(lock);
361
362         if (req->lock_type >= PENDING_READ_LOCK) {
363                 status = NT_STATUS_LOCK_NOT_GRANTED;
364         }
365
366 reply:
367         call->status = NT_STATUS_V(status);
368
369         return 0;
370 }
371
372
373 /*
374   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
375   which case a real lock is first tried, and if that fails then a
376   pending lock is created. When the pending lock is triggered (by
377   someone else closing an overlapping lock range) a messaging
378   notification is sent, identified by the notify_ptr
379 */
380 static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
381                               struct brl_handle *brlh,
382                               uint16_t smbpid,
383                               uint64_t start, uint64_t size, 
384                               enum brl_type lock_type,
385                               void *notify_ptr)
386 {
387         struct ctdb_lock_req req;
388         struct ctdb_call call;
389         int ret;
390         NTSTATUS status;
391
392         call.call_id = FUNC_BRL_LOCK;
393         call.key.dptr = brlh->key.data;
394         call.key.dsize = brlh->key.length;
395         call.call_data.dptr = (uint8_t *)&req;
396         call.call_data.dsize = sizeof(req);
397         call.flags = 0;
398         call.status = 0;
399
400         ZERO_STRUCT(req);
401         req.smbpid = smbpid;
402         req.start  = start;
403         req.size   = size;
404         req.lock_type = lock_type;
405         req.notify_ptr = notify_ptr;
406         req.server = brl->server;
407         req.brl = brl;
408         req.ntvfs = brlh->ntvfs;
409
410         ret = ctdb_call(brl->ctdb_db, &call);
411         if (ret == -1) {
412                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
413         }
414
415         status = NT_STATUS(call.status);
416
417         if (NT_STATUS_EQUAL(status, NT_STATUS_LOCK_NOT_GRANTED)) {
418                 struct lock_struct lock;
419                 lock.context.smbpid = smbpid;
420                 lock.context.server = brl->server;
421                 lock.context.ctx = brl;
422                 lock.ntvfs = brlh->ntvfs;
423                 lock.start = start;
424                 lock.size = size;
425                 lock.lock_type = lock_type;
426                 lock.notify_ptr = notify_ptr;
427                 status = brl_ctdb_lock_failed(brlh, &lock);
428         }
429
430         return status;
431 }
432
433 /*
434   we are removing a lock that might be holding up a pending lock. Scan
435   for pending locks that cover this range and if we find any then
436   notify the server that it should retry the lock. In this backend, we
437   notify by sending the list of locks that need to be notified on back
438   in the reply_data of the ctdb call. The caller then does the
439   messaging for us. 
440 */
441 static int brl_ctdb_notify_unlock(struct ctdb_call_info *call,
442                                   struct lock_struct *locks, int count, 
443                                    struct lock_struct *removed_lock)
444 {
445         int i, last_notice;
446
447         /* the last_notice logic is to prevent stampeding on a lock
448            range. It prevents us sending hundreds of notifies on the
449            same range of bytes. It doesn't prevent all possible
450            stampedes, but it does prevent the most common problem */
451         last_notice = -1;
452
453         for (i=0;i<count;i++) {
454                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
455                     brl_ctdb_overlap(&locks[i], removed_lock)) {
456                         struct lock_struct *nlocks;
457                         int ncount;
458
459                         if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
460                                 continue;
461                         }
462                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
463                                 last_notice = i;
464                         }
465                         if (call->reply_data == NULL) {
466                                 call->reply_data = talloc_zero(call, TDB_DATA);
467                                 if (call->reply_data == NULL) {
468                                         return CTDB_ERR_NOMEM;
469                                 }
470                         }
471                         /* add to the list of pending locks to notify caller of */
472                         ncount = call->reply_data->dsize / sizeof(struct lock_struct);
473                         nlocks = talloc_realloc(call->reply_data, call->reply_data->dptr, 
474                                                 struct lock_struct, ncount + 1);
475                         if (nlocks == NULL) {
476                                 return CTDB_ERR_NOMEM;
477                         }
478                         call->reply_data->dptr = (uint8_t *)nlocks;
479                         nlocks[ncount] = locks[i];
480                         call->reply_data->dsize += sizeof(struct lock_struct);
481                 }
482         }
483
484         return 0;
485 }
486
487 /*
488   send notifications for all pending locks - the file is being closed by this
489   user
490 */
491 static int brl_ctdb_notify_all(struct ctdb_call_info *call,
492                                 struct lock_struct *locks, int count)
493 {
494         int i;
495         for (i=0;i<count;i++) {
496                 if (locks->lock_type >= PENDING_READ_LOCK) {
497                         int ret = brl_ctdb_notify_unlock(call, locks, count, &locks[i]);
498                         if (ret != 0) return ret;
499                 }
500         }
501         return 0;
502 }
503
504 /*
505   send off any messages needed to notify of pending locks that should now retry
506 */
507 static void brl_ctdb_notify_send(struct brl_context *brl, TDB_DATA *reply_data)
508 {
509         struct lock_struct *locks = (struct lock_struct *)reply_data->dptr;
510         int i, count = reply_data->dsize / sizeof(struct lock_struct);
511         for (i=0;i<count;i++) {
512                 messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
513                                    MSG_BRL_RETRY, locks[i].notify_ptr);
514         }
515 }
516
517
518 struct ctdb_unlock_req {
519         uint16_t smbpid;
520         uint64_t start;
521         uint64_t size;
522         struct server_id server;
523         struct brl_context *brl;
524         struct ntvfs_handle *ntvfs;
525 };
526
527 /*
528  Unlock a range of bytes.
529 */
530 static int brl_ctdb_unlock_func(struct ctdb_call_info *call)
531 {
532         struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
533         TDB_DATA dbuf;
534         int count, i;
535         struct lock_struct *locks, *lock;
536         struct lock_context context;
537         NTSTATUS status = NT_STATUS_OK;
538
539         dbuf = call->record_data;
540
541         context.smbpid = req->smbpid;
542         context.server = req->server;
543         context.ctx = req->brl;
544
545         /* there are existing locks - find a match */
546         locks = (struct lock_struct *)dbuf.dptr;
547         count = dbuf.dsize / sizeof(*locks);
548
549         for (i=0; i<count; i++) {
550                 lock = &locks[i];
551                 if (brl_ctdb_same_context(&lock->context, &context) &&
552                     lock->ntvfs == req->ntvfs &&
553                     lock->start == req->start &&
554                     lock->size == req->size &&
555                     lock->lock_type == WRITE_LOCK) {
556                         break;
557                 }
558         }
559         if (i < count) goto found;
560
561         for (i=0; i<count; i++) {
562                 lock = &locks[i];
563                 if (brl_ctdb_same_context(&lock->context, &context) &&
564                     lock->ntvfs == req->ntvfs &&
565                     lock->start == req->start &&
566                     lock->size == req->size &&
567                     lock->lock_type < PENDING_READ_LOCK) {
568                         break;
569                 }
570         }
571
572 found:
573         if (i < count) {
574                 struct lock_struct removed_lock = *lock;
575
576                 call->new_data = talloc(call, TDB_DATA);
577                 if (call->new_data == NULL) {
578                         return CTDB_ERR_NOMEM;
579                 }
580                 
581                 call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
582                 if (call->new_data->dptr == NULL) {
583                         return CTDB_ERR_NOMEM;
584                 }
585                 call->new_data->dsize = dbuf.dsize - sizeof(*lock);
586                 
587                 memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
588                 memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
589                        (count-(i+1))*sizeof(*lock));
590                 
591                 if (count > 1) {
592                         int ret = brl_ctdb_notify_unlock(call, locks, count, &removed_lock);
593                         if (ret != 0) return ret;
594                 }
595         }
596
597         if (i == count) {
598                 /* we didn't find it */
599                 status = NT_STATUS_RANGE_NOT_LOCKED;
600         }
601
602         call->status = NT_STATUS_V(status);
603
604         return 0;
605 }
606
607
608 /*
609  Unlock a range of bytes.
610 */
611 static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
612                                 struct brl_handle *brlh, 
613                                 uint16_t smbpid,
614                                 uint64_t start, uint64_t size)
615 {
616         struct ctdb_call call;
617         struct ctdb_unlock_req req;
618         int ret;
619
620         call.call_id = FUNC_BRL_UNLOCK;
621         call.key.dptr = brlh->key.data;
622         call.key.dsize = brlh->key.length;
623         call.call_data.dptr = (uint8_t *)&req;
624         call.call_data.dsize = sizeof(req);
625
626         ZERO_STRUCT(req);
627         req.smbpid = smbpid;
628         req.start  = start;
629         req.size   = size;
630         req.server = brl->server;
631         req.brl = brl;
632         req.ntvfs = brlh->ntvfs;
633                 
634         ret = ctdb_call(brl->ctdb_db, &call);
635         if (ret == -1) {
636                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
637                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
638         }
639
640         brl_ctdb_notify_send(brl, &call.reply_data);
641
642         return NT_STATUS(call.status);
643 }
644
645
646 struct ctdb_remove_pending_req {
647         struct server_id server;
648         void *notify_ptr;
649 };
650
651 /*
652   remove a pending lock. This is called when the caller has either
653   given up trying to establish a lock or when they have succeeded in
654   getting it. In either case they no longer need to be notified.
655 */
656 static int brl_ctdb_remove_pending_func(struct ctdb_call_info *call)
657 {
658         struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
659         TDB_DATA dbuf;
660         int count, i;
661         struct lock_struct *locks;
662         NTSTATUS status = NT_STATUS_OK;
663
664         dbuf = call->record_data;
665
666         /* there are existing locks - find a match */
667         locks = (struct lock_struct *)dbuf.dptr;
668         count = dbuf.dsize / sizeof(*locks);
669
670         for (i=0; i<count; i++) {
671                 struct lock_struct *lock = &locks[i];
672                 
673                 if (lock->lock_type >= PENDING_READ_LOCK &&
674                     lock->notify_ptr == req->notify_ptr &&
675                     cluster_id_equal(&lock->context.server, &req->server)) {
676                         call->new_data = talloc(call, TDB_DATA);
677                         if (call->new_data == NULL) {
678                                 return CTDB_ERR_NOMEM;
679                         }
680
681                         call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(*lock));
682                         if (call->new_data->dptr == NULL) {
683                                 return CTDB_ERR_NOMEM;
684                         }
685                         call->new_data->dsize = dbuf.dsize - sizeof(*lock);
686
687                         memcpy(call->new_data->dptr, locks, i*sizeof(*lock));
688                         memcpy(call->new_data->dptr+i*sizeof(*lock), locks+i+1,
689                                (count-(i+1))*sizeof(*lock));
690                         break;
691                 }
692         }
693         
694         if (i == count) {
695                 /* we didn't find it */
696                 status = NT_STATUS_RANGE_NOT_LOCKED;
697         }
698
699         call->status = NT_STATUS_V(status);
700
701         return 0;
702 }
703
704 static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
705                                         struct brl_handle *brlh, 
706                                         void *notify_ptr)
707 {
708         struct ctdb_call call;
709         struct ctdb_remove_pending_req req;
710         int ret;
711
712         call.call_id = FUNC_BRL_REMOVE_PENDING;
713         call.key.dptr = brlh->key.data;
714         call.key.dsize = brlh->key.length;
715         call.call_data.dptr = (uint8_t *)&req;
716         call.call_data.dsize = sizeof(req);
717
718         ZERO_STRUCT(req);
719         req.notify_ptr = notify_ptr;
720         req.server = brl->server;
721                 
722         ret = ctdb_call(brl->ctdb_db, &call);
723         if (ret == -1) {
724                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
725                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
726         }
727
728         return NT_STATUS(call.status);
729 }
730
731
732 struct ctdb_locktest_req {
733         uint16_t smbpid;
734         uint64_t start;
735         uint64_t size;
736         enum brl_type lock_type;
737         struct brl_context *brl;
738         struct server_id server;
739         struct ntvfs_handle *ntvfs;
740 };
741
742 /*
743   remove a pending lock. This is called when the caller has either
744   given up trying to establish a lock or when they have succeeded in
745   getting it. In either case they no longer need to be notified.
746 */
747 static int brl_ctdb_locktest_func(struct ctdb_call_info *call)
748 {
749         struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
750         TDB_DATA dbuf;
751         int count, i;
752         struct lock_struct *locks, lock;
753         NTSTATUS status = NT_STATUS_OK;
754
755         lock.context.smbpid = req->smbpid;
756         lock.context.server = req->server;
757         lock.context.ctx = req->brl;
758         lock.ntvfs = req->ntvfs;
759         lock.start = req->start;
760         lock.size = req->size;
761         lock.lock_type = req->lock_type;
762
763         dbuf = call->record_data;
764
765         /* there are existing locks - find a match */
766         locks = (struct lock_struct *)dbuf.dptr;
767         count = dbuf.dsize / sizeof(*locks);
768
769         for (i=0; i<count; i++) {
770                 if (brl_ctdb_conflict_other(&locks[i], &lock)) {
771                         status = NT_STATUS_FILE_LOCK_CONFLICT;
772                         break;
773                 }
774         }
775         
776         call->status = NT_STATUS_V(status);
777
778         return 0;
779 }
780
781 /*
782   Test if we are allowed to perform IO on a region of an open file
783 */
784 static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
785                                   struct brl_handle *brlh,
786                                   uint16_t smbpid, 
787                                   uint64_t start, uint64_t size, 
788                                   enum brl_type lock_type)
789 {
790         struct ctdb_call call;
791         struct ctdb_locktest_req req;
792         int ret;
793
794         call.call_id = FUNC_BRL_LOCKTEST;
795         call.key.dptr = brlh->key.data;
796         call.key.dsize = brlh->key.length;
797         call.call_data.dptr = (uint8_t *)&req;
798         call.call_data.dsize = sizeof(req);
799
800         ZERO_STRUCT(req);
801         req.smbpid = smbpid;
802         req.start  = start;
803         req.size   = size;
804         req.lock_type = lock_type;
805         req.server = brl->server;
806         req.brl = brl;
807         req.ntvfs = brlh->ntvfs;
808
809         ret = ctdb_call(brl->ctdb_db, &call);
810         if (ret == -1) {
811                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
812                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
813         }
814
815         return NT_STATUS(call.status);
816 }
817
818
819 struct ctdb_close_req {
820         struct brl_context *brl;
821         struct server_id server;
822         struct ntvfs_handle *ntvfs;
823 };
824
825 /*
826   remove a pending lock. This is called when the caller has either
827   given up trying to establish a lock or when they have succeeded in
828   getting it. In either case they no longer need to be notified.
829 */
830 static int brl_ctdb_close_func(struct ctdb_call_info *call)
831 {
832         struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
833         TDB_DATA dbuf;
834         int count, dcount=0, i;
835         struct lock_struct *locks;
836         NTSTATUS status = NT_STATUS_OK;
837
838         dbuf = call->record_data;
839
840         /* there are existing locks - find a match */
841         locks = (struct lock_struct *)dbuf.dptr;
842         count = dbuf.dsize / sizeof(*locks);
843
844         for (i=0; i<count; i++) {
845                 struct lock_struct *lock = &locks[i];
846
847                 if (lock->context.ctx == req->brl &&
848                     cluster_id_equal(&lock->context.server, &req->server) &&
849                     lock->ntvfs == req->ntvfs) {
850                         /* found it - delete it */
851                         if (count > 1 && i < count-1) {
852                                 memmove(&locks[i], &locks[i+1], 
853                                         sizeof(*locks)*((count-1) - i));
854                         }
855                         count--;
856                         i--;
857                         dcount++;
858                 }
859         }
860
861         if (dcount > 0) {
862                 call->new_data = talloc(call, TDB_DATA);
863                 if (call->new_data == NULL) {
864                         return CTDB_ERR_NOMEM;
865                 }
866
867                 brl_ctdb_notify_all(call, locks, count);
868                 
869                 call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
870                 if (call->new_data->dptr == NULL) {
871                         return CTDB_ERR_NOMEM;
872                 }
873                 call->new_data->dsize = count*sizeof(struct lock_struct);
874
875                 memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
876         }
877
878         call->status = NT_STATUS_V(status);
879
880         return 0;
881 }
882
883 /*
884   Test if we are allowed to perform IO on a region of an open file
885 */
886 static NTSTATUS brl_ctdb_close(struct brl_context *brl,
887                                struct brl_handle *brlh)
888 {
889         struct ctdb_call call;
890         struct ctdb_close_req req;
891         int ret;
892
893         call.call_id = FUNC_BRL_CLOSE;
894         call.key.dptr = brlh->key.data;
895         call.key.dsize = brlh->key.length;
896         call.call_data.dptr = (uint8_t *)&req;
897         call.call_data.dsize = sizeof(req);
898
899         ZERO_STRUCT(req);
900         req.brl = brl;
901         req.server = brl->server;
902         req.ntvfs = brlh->ntvfs;
903
904         ret = ctdb_call(brl->ctdb_db, &call);
905         if (ret == -1) {
906                 DEBUG(0,("ctdb_call failed - %s\n", __location__));
907                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
908         }
909
910         brl_ctdb_notify_send(brl, &call.reply_data);
911
912         return NT_STATUS(call.status);
913 }
914
915
916 static const struct brlock_ops brlock_tdb_ops = {
917         .brl_init           = brl_ctdb_init,
918         .brl_create_handle  = brl_ctdb_create_handle,
919         .brl_lock           = brl_ctdb_lock,
920         .brl_unlock         = brl_ctdb_unlock,
921         .brl_remove_pending = brl_ctdb_remove_pending,
922         .brl_locktest       = brl_ctdb_locktest,
923         .brl_close          = brl_ctdb_close
924 };
925
926
927 void brl_ctdb_init_ops(void)
928 {
929         struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), 
930                                                     struct ctdb_context);
931         struct ctdb_db_context *ctdb_db;
932
933         brl_set_ops(&brlock_tdb_ops);
934
935         ctdb_db = ctdb_db_handle(ctdb, "brlock");
936         if (ctdb_db == NULL) {
937                 DEBUG(0,("Failed to get attached ctdb db handle for brlock\n"));
938                 return;
939         }
940
941         ctdb_set_call(ctdb_db, brl_ctdb_lock_func,  FUNC_BRL_LOCK);
942         ctdb_set_call(ctdb_db, brl_ctdb_unlock_func,  FUNC_BRL_UNLOCK);
943         ctdb_set_call(ctdb_db, brl_ctdb_remove_pending_func,  FUNC_BRL_REMOVE_PENDING);
944         ctdb_set_call(ctdb_db, brl_ctdb_locktest_func,  FUNC_BRL_LOCKTEST);
945         ctdb_set_call(ctdb_db, brl_ctdb_close_func,  FUNC_BRL_CLOSE);
946 }