Merge commit 'release-4-0-0alpha1' into v4-0-test
[jelmer/samba4-debian.git] / source / ntvfs / common / brlock_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - tdb backend
5
6    Copyright (C) Andrew Tridgell 1992-2006
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /* This module implements a tdb based byte range locking service,
24    replacing the fcntl() based byte range locking previously
25    used. This allows us to provide the same semantics as NT */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "lib/tdb/include/tdb.h"
30 #include "messaging/messaging.h"
31 #include "lib/dbwrap/dbwrap.h"
32 #include "lib/messaging/irpc.h"
33 #include "libcli/libcli.h"
34 #include "cluster/cluster.h"
35 #include "ntvfs/common/brlock.h"
36 #include "ntvfs/ntvfs.h"
37
38 /*
39   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
40   a file. For a local posix filesystem this will usually be a combination
41   of the device and inode numbers of the file, but it can be anything 
42   that uniquely idetifies a file for locking purposes, as long
43   as it is applied consistently.
44 */
45
46 /* this struct is typicaly attached to tcon */
47 struct brl_context {
48         struct db_context *db;
49         struct server_id server;
50         struct messaging_context *messaging_ctx;
51 };
52
53 /*
54   the lock context contains the elements that define whether one
55   lock is the same as another lock
56 */
57 struct lock_context {
58         struct server_id server;
59         uint16_t smbpid;
60         struct brl_context *ctx;
61 };
62
63 /* The data in brlock records is an unsorted linear array of these
64    records.  It is unnecessary to store the count as tdb provides the
65    size of the record */
66 struct lock_struct {
67         struct lock_context context;
68         struct ntvfs_handle *ntvfs;
69         uint64_t start;
70         uint64_t size;
71         enum brl_type lock_type;
72         void *notify_ptr;
73 };
74
75 /* this struct is attached to on oprn file handle */
76 struct brl_handle {
77         DATA_BLOB key;
78         struct ntvfs_handle *ntvfs;
79         struct lock_struct last_lock;
80 };
81
82 /*
83   Open up the brlock.tdb database. Close it down using
84   talloc_free(). We need the messaging_ctx to allow for
85   pending lock notifications.
86 */
87 static struct brl_context *brl_tdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
88                                     struct messaging_context *messaging_ctx)
89 {
90         struct brl_context *brl;
91
92         brl = talloc(mem_ctx, struct brl_context);
93         if (brl == NULL) {
94                 return NULL;
95         }
96
97         brl->db = db_tmp_open(brl, "brlock.tdb", TDB_DEFAULT);
98         if (brl->db == NULL) {
99                 talloc_free(brl);
100                 return NULL;
101         }
102
103         brl->server = server;
104         brl->messaging_ctx = messaging_ctx;
105
106         return brl;
107 }
108
109 static struct brl_handle *brl_tdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
110                                                     DATA_BLOB *file_key)
111 {
112         struct brl_handle *brlh;
113
114         brlh = talloc(mem_ctx, struct brl_handle);
115         if (brlh == NULL) {
116                 return NULL;
117         }
118
119         brlh->key = *file_key;
120         brlh->ntvfs = ntvfs;
121         ZERO_STRUCT(brlh->last_lock);
122
123         return brlh;
124 }
125
126 /*
127   see if two locking contexts are equal
128 */
129 static bool brl_tdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
130 {
131         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
132                 ctx1->smbpid == ctx2->smbpid &&
133                 ctx1->ctx == ctx2->ctx);
134 }
135
136 /*
137   see if lck1 and lck2 overlap
138
139   lck1 is the existing lock. lck2 is the new lock we are 
140   looking at adding
141 */
142 static bool brl_tdb_overlap(struct lock_struct *lck1, 
143                             struct lock_struct *lck2)
144 {
145         /* this extra check is not redundent - it copes with locks
146            that go beyond the end of 64 bit file space */
147         if (lck1->size != 0 &&
148             lck1->start == lck2->start &&
149             lck1->size == lck2->size) {
150                 return true;
151         }
152             
153         if (lck1->start >= (lck2->start+lck2->size) ||
154             lck2->start >= (lck1->start+lck1->size)) {
155                 return false;
156         }
157
158         /* we have a conflict. Now check to see if lck1 really still
159          * exists, which involves checking if the process still
160          * exists. We leave this test to last as its the most
161          * expensive test, especially when we are clustered */
162         /* TODO: need to do this via a server_id_exists() call, which
163          * hasn't been written yet. When clustered this will need to
164          * call into ctdb */
165
166         return true;
167
168
169 /*
170  See if lock2 can be added when lock1 is in place.
171 */
172 static bool brl_tdb_conflict(struct lock_struct *lck1, 
173                          struct lock_struct *lck2)
174 {
175         /* pending locks don't conflict with anything */
176         if (lck1->lock_type >= PENDING_READ_LOCK ||
177             lck2->lock_type >= PENDING_READ_LOCK) {
178                 return false;
179         }
180
181         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
182                 return false;
183         }
184
185         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
186             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
187                 return false;
188         }
189
190         return brl_tdb_overlap(lck1, lck2);
191
192
193
194 /*
195  Check to see if this lock conflicts, but ignore our own locks on the
196  same fnum only.
197 */
198 static bool brl_tdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
199 {
200         /* pending locks don't conflict with anything */
201         if (lck1->lock_type >= PENDING_READ_LOCK ||
202             lck2->lock_type >= PENDING_READ_LOCK) {
203                 return false;
204         }
205
206         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
207                 return false;
208
209         /*
210          * note that incoming write calls conflict with existing READ
211          * locks even if the context is the same. JRA. See LOCKTEST7
212          * in smbtorture.
213          */
214         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
215             lck1->ntvfs == lck2->ntvfs &&
216             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
217                 return false;
218         }
219
220         return brl_tdb_overlap(lck1, lck2);
221
222
223
224 /*
225   amazingly enough, w2k3 "remembers" whether the last lock failure
226   is the same as this one and changes its error code. I wonder if any
227   app depends on this?
228 */
229 static NTSTATUS brl_tdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
230 {
231         /*
232          * this function is only called for non pending lock!
233          */
234
235         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
236         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
237                 return NT_STATUS_LOCK_NOT_GRANTED;
238         }
239
240         /* 
241          * if the notify_ptr is non NULL,
242          * it means that we're at the end of a pending lock
243          * and the real lock is requested after the timout went by
244          * In this case we need to remember the last_lock and always
245          * give FILE_LOCK_CONFLICT
246          */
247         if (lock->notify_ptr) {
248                 brlh->last_lock = *lock;
249                 return NT_STATUS_FILE_LOCK_CONFLICT;
250         }
251
252         /* 
253          * amazing the little things you learn with a test
254          * suite. Locks beyond this offset (as a 64 bit
255          * number!) always generate the conflict error code,
256          * unless the top bit is set
257          */
258         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
259                 brlh->last_lock = *lock;
260                 return NT_STATUS_FILE_LOCK_CONFLICT;
261         }
262
263         /*
264          * if the current lock matches the last failed lock on the file handle
265          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
266          */
267         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
268             lock->context.ctx == brlh->last_lock.context.ctx &&
269             lock->ntvfs == brlh->last_lock.ntvfs &&
270             lock->start == brlh->last_lock.start) {
271                 return NT_STATUS_FILE_LOCK_CONFLICT;
272         }
273
274         brlh->last_lock = *lock;
275         return NT_STATUS_LOCK_NOT_GRANTED;
276 }
277
278 /*
279   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
280   which case a real lock is first tried, and if that fails then a
281   pending lock is created. When the pending lock is triggered (by
282   someone else closing an overlapping lock range) a messaging
283   notification is sent, identified by the notify_ptr
284 */
285 static NTSTATUS brl_tdb_lock(struct brl_context *brl,
286                          struct brl_handle *brlh,
287                          uint16_t smbpid,
288                          uint64_t start, uint64_t size, 
289                          enum brl_type lock_type,
290                          void *notify_ptr)
291 {
292         TDB_DATA kbuf, dbuf;
293         int count=0, i;
294         struct lock_struct lock, *locks=NULL;
295         NTSTATUS status;
296         struct db_record *rec = NULL;
297
298         /* if this is a pending lock, then with the chainlock held we
299            try to get the real lock. If we succeed then we don't need
300            to make it pending. This prevents a possible race condition
301            where the pending lock gets created after the lock that is
302            preventing the real lock gets removed */
303         if (lock_type >= PENDING_READ_LOCK) {
304                 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
305
306                 /* here we need to force that the last_lock isn't overwritten */
307                 lock = brlh->last_lock;
308                 status = brl_tdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
309                 brlh->last_lock = lock;
310
311                 if (NT_STATUS_IS_OK(status)) {
312                         return NT_STATUS_OK;
313                 }
314         }
315
316         kbuf.dptr = brlh->key.data;
317         kbuf.dsize = brlh->key.length;
318
319         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
320         if (rec == NULL) {
321                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
322         }
323
324
325         dbuf = rec->value;
326
327         lock.context.smbpid = smbpid;
328         lock.context.server = brl->server;
329         lock.context.ctx = brl;
330         lock.ntvfs = brlh->ntvfs;
331         lock.context.ctx = brl;
332         lock.start = start;
333         lock.size = size;
334         lock.lock_type = lock_type;
335         lock.notify_ptr = notify_ptr;
336
337         if (dbuf.dptr) {
338                 /* there are existing locks - make sure they don't conflict */
339                 locks = (struct lock_struct *)dbuf.dptr;
340                 count = dbuf.dsize / sizeof(*locks);
341                 for (i=0; i<count; i++) {
342                         if (brl_tdb_conflict(&locks[i], &lock)) {
343                                 status = brl_tdb_lock_failed(brlh, &lock);
344                                 goto fail;
345                         }
346                 }
347         }
348
349         /* no conflicts - add it to the list of locks */
350         locks = talloc_realloc(rec, locks, struct lock_struct, count+1);
351         if (!locks) {
352                 status = NT_STATUS_NO_MEMORY;
353                 goto fail;
354         } else {
355                 dbuf.dptr = (uint8_t *)locks;
356         }
357         locks[count] = lock;
358         dbuf.dsize += sizeof(lock);
359
360         status = rec->store(rec, dbuf, TDB_REPLACE);
361         if (!NT_STATUS_IS_OK(status)) {
362                 goto fail;
363         }
364
365         talloc_free(rec);
366
367         /* the caller needs to know if the real lock was granted. If
368            we have reached here then it must be a pending lock that
369            was granted, so tell them the lock failed */
370         if (lock_type >= PENDING_READ_LOCK) {
371                 return NT_STATUS_LOCK_NOT_GRANTED;
372         }
373
374         return NT_STATUS_OK;
375
376  fail:
377         talloc_free(rec);
378         return status;
379 }
380
381
382 /*
383   we are removing a lock that might be holding up a pending lock. Scan for pending
384   locks that cover this range and if we find any then notify the server that it should
385   retry the lock
386 */
387 static void brl_tdb_notify_unlock(struct brl_context *brl,
388                               struct lock_struct *locks, int count, 
389                               struct lock_struct *removed_lock)
390 {
391         int i, last_notice;
392
393         /* the last_notice logic is to prevent stampeding on a lock
394            range. It prevents us sending hundreds of notifies on the
395            same range of bytes. It doesn't prevent all possible
396            stampedes, but it does prevent the most common problem */
397         last_notice = -1;
398
399         for (i=0;i<count;i++) {
400                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
401                     brl_tdb_overlap(&locks[i], removed_lock)) {
402                         if (last_notice != -1 && brl_tdb_overlap(&locks[i], &locks[last_notice])) {
403                                 continue;
404                         }
405                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
406                                 last_notice = i;
407                         }
408                         messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
409                                            MSG_BRL_RETRY, locks[i].notify_ptr);
410                 }
411         }
412 }
413
414
415 /*
416   send notifications for all pending locks - the file is being closed by this
417   user
418 */
419 static void brl_tdb_notify_all(struct brl_context *brl,
420                            struct lock_struct *locks, int count)
421 {
422         int i;
423         for (i=0;i<count;i++) {
424                 if (locks->lock_type >= PENDING_READ_LOCK) {
425                         brl_tdb_notify_unlock(brl, locks, count, &locks[i]);
426                 }
427         }
428 }
429
430
431
432 /*
433  Unlock a range of bytes.
434 */
435 static NTSTATUS brl_tdb_unlock(struct brl_context *brl,
436                            struct brl_handle *brlh, 
437                            uint16_t smbpid,
438                            uint64_t start, uint64_t size)
439 {
440         TDB_DATA kbuf, dbuf;
441         int count, i;
442         struct lock_struct *locks, *lock;
443         struct lock_context context;
444         NTSTATUS status;
445         struct db_record *rec = NULL;
446
447         kbuf.dptr = brlh->key.data;
448         kbuf.dsize = brlh->key.length;
449
450         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
451         if (rec == NULL) {
452                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
453         }
454
455         if (!rec->value.dptr) {
456                 talloc_free(rec);
457                 return NT_STATUS_RANGE_NOT_LOCKED;
458         }
459
460         dbuf = rec->value;
461
462         context.smbpid = smbpid;
463         context.server = brl->server;
464         context.ctx = brl;
465
466         /* there are existing locks - find a match */
467         locks = (struct lock_struct *)dbuf.dptr;
468         count = dbuf.dsize / sizeof(*locks);
469
470         for (i=0; i<count; i++) {
471                 lock = &locks[i];
472                 if (brl_tdb_same_context(&lock->context, &context) &&
473                     lock->ntvfs == brlh->ntvfs &&
474                     lock->start == start &&
475                     lock->size == size &&
476                     lock->lock_type == WRITE_LOCK) {
477                         break;
478                 }
479         }
480         if (i < count) goto found;
481
482         for (i=0; i<count; i++) {
483                 lock = &locks[i];
484                 if (brl_tdb_same_context(&lock->context, &context) &&
485                     lock->ntvfs == brlh->ntvfs &&
486                     lock->start == start &&
487                     lock->size == size &&
488                     lock->lock_type < PENDING_READ_LOCK) {
489                         break;
490                 }
491         }
492
493 found:
494         if (i == count) {
495                 status = NT_STATUS_RANGE_NOT_LOCKED;
496         } else if (count == 1) {
497                 status = rec->delete_rec(rec);
498         } else {
499                 struct lock_struct removed_lock = *lock;
500                 if (i < count-1) {
501                         memmove(&locks[i], &locks[i+1], 
502                                 sizeof(*locks)*((count-1) - i));
503                 }
504                 count--;
505                 
506                 /* send notifications for any relevant pending locks */
507                 brl_tdb_notify_unlock(brl, locks, count, &removed_lock);
508                 
509                 dbuf.dsize = count * sizeof(*locks);
510                 
511                 status = rec->store(rec, dbuf, TDB_REPLACE);
512         }
513
514         talloc_free(rec);
515         return status;
516 }
517
518
519 /*
520   remove a pending lock. This is called when the caller has either
521   given up trying to establish a lock or when they have succeeded in
522   getting it. In either case they no longer need to be notified.
523 */
524 static NTSTATUS brl_tdb_remove_pending(struct brl_context *brl,
525                                    struct brl_handle *brlh, 
526                                    void *notify_ptr)
527 {
528         TDB_DATA kbuf, dbuf;
529         int count, i;
530         struct lock_struct *locks;
531         NTSTATUS status;
532         struct db_record *rec = NULL;
533
534         kbuf.dptr = brlh->key.data;
535         kbuf.dsize = brlh->key.length;
536
537         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
538         if (rec == NULL) {
539                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
540         }
541
542         dbuf = rec->value;
543
544         /* there are existing locks - find a match */
545         locks = (struct lock_struct *)dbuf.dptr;
546         count = dbuf.dsize / sizeof(*locks);
547
548         status = NT_STATUS_RANGE_NOT_LOCKED;
549
550         for (i=0; i<count; i++) {
551                 struct lock_struct *lock = &locks[i];
552                 
553                 if (lock->lock_type >= PENDING_READ_LOCK &&
554                     lock->notify_ptr == notify_ptr &&
555                     cluster_id_equal(&lock->context.server, &brl->server)) {
556                         /* found it - delete it */
557                         if (count == 1) {
558                                 status = rec->delete_rec(rec);
559                         } else {
560                                 if (i < count-1) {
561                                         memmove(&locks[i], &locks[i+1], 
562                                                 sizeof(*locks)*((count-1) - i));
563                                 }
564                                 count--;
565                                 dbuf.dsize = count * sizeof(*locks);
566                                 status = rec->store(rec, dbuf, TDB_REPLACE);
567                         }                       
568                         break;
569                 }
570         }
571
572         talloc_free(rec);
573         return status;
574 }
575
576
577 /*
578   Test if we are allowed to perform IO on a region of an open file
579 */
580 static NTSTATUS brl_tdb_locktest(struct brl_context *brl,
581                              struct brl_handle *brlh,
582                              uint16_t smbpid, 
583                              uint64_t start, uint64_t size, 
584                              enum brl_type lock_type)
585 {
586         TDB_DATA kbuf, dbuf;
587         int count, i;
588         struct lock_struct lock, *locks;
589         NTSTATUS status;
590
591         kbuf.dptr = brlh->key.data;
592         kbuf.dsize = brlh->key.length;
593
594         if (brl->db->fetch(brl->db, brl, kbuf, &dbuf) != 0) {
595                 return NT_STATUS_OK;
596         }
597
598         lock.context.smbpid = smbpid;
599         lock.context.server = brl->server;
600         lock.context.ctx = brl;
601         lock.ntvfs = brlh->ntvfs;
602         lock.start = start;
603         lock.size = size;
604         lock.lock_type = lock_type;
605
606         /* there are existing locks - make sure they don't conflict */
607         locks = (struct lock_struct *)dbuf.dptr;
608         count = dbuf.dsize / sizeof(*locks);
609
610         status = NT_STATUS_OK;
611
612         for (i=0; i<count; i++) {
613                 if (brl_tdb_conflict_other(&locks[i], &lock)) {
614                         status = NT_STATUS_FILE_LOCK_CONFLICT;
615                         break;
616                 }
617         }
618
619         talloc_free(dbuf.dptr);
620         return status;
621 }
622
623
624 /*
625  Remove any locks associated with a open file.
626 */
627 static NTSTATUS brl_tdb_close(struct brl_context *brl,
628                           struct brl_handle *brlh)
629 {
630         TDB_DATA kbuf, dbuf;
631         int count, i, dcount=0;
632         struct lock_struct *locks;
633         NTSTATUS status;
634         struct db_record *rec = NULL;
635
636         kbuf.dptr = brlh->key.data;
637         kbuf.dsize = brlh->key.length;
638
639         rec = brl->db->fetch_locked(brl->db, brl, kbuf);
640         if (rec == NULL) {
641                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
642         }
643
644         dbuf = rec->value;
645         if (!dbuf.dptr) {
646                 talloc_free(rec);
647                 return NT_STATUS_OK;
648         }
649
650         /* there are existing locks - remove any for this fnum */
651         locks = (struct lock_struct *)dbuf.dptr;
652         count = dbuf.dsize / sizeof(*locks);
653
654         for (i=0; i<count; i++) {
655                 struct lock_struct *lock = &locks[i];
656
657                 if (lock->context.ctx == brl &&
658                     cluster_id_equal(&lock->context.server, &brl->server) &&
659                     lock->ntvfs == brlh->ntvfs) {
660                         /* found it - delete it */
661                         if (count > 1 && i < count-1) {
662                                 memmove(&locks[i], &locks[i+1], 
663                                         sizeof(*locks)*((count-1) - i));
664                         }
665                         count--;
666                         i--;
667                         dcount++;
668                 }
669         }
670
671         status = NT_STATUS_OK;
672
673         if (count == 0) {
674                 status = rec->delete_rec(rec);
675         } else if (dcount != 0) {
676                 /* tell all pending lock holders for this file that
677                    they have a chance now. This is a bit indiscriminant,
678                    but works OK */
679                 brl_tdb_notify_all(brl, locks, count);
680
681                 dbuf.dsize = count * sizeof(*locks);
682
683                 status = rec->store(rec, dbuf, TDB_REPLACE);
684         }
685
686         talloc_free(rec);
687
688         return status;
689 }
690
691
692 static const struct brlock_ops brlock_tdb_ops = {
693         .brl_init           = brl_tdb_init,
694         .brl_create_handle  = brl_tdb_create_handle,
695         .brl_lock           = brl_tdb_lock,
696         .brl_unlock         = brl_tdb_unlock,
697         .brl_remove_pending = brl_tdb_remove_pending,
698         .brl_locktest       = brl_tdb_locktest,
699         .brl_close          = brl_tdb_close
700 };
701
702
703 void brl_tdb_init_ops(void)
704 {
705         brl_set_ops(&brlock_tdb_ops);
706 }