Merge commit 'release-4-0-0alpha15' into master4-tmp
[kai/samba-autobuild/.git] / source4 / ntvfs / common / brlock_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - tdb backend
5
6    Copyright (C) Andrew Tridgell 1992-2006
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /* This module implements a tdb based byte range locking service,
24    replacing the fcntl() based byte range locking previously
25    used. This allows us to provide the same semantics as NT */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "tdb_compat.h"
30 #include "messaging/messaging.h"
31 #include "lib/util/tdb_wrap.h"
32 #include "lib/messaging/irpc.h"
33 #include "libcli/libcli.h"
34 #include "cluster/cluster.h"
35 #include "ntvfs/common/brlock.h"
36 #include "ntvfs/ntvfs.h"
37 #include "param/param.h"
38
39 /*
40   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
41   a file. For a local posix filesystem this will usually be a combination
42   of the device and inode numbers of the file, but it can be anything 
43   that uniquely idetifies a file for locking purposes, as long
44   as it is applied consistently.
45 */
46
47 /* this struct is typicaly attached to tcon */
48 struct brl_context {
49         struct tdb_wrap *w;
50         struct server_id server;
51         struct imessaging_context *imessaging_ctx;
52 };
53
54 /*
55   the lock context contains the elements that define whether one
56   lock is the same as another lock
57 */
58 struct lock_context {
59         struct server_id server;
60         uint32_t smbpid;
61         struct brl_context *ctx;
62 };
63
64 /* The data in brlock records is an unsorted linear array of these
65    records.  It is unnecessary to store the count as tdb provides the
66    size of the record */
67 struct lock_struct {
68         struct lock_context context;
69         struct ntvfs_handle *ntvfs;
70         uint64_t start;
71         uint64_t size;
72         enum brl_type lock_type;
73         void *notify_ptr;
74 };
75
76 /* this struct is attached to on oprn file handle */
77 struct brl_handle {
78         DATA_BLOB key;
79         struct ntvfs_handle *ntvfs;
80         struct lock_struct last_lock;
81 };
82
83 /* see if we have wrapped locks, which are no longer allowed (windows
84  * changed this in win7 */
85 static bool brl_invalid_lock_range(uint64_t start, uint64_t size)
86 {
87         return (size > 1 && (start + size < start));
88 }
89
90 /*
91   Open up the brlock.tdb database. Close it down using
92   talloc_free(). We need the imessaging_ctx to allow for
93   pending lock notifications.
94 */
95 static struct brl_context *brl_tdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
96                                         struct loadparm_context *lp_ctx,
97                                     struct imessaging_context *imessaging_ctx)
98 {
99         struct brl_context *brl;
100
101         brl = talloc(mem_ctx, struct brl_context);
102         if (brl == NULL) {
103                 return NULL;
104         }
105
106         brl->w = cluster_tdb_tmp_open(brl, lp_ctx, "brlock.tdb", TDB_DEFAULT);
107         if (brl->w == NULL) {
108                 talloc_free(brl);
109                 return NULL;
110         }
111
112         brl->server = server;
113         brl->imessaging_ctx = imessaging_ctx;
114
115         return brl;
116 }
117
118 static struct brl_handle *brl_tdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
119                                                     DATA_BLOB *file_key)
120 {
121         struct brl_handle *brlh;
122
123         brlh = talloc(mem_ctx, struct brl_handle);
124         if (brlh == NULL) {
125                 return NULL;
126         }
127
128         brlh->key = *file_key;
129         brlh->ntvfs = ntvfs;
130         ZERO_STRUCT(brlh->last_lock);
131
132         return brlh;
133 }
134
135 /*
136   see if two locking contexts are equal
137 */
138 static bool brl_tdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
139 {
140         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
141                 ctx1->smbpid == ctx2->smbpid &&
142                 ctx1->ctx == ctx2->ctx);
143 }
144
145 /*
146   see if lck1 and lck2 overlap
147
148   lck1 is the existing lock. lck2 is the new lock we are 
149   looking at adding
150 */
151 static bool brl_tdb_overlap(struct lock_struct *lck1, 
152                             struct lock_struct *lck2)
153 {
154         /* this extra check is not redundent - it copes with locks
155            that go beyond the end of 64 bit file space */
156         if (lck1->size != 0 &&
157             lck1->start == lck2->start &&
158             lck1->size == lck2->size) {
159                 return true;
160         }
161             
162         if (lck1->start >= (lck2->start+lck2->size) ||
163             lck2->start >= (lck1->start+lck1->size)) {
164                 return false;
165         }
166
167         /* we have a conflict. Now check to see if lck1 really still
168          * exists, which involves checking if the process still
169          * exists. We leave this test to last as its the most
170          * expensive test, especially when we are clustered */
171         /* TODO: need to do this via a server_id_exists() call, which
172          * hasn't been written yet. When clustered this will need to
173          * call into ctdb */
174
175         return true;
176
177
178 /*
179  See if lock2 can be added when lock1 is in place.
180 */
181 static bool brl_tdb_conflict(struct lock_struct *lck1, 
182                          struct lock_struct *lck2)
183 {
184         /* pending locks don't conflict with anything */
185         if (lck1->lock_type >= PENDING_READ_LOCK ||
186             lck2->lock_type >= PENDING_READ_LOCK) {
187                 return false;
188         }
189
190         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
191                 return false;
192         }
193
194         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
195             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
196                 return false;
197         }
198
199         return brl_tdb_overlap(lck1, lck2);
200
201
202
203 /*
204  Check to see if this lock conflicts, but ignore our own locks on the
205  same fnum only.
206 */
207 static bool brl_tdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
208 {
209         /* pending locks don't conflict with anything */
210         if (lck1->lock_type >= PENDING_READ_LOCK ||
211             lck2->lock_type >= PENDING_READ_LOCK) {
212                 return false;
213         }
214
215         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
216                 return false;
217
218         /*
219          * note that incoming write calls conflict with existing READ
220          * locks even if the context is the same. JRA. See LOCKTEST7
221          * in smbtorture.
222          */
223         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
224             lck1->ntvfs == lck2->ntvfs &&
225             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
226                 return false;
227         }
228
229         return brl_tdb_overlap(lck1, lck2);
230
231
232
233 /*
234   amazingly enough, w2k3 "remembers" whether the last lock failure
235   is the same as this one and changes its error code. I wonder if any
236   app depends on this?
237 */
238 static NTSTATUS brl_tdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
239 {
240         /*
241          * this function is only called for non pending lock!
242          */
243
244         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
245         if (lock->ntvfs->ctx->protocol == PROTOCOL_SMB2) {
246                 return NT_STATUS_LOCK_NOT_GRANTED;
247         }
248
249         /* 
250          * if the notify_ptr is non NULL,
251          * it means that we're at the end of a pending lock
252          * and the real lock is requested after the timout went by
253          * In this case we need to remember the last_lock and always
254          * give FILE_LOCK_CONFLICT
255          */
256         if (lock->notify_ptr) {
257                 brlh->last_lock = *lock;
258                 return NT_STATUS_FILE_LOCK_CONFLICT;
259         }
260
261         /* 
262          * amazing the little things you learn with a test
263          * suite. Locks beyond this offset (as a 64 bit
264          * number!) always generate the conflict error code,
265          * unless the top bit is set
266          */
267         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
268                 brlh->last_lock = *lock;
269                 return NT_STATUS_FILE_LOCK_CONFLICT;
270         }
271
272         /*
273          * if the current lock matches the last failed lock on the file handle
274          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
275          */
276         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
277             lock->context.ctx == brlh->last_lock.context.ctx &&
278             lock->ntvfs == brlh->last_lock.ntvfs &&
279             lock->start == brlh->last_lock.start) {
280                 return NT_STATUS_FILE_LOCK_CONFLICT;
281         }
282
283         brlh->last_lock = *lock;
284         return NT_STATUS_LOCK_NOT_GRANTED;
285 }
286
287 /*
288   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
289   which case a real lock is first tried, and if that fails then a
290   pending lock is created. When the pending lock is triggered (by
291   someone else closing an overlapping lock range) a messaging
292   notification is sent, identified by the notify_ptr
293 */
294 static NTSTATUS brl_tdb_lock(struct brl_context *brl,
295                          struct brl_handle *brlh,
296                          uint32_t smbpid,
297                          uint64_t start, uint64_t size, 
298                          enum brl_type lock_type,
299                          void *notify_ptr)
300 {
301         TDB_DATA kbuf, dbuf;
302         int count=0, i;
303         struct lock_struct lock, *locks=NULL;
304         NTSTATUS status;
305
306         kbuf.dptr = brlh->key.data;
307         kbuf.dsize = brlh->key.length;
308
309         if (brl_invalid_lock_range(start, size)) {
310                 return NT_STATUS_INVALID_LOCK_RANGE;
311         }
312
313         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
314                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
315         }
316
317         /* if this is a pending lock, then with the chainlock held we
318            try to get the real lock. If we succeed then we don't need
319            to make it pending. This prevents a possible race condition
320            where the pending lock gets created after the lock that is
321            preventing the real lock gets removed */
322         if (lock_type >= PENDING_READ_LOCK) {
323                 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
324
325                 /* here we need to force that the last_lock isn't overwritten */
326                 lock = brlh->last_lock;
327                 status = brl_tdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
328                 brlh->last_lock = lock;
329
330                 if (NT_STATUS_IS_OK(status)) {
331                         tdb_chainunlock(brl->w->tdb, kbuf);
332                         return NT_STATUS_OK;
333                 }
334         }
335
336         dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
337
338         lock.context.smbpid = smbpid;
339         lock.context.server = brl->server;
340         lock.context.ctx = brl;
341         lock.ntvfs = brlh->ntvfs;
342         lock.context.ctx = brl;
343         lock.start = start;
344         lock.size = size;
345         lock.lock_type = lock_type;
346         lock.notify_ptr = notify_ptr;
347
348         if (dbuf.dptr) {
349                 /* there are existing locks - make sure they don't conflict */
350                 locks = (struct lock_struct *)dbuf.dptr;
351                 count = dbuf.dsize / sizeof(*locks);
352                 for (i=0; i<count; i++) {
353                         if (brl_tdb_conflict(&locks[i], &lock)) {
354                                 status = brl_tdb_lock_failed(brlh, &lock);
355                                 goto fail;
356                         }
357                 }
358         }
359
360         /* no conflicts - add it to the list of locks */
361         locks = realloc_p(locks, struct lock_struct, count+1);
362         if (!locks) {
363                 status = NT_STATUS_NO_MEMORY;
364                 goto fail;
365         } else {
366                 dbuf.dptr = (uint8_t *)locks;
367         }
368         locks[count] = lock;
369         dbuf.dsize += sizeof(lock);
370
371         if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
372                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
373                 goto fail;
374         }
375
376         free(dbuf.dptr);
377         tdb_chainunlock(brl->w->tdb, kbuf);
378
379         /* the caller needs to know if the real lock was granted. If
380            we have reached here then it must be a pending lock that
381            was granted, so tell them the lock failed */
382         if (lock_type >= PENDING_READ_LOCK) {
383                 return NT_STATUS_LOCK_NOT_GRANTED;
384         }
385
386         return NT_STATUS_OK;
387
388  fail:
389
390         free(dbuf.dptr);
391         tdb_chainunlock(brl->w->tdb, kbuf);
392         return status;
393 }
394
395
396 /*
397   we are removing a lock that might be holding up a pending lock. Scan for pending
398   locks that cover this range and if we find any then notify the server that it should
399   retry the lock
400 */
401 static void brl_tdb_notify_unlock(struct brl_context *brl,
402                               struct lock_struct *locks, int count, 
403                               struct lock_struct *removed_lock)
404 {
405         int i, last_notice;
406
407         /* the last_notice logic is to prevent stampeding on a lock
408            range. It prevents us sending hundreds of notifies on the
409            same range of bytes. It doesn't prevent all possible
410            stampedes, but it does prevent the most common problem */
411         last_notice = -1;
412
413         for (i=0;i<count;i++) {
414                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
415                     brl_tdb_overlap(&locks[i], removed_lock)) {
416                         if (last_notice != -1 && brl_tdb_overlap(&locks[i], &locks[last_notice])) {
417                                 continue;
418                         }
419                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
420                                 last_notice = i;
421                         }
422                         imessaging_send_ptr(brl->imessaging_ctx, locks[i].context.server,
423                                            MSG_BRL_RETRY, locks[i].notify_ptr);
424                 }
425         }
426 }
427
428
429 /*
430   send notifications for all pending locks - the file is being closed by this
431   user
432 */
433 static void brl_tdb_notify_all(struct brl_context *brl,
434                            struct lock_struct *locks, int count)
435 {
436         int i;
437         for (i=0;i<count;i++) {
438                 if (locks->lock_type >= PENDING_READ_LOCK) {
439                         brl_tdb_notify_unlock(brl, locks, count, &locks[i]);
440                 }
441         }
442 }
443
444
445
446 /*
447  Unlock a range of bytes.
448 */
449 static NTSTATUS brl_tdb_unlock(struct brl_context *brl,
450                            struct brl_handle *brlh, 
451                            uint32_t smbpid,
452                            uint64_t start, uint64_t size)
453 {
454         TDB_DATA kbuf, dbuf;
455         int count, i;
456         struct lock_struct *locks, *lock;
457         struct lock_context context;
458         NTSTATUS status;
459
460         kbuf.dptr = brlh->key.data;
461         kbuf.dsize = brlh->key.length;
462
463         if (brl_invalid_lock_range(start, size)) {
464                 return NT_STATUS_INVALID_LOCK_RANGE;
465         }
466
467         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
468                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
469         }
470
471         dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
472         if (!dbuf.dptr) {
473                 tdb_chainunlock(brl->w->tdb, kbuf);
474                 return NT_STATUS_RANGE_NOT_LOCKED;
475         }
476
477         context.smbpid = smbpid;
478         context.server = brl->server;
479         context.ctx = brl;
480
481         /* there are existing locks - find a match */
482         locks = (struct lock_struct *)dbuf.dptr;
483         count = dbuf.dsize / sizeof(*locks);
484
485         for (i=0; i<count; i++) {
486                 lock = &locks[i];
487                 if (brl_tdb_same_context(&lock->context, &context) &&
488                     lock->ntvfs == brlh->ntvfs &&
489                     lock->start == start &&
490                     lock->size == size &&
491                     lock->lock_type == WRITE_LOCK) {
492                         break;
493                 }
494         }
495         if (i < count) goto found;
496
497         for (i=0; i<count; i++) {
498                 lock = &locks[i];
499                 if (brl_tdb_same_context(&lock->context, &context) &&
500                     lock->ntvfs == brlh->ntvfs &&
501                     lock->start == start &&
502                     lock->size == size &&
503                     lock->lock_type < PENDING_READ_LOCK) {
504                         break;
505                 }
506         }
507
508 found:
509         if (i < count) {
510                 /* found it - delete it */
511                 if (count == 1) {
512                         if (tdb_delete(brl->w->tdb, kbuf) != 0) {
513                                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
514                                 goto fail;
515                         }
516                 } else {
517                         struct lock_struct removed_lock = *lock;
518                         if (i < count-1) {
519                                 memmove(&locks[i], &locks[i+1], 
520                                         sizeof(*locks)*((count-1) - i));
521                         }
522                         count--;
523                         
524                         /* send notifications for any relevant pending locks */
525                         brl_tdb_notify_unlock(brl, locks, count, &removed_lock);
526                         
527                         dbuf.dsize = count * sizeof(*locks);
528                         
529                         if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
530                                 status = NT_STATUS_INTERNAL_DB_CORRUPTION;
531                                 goto fail;
532                         }
533                 }
534                 
535                 free(dbuf.dptr);
536                 tdb_chainunlock(brl->w->tdb, kbuf);
537                 return NT_STATUS_OK;
538         }
539         
540         /* we didn't find it */
541         status = NT_STATUS_RANGE_NOT_LOCKED;
542
543  fail:
544         free(dbuf.dptr);
545         tdb_chainunlock(brl->w->tdb, kbuf);
546         return status;
547 }
548
549
550 /*
551   remove a pending lock. This is called when the caller has either
552   given up trying to establish a lock or when they have succeeded in
553   getting it. In either case they no longer need to be notified.
554 */
555 static NTSTATUS brl_tdb_remove_pending(struct brl_context *brl,
556                                    struct brl_handle *brlh, 
557                                    void *notify_ptr)
558 {
559         TDB_DATA kbuf, dbuf;
560         int count, i;
561         struct lock_struct *locks;
562         NTSTATUS status;
563
564         kbuf.dptr = brlh->key.data;
565         kbuf.dsize = brlh->key.length;
566
567         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
568                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
569         }
570
571         dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
572         if (!dbuf.dptr) {
573                 tdb_chainunlock(brl->w->tdb, kbuf);
574                 return NT_STATUS_RANGE_NOT_LOCKED;
575         }
576
577         /* there are existing locks - find a match */
578         locks = (struct lock_struct *)dbuf.dptr;
579         count = dbuf.dsize / sizeof(*locks);
580
581         for (i=0; i<count; i++) {
582                 struct lock_struct *lock = &locks[i];
583                 
584                 if (lock->lock_type >= PENDING_READ_LOCK &&
585                     lock->notify_ptr == notify_ptr &&
586                     cluster_id_equal(&lock->context.server, &brl->server)) {
587                         /* found it - delete it */
588                         if (count == 1) {
589                                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
590                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
591                                         goto fail;
592                                 }
593                         } else {
594                                 if (i < count-1) {
595                                         memmove(&locks[i], &locks[i+1], 
596                                                 sizeof(*locks)*((count-1) - i));
597                                 }
598                                 count--;
599                                 dbuf.dsize = count * sizeof(*locks);
600                                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
601                                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
602                                         goto fail;
603                                 }
604                         }
605                         
606                         free(dbuf.dptr);
607                         tdb_chainunlock(brl->w->tdb, kbuf);
608                         return NT_STATUS_OK;
609                 }
610         }
611         
612         /* we didn't find it */
613         status = NT_STATUS_RANGE_NOT_LOCKED;
614
615  fail:
616         free(dbuf.dptr);
617         tdb_chainunlock(brl->w->tdb, kbuf);
618         return status;
619 }
620
621
622 /*
623   Test if we are allowed to perform IO on a region of an open file
624 */
625 static NTSTATUS brl_tdb_locktest(struct brl_context *brl,
626                              struct brl_handle *brlh,
627                              uint32_t smbpid, 
628                              uint64_t start, uint64_t size, 
629                              enum brl_type lock_type)
630 {
631         TDB_DATA kbuf, dbuf;
632         int count, i;
633         struct lock_struct lock, *locks;
634
635         kbuf.dptr = brlh->key.data;
636         kbuf.dsize = brlh->key.length;
637
638         if (brl_invalid_lock_range(start, size)) {
639                 return NT_STATUS_INVALID_LOCK_RANGE;
640         }
641
642         dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
643         if (dbuf.dptr == NULL) {
644                 return NT_STATUS_OK;
645         }
646
647         lock.context.smbpid = smbpid;
648         lock.context.server = brl->server;
649         lock.context.ctx = brl;
650         lock.ntvfs = brlh->ntvfs;
651         lock.start = start;
652         lock.size = size;
653         lock.lock_type = lock_type;
654
655         /* there are existing locks - make sure they don't conflict */
656         locks = (struct lock_struct *)dbuf.dptr;
657         count = dbuf.dsize / sizeof(*locks);
658
659         for (i=0; i<count; i++) {
660                 if (brl_tdb_conflict_other(&locks[i], &lock)) {
661                         free(dbuf.dptr);
662                         return NT_STATUS_FILE_LOCK_CONFLICT;
663                 }
664         }
665
666         free(dbuf.dptr);
667         return NT_STATUS_OK;
668 }
669
670
671 /*
672  Remove any locks associated with a open file.
673 */
674 static NTSTATUS brl_tdb_close(struct brl_context *brl,
675                           struct brl_handle *brlh)
676 {
677         TDB_DATA kbuf, dbuf;
678         int count, i, dcount=0;
679         struct lock_struct *locks;
680         NTSTATUS status;
681
682         kbuf.dptr = brlh->key.data;
683         kbuf.dsize = brlh->key.length;
684
685         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
686                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
687         }
688
689         dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
690         if (!dbuf.dptr) {
691                 tdb_chainunlock(brl->w->tdb, kbuf);
692                 return NT_STATUS_OK;
693         }
694
695         /* there are existing locks - remove any for this fnum */
696         locks = (struct lock_struct *)dbuf.dptr;
697         count = dbuf.dsize / sizeof(*locks);
698
699         for (i=0; i<count; i++) {
700                 struct lock_struct *lock = &locks[i];
701
702                 if (lock->context.ctx == brl &&
703                     cluster_id_equal(&lock->context.server, &brl->server) &&
704                     lock->ntvfs == brlh->ntvfs) {
705                         /* found it - delete it */
706                         if (count > 1 && i < count-1) {
707                                 memmove(&locks[i], &locks[i+1], 
708                                         sizeof(*locks)*((count-1) - i));
709                         }
710                         count--;
711                         i--;
712                         dcount++;
713                 }
714         }
715
716         status = NT_STATUS_OK;
717
718         if (count == 0) {
719                 if (tdb_delete(brl->w->tdb, kbuf) != 0) {
720                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
721                 }
722         } else if (dcount != 0) {
723                 /* tell all pending lock holders for this file that
724                    they have a chance now. This is a bit indiscriminant,
725                    but works OK */
726                 brl_tdb_notify_all(brl, locks, count);
727
728                 dbuf.dsize = count * sizeof(*locks);
729
730                 if (tdb_store(brl->w->tdb, kbuf, dbuf, TDB_REPLACE) != 0) {
731                         status = NT_STATUS_INTERNAL_DB_CORRUPTION;
732                 }
733         }
734
735         free(dbuf.dptr);
736         tdb_chainunlock(brl->w->tdb, kbuf);
737
738         return status;
739 }
740
741 static NTSTATUS brl_tdb_count(struct brl_context *brl, struct brl_handle *brlh,
742                               int *count)
743 {
744         TDB_DATA kbuf, dbuf;
745
746         kbuf.dptr = brlh->key.data;
747         kbuf.dsize = brlh->key.length;
748         *count = 0;
749
750         if (tdb_chainlock(brl->w->tdb, kbuf) != 0) {
751                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
752         }
753
754         dbuf = tdb_fetch_compat(brl->w->tdb, kbuf);
755         if (!dbuf.dptr) {
756                 tdb_chainunlock(brl->w->tdb, kbuf);
757                 return NT_STATUS_OK;
758         }
759
760         *count = dbuf.dsize / sizeof(struct lock_struct);
761
762         free(dbuf.dptr);
763         tdb_chainunlock(brl->w->tdb, kbuf);
764
765         return NT_STATUS_OK;
766 }
767
768 static const struct brlock_ops brlock_tdb_ops = {
769         .brl_init           = brl_tdb_init,
770         .brl_create_handle  = brl_tdb_create_handle,
771         .brl_lock           = brl_tdb_lock,
772         .brl_unlock         = brl_tdb_unlock,
773         .brl_remove_pending = brl_tdb_remove_pending,
774         .brl_locktest       = brl_tdb_locktest,
775         .brl_close          = brl_tdb_close,
776         .brl_count          = brl_tdb_count
777 };
778
779
780 void brl_tdb_init_ops(void)
781 {
782         brlock_set_ops(&brlock_tdb_ops);
783 }