r15614: the byte range locking error handling caches the last failed lock
[kai/samba.git] / source4 / ntvfs / common / brlock.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code
5
6    Copyright (C) Andrew Tridgell 1992-2004
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 /* This module implements a tdb based byte range locking service,
25    replacing the fcntl() based byte range locking previously
26    used. This allows us to provide the same semantics as NT */
27
28 #include "includes.h"
29 #include "system/filesys.h"
30 #include "lib/tdb/include/tdb.h"
31 #include "messaging/messaging.h"
32 #include "db_wrap.h"
33 #include "lib/messaging/irpc.h"
34 #include "libcli/libcli.h"
35
36 /*
37   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
38   a file. For a local posix filesystem this will usually be a combination
39   of the device and inode numbers of the file, but it can be anything 
40   that uniquely idetifies a file for locking purposes, as long
41   as it is applied consistently.
42 */
43
44 struct brl_context;
45 /*
46   the lock context contains the elements that define whether one
47   lock is the same as another lock
48 */
49 struct lock_context {
50         uint32_t server;
51         uint16_t smbpid;
52         struct brl_context *ctx;
53 };
54
55 /* The data in brlock records is an unsorted linear array of these
56    records.  It is unnecessary to store the count as tdb provides the
57    size of the record */
58 struct lock_struct {
59         struct lock_context context;
60         uint16_t fnum;
61         uint64_t start;
62         uint64_t size;
63         enum brl_type lock_type;
64         void *notify_ptr;
65 };
66
67 /* this struct is attached to on oprn file handle */
68 struct brl_handle {
69         DATA_BLOB key;
70         uint16_t fnum;
71         struct lock_struct last_lock;
72 };
73
74 /* this struct is typicaly attached to tcon */
75 struct brl_context {
76         struct tdb_wrap *w;
77         uint32_t server;
78         struct messaging_context *messaging_ctx;
79 };
80
81 /*
82   Open up the brlock.tdb database. Close it down using
83   talloc_free(). We need the messaging_ctx to allow for
84   pending lock notifications.
85 */
86 struct brl_context *brl_init(TALLOC_CTX *mem_ctx, uint32_t server, 
87                              struct messaging_context *messaging_ctx)
88 {
89         char *path;
90         struct brl_context *brl;
91
92         brl = talloc(mem_ctx, struct brl_context);
93         if (brl == NULL) {
94                 return NULL;
95         }
96
97         path = smbd_tmp_path(brl, "brlock.tdb");
98         brl->w = tdb_wrap_open(brl, path, 0,
99                                TDB_DEFAULT, O_RDWR|O_CREAT, 0600);
100         talloc_free(path);
101         if (brl->w == NULL) {
102                 talloc_free(brl);
103                 return NULL;
104         }
105
106         brl->server = server;
107         brl->messaging_ctx = messaging_ctx;
108
109         return brl;
110 }
111
112 struct brl_handle *brl_create_handle(TALLOC_CTX *mem_ctx, DATA_BLOB *file_key, uint16_t fnum)
113 {
114         struct brl_handle *brlh;
115
116         brlh = talloc(mem_ctx, struct brl_handle);
117         if (brlh == NULL) {
118                 return NULL;
119         }
120
121         brlh->key = *file_key;
122         brlh->fnum = fnum;
123         ZERO_STRUCT(brlh->last_lock);
124
125         return brlh;
126 }
127
128 /*
129   see if two locking contexts are equal
130 */
131 static BOOL brl_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
132 {
133         return (ctx1->server == ctx2->server &&
134                 ctx1->smbpid == ctx2->smbpid &&
135                 ctx1->ctx == ctx2->ctx);
136 }
137
138 /*
139   see if lck1 and lck2 overlap
140 */
141 static BOOL brl_overlap(struct lock_struct *lck1, 
142                         struct lock_struct *lck2)
143 {
144         /* this extra check is not redundent - it copes with locks
145            that go beyond the end of 64 bit file space */
146         if (lck1->size != 0 &&
147             lck1->start == lck2->start &&
148             lck1->size == lck2->size) {
149                 return True;
150         }
151             
152         if (lck1->start >= (lck2->start+lck2->size) ||
153             lck2->start >= (lck1->start+lck1->size)) {
154                 return False;
155         }
156         return True;
157
158
159 /*
160  See if lock2 can be added when lock1 is in place.
161 */
162 static BOOL brl_conflict(struct lock_struct *lck1, 
163                          struct lock_struct *lck2)
164 {
165         /* pending locks don't conflict with anything */
166         if (lck1->lock_type >= PENDING_READ_LOCK ||
167             lck2->lock_type >= PENDING_READ_LOCK) {
168                 return False;
169         }
170
171         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
172                 return False;
173         }
174
175         if (brl_same_context(&lck1->context, &lck2->context) &&
176             lck2->lock_type == READ_LOCK && lck1->fnum == lck2->fnum) {
177                 return False;
178         }
179
180         return brl_overlap(lck1, lck2);
181
182
183
184 /*
185  Check to see if this lock conflicts, but ignore our own locks on the
186  same fnum only.
187 */
188 static BOOL brl_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
189 {
190         /* pending locks don't conflict with anything */
191         if (lck1->lock_type >= PENDING_READ_LOCK ||
192             lck2->lock_type >= PENDING_READ_LOCK) {
193                 return False;
194         }
195
196         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
197                 return False;
198
199         /*
200          * note that incoming write calls conflict with existing READ
201          * locks even if the context is the same. JRA. See LOCKTEST7
202          * in smbtorture.
203          */
204         if (brl_same_context(&lck1->context, &lck2->context) &&
205             lck1->fnum == lck2->fnum &&
206             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
207                 return False;
208         }
209
210         return brl_overlap(lck1, lck2);
211
212
213
214 /*
215   amazingly enough, w2k3 "remembers" whether the last lock failure
216   is the same as this one and changes its error code. I wonder if any
217   app depends on this?
218 */
219 static NTSTATUS brl_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
220 {
221         /*
222          * this function is only called for non pending lock!
223          */
224
225         /* 
226          * if the notify_ptr is non NULL,
227          * it means that we're at the end of a pending lock
228          * and the real lock is requested after the timout went by
229          * In this case we need to remember the last_lock and always
230          * give FILE_LOCK_CONFLICT
231          */
232         if (lock->notify_ptr) {
233                 brlh->last_lock = *lock;
234                 return NT_STATUS_FILE_LOCK_CONFLICT;
235         }
236
237         /* 
238          * amazing the little things you learn with a test
239          * suite. Locks beyond this offset (as a 64 bit
240          * number!) always generate the conflict error code,
241          * unless the top bit is set
242          */
243         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
244                 brlh->last_lock = *lock;
245                 return NT_STATUS_FILE_LOCK_CONFLICT;
246         }
247
248         /*
249          * if the current lock matches the last failed lock on the file handle
250          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
251          */
252         if (lock->context.server == brlh->last_lock.context.server &&
253             lock->context.ctx == brlh->last_lock.context.ctx &&
254             lock->fnum == brlh->last_lock.fnum &&
255             lock->start == brlh->last_lock.start) {
256                 return NT_STATUS_FILE_LOCK_CONFLICT;
257         }
258
259         brlh->last_lock = *lock;
260         return NT_STATUS_LOCK_NOT_GRANTED;
261 }
262
263 /*
264   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
265   which case a real lock is first tried, and if that fails then a
266   pending lock is created. When the pending lock is triggered (by
267   someone else closing an overlapping lock range) a messaging
268   notification is sent, identified by the notify_ptr
269 */
270 NTSTATUS brl_lock(struct brl_context *brl,
271                   struct brl_handle *brlh,
272                   uint16_t smbpid,
273                   uint64_t start, uint64_t size, 
274                   enum brl_type lock_type,
275                   void *notify_ptr)
276 {
277         TDB_DATA kbuf, dbuf;
278         int count=0, i;
279         struct lock_struct lock, *locks=NULL;
280         NTSTATUS status;
281
282         kbuf.dptr = brlh->key.data;
283         kbuf.dsize = brlh->key.length;
284
285         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
286                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
287         }
288
289         /* if this is a pending lock, then with the chainlock held we
290            try to get the real lock. If we succeed then we don't need
291            to make it pending. This prevents a possible race condition
292            where the pending lock gets created after the lock that is
293            preventing the real lock gets removed */
294         if (lock_type >= PENDING_READ_LOCK) {
295                 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
296
297                 /* here we need to force that the last_lock isn't overwritten */
298                 lock = brlh->last_lock;
299                 status = brl_lock(brl, brlh, smbpid, start, size, rw, NULL);
300                 brlh->last_lock = lock;
301
302                 if (NT_STATUS_IS_OK(status)) {
303                         tdb_chainunlock(brl->w->tdb, kbuf);
304                         return NT_STATUS_OK;
305                 }
306         }
307
308         dbuf = tdb_fetch(brl->w->tdb, kbuf);
309
310         lock.context.smbpid = smbpid;
311         lock.context.server = brl->server;
312         lock.context.ctx = brl;
313         lock.fnum = brlh->fnum;
314         lock.context.ctx = brl;
315         lock.start = start;
316         lock.size = size;
317         lock.lock_type = lock_type;
318         lock.notify_ptr = notify_ptr;
319
320         if (dbuf.dptr) {
321                 /* there are existing locks - make sure they don't conflict */
322                 locks = (struct lock_struct *)dbuf.dptr;
323                 count = dbuf.dsize / sizeof(*locks);
324                 for (i=0; i<count; i++) {
325                         if (brl_conflict(&locks[i], &lock)) {
326                                 status = brl_lock_failed(brlh, &lock);
327                                 goto fail;
328                         }
329                 }
330         }
331
332         /* no conflicts - add it to the list of locks */
333         locks = realloc_p(locks, struct lock_struct, count+1);
334         if (!locks) {
335                 status = NT_STATUS_NO_MEMORY;
336                 goto fail;
337         } else {
338                 dbuf.dptr = (uint8_t *)locks;
339         }
340         locks[count] = lock;
341         dbuf.dsize += sizeof(lock);
342
343         if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
344                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
345                 goto fail;
346         }
347
348         free(dbuf.dptr);
349         tdb_chainunlock(brl->w->tdb, kbuf);
350
351         /* the caller needs to know if the real lock was granted. If
352            we have reached here then it must be a pending lock that
353            was granted, so tell them the lock failed */
354         if (lock_type >= PENDING_READ_LOCK) {
355                 return NT_STATUS_LOCK_NOT_GRANTED;
356         }
357
358         return NT_STATUS_OK;
359
360  fail:
361
362         free(dbuf.dptr);
363         tdb_chainunlock(brl->w->tdb, kbuf);
364         return status;
365 }
366
367
368 /*
369   we are removing a lock that might be holding up a pending lock. Scan for pending
370   locks that cover this range and if we find any then notify the server that it should
371   retry the lock
372 */
373 static void brl_notify_unlock(struct brl_context *brl,
374                               struct lock_struct *locks, int count, 
375                               struct lock_struct *removed_lock)
376 {
377         int i, last_notice;
378
379         /* the last_notice logic is to prevent stampeding on a lock
380            range. It prevents us sending hundreds of notifies on the
381            same range of bytes. It doesn't prevent all possible
382            stampedes, but it does prevent the most common problem */
383         last_notice = -1;
384
385         for (i=0;i<count;i++) {
386                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
387                     brl_overlap(&locks[i], removed_lock)) {
388                         if (last_notice != -1 && brl_overlap(&locks[i], &locks[last_notice])) {
389                                 continue;
390                         }
391                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
392                                 last_notice = i;
393                         }
394                         messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
395                                            MSG_BRL_RETRY, locks[i].notify_ptr);
396                 }
397         }
398 }
399
400
401 /*
402   send notifications for all pending locks - the file is being closed by this
403   user
404 */
405 static void brl_notify_all(struct brl_context *brl,
406                            struct lock_struct *locks, int count)
407 {
408         int i;
409         for (i=0;i<count;i++) {
410                 if (locks->lock_type >= PENDING_READ_LOCK) {
411                         brl_notify_unlock(brl, locks, count, &locks[i]);
412                 }
413         }
414 }
415
416
417
418 /*
419  Unlock a range of bytes.
420 */
421 NTSTATUS brl_unlock(struct brl_context *brl,
422                     struct brl_handle *brlh, 
423                     uint16_t smbpid,
424                     uint64_t start, uint64_t size)
425 {
426         TDB_DATA kbuf, dbuf;
427         int count, i;
428         struct lock_struct *locks;
429         struct lock_context context;
430         NTSTATUS status;
431
432         kbuf.dptr = brlh->key.data;
433         kbuf.dsize = brlh->key.length;
434
435         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
436                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
437         }
438
439         dbuf = tdb_fetch(brl->w->tdb, kbuf);
440         if (!dbuf.dptr) {
441                 tdb_chainunlock(brl->w->tdb, kbuf);
442                 return NT_STATUS_RANGE_NOT_LOCKED;
443         }
444
445         context.smbpid = smbpid;
446         context.server = brl->server;
447         context.ctx = brl;
448
449         /* there are existing locks - find a match */
450         locks = (struct lock_struct *)dbuf.dptr;
451         count = dbuf.dsize / sizeof(*locks);
452
453         for (i=0; i<count; i++) {
454                 struct lock_struct *lock = &locks[i];
455                 
456                 if (brl_same_context(&lock->context, &context) &&
457                     lock->fnum == brlh->fnum &&
458                     lock->start == start &&
459                     lock->size == size &&
460                     lock->lock_type < PENDING_READ_LOCK) {
461                         /* found it - delete it */
462                         if (count == 1) {
463                                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
464                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
465                                         goto fail;
466                                 }
467                         } else {
468                                 struct lock_struct removed_lock = *lock;
469                                 if (i < count-1) {
470                                         memmove(&locks[i], &locks[i+1], 
471                                                 sizeof(*locks)*((count-1) - i));
472                                 }
473                                 count--;
474
475                                 /* send notifications for any relevant pending locks */
476                                 brl_notify_unlock(brl, locks, count, &removed_lock);
477
478                                 dbuf.dsize = count * sizeof(*locks);
479
480                                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
481                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
482                                         goto fail;
483                                 }
484                         }
485                         
486                         free(dbuf.dptr);
487                         tdb_chainunlock(brl->w->tdb, kbuf);
488                         return NT_STATUS_OK;
489                 }
490         }
491         
492         /* we didn't find it */
493         status = NT_STATUS_RANGE_NOT_LOCKED;
494
495  fail:
496         free(dbuf.dptr);
497         tdb_chainunlock(brl->w->tdb, kbuf);
498         return status;
499 }
500
501
502 /*
503   remove a pending lock. This is called when the caller has either
504   given up trying to establish a lock or when they have succeeded in
505   getting it. In either case they no longer need to be notified.
506 */
507 NTSTATUS brl_remove_pending(struct brl_context *brl,
508                             struct brl_handle *brlh, 
509                             void *notify_ptr)
510 {
511         TDB_DATA kbuf, dbuf;
512         int count, i;
513         struct lock_struct *locks;
514         NTSTATUS status;
515
516         kbuf.dptr = brlh->key.data;
517         kbuf.dsize = brlh->key.length;
518
519         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
520                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
521         }
522
523         dbuf = tdb_fetch(brl->w->tdb, kbuf);
524         if (!dbuf.dptr) {
525                 tdb_chainunlock(brl->w->tdb, kbuf);
526                 return NT_STATUS_RANGE_NOT_LOCKED;
527         }
528
529         /* there are existing locks - find a match */
530         locks = (struct lock_struct *)dbuf.dptr;
531         count = dbuf.dsize / sizeof(*locks);
532
533         for (i=0; i<count; i++) {
534                 struct lock_struct *lock = &locks[i];
535                 
536                 if (lock->lock_type >= PENDING_READ_LOCK &&
537                     lock->notify_ptr == notify_ptr &&
538                     lock->context.server == brl->server) {
539                         /* found it - delete it */
540                         if (count == 1) {
541                                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
542                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
543                                         goto fail;
544                                 }
545                         } else {
546                                 if (i < count-1) {
547                                         memmove(&locks[i], &locks[i+1], 
548                                                 sizeof(*locks)*((count-1) - i));
549                                 }
550                                 count--;
551                                 dbuf.dsize = count * sizeof(*locks);
552                                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
553                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
554                                         goto fail;
555                                 }
556                         }
557                         
558                         free(dbuf.dptr);
559                         tdb_chainunlock(brl->w->tdb, kbuf);
560                         return NT_STATUS_OK;
561                 }
562         }
563         
564         /* we didn't find it */
565         status = NT_STATUS_RANGE_NOT_LOCKED;
566
567  fail:
568         free(dbuf.dptr);
569         tdb_chainunlock(brl->w->tdb, kbuf);
570         return status;
571 }
572
573
574 /*
575   Test if we are allowed to perform IO on a region of an open file
576 */
577 NTSTATUS brl_locktest(struct brl_context *brl,
578                       struct brl_handle *brlh,
579                       uint16_t smbpid, 
580                       uint64_t start, uint64_t size, 
581                       enum brl_type lock_type)
582 {
583         TDB_DATA kbuf, dbuf;
584         int count, i;
585         struct lock_struct lock, *locks;
586
587         kbuf.dptr = brlh->key.data;
588         kbuf.dsize = brlh->key.length;
589
590         dbuf = tdb_fetch(brl->w->tdb, kbuf);
591         if (dbuf.dptr == NULL) {
592                 return NT_STATUS_OK;
593         }
594
595         lock.context.smbpid = smbpid;
596         lock.context.server = brl->server;
597         lock.context.ctx = brl;
598         lock.fnum = brlh->fnum;
599         lock.start = start;
600         lock.size = size;
601         lock.lock_type = lock_type;
602
603         /* there are existing locks - make sure they don't conflict */
604         locks = (struct lock_struct *)dbuf.dptr;
605         count = dbuf.dsize / sizeof(*locks);
606
607         for (i=0; i<count; i++) {
608                 if (brl_conflict_other(&locks[i], &lock)) {
609                         free(dbuf.dptr);
610                         return NT_STATUS_FILE_LOCK_CONFLICT;
611                 }
612         }
613
614         free(dbuf.dptr);
615         return NT_STATUS_OK;
616 }
617
618
619 /*
620  Remove any locks associated with a open file.
621 */
622 NTSTATUS brl_close(struct brl_context *brl,
623                    struct brl_handle *brlh)
624 {
625         TDB_DATA kbuf, dbuf;
626         int count, i, dcount=0;
627         struct lock_struct *locks;
628         NTSTATUS status;
629
630         kbuf.dptr = brlh->key.data;
631         kbuf.dsize = brlh->key.length;
632
633         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
634                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
635         }
636
637         dbuf = tdb_fetch(brl->w->tdb, kbuf);
638         if (!dbuf.dptr) {
639                 tdb_chainunlock(brl->w->tdb, kbuf);
640                 return NT_STATUS_OK;
641         }
642
643         /* there are existing locks - remove any for this fnum */
644         locks = (struct lock_struct *)dbuf.dptr;
645         count = dbuf.dsize / sizeof(*locks);
646
647         for (i=0; i<count; i++) {
648                 struct lock_struct *lock = &locks[i];
649
650                 if (lock->context.ctx == brl &&
651                     lock->context.server == brl->server &&
652                     lock->fnum == brlh->fnum) {
653                         /* found it - delete it */
654                         if (count > 1 && i < count-1) {
655                                 memmove(&locks[i], &locks[i+1], 
656                                         sizeof(*locks)*((count-1) - i));
657                         }
658                         count--;
659                         i--;
660                         dcount++;
661                 }
662         }
663
664         status = NT_STATUS_OK;
665
666         if (count == 0) {
667                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
668                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
669                 }
670         } else if (dcount != 0) {
671                 /* tell all pending lock holders for this file that
672                    they have a chance now. This is a bit indiscriminant,
673                    but works OK */
674                 brl_notify_all(brl, locks, count);
675
676                 dbuf.dsize = count * sizeof(*locks);
677
678                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
679                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
680                 }
681         }
682
683         free(dbuf.dptr);
684         tdb_chainunlock(brl->w->tdb, kbuf);
685
686         return status;
687 }
688