Merge branch 'master' of ssh://git.samba.org/data/git/samba into noejs
[sfrench/samba-autobuild/.git] / source4 / ntvfs / common / brlock_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - tdb backend
5
6    Copyright (C) Andrew Tridgell 1992-2006
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /* This module implements a tdb based byte range locking service,
24    replacing the fcntl() based byte range locking previously
25    used. This allows us to provide the same semantics as NT */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "../tdb/include/tdb.h"
30 #include "messaging/messaging.h"
31 #include "lib/dbwrap/dbwrap.h"
32 #include "lib/messaging/irpc.h"
33 #include "libcli/libcli.h"
34 #include "cluster/cluster.h"
35 #include "ntvfs/common/brlock.h"
36 #include "ntvfs/ntvfs.h"
37 #include "param/param.h"
38
39 /*
40   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
41   a file. For a local posix filesystem this will usually be a combination
42   of the device and inode numbers of the file, but it can be anything 
43   that uniquely idetifies a file for locking purposes, as long
44   as it is applied consistently.
45 */
46
47 /* this struct is typicaly attached to tcon */
48 struct brl_context {
49         struct db_context *db;
50         struct server_id server;
51         struct messaging_context *messaging_ctx;
52 };
53
54 /*
55   the lock context contains the elements that define whether one
56   lock is the same as another lock
57 */
58 struct lock_context {
59         struct server_id server;
60         uint32_t smbpid;
61         struct brl_context *ctx;
62 };
63
64 /* The data in brlock records is an unsorted linear array of these
65    records.  It is unnecessary to store the count as tdb provides the
66    size of the record */
67 struct lock_struct {
68         struct lock_context context;
69         struct ntvfs_handle *ntvfs;
70         uint64_t start;
71         uint64_t size;
72         enum brl_type lock_type;
73         void *notify_ptr;
74 };
75
76 /* this struct is attached to on oprn file handle */
77 struct brl_handle {
78         DATA_BLOB key;
79         struct ntvfs_handle *ntvfs;
80         struct lock_struct last_lock;
81 };
82
83 /*
84   Open up the brlock.tdb database. Close it down using
85   talloc_free(). We need the messaging_ctx to allow for
86   pending lock notifications.
87 */
88 static struct brl_context *brl_tdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
89                                         struct loadparm_context *lp_ctx,
90                                     struct messaging_context *messaging_ctx)
91 {
92         struct brl_context *brl;
93
94         brl = talloc(mem_ctx, struct brl_context);
95         if (brl == NULL) {
96                 return NULL;
97         }
98
99         brl->db = db_tmp_open(brl, lp_ctx, "brlock.tdb", TDB_DEFAULT);
100         if (brl->db == NULL) {
101                 talloc_free(brl);
102                 return NULL;
103         }
104
105         brl->server = server;
106         brl->messaging_ctx = messaging_ctx;
107
108         return brl;
109 }
110
111 static struct brl_handle *brl_tdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
112                                                     DATA_BLOB *file_key)
113 {
114         struct brl_handle *brlh;
115
116         brlh = talloc(mem_ctx, struct brl_handle);
117         if (brlh == NULL) {
118                 return NULL;
119         }
120
121         brlh->key = *file_key;
122         brlh->ntvfs = ntvfs;
123         ZERO_STRUCT(brlh->last_lock);
124
125         return brlh;
126 }
127
128 /*
129   see if two locking contexts are equal
130 */
131 static bool brl_tdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
132 {
133         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
134                 ctx1->smbpid == ctx2->smbpid &&
135                 ctx1->ctx == ctx2->ctx);
136 }
137
138 /*
139   see if lck1 and lck2 overlap
140
141   lck1 is the existing lock. lck2 is the new lock we are 
142   looking at adding
143 */
144 static bool brl_tdb_overlap(struct lock_struct *lck1, 
145                             struct lock_struct *lck2)
146 {
147         /* this extra check is not redundent - it copes with locks
148            that go beyond the end of 64 bit file space */
149         if (lck1->size != 0 &&
150             lck1->start == lck2->start &&
151             lck1->size == lck2->size) {
152                 return true;
153         }
154             
155         if (lck1->start >= (lck2->start+lck2->size) ||
156             lck2->start >= (lck1->start+lck1->size)) {
157                 return false;
158         }
159
160         /* we have a conflict. Now check to see if lck1 really still
161          * exists, which involves checking if the process still
162          * exists. We leave this test to last as its the most
163          * expensive test, especially when we are clustered */
164         /* TODO: need to do this via a server_id_exists() call, which
165          * hasn't been written yet. When clustered this will need to
166          * call into ctdb */
167
168         return true;
169
170
171 /*
172  See if lock2 can be added when lock1 is in place.
173 */
174 static bool brl_tdb_conflict(struct lock_struct *lck1, 
175                          struct lock_struct *lck2)
176 {
177         /* pending locks don't conflict with anything */
178         if (lck1->lock_type >= PENDING_READ_LOCK ||
179             lck2->lock_type >= PENDING_READ_LOCK) {
180                 return false;
181         }
182
183         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
184                 return false;
185         }
186
187         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
188             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
189                 return false;
190         }
191
192         return brl_tdb_overlap(lck1, lck2);
193
194
195
196 /*
197  Check to see if this lock conflicts, but ignore our own locks on the
198  same fnum only.
199 */
200 static bool brl_tdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
201 {
202         /* pending locks don't conflict with anything */
203         if (lck1->lock_type >= PENDING_READ_LOCK ||
204             lck2->lock_type >= PENDING_READ_LOCK) {
205                 return false;
206         }
207
208         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
209                 return false;
210
211         /*
212          * note that incoming write calls conflict with existing READ
213          * locks even if the context is the same. JRA. See LOCKTEST7
214          * in smbtorture.
215          */
216         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
217             lck1->ntvfs == lck2->ntvfs &&
218             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
219                 return false;
220         }
221
222         return brl_tdb_overlap(lck1, lck2);
223
224
225
226 /*
227   amazingly enough, w2k3 "remembers" whether the last lock failure
228   is the same as this one and changes its error code. I wonder if any
229   app depends on this?
230 */
231 static NTSTATUS brl_tdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
232 {
233         /*
234          * this function is only called for non pending lock!
235          */
236
237         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
238         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
239                 return NT_STATUS_LOCK_NOT_GRANTED;
240         }
241
242         /* 
243          * if the notify_ptr is non NULL,
244          * it means that we're at the end of a pending lock
245          * and the real lock is requested after the timout went by
246          * In this case we need to remember the last_lock and always
247          * give FILE_LOCK_CONFLICT
248          */
249         if (lock->notify_ptr) {
250                 brlh->last_lock = *lock;
251                 return NT_STATUS_FILE_LOCK_CONFLICT;
252         }
253
254         /* 
255          * amazing the little things you learn with a test
256          * suite. Locks beyond this offset (as a 64 bit
257          * number!) always generate the conflict error code,
258          * unless the top bit is set
259          */
260         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
261                 brlh->last_lock = *lock;
262                 return NT_STATUS_FILE_LOCK_CONFLICT;
263         }
264
265         /*
266          * if the current lock matches the last failed lock on the file handle
267          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
268          */
269         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
270             lock->context.ctx == brlh->last_lock.context.ctx &&
271             lock->ntvfs == brlh->last_lock.ntvfs &&
272             lock->start == brlh->last_lock.start) {
273                 return NT_STATUS_FILE_LOCK_CONFLICT;
274         }
275
276         brlh->last_lock = *lock;
277         return NT_STATUS_LOCK_NOT_GRANTED;
278 }
279
280 /*
281   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
282   which case a real lock is first tried, and if that fails then a
283   pending lock is created. When the pending lock is triggered (by
284   someone else closing an overlapping lock range) a messaging
285   notification is sent, identified by the notify_ptr
286 */
287 static NTSTATUS brl_tdb_lock(struct brl_context *brl,
288                          struct brl_handle *brlh,
289                          uint32_t smbpid,
290                          uint64_t start, uint64_t size, 
291                          enum brl_type lock_type,
292                          void *notify_ptr)
293 {
294         TDB_DATA kbuf, dbuf;
295         int count=0, i;
296         struct lock_struct lock, *locks=NULL;
297         NTSTATUS status;
298         struct db_record *rec = NULL;
299
300         /* if this is a pending lock, then with the chainlock held we
301            try to get the real lock. If we succeed then we don't need
302            to make it pending. This prevents a possible race condition
303            where the pending lock gets created after the lock that is
304            preventing the real lock gets removed */
305         if (lock_type >= PENDING_READ_LOCK) {
306                 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
307
308                 /* here we need to force that the last_lock isn't overwritten */
309                 lock = brlh->last_lock;
310                 status = brl_tdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
311                 brlh->last_lock = lock;
312
313                 if (NT_STATUS_IS_OK(status)) {
314                         return NT_STATUS_OK;
315                 }
316         }
317
318         kbuf.dptr = brlh->key.data;
319         kbuf.dsize = brlh->key.length;
320
321         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
322         if (rec == NULL) {
323                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
324         }
325
326
327         dbuf = rec->value;
328
329         lock.context.smbpid = smbpid;
330         lock.context.server = brl->server;
331         lock.context.ctx = brl;
332         lock.ntvfs = brlh->ntvfs;
333         lock.context.ctx = brl;
334         lock.start = start;
335         lock.size = size;
336         lock.lock_type = lock_type;
337         lock.notify_ptr = notify_ptr;
338
339         if (dbuf.dptr) {
340                 /* there are existing locks - make sure they don't conflict */
341                 locks = (struct lock_struct *)dbuf.dptr;
342                 count = dbuf.dsize / sizeof(*locks);
343                 for (i=0; i<count; i++) {
344                         if (brl_tdb_conflict(&locks[i], &lock)) {
345                                 status = brl_tdb_lock_failed(brlh, &lock);
346                                 goto fail;
347                         }
348                 }
349         }
350
351         /* no conflicts - add it to the list of locks */
352         locks = talloc_realloc(rec, locks, struct lock_struct, count+1);
353         if (!locks) {
354                 status = NT_STATUS_NO_MEMORY;
355                 goto fail;
356         } else {
357                 dbuf.dptr = (uint8_t *)locks;
358         }
359         locks[count] = lock;
360         dbuf.dsize += sizeof(lock);
361
362         status = rec->store(rec, dbuf, TDB_REPLACE);
363         if (!NT_STATUS_IS_OK(status)) {
364                 goto fail;
365         }
366
367         talloc_free(rec);
368
369         /* the caller needs to know if the real lock was granted. If
370            we have reached here then it must be a pending lock that
371            was granted, so tell them the lock failed */
372         if (lock_type >= PENDING_READ_LOCK) {
373                 return NT_STATUS_LOCK_NOT_GRANTED;
374         }
375
376         return NT_STATUS_OK;
377
378  fail:
379         talloc_free(rec);
380         return status;
381 }
382
383
384 /*
385   we are removing a lock that might be holding up a pending lock. Scan for pending
386   locks that cover this range and if we find any then notify the server that it should
387   retry the lock
388 */
389 static void brl_tdb_notify_unlock(struct brl_context *brl,
390                               struct lock_struct *locks, int count, 
391                               struct lock_struct *removed_lock)
392 {
393         int i, last_notice;
394
395         /* the last_notice logic is to prevent stampeding on a lock
396            range. It prevents us sending hundreds of notifies on the
397            same range of bytes. It doesn't prevent all possible
398            stampedes, but it does prevent the most common problem */
399         last_notice = -1;
400
401         for (i=0;i<count;i++) {
402                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
403                     brl_tdb_overlap(&locks[i], removed_lock)) {
404                         if (last_notice != -1 && brl_tdb_overlap(&locks[i], &locks[last_notice])) {
405                                 continue;
406                         }
407                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
408                                 last_notice = i;
409                         }
410                         messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
411                                            MSG_BRL_RETRY, locks[i].notify_ptr);
412                 }
413         }
414 }
415
416
417 /*
418   send notifications for all pending locks - the file is being closed by this
419   user
420 */
421 static void brl_tdb_notify_all(struct brl_context *brl,
422                            struct lock_struct *locks, int count)
423 {
424         int i;
425         for (i=0;i<count;i++) {
426                 if (locks->lock_type >= PENDING_READ_LOCK) {
427                         brl_tdb_notify_unlock(brl, locks, count, &locks[i]);
428                 }
429         }
430 }
431
432
433
434 /*
435  Unlock a range of bytes.
436 */
437 static NTSTATUS brl_tdb_unlock(struct brl_context *brl,
438                            struct brl_handle *brlh, 
439                            uint32_t smbpid,
440                            uint64_t start, uint64_t size)
441 {
442         TDB_DATA kbuf, dbuf;
443         int count, i;
444         struct lock_struct *locks, *lock;
445         struct lock_context context;
446         NTSTATUS status;
447         struct db_record *rec = NULL;
448
449         kbuf.dptr = brlh->key.data;
450         kbuf.dsize = brlh->key.length;
451
452         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
453         if (rec == NULL) {
454                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
455         }
456
457         if (!rec->value.dptr) {
458                 talloc_free(rec);
459                 return NT_STATUS_RANGE_NOT_LOCKED;
460         }
461
462         dbuf = rec->value;
463
464         context.smbpid = smbpid;
465         context.server = brl->server;
466         context.ctx = brl;
467
468         /* there are existing locks - find a match */
469         locks = (struct lock_struct *)dbuf.dptr;
470         count = dbuf.dsize / sizeof(*locks);
471
472         for (i=0; i<count; i++) {
473                 lock = &locks[i];
474                 if (brl_tdb_same_context(&lock->context, &context) &&
475                     lock->ntvfs == brlh->ntvfs &&
476                     lock->start == start &&
477                     lock->size == size &&
478                     lock->lock_type == WRITE_LOCK) {
479                         break;
480                 }
481         }
482         if (i < count) goto found;
483
484         for (i=0; i<count; i++) {
485                 lock = &locks[i];
486                 if (brl_tdb_same_context(&lock->context, &context) &&
487                     lock->ntvfs == brlh->ntvfs &&
488                     lock->start == start &&
489                     lock->size == size &&
490                     lock->lock_type < PENDING_READ_LOCK) {
491                         break;
492                 }
493         }
494
495 found:
496         if (i == count) {
497                 status = NT_STATUS_RANGE_NOT_LOCKED;
498         } else if (count == 1) {
499                 status = rec->delete_rec(rec);
500         } else {
501                 struct lock_struct removed_lock = *lock;
502                 if (i < count-1) {
503                         memmove(&locks[i], &locks[i+1], 
504                                 sizeof(*locks)*((count-1) - i));
505                 }
506                 count--;
507                 
508                 /* send notifications for any relevant pending locks */
509                 brl_tdb_notify_unlock(brl, locks, count, &removed_lock);
510                 
511                 dbuf.dsize = count * sizeof(*locks);
512                 
513                 status = rec->store(rec, dbuf, TDB_REPLACE);
514         }
515
516         talloc_free(rec);
517         return status;
518 }
519
520
521 /*
522   remove a pending lock. This is called when the caller has either
523   given up trying to establish a lock or when they have succeeded in
524   getting it. In either case they no longer need to be notified.
525 */
526 static NTSTATUS brl_tdb_remove_pending(struct brl_context *brl,
527                                    struct brl_handle *brlh, 
528                                    void *notify_ptr)
529 {
530         TDB_DATA kbuf, dbuf;
531         int count, i;
532         struct lock_struct *locks;
533         NTSTATUS status;
534         struct db_record *rec = NULL;
535
536         kbuf.dptr = brlh->key.data;
537         kbuf.dsize = brlh->key.length;
538
539         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
540         if (rec == NULL) {
541                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
542         }
543
544         dbuf = rec->value;
545
546         /* there are existing locks - find a match */
547         locks = (struct lock_struct *)dbuf.dptr;
548         count = dbuf.dsize / sizeof(*locks);
549
550         status = NT_STATUS_RANGE_NOT_LOCKED;
551
552         for (i=0; i<count; i++) {
553                 struct lock_struct *lock = &locks[i];
554                 
555                 if (lock->lock_type >= PENDING_READ_LOCK &&
556                     lock->notify_ptr == notify_ptr &&
557                     cluster_id_equal(&lock->context.server, &brl->server)) {
558                         /* found it - delete it */
559                         if (count == 1) {
560                                 status = rec->delete_rec(rec);
561                         } else {
562                                 if (i < count-1) {
563                                         memmove(&locks[i], &locks[i+1], 
564                                                 sizeof(*locks)*((count-1) - i));
565                                 }
566                                 count--;
567                                 dbuf.dsize = count * sizeof(*locks);
568                                 status = rec->store(rec, dbuf, TDB_REPLACE);
569                         }                       
570                         break;
571                 }
572         }
573
574         talloc_free(rec);
575         return status;
576 }
577
578
579 /*
580   Test if we are allowed to perform IO on a region of an open file
581 */
582 static NTSTATUS brl_tdb_locktest(struct brl_context *brl,
583                              struct brl_handle *brlh,
584                              uint32_t smbpid, 
585                              uint64_t start, uint64_t size, 
586                              enum brl_type lock_type)
587 {
588         TDB_DATA kbuf, dbuf;
589         int count, i;
590         struct lock_struct lock, *locks;
591         NTSTATUS status;
592
593         kbuf.dptr = brlh->key.data;
594         kbuf.dsize = brlh->key.length;
595
596         if (brl->db->fetch(brl->db, brl, kbuf, &dbuf) != 0) {
597                 return NT_STATUS_OK;
598         }
599
600         lock.context.smbpid = smbpid;
601         lock.context.server = brl->server;
602         lock.context.ctx = brl;
603         lock.ntvfs = brlh->ntvfs;
604         lock.start = start;
605         lock.size = size;
606         lock.lock_type = lock_type;
607
608         /* there are existing locks - make sure they don't conflict */
609         locks = (struct lock_struct *)dbuf.dptr;
610         count = dbuf.dsize / sizeof(*locks);
611
612         status = NT_STATUS_OK;
613
614         for (i=0; i<count; i++) {
615                 if (brl_tdb_conflict_other(&locks[i], &lock)) {
616                         status = NT_STATUS_FILE_LOCK_CONFLICT;
617                         break;
618                 }
619         }
620
621         talloc_free(dbuf.dptr);
622         return status;
623 }
624
625
626 /*
627  Remove any locks associated with a open file.
628 */
629 static NTSTATUS brl_tdb_close(struct brl_context *brl,
630                           struct brl_handle *brlh)
631 {
632         TDB_DATA kbuf, dbuf;
633         int count, i, dcount=0;
634         struct lock_struct *locks;
635         NTSTATUS status;
636         struct db_record *rec = NULL;
637
638         kbuf.dptr = brlh->key.data;
639         kbuf.dsize = brlh->key.length;
640
641         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
642         if (rec == NULL) {
643                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
644         }
645
646         dbuf = rec->value;
647         if (!dbuf.dptr) {
648                 talloc_free(rec);
649                 return NT_STATUS_OK;
650         }
651
652         /* there are existing locks - remove any for this fnum */
653         locks = (struct lock_struct *)dbuf.dptr;
654         count = dbuf.dsize / sizeof(*locks);
655
656         for (i=0; i<count; i++) {
657                 struct lock_struct *lock = &locks[i];
658
659                 if (lock->context.ctx == brl &&
660                     cluster_id_equal(&lock->context.server, &brl->server) &&
661                     lock->ntvfs == brlh->ntvfs) {
662                         /* found it - delete it */
663                         if (count > 1 && i < count-1) {
664                                 memmove(&locks[i], &locks[i+1], 
665                                         sizeof(*locks)*((count-1) - i));
666                         }
667                         count--;
668                         i--;
669                         dcount++;
670                 }
671         }
672
673         status = NT_STATUS_OK;
674
675         if (count == 0) {
676                 status = rec->delete_rec(rec);
677         } else if (dcount != 0) {
678                 /* tell all pending lock holders for this file that
679                    they have a chance now. This is a bit indiscriminant,
680                    but works OK */
681                 brl_tdb_notify_all(brl, locks, count);
682
683                 dbuf.dsize = count * sizeof(*locks);
684
685                 status = rec->store(rec, dbuf, TDB_REPLACE);
686         }
687
688         talloc_free(rec);
689
690         return status;
691 }
692
693
694 static const struct brlock_ops brlock_tdb_ops = {
695         .brl_init           = brl_tdb_init,
696         .brl_create_handle  = brl_tdb_create_handle,
697         .brl_lock           = brl_tdb_lock,
698         .brl_unlock         = brl_tdb_unlock,
699         .brl_remove_pending = brl_tdb_remove_pending,
700         .brl_locktest       = brl_tdb_locktest,
701         .brl_close          = brl_tdb_close
702 };
703
704
705 void brl_tdb_init_ops(void)
706 {
707         brl_set_ops(&brlock_tdb_ops);
708 }