be78e0995810497e8fad6da7e75caaff1760d6b8
[samba.git] / source4 / ntvfs / common / opendb_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the open files database, tdb backend. It implements shared
23   storage of what files are open between server instances, and
24   implements the rules of shared access to files.
25
26   The caller needs to provide a file_key, which specifies what file
27   they are talking about. This needs to be a unique key across all
28   filesystems, and is usually implemented in terms of a device/inode
29   pair.
30
31   Before any operations can be performed the caller needs to establish
32   a lock on the record associated with file_key. That is done by
33   calling odb_lock(). The caller releases this lock by calling
34   talloc_free() on the returned handle.
35
36   All other operations on a record are done by passing the odb_lock()
37   handle back to this module. The handle contains internal
38   information about what file_key is being operated on.
39 */
40
41 #include "includes.h"
42 #include "system/filesys.h"
43 #include "lib/tdb/include/tdb.h"
44 #include "messaging/messaging.h"
45 #include "tdb_wrap.h"
46 #include "lib/messaging/irpc.h"
47 #include "librpc/gen_ndr/ndr_opendb.h"
48 #include "ntvfs/ntvfs.h"
49 #include "ntvfs/common/ntvfs_common.h"
50 #include "cluster/cluster.h"
51 #include "param/param.h"
52 #include "ntvfs/sysdep/sys_lease.h"
53
54 struct odb_context {
55         struct tdb_wrap *w;
56         struct ntvfs_context *ntvfs_ctx;
57         bool oplocks;
58         struct sys_lease_context *lease_ctx;
59 };
60
61 /*
62   an odb lock handle. You must obtain one of these using odb_lock() before doing
63   any other operations. 
64 */
65 struct odb_lock {
66         struct odb_context *odb;
67         TDB_DATA key;
68
69         struct opendb_file file;
70
71         struct {
72                 struct opendb_entry *e;
73                 bool attrs_only;
74         } can_open;
75 };
76
77 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
78                                       struct opendb_entry *e,
79                                       uint8_t level);
80
81 /*
82   Open up the openfiles.tdb database. Close it down using
83   talloc_free(). We need the messaging_ctx to allow for pending open
84   notifications.
85 */
86 static struct odb_context *odb_tdb_init(TALLOC_CTX *mem_ctx, 
87                                         struct ntvfs_context *ntvfs_ctx)
88 {
89         struct odb_context *odb;
90
91         odb = talloc(mem_ctx, struct odb_context);
92         if (odb == NULL) {
93                 return NULL;
94         }
95
96         odb->w = cluster_tdb_tmp_open(odb, ntvfs_ctx->lp_ctx, "openfiles.tdb", TDB_DEFAULT);
97         if (odb->w == NULL) {
98                 talloc_free(odb);
99                 return NULL;
100         }
101
102         odb->ntvfs_ctx = ntvfs_ctx;
103
104         /* leave oplocks disabled by default until the code is working */
105         odb->oplocks = lp_parm_bool(ntvfs_ctx->lp_ctx, NULL, "opendb", "oplocks", false);
106
107         odb->lease_ctx = sys_lease_context_create(ntvfs_ctx->config, odb,
108                                                   ntvfs_ctx->event_ctx,
109                                                   ntvfs_ctx->msg_ctx,
110                                                   odb_oplock_break_send);
111
112         return odb;
113 }
114
115 /*
116   destroy a lock on the database
117 */
118 static int odb_lock_destructor(struct odb_lock *lck)
119 {
120         tdb_chainunlock(lck->odb->w->tdb, lck->key);
121         return 0;
122 }
123
124 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file);
125
126 /*
127   get a lock on a entry in the odb. This call returns a lock handle,
128   which the caller should unlock using talloc_free().
129 */
130 static struct odb_lock *odb_tdb_lock(TALLOC_CTX *mem_ctx,
131                                      struct odb_context *odb, DATA_BLOB *file_key)
132 {
133         struct odb_lock *lck;
134         NTSTATUS status;
135
136         lck = talloc(mem_ctx, struct odb_lock);
137         if (lck == NULL) {
138                 return NULL;
139         }
140
141         lck->odb = talloc_reference(lck, odb);
142         lck->key.dptr = talloc_memdup(lck, file_key->data, file_key->length);
143         lck->key.dsize = file_key->length;
144         if (lck->key.dptr == NULL) {
145                 talloc_free(lck);
146                 return NULL;
147         }
148
149         if (tdb_chainlock(odb->w->tdb, lck->key) != 0) {
150                 talloc_free(lck);
151                 return NULL;
152         }
153
154         ZERO_STRUCT(lck->can_open);
155
156         talloc_set_destructor(lck, odb_lock_destructor);
157
158         status = odb_pull_record(lck, &lck->file);
159         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
160                 /* initialise a blank structure */
161                 ZERO_STRUCT(lck->file);
162         } else if (!NT_STATUS_IS_OK(status)) {
163                 talloc_free(lck);
164                 return NULL;
165         }
166         
167         return lck;
168 }
169
170 static DATA_BLOB odb_tdb_get_key(TALLOC_CTX *mem_ctx, struct odb_lock *lck)
171 {
172         return data_blob_talloc(mem_ctx, lck->key.dptr, lck->key.dsize);
173 }
174
175
176 /*
177   determine if two odb_entry structures conflict
178
179   return NT_STATUS_OK on no conflict
180 */
181 static NTSTATUS share_conflict(struct opendb_entry *e1,
182                                uint32_t stream_id,
183                                uint32_t share_access,
184                                uint32_t access_mask)
185 {
186         /* if either open involves no read.write or delete access then
187            it can't conflict */
188         if (!(e1->access_mask & (SEC_FILE_WRITE_DATA |
189                                  SEC_FILE_APPEND_DATA |
190                                  SEC_FILE_READ_DATA |
191                                  SEC_FILE_EXECUTE |
192                                  SEC_STD_DELETE))) {
193                 return NT_STATUS_OK;
194         }
195         if (!(access_mask & (SEC_FILE_WRITE_DATA |
196                              SEC_FILE_APPEND_DATA |
197                              SEC_FILE_READ_DATA |
198                              SEC_FILE_EXECUTE |
199                              SEC_STD_DELETE))) {
200                 return NT_STATUS_OK;
201         }
202
203         /* data IO access masks. This is skipped if the two open handles
204            are on different streams (as in that case the masks don't
205            interact) */
206         if (e1->stream_id != stream_id) {
207                 return NT_STATUS_OK;
208         }
209
210 #define CHECK_MASK(am, right, sa, share) \
211         if (((am) & (right)) && !((sa) & (share))) return NT_STATUS_SHARING_VIOLATION
212
213         CHECK_MASK(e1->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
214                    share_access, NTCREATEX_SHARE_ACCESS_WRITE);
215         CHECK_MASK(access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
216                    e1->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
217         
218         CHECK_MASK(e1->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
219                    share_access, NTCREATEX_SHARE_ACCESS_READ);
220         CHECK_MASK(access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
221                    e1->share_access, NTCREATEX_SHARE_ACCESS_READ);
222
223         CHECK_MASK(e1->access_mask, SEC_STD_DELETE,
224                    share_access, NTCREATEX_SHARE_ACCESS_DELETE);
225         CHECK_MASK(access_mask, SEC_STD_DELETE,
226                    e1->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
227 #undef CHECK_MASK
228         return NT_STATUS_OK;
229 }
230
231 /*
232   pull a record, translating from the db format to the opendb_file structure defined
233   in opendb.idl
234 */
235 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file)
236 {
237         struct odb_context *odb = lck->odb;
238         TDB_DATA dbuf;
239         DATA_BLOB blob;
240         enum ndr_err_code ndr_err;
241
242         dbuf = tdb_fetch(odb->w->tdb, lck->key);
243         if (dbuf.dptr == NULL) {
244                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
245         }
246
247         blob.data = dbuf.dptr;
248         blob.length = dbuf.dsize;
249
250         ndr_err = ndr_pull_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_pull_flags_fn_t)ndr_pull_opendb_file);
251         free(dbuf.dptr);
252         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
253                 return ndr_map_error2ntstatus(ndr_err);
254         }
255
256         return NT_STATUS_OK;
257 }
258
259 /*
260   push a record, translating from the opendb_file structure defined in opendb.idl
261 */
262 static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file)
263 {
264         struct odb_context *odb = lck->odb;
265         TDB_DATA dbuf;
266         DATA_BLOB blob;
267         enum ndr_err_code ndr_err;
268         int ret;
269
270         if (file->num_entries == 0) {
271                 ret = tdb_delete(odb->w->tdb, lck->key);
272                 if (ret != 0) {
273                         return NT_STATUS_INTERNAL_DB_CORRUPTION;
274                 }
275                 return NT_STATUS_OK;
276         }
277
278         ndr_err = ndr_push_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_push_flags_fn_t)ndr_push_opendb_file);
279         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
280                 return ndr_map_error2ntstatus(ndr_err);
281         }
282
283         dbuf.dptr = blob.data;
284         dbuf.dsize = blob.length;
285                 
286         ret = tdb_store(odb->w->tdb, lck->key, dbuf, TDB_REPLACE);
287         data_blob_free(&blob);
288         if (ret != 0) {
289                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
290         }
291
292         return NT_STATUS_OK;
293 }
294
295 /*
296   send an oplock break to a client
297 */
298 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
299                                       struct opendb_entry *e,
300                                       uint8_t level)
301 {
302         NTSTATUS status;
303         struct opendb_oplock_break op_break;
304         DATA_BLOB blob;
305
306         ZERO_STRUCT(op_break);
307
308         /* tell the server handling this open file about the need to send the client
309            a break */
310         op_break.file_handle    = e->file_handle;
311         op_break.level          = level;
312
313         blob = data_blob_const(&op_break, sizeof(op_break));
314
315         status = messaging_send(msg_ctx, e->server,
316                                 MSG_NTVFS_OPLOCK_BREAK, &blob);
317         NT_STATUS_NOT_OK_RETURN(status);
318
319         return NT_STATUS_OK;
320 }
321
322 static bool access_attributes_only(uint32_t access_mask,
323                                    uint32_t open_disposition,
324                                    bool break_to_none)
325 {
326         switch (open_disposition) {
327         case NTCREATEX_DISP_SUPERSEDE:
328         case NTCREATEX_DISP_OVERWRITE_IF:
329         case NTCREATEX_DISP_OVERWRITE:
330                 return false;
331         default:
332                 break;
333         }
334
335         if (break_to_none) {
336                 return false;
337         }
338
339 #define CHECK_MASK(m,g) ((m) && (((m) & ~(g))==0) && (((m) & (g)) != 0))
340         return CHECK_MASK(access_mask,
341                           SEC_STD_SYNCHRONIZE |
342                           SEC_FILE_READ_ATTRIBUTE |
343                           SEC_FILE_WRITE_ATTRIBUTE);
344 #undef CHECK_MASK
345 }
346
347 static NTSTATUS odb_tdb_open_can_internal(struct odb_context *odb,
348                                           const struct opendb_file *file,
349                                           uint32_t stream_id, uint32_t share_access,
350                                           uint32_t access_mask, bool delete_on_close,
351                                           uint32_t open_disposition, bool break_to_none,
352                                           bool *_attrs_only)
353 {
354         NTSTATUS status;
355         uint32_t i;
356         bool attrs_only = false;
357
358         /* see if anyone has an oplock, which we need to break */
359         for (i=0;i<file->num_entries;i++) {
360                 if (file->entries[i].oplock_level == OPLOCK_BATCH) {
361                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
362                         /* if this is an attribute only access
363                          * it doesn't conflict with a BACTCH oplock
364                          * but we'll not grant the oplock below
365                          */
366                         attrs_only = access_attributes_only(access_mask,
367                                                             open_disposition,
368                                                             break_to_none);
369                         if (attrs_only) {
370                                 break;
371                         }
372                         /* a batch oplock caches close calls, which
373                            means the client application might have
374                            already closed the file. We have to allow
375                            this close to propogate by sending a oplock
376                            break request and suspending this call
377                            until the break is acknowledged or the file
378                            is closed */
379                         if (break_to_none ||
380                             !file->entries[i].allow_level_II_oplock) {
381                                 oplock_return = OPLOCK_BREAK_TO_NONE;
382                         }
383                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
384                                               &file->entries[i],
385                                               oplock_return);
386                         return NT_STATUS_OPLOCK_NOT_GRANTED;
387                 }
388         }
389
390         if (file->delete_on_close) {
391                 /* while delete on close is set, no new opens are allowed */
392                 return NT_STATUS_DELETE_PENDING;
393         }
394
395         if (file->num_entries != 0 && delete_on_close) {
396                 return NT_STATUS_SHARING_VIOLATION;
397         }
398
399         /* check for sharing violations */
400         for (i=0;i<file->num_entries;i++) {
401                 status = share_conflict(&file->entries[i], stream_id,
402                                         share_access, access_mask);
403                 NT_STATUS_NOT_OK_RETURN(status);
404         }
405
406         /* we now know the open could succeed, but we need to check
407            for any exclusive oplocks. We can't grant a second open
408            till these are broken. Note that we check for batch oplocks
409            before checking for sharing violations, and check for
410            exclusive oplocks afterwards. */
411         for (i=0;i<file->num_entries;i++) {
412                 if (file->entries[i].oplock_level == OPLOCK_EXCLUSIVE) {
413                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
414                         /* if this is an attribute only access
415                          * it doesn't conflict with an EXCLUSIVE oplock
416                          * but we'll not grant the oplock below
417                          */
418                         attrs_only = access_attributes_only(access_mask,
419                                                             open_disposition,
420                                                             break_to_none);
421                         if (attrs_only) {
422                                 break;
423                         }
424                         /*
425                          * send an oplock break to the holder of the
426                          * oplock and tell caller to retry later
427                          */
428                         if (break_to_none ||
429                             !file->entries[i].allow_level_II_oplock) {
430                                 oplock_return = OPLOCK_BREAK_TO_NONE;
431                         }
432                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
433                                               &file->entries[i],
434                                               oplock_return);
435                         return NT_STATUS_OPLOCK_NOT_GRANTED;
436                 }
437         }
438
439         if (_attrs_only) {
440                 *_attrs_only = attrs_only;
441         }
442         return NT_STATUS_OK;
443 }
444
445 /*
446   register an open file in the open files database.
447   The share_access rules are implemented by odb_can_open()
448   and it's needed to call odb_can_open() before
449   odb_open_file() otherwise NT_STATUS_INTERNAL_ERROR is returned
450
451   Note that the path is only used by the delete on close logic, not
452   for comparing with other filenames
453 */
454 static NTSTATUS odb_tdb_open_file(struct odb_lock *lck,
455                                   void *file_handle, const char *path,
456                                   int *fd, bool allow_level_II_oplock,
457                                   uint32_t oplock_level, uint32_t *oplock_granted)
458 {
459         struct odb_context *odb = lck->odb;
460
461         if (!lck->can_open.e) {
462                 return NT_STATUS_INTERNAL_ERROR;
463         }
464
465         if (odb->oplocks == false) {
466                 oplock_level = OPLOCK_NONE;
467         }
468
469         if (!oplock_granted) {
470                 oplock_level = OPLOCK_NONE;
471         }
472
473         if (lck->file.path == NULL) {
474                 lck->file.path = talloc_strdup(lck, path);
475                 NT_STATUS_HAVE_NO_MEMORY(lck->file.path);
476         }
477
478         /*
479           possibly grant an exclusive, batch or level2 oplock
480         */
481         if (lck->can_open.attrs_only) {
482                 oplock_level    = OPLOCK_NONE;
483         } else if (oplock_level == OPLOCK_EXCLUSIVE) {
484                 if (lck->file.num_entries == 0) {
485                         oplock_level    = OPLOCK_EXCLUSIVE;
486                 } else if (allow_level_II_oplock) {
487                         oplock_level    = OPLOCK_LEVEL_II;
488                 } else {
489                         oplock_level    = OPLOCK_NONE;
490                 }
491         } else if (oplock_level == OPLOCK_BATCH) {
492                 if (lck->file.num_entries == 0) {
493                         oplock_level    = OPLOCK_BATCH;
494                 } else if (allow_level_II_oplock) {
495                         oplock_level    = OPLOCK_LEVEL_II;
496                 } else {
497                         oplock_level    = OPLOCK_NONE;
498                 }
499         } else if (oplock_level == OPLOCK_LEVEL_II) {
500                 oplock_level    = OPLOCK_LEVEL_II;
501         } else {
502                 oplock_level    = OPLOCK_NONE;
503         }
504
505         lck->can_open.e->file_handle            = file_handle;
506         lck->can_open.e->fd                     = fd;
507         lck->can_open.e->allow_level_II_oplock  = allow_level_II_oplock;
508         lck->can_open.e->oplock_level           = oplock_level;
509
510         if (odb->lease_ctx && fd) {
511                 NTSTATUS status;
512                 status = sys_lease_setup(odb->lease_ctx, lck->can_open.e);
513                 NT_STATUS_NOT_OK_RETURN(status);
514         }
515
516         if (oplock_granted) {
517                 if (lck->can_open.e->oplock_level == OPLOCK_EXCLUSIVE) {
518                         *oplock_granted = EXCLUSIVE_OPLOCK_RETURN;
519                 } else if (lck->can_open.e->oplock_level == OPLOCK_BATCH) {
520                         *oplock_granted = BATCH_OPLOCK_RETURN;
521                 } else if (lck->can_open.e->oplock_level == OPLOCK_LEVEL_II) {
522                         *oplock_granted = LEVEL_II_OPLOCK_RETURN;
523                 } else {
524                         *oplock_granted = NO_OPLOCK_RETURN;
525                 }
526         }
527
528         /* it doesn't conflict, so add it to the end */
529         lck->file.entries = talloc_realloc(lck, lck->file.entries,
530                                            struct opendb_entry,
531                                            lck->file.num_entries+1);
532         NT_STATUS_HAVE_NO_MEMORY(lck->file.entries);
533
534         lck->file.entries[lck->file.num_entries] = *lck->can_open.e;
535         lck->file.num_entries++;
536
537         talloc_free(lck->can_open.e);
538         lck->can_open.e = NULL;
539
540         return odb_push_record(lck, &lck->file);
541 }
542
543
544 /*
545   register a pending open file in the open files database
546 */
547 static NTSTATUS odb_tdb_open_file_pending(struct odb_lock *lck, void *private)
548 {
549         struct odb_context *odb = lck->odb;
550
551         if (lck->file.path == NULL) {
552                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
553         }
554
555         lck->file.pending = talloc_realloc(lck, lck->file.pending,
556                                            struct opendb_pending,
557                                            lck->file.num_pending+1);
558         NT_STATUS_HAVE_NO_MEMORY(lck->file.pending);
559
560         lck->file.pending[lck->file.num_pending].server = odb->ntvfs_ctx->server_id;
561         lck->file.pending[lck->file.num_pending].notify_ptr = private;
562
563         lck->file.num_pending++;
564
565         return odb_push_record(lck, &lck->file);
566 }
567
568
569 /*
570   remove a opendb entry
571 */
572 static NTSTATUS odb_tdb_close_file(struct odb_lock *lck, void *file_handle,
573                                    const char **_delete_path)
574 {
575         struct odb_context *odb = lck->odb;
576         const char *delete_path = NULL;
577         int i;
578
579         if (lck->file.path == NULL) {
580                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
581         }
582
583         /* find the entry, and delete it */
584         for (i=0;i<lck->file.num_entries;i++) {
585                 if (file_handle == lck->file.entries[i].file_handle &&
586                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.entries[i].server)) {
587                         if (lck->file.entries[i].delete_on_close) {
588                                 lck->file.delete_on_close = true;
589                         }
590                         if (odb->lease_ctx && lck->file.entries[i].fd) {
591                                 NTSTATUS status;
592                                 status = sys_lease_remove(odb->lease_ctx, &lck->file.entries[i]);
593                                 NT_STATUS_NOT_OK_RETURN(status);
594                         }
595                         if (i < lck->file.num_entries-1) {
596                                 memmove(lck->file.entries+i, lck->file.entries+i+1,
597                                         (lck->file.num_entries - (i+1)) *
598                                         sizeof(struct opendb_entry));
599                         }
600                         break;
601                 }
602         }
603
604         if (i == lck->file.num_entries) {
605                 return NT_STATUS_UNSUCCESSFUL;
606         }
607
608         /* send any pending notifications, removing them once sent */
609         for (i=0;i<lck->file.num_pending;i++) {
610                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
611                                    lck->file.pending[i].server,
612                                    MSG_PVFS_RETRY_OPEN,
613                                    lck->file.pending[i].notify_ptr);
614         }
615         lck->file.num_pending = 0;
616
617         lck->file.num_entries--;
618
619         if (lck->file.num_entries == 0 && lck->file.delete_on_close) {
620                 delete_path = lck->file.path;
621         }
622
623         if (_delete_path) {
624                 *_delete_path = delete_path;
625         }
626
627         return odb_push_record(lck, &lck->file);
628 }
629
630 /*
631   update the oplock level of the client
632 */
633 static NTSTATUS odb_tdb_update_oplock(struct odb_lock *lck, void *file_handle,
634                                       uint32_t oplock_level)
635 {
636         struct odb_context *odb = lck->odb;
637         int i;
638
639         if (lck->file.path == NULL) {
640                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
641         }
642
643         /* find the entry, and update it */
644         for (i=0;i<lck->file.num_entries;i++) {
645                 if (file_handle == lck->file.entries[i].file_handle &&
646                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.entries[i].server)) {
647                         lck->file.entries[i].oplock_level = oplock_level;
648
649                         if (odb->lease_ctx && lck->file.entries[i].fd) {
650                                 NTSTATUS status;
651                                 status = sys_lease_update(odb->lease_ctx, &lck->file.entries[i]);
652                                 NT_STATUS_NOT_OK_RETURN(status);
653                         }
654
655                         break;
656                 }
657         }
658
659         if (i == lck->file.num_entries) {
660                 return NT_STATUS_UNSUCCESSFUL;
661         }
662
663         /* send any pending notifications, removing them once sent */
664         for (i=0;i<lck->file.num_pending;i++) {
665                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
666                                    lck->file.pending[i].server,
667                                    MSG_PVFS_RETRY_OPEN,
668                                    lck->file.pending[i].notify_ptr);
669         }
670         lck->file.num_pending = 0;
671
672         return odb_push_record(lck, &lck->file);
673 }
674
675 /*
676   send oplocks breaks to none to all level2 holders
677 */
678 static NTSTATUS odb_tdb_break_oplocks(struct odb_lock *lck)
679 {
680         struct odb_context *odb = lck->odb;
681         int i;
682         bool modified = false;
683
684         /* see if anyone has an oplock, which we need to break */
685         for (i=0;i<lck->file.num_entries;i++) {
686                 if (lck->file.entries[i].oplock_level == OPLOCK_LEVEL_II) {
687                         /*
688                          * there could be multiple level2 oplocks
689                          * and we just send a break to none to all of them
690                          * without waiting for a release
691                          */
692                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
693                                               &lck->file.entries[i],
694                                               OPLOCK_BREAK_TO_NONE);
695                         lck->file.entries[i].oplock_level = OPLOCK_NONE;
696                         modified = true;
697                 }
698         }
699
700         if (modified) {
701                 return odb_push_record(lck, &lck->file);
702         }
703         return NT_STATUS_OK;
704 }
705
706 /*
707   remove a pending opendb entry
708 */
709 static NTSTATUS odb_tdb_remove_pending(struct odb_lock *lck, void *private)
710 {
711         struct odb_context *odb = lck->odb;
712         int i;
713
714         if (lck->file.path == NULL) {
715                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
716         }
717
718         /* find the entry, and delete it */
719         for (i=0;i<lck->file.num_pending;i++) {
720                 if (private == lck->file.pending[i].notify_ptr &&
721                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.pending[i].server)) {
722                         if (i < lck->file.num_pending-1) {
723                                 memmove(lck->file.pending+i, lck->file.pending+i+1,
724                                         (lck->file.num_pending - (i+1)) *
725                                         sizeof(struct opendb_pending));
726                         }
727                         break;
728                 }
729         }
730
731         if (i == lck->file.num_pending) {
732                 return NT_STATUS_UNSUCCESSFUL;
733         }
734
735         lck->file.num_pending--;
736         
737         return odb_push_record(lck, &lck->file);
738 }
739
740
741 /*
742   rename the path in a open file
743 */
744 static NTSTATUS odb_tdb_rename(struct odb_lock *lck, const char *path)
745 {
746         if (lck->file.path == NULL) {
747                 /* not having the record at all is OK */
748                 return NT_STATUS_OK;
749         }
750
751         lck->file.path = talloc_strdup(lck, path);
752         NT_STATUS_HAVE_NO_MEMORY(lck->file.path);
753
754         return odb_push_record(lck, &lck->file);
755 }
756
757 /*
758   get the path of an open file
759 */
760 static NTSTATUS odb_tdb_get_path(struct odb_lock *lck, const char **path)
761 {
762         *path = NULL;
763
764         /* we don't ignore NT_STATUS_OBJECT_NAME_NOT_FOUND here */
765         if (lck->file.path == NULL) {
766                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
767         }
768
769         *path = lck->file.path;
770
771         return NT_STATUS_OK;
772 }
773
774 /*
775   update delete on close flag on an open file
776 */
777 static NTSTATUS odb_tdb_set_delete_on_close(struct odb_lock *lck, bool del_on_close)
778 {
779         if (lck->file.path == NULL) {
780                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
781         }
782
783         lck->file.delete_on_close = del_on_close;
784
785         return odb_push_record(lck, &lck->file);
786 }
787
788 /*
789   return the current value of the delete_on_close bit, and how many
790   people still have the file open
791 */
792 static NTSTATUS odb_tdb_get_delete_on_close(struct odb_context *odb, 
793                                             DATA_BLOB *key, bool *del_on_close)
794 {
795         struct odb_lock *lck;
796
797         (*del_on_close) = false;
798
799         lck = odb_lock(odb, odb, key);
800         NT_STATUS_HAVE_NO_MEMORY(lck);
801
802         (*del_on_close) = lck->file.delete_on_close;
803
804         talloc_free(lck);
805
806         return NT_STATUS_OK;
807 }
808
809
810 /*
811   determine if a file can be opened with the given share_access,
812   create_options and access_mask
813 */
814 static NTSTATUS odb_tdb_can_open(struct odb_lock *lck,
815                                  uint32_t stream_id, uint32_t share_access,
816                                  uint32_t access_mask, bool delete_on_close,
817                                  uint32_t open_disposition, bool break_to_none)
818 {
819         struct odb_context *odb = lck->odb;
820         NTSTATUS status;
821
822         status = odb_tdb_open_can_internal(odb, &lck->file, stream_id,
823                                            share_access, access_mask,
824                                            delete_on_close, open_disposition,
825                                            break_to_none, &lck->can_open.attrs_only);
826         NT_STATUS_NOT_OK_RETURN(status);
827
828         lck->can_open.e = talloc(lck, struct opendb_entry);
829         NT_STATUS_HAVE_NO_MEMORY(lck->can_open.e);
830
831         lck->can_open.e->server                 = odb->ntvfs_ctx->server_id;
832         lck->can_open.e->file_handle            = NULL;
833         lck->can_open.e->fd                     = NULL;
834         lck->can_open.e->stream_id              = stream_id;
835         lck->can_open.e->share_access           = share_access;
836         lck->can_open.e->access_mask            = access_mask;
837         lck->can_open.e->delete_on_close        = delete_on_close;
838         lck->can_open.e->allow_level_II_oplock  = false;
839         lck->can_open.e->oplock_level           = OPLOCK_NONE;
840
841         return NT_STATUS_OK;
842 }
843
844
845 static const struct opendb_ops opendb_tdb_ops = {
846         .odb_init                = odb_tdb_init,
847         .odb_lock                = odb_tdb_lock,
848         .odb_get_key             = odb_tdb_get_key,
849         .odb_open_file           = odb_tdb_open_file,
850         .odb_open_file_pending   = odb_tdb_open_file_pending,
851         .odb_close_file          = odb_tdb_close_file,
852         .odb_remove_pending      = odb_tdb_remove_pending,
853         .odb_rename              = odb_tdb_rename,
854         .odb_get_path            = odb_tdb_get_path,
855         .odb_set_delete_on_close = odb_tdb_set_delete_on_close,
856         .odb_get_delete_on_close = odb_tdb_get_delete_on_close,
857         .odb_can_open            = odb_tdb_can_open,
858         .odb_update_oplock       = odb_tdb_update_oplock,
859         .odb_break_oplocks       = odb_tdb_break_oplocks
860 };
861
862
863 void odb_tdb_init_ops(void)
864 {
865         sys_lease_init();
866         odb_set_ops(&opendb_tdb_ops);
867 }