dns_server: Remove unused "dns_generate_options"
[vlendec/samba-autobuild/.git] / source4 / ntvfs / common / brlock_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    generic byte range locking code - tdb backend
5
6    Copyright (C) Andrew Tridgell 1992-2006
7    Copyright (C) Jeremy Allison 1992-2000
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /* This module implements a tdb based byte range locking service,
24    replacing the fcntl() based byte range locking previously
25    used. This allows us to provide the same semantics as NT */
26
27 #include "includes.h"
28 #include "system/filesys.h"
29 #include "messaging/messaging.h"
30 #include "lib/messaging/irpc.h"
31 #include "libcli/libcli.h"
32 #include "cluster/cluster.h"
33 #include "ntvfs/common/brlock.h"
34 #include "ntvfs/ntvfs.h"
35 #include "param/param.h"
36 #include "dbwrap/dbwrap.h"
37
38 /*
39   in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
40   a file. For a local posix filesystem this will usually be a combination
41   of the device and inode numbers of the file, but it can be anything 
42   that uniquely idetifies a file for locking purposes, as long
43   as it is applied consistently.
44 */
45
46 /* this struct is typicaly attached to tcon */
47 struct brl_context {
48         struct db_context *db;
49         struct server_id server;
50         struct imessaging_context *imessaging_ctx;
51 };
52
53 /*
54   the lock context contains the elements that define whether one
55   lock is the same as another lock
56 */
57 struct lock_context {
58         struct server_id server;
59         uint32_t smbpid;
60         struct brl_context *ctx;
61 };
62
63 /* The data in brlock records is an unsorted linear array of these
64    records.  It is unnecessary to store the count as tdb provides the
65    size of the record */
66 struct lock_struct {
67         struct lock_context context;
68         struct ntvfs_handle *ntvfs;
69         uint64_t start;
70         uint64_t size;
71         enum brl_type lock_type;
72         void *notify_ptr;
73 };
74
75 /* this struct is attached to on oprn file handle */
76 struct brl_handle {
77         DATA_BLOB key;
78         struct ntvfs_handle *ntvfs;
79         struct lock_struct last_lock;
80 };
81
82 /* see if we have wrapped locks, which are no longer allowed (windows
83  * changed this in win7 */
84 static bool brl_invalid_lock_range(uint64_t start, uint64_t size)
85 {
86         return (size > 1 && (start + size < start));
87 }
88
89 /*
90   Open up the brlock.tdb database. Close it down using
91   talloc_free(). We need the imessaging_ctx to allow for
92   pending lock notifications.
93 */
94 static struct brl_context *brl_tdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
95                                         struct loadparm_context *lp_ctx,
96                                     struct imessaging_context *imessaging_ctx)
97 {
98         struct brl_context *brl;
99
100         brl = talloc(mem_ctx, struct brl_context);
101         if (brl == NULL) {
102                 return NULL;
103         }
104
105         brl->db = cluster_db_tmp_open(brl, lp_ctx, "brlock", TDB_DEFAULT);
106         if (brl->db == NULL) {
107                 talloc_free(brl);
108                 return NULL;
109         }
110
111         brl->server = server;
112         brl->imessaging_ctx = imessaging_ctx;
113
114         return brl;
115 }
116
117 static struct brl_handle *brl_tdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
118                                                     DATA_BLOB *file_key)
119 {
120         struct brl_handle *brlh;
121
122         brlh = talloc(mem_ctx, struct brl_handle);
123         if (brlh == NULL) {
124                 return NULL;
125         }
126
127         brlh->key = *file_key;
128         brlh->ntvfs = ntvfs;
129         ZERO_STRUCT(brlh->last_lock);
130
131         return brlh;
132 }
133
134 /*
135   see if two locking contexts are equal
136 */
137 static bool brl_tdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
138 {
139         return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
140                 ctx1->smbpid == ctx2->smbpid &&
141                 ctx1->ctx == ctx2->ctx);
142 }
143
144 /*
145   see if lck1 and lck2 overlap
146
147   lck1 is the existing lock. lck2 is the new lock we are 
148   looking at adding
149 */
150 static bool brl_tdb_overlap(struct lock_struct *lck1, 
151                             struct lock_struct *lck2)
152 {
153         /* this extra check is not redundant - it copes with locks
154            that go beyond the end of 64 bit file space */
155         if (lck1->size != 0 &&
156             lck1->start == lck2->start &&
157             lck1->size == lck2->size) {
158                 return true;
159         }
160             
161         if (lck1->start >= (lck2->start+lck2->size) ||
162             lck2->start >= (lck1->start+lck1->size)) {
163                 return false;
164         }
165
166         /* we have a conflict. Now check to see if lck1 really still
167          * exists, which involves checking if the process still
168          * exists. We leave this test to last as its the most
169          * expensive test, especially when we are clustered */
170         /* TODO: need to do this via a server_id_exists() call, which
171          * hasn't been written yet. When clustered this will need to
172          * call into ctdb */
173
174         return true;
175
176
177 /*
178  See if lock2 can be added when lock1 is in place.
179 */
180 static bool brl_tdb_conflict(struct lock_struct *lck1, 
181                          struct lock_struct *lck2)
182 {
183         /* pending locks don't conflict with anything */
184         if (lck1->lock_type >= PENDING_READ_LOCK ||
185             lck2->lock_type >= PENDING_READ_LOCK) {
186                 return false;
187         }
188
189         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
190                 return false;
191         }
192
193         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
194             lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
195                 return false;
196         }
197
198         return brl_tdb_overlap(lck1, lck2);
199
200
201
202 /*
203  Check to see if this lock conflicts, but ignore our own locks on the
204  same fnum only.
205 */
206 static bool brl_tdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
207 {
208         /* pending locks don't conflict with anything */
209         if (lck1->lock_type >= PENDING_READ_LOCK ||
210             lck2->lock_type >= PENDING_READ_LOCK) {
211                 return false;
212         }
213
214         if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
215                 return false;
216
217         /*
218          * note that incoming write calls conflict with existing READ
219          * locks even if the context is the same. JRA. See LOCKTEST7
220          * in smbtorture.
221          */
222         if (brl_tdb_same_context(&lck1->context, &lck2->context) &&
223             lck1->ntvfs == lck2->ntvfs &&
224             (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
225                 return false;
226         }
227
228         return brl_tdb_overlap(lck1, lck2);
229
230
231
232 /*
233   amazingly enough, w2k3 "remembers" whether the last lock failure
234   is the same as this one and changes its error code. I wonder if any
235   app depends on this?
236 */
237 static NTSTATUS brl_tdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
238 {
239         /*
240          * this function is only called for non pending lock!
241          */
242
243         /* in SMB2 mode always return NT_STATUS_LOCK_NOT_GRANTED! */
244         if (lock->ntvfs->ctx->protocol >= PROTOCOL_SMB2_02) {
245                 return NT_STATUS_LOCK_NOT_GRANTED;
246         }
247
248         /* 
249          * if the notify_ptr is non NULL,
250          * it means that we're at the end of a pending lock
251          * and the real lock is requested after the timout went by
252          * In this case we need to remember the last_lock and always
253          * give FILE_LOCK_CONFLICT
254          */
255         if (lock->notify_ptr) {
256                 brlh->last_lock = *lock;
257                 return NT_STATUS_FILE_LOCK_CONFLICT;
258         }
259
260         /* 
261          * amazing the little things you learn with a test
262          * suite. Locks beyond this offset (as a 64 bit
263          * number!) always generate the conflict error code,
264          * unless the top bit is set
265          */
266         if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
267                 brlh->last_lock = *lock;
268                 return NT_STATUS_FILE_LOCK_CONFLICT;
269         }
270
271         /*
272          * if the current lock matches the last failed lock on the file handle
273          * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
274          */
275         if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
276             lock->context.ctx == brlh->last_lock.context.ctx &&
277             lock->ntvfs == brlh->last_lock.ntvfs &&
278             lock->start == brlh->last_lock.start) {
279                 return NT_STATUS_FILE_LOCK_CONFLICT;
280         }
281
282         brlh->last_lock = *lock;
283         return NT_STATUS_LOCK_NOT_GRANTED;
284 }
285
286 /*
287   Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
288   which case a real lock is first tried, and if that fails then a
289   pending lock is created. When the pending lock is triggered (by
290   someone else closing an overlapping lock range) a messaging
291   notification is sent, identified by the notify_ptr
292 */
293 static NTSTATUS brl_tdb_lock(struct brl_context *brl,
294                          struct brl_handle *brlh,
295                          uint32_t smbpid,
296                          uint64_t start, uint64_t size, 
297                          enum brl_type lock_type,
298                          void *notify_ptr)
299 {
300         TDB_DATA kbuf, dbuf;
301         int count=0, i;
302         struct lock_struct lock, *locks=NULL;
303         NTSTATUS status;
304         struct db_record *locked;
305
306         kbuf.dptr = brlh->key.data;
307         kbuf.dsize = brlh->key.length;
308
309         if (brl_invalid_lock_range(start, size)) {
310                 return NT_STATUS_INVALID_LOCK_RANGE;
311         }
312
313         locked = dbwrap_fetch_locked(brl->db, brl, kbuf);
314         if (!locked) {
315                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
316         }
317
318         /* if this is a pending lock, then with the chainlock held we
319            try to get the real lock. If we succeed then we don't need
320            to make it pending. This prevents a possible race condition
321            where the pending lock gets created after the lock that is
322            preventing the real lock gets removed */
323         if (lock_type >= PENDING_READ_LOCK) {
324                 enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
325
326                 /* here we need to force that the last_lock isn't overwritten */
327                 lock = brlh->last_lock;
328                 status = brl_tdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
329                 brlh->last_lock = lock;
330
331                 if (NT_STATUS_IS_OK(status)) {
332                         talloc_free(locked);
333                         return NT_STATUS_OK;
334                 }
335         }
336
337         dbuf = dbwrap_record_get_value(locked);
338
339         lock.context.smbpid = smbpid;
340         lock.context.server = brl->server;
341         lock.context.ctx = brl;
342         lock.ntvfs = brlh->ntvfs;
343         lock.context.ctx = brl;
344         lock.start = start;
345         lock.size = size;
346         lock.lock_type = lock_type;
347         lock.notify_ptr = notify_ptr;
348
349         if (dbuf.dptr) {
350                 /* there are existing locks - make sure they don't conflict */
351                 locks = (struct lock_struct *)dbuf.dptr;
352                 count = dbuf.dsize / sizeof(*locks);
353                 for (i=0; i<count; i++) {
354                         if (brl_tdb_conflict(&locks[i], &lock)) {
355                                 status = brl_tdb_lock_failed(brlh, &lock);
356                                 goto fail;
357                         }
358                 }
359         }
360
361         /* no conflicts - add it to the list of locks */
362         /* FIXME: a dbwrap_record_append() would help here! */
363         locks = talloc_array(locked, struct lock_struct, count+1);
364         if (!locks) {
365                 status = NT_STATUS_NO_MEMORY;
366                 goto fail;
367         }
368         memcpy(locks, dbuf.dptr, dbuf.dsize);
369         locks[count] = lock;
370
371         dbuf.dptr = (unsigned char *)locks;
372         dbuf.dsize += sizeof(lock);
373
374         status = dbwrap_record_store(locked, dbuf, TDB_REPLACE);
375         if (!NT_STATUS_IS_OK(status)) {
376                 goto fail;
377         }
378
379         talloc_free(locked);
380
381         /* the caller needs to know if the real lock was granted. If
382            we have reached here then it must be a pending lock that
383            was granted, so tell them the lock failed */
384         if (lock_type >= PENDING_READ_LOCK) {
385                 return NT_STATUS_LOCK_NOT_GRANTED;
386         }
387
388         return NT_STATUS_OK;
389
390  fail:
391         talloc_free(locked);
392         return status;
393 }
394
395
396 /*
397   we are removing a lock that might be holding up a pending lock. Scan for pending
398   locks that cover this range and if we find any then notify the server that it should
399   retry the lock
400 */
401 static void brl_tdb_notify_unlock(struct brl_context *brl,
402                               struct lock_struct *locks, int count, 
403                               struct lock_struct *removed_lock)
404 {
405         int i, last_notice;
406
407         /* the last_notice logic is to prevent stampeding on a lock
408            range. It prevents us sending hundreds of notifies on the
409            same range of bytes. It doesn't prevent all possible
410            stampedes, but it does prevent the most common problem */
411         last_notice = -1;
412
413         for (i=0;i<count;i++) {
414                 if (locks[i].lock_type >= PENDING_READ_LOCK &&
415                     brl_tdb_overlap(&locks[i], removed_lock)) {
416                         if (last_notice != -1 && brl_tdb_overlap(&locks[i], &locks[last_notice])) {
417                                 continue;
418                         }
419                         if (locks[i].lock_type == PENDING_WRITE_LOCK) {
420                                 last_notice = i;
421                         }
422                         imessaging_send_ptr(brl->imessaging_ctx, locks[i].context.server,
423                                            MSG_BRL_RETRY, locks[i].notify_ptr);
424                 }
425         }
426 }
427
428
429 /*
430   send notifications for all pending locks - the file is being closed by this
431   user
432 */
433 static void brl_tdb_notify_all(struct brl_context *brl,
434                            struct lock_struct *locks, int count)
435 {
436         int i;
437         for (i=0;i<count;i++) {
438                 if (locks->lock_type >= PENDING_READ_LOCK) {
439                         brl_tdb_notify_unlock(brl, locks, count, &locks[i]);
440                 }
441         }
442 }
443
444
445
446 /*
447  Unlock a range of bytes.
448 */
449 static NTSTATUS brl_tdb_unlock(struct brl_context *brl,
450                            struct brl_handle *brlh, 
451                            uint32_t smbpid,
452                            uint64_t start, uint64_t size)
453 {
454         TDB_DATA kbuf, dbuf;
455         int count, i;
456         struct lock_struct *locks, *lock;
457         struct lock_context context;
458         struct db_record *locked;
459         NTSTATUS status;
460
461         kbuf.dptr = brlh->key.data;
462         kbuf.dsize = brlh->key.length;
463
464         if (brl_invalid_lock_range(start, size)) {
465                 return NT_STATUS_INVALID_LOCK_RANGE;
466         }
467
468         locked = dbwrap_fetch_locked(brl->db, brl, kbuf);
469         if (!locked) {
470                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
471         }
472         dbuf = dbwrap_record_get_value(locked);
473
474         context.smbpid = smbpid;
475         context.server = brl->server;
476         context.ctx = brl;
477
478         /* there are existing locks - find a match */
479         locks = (struct lock_struct *)dbuf.dptr;
480         count = dbuf.dsize / sizeof(*locks);
481
482         for (i=0; i<count; i++) {
483                 lock = &locks[i];
484                 if (brl_tdb_same_context(&lock->context, &context) &&
485                     lock->ntvfs == brlh->ntvfs &&
486                     lock->start == start &&
487                     lock->size == size &&
488                     lock->lock_type == WRITE_LOCK) {
489                         break;
490                 }
491         }
492         if (i < count) goto found;
493
494         for (i=0; i<count; i++) {
495                 lock = &locks[i];
496                 if (brl_tdb_same_context(&lock->context, &context) &&
497                     lock->ntvfs == brlh->ntvfs &&
498                     lock->start == start &&
499                     lock->size == size &&
500                     lock->lock_type < PENDING_READ_LOCK) {
501                         break;
502                 }
503         }
504
505 found:
506         if (i < count) {
507                 /* found it - delete it */
508                 if (count == 1) {
509                         status = dbwrap_record_delete(locked);
510                         if (!NT_STATUS_IS_OK(status)) {
511                                 goto fail;
512                         }
513                 } else {
514                         struct lock_struct removed_lock = *lock;
515                         if (i < count-1) {
516                                 memmove(&locks[i], &locks[i+1], 
517                                         sizeof(*locks)*((count-1) - i));
518                         }
519                         count--;
520                         
521                         /* send notifications for any relevant pending locks */
522                         brl_tdb_notify_unlock(brl, locks, count, &removed_lock);
523                         
524                         dbuf.dsize = count * sizeof(*locks);
525
526                         status = dbwrap_record_store(locked, dbuf, TDB_REPLACE);
527                         if (!NT_STATUS_IS_OK(status)) {
528                                 goto fail;
529                         }
530                 }
531
532                 talloc_free(locked);
533                 return NT_STATUS_OK;
534         }
535         
536         /* we didn't find it */
537         status = NT_STATUS_RANGE_NOT_LOCKED;
538
539  fail:
540         talloc_free(locked);
541         return status;
542 }
543
544
545 /*
546   remove a pending lock. This is called when the caller has either
547   given up trying to establish a lock or when they have succeeded in
548   getting it. In either case they no longer need to be notified.
549 */
550 static NTSTATUS brl_tdb_remove_pending(struct brl_context *brl,
551                                    struct brl_handle *brlh, 
552                                    void *notify_ptr)
553 {
554         TDB_DATA kbuf, dbuf;
555         int count, i;
556         struct lock_struct *locks;
557         NTSTATUS status;
558         struct db_record *locked;
559
560         kbuf.dptr = brlh->key.data;
561         kbuf.dsize = brlh->key.length;
562
563         locked = dbwrap_fetch_locked(brl->db, brl, kbuf);
564         if (!locked) {
565                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
566         }
567
568         dbuf = dbwrap_record_get_value(locked);
569         if (!dbuf.dptr) {
570                 talloc_free(locked);
571                 return NT_STATUS_RANGE_NOT_LOCKED;
572         }
573
574         /* there are existing locks - find a match */
575         locks = (struct lock_struct *)dbuf.dptr;
576         count = dbuf.dsize / sizeof(*locks);
577
578         for (i=0; i<count; i++) {
579                 struct lock_struct *lock = &locks[i];
580                 
581                 if (lock->lock_type >= PENDING_READ_LOCK &&
582                     lock->notify_ptr == notify_ptr &&
583                     cluster_id_equal(&lock->context.server, &brl->server)) {
584                         /* found it - delete it */
585                         if (count == 1) {
586                                 status = dbwrap_record_delete(locked);
587                                 if (!NT_STATUS_IS_OK(status)) {
588                                         goto fail;
589                                 }
590                         } else {
591                                 if (i < count-1) {
592                                         memmove(&locks[i], &locks[i+1], 
593                                                 sizeof(*locks)*((count-1) - i));
594                                 }
595                                 count--;
596                                 dbuf.dsize = count * sizeof(*locks);
597                                 status = dbwrap_record_store(locked, dbuf,
598                                                              TDB_REPLACE);
599                                 if (!NT_STATUS_IS_OK(status)) {
600                                         goto fail;
601                                 }
602                         }
603                         
604                         talloc_free(locked);
605                         return NT_STATUS_OK;
606                 }
607         }
608         
609         /* we didn't find it */
610         status = NT_STATUS_RANGE_NOT_LOCKED;
611
612  fail:
613         talloc_free(locked);
614         return status;
615 }
616
617
618 /*
619   Test if we are allowed to perform IO on a region of an open file
620 */
621 static NTSTATUS brl_tdb_locktest(struct brl_context *brl,
622                              struct brl_handle *brlh,
623                              uint32_t smbpid, 
624                              uint64_t start, uint64_t size, 
625                              enum brl_type lock_type)
626 {
627         TDB_DATA kbuf, dbuf;
628         int count, i;
629         struct lock_struct lock, *locks;
630         NTSTATUS status;
631
632         kbuf.dptr = brlh->key.data;
633         kbuf.dsize = brlh->key.length;
634
635         if (brl_invalid_lock_range(start, size)) {
636                 return NT_STATUS_INVALID_LOCK_RANGE;
637         }
638
639         status = dbwrap_fetch(brl->db, brl, kbuf, &dbuf);
640         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
641                 return NT_STATUS_OK;
642         } else if (!NT_STATUS_IS_OK(status)) {
643                 return status;
644         }
645
646         lock.context.smbpid = smbpid;
647         lock.context.server = brl->server;
648         lock.context.ctx = brl;
649         lock.ntvfs = brlh->ntvfs;
650         lock.start = start;
651         lock.size = size;
652         lock.lock_type = lock_type;
653
654         /* there are existing locks - make sure they don't conflict */
655         locks = (struct lock_struct *)dbuf.dptr;
656         count = dbuf.dsize / sizeof(*locks);
657
658         for (i=0; i<count; i++) {
659                 if (brl_tdb_conflict_other(&locks[i], &lock)) {
660                         talloc_free(dbuf.dptr);
661                         return NT_STATUS_FILE_LOCK_CONFLICT;
662                 }
663         }
664
665         talloc_free(dbuf.dptr);
666         return NT_STATUS_OK;
667 }
668
669
670 /*
671  Remove any locks associated with a open file.
672 */
673 static NTSTATUS brl_tdb_close(struct brl_context *brl,
674                           struct brl_handle *brlh)
675 {
676         TDB_DATA kbuf, dbuf;
677         int count, i, dcount=0;
678         struct lock_struct *locks;
679         struct db_record *locked;
680         NTSTATUS status;
681
682         kbuf.dptr = brlh->key.data;
683         kbuf.dsize = brlh->key.length;
684
685         locked = dbwrap_fetch_locked(brl->db, brl, kbuf);
686         if (!locked) {
687                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
688         }
689         dbuf = dbwrap_record_get_value(locked);
690         if (!dbuf.dptr) {
691                 talloc_free(locked);
692                 return NT_STATUS_OK;
693         }
694
695         /* there are existing locks - remove any for this fnum */
696         locks = (struct lock_struct *)dbuf.dptr;
697         count = dbuf.dsize / sizeof(*locks);
698
699         for (i=0; i<count; i++) {
700                 struct lock_struct *lock = &locks[i];
701
702                 if (lock->context.ctx == brl &&
703                     cluster_id_equal(&lock->context.server, &brl->server) &&
704                     lock->ntvfs == brlh->ntvfs) {
705                         /* found it - delete it */
706                         if (count > 1 && i < count-1) {
707                                 memmove(&locks[i], &locks[i+1], 
708                                         sizeof(*locks)*((count-1) - i));
709                         }
710                         count--;
711                         i--;
712                         dcount++;
713                 }
714         }
715
716         status = NT_STATUS_OK;
717
718         if (count == 0) {
719                 status = dbwrap_record_delete(locked);
720         } else if (dcount != 0) {
721                 /* tell all pending lock holders for this file that
722                    they have a chance now. This is a bit indiscriminant,
723                    but works OK */
724                 brl_tdb_notify_all(brl, locks, count);
725
726                 dbuf.dsize = count * sizeof(*locks);
727
728                 status = dbwrap_record_store(locked, dbuf, TDB_REPLACE);
729         }
730         talloc_free(locked);
731
732         return status;
733 }
734
735 static NTSTATUS brl_tdb_count(struct brl_context *brl, struct brl_handle *brlh,
736                               int *count)
737 {
738         TDB_DATA kbuf, dbuf;
739         NTSTATUS status;
740
741         kbuf.dptr = brlh->key.data;
742         kbuf.dsize = brlh->key.length;
743         *count = 0;
744
745         status = dbwrap_fetch(brl->db, brl, kbuf, &dbuf);
746         if (NT_STATUS_EQUAL(status, NT_STATUS_NOT_FOUND)) {
747                 return NT_STATUS_OK;
748         } else if (!NT_STATUS_IS_OK(status)) {
749                 return status;
750         }
751         *count = dbuf.dsize / sizeof(struct lock_struct);
752
753         talloc_free(dbuf.dptr);
754
755         return NT_STATUS_OK;
756 }
757
758 static const struct brlock_ops brlock_tdb_ops = {
759         .brl_init           = brl_tdb_init,
760         .brl_create_handle  = brl_tdb_create_handle,
761         .brl_lock           = brl_tdb_lock,
762         .brl_unlock         = brl_tdb_unlock,
763         .brl_remove_pending = brl_tdb_remove_pending,
764         .brl_locktest       = brl_tdb_locktest,
765         .brl_close          = brl_tdb_close,
766         .brl_count          = brl_tdb_count
767 };
768
769
770 void brl_tdb_init_ops(void)
771 {
772         brlock_set_ops(&brlock_tdb_ops);
773 }