Fix references to ntvfs share config
[jelmer/samba4-debian.git] / source / ntvfs / common / opendb_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the open files database, tdb backend. It implements shared
23   storage of what files are open between server instances, and
24   implements the rules of shared access to files.
25
26   The caller needs to provide a file_key, which specifies what file
27   they are talking about. This needs to be a unique key across all
28   filesystems, and is usually implemented in terms of a device/inode
29   pair.
30
31   Before any operations can be performed the caller needs to establish
32   a lock on the record associated with file_key. That is done by
33   calling odb_lock(). The caller releases this lock by calling
34   talloc_free() on the returned handle.
35
36   All other operations on a record are done by passing the odb_lock()
37   handle back to this module. The handle contains internal
38   information about what file_key is being operated on.
39 */
40
41 #include "includes.h"
42 #include "system/filesys.h"
43 #include "lib/tdb/include/tdb.h"
44 #include "messaging/messaging.h"
45 #include "tdb_wrap.h"
46 #include "lib/messaging/irpc.h"
47 #include "librpc/gen_ndr/ndr_opendb.h"
48 #include "ntvfs/ntvfs.h"
49 #include "ntvfs/common/ntvfs_common.h"
50 #include "cluster/cluster.h"
51 #include "param/param.h"
52 #include "ntvfs/sysdep/sys_lease.h"
53
54 struct odb_context {
55         struct tdb_wrap *w;
56         struct ntvfs_context *ntvfs_ctx;
57         bool oplocks;
58         struct sys_lease_context *lease_ctx;
59 };
60
61 /*
62   an odb lock handle. You must obtain one of these using odb_lock() before doing
63   any other operations. 
64 */
65 struct odb_lock {
66         struct odb_context *odb;
67         TDB_DATA key;
68
69         struct opendb_file file;
70
71         struct {
72                 struct opendb_entry *e;
73                 bool attrs_only;
74         } can_open;
75 };
76
77 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
78                                       struct opendb_entry *e,
79                                       uint8_t level);
80
81 /*
82   Open up the openfiles.tdb database. Close it down using
83   talloc_free(). We need the messaging_ctx to allow for pending open
84   notifications.
85 */
86 static struct odb_context *odb_tdb_init(TALLOC_CTX *mem_ctx, 
87                                         struct ntvfs_context *ntvfs_ctx)
88 {
89         struct odb_context *odb;
90
91         odb = talloc(mem_ctx, struct odb_context);
92         if (odb == NULL) {
93                 return NULL;
94         }
95
96         odb->w = cluster_tdb_tmp_open(odb, ntvfs_ctx->lp_ctx, "openfiles.tdb", TDB_DEFAULT);
97         if (odb->w == NULL) {
98                 talloc_free(odb);
99                 return NULL;
100         }
101
102         odb->ntvfs_ctx = ntvfs_ctx;
103
104         odb->oplocks = share_bool_option(ntvfs_ctx->config, SHARE_OPLOCKS, SHARE_OPLOCKS_DEFAULT);
105
106         odb->lease_ctx = sys_lease_context_create(ntvfs_ctx->config, odb,
107                                                   ntvfs_ctx->event_ctx,
108                                                   ntvfs_ctx->msg_ctx,
109                                                   odb_oplock_break_send);
110
111         return odb;
112 }
113
114 /*
115   destroy a lock on the database
116 */
117 static int odb_lock_destructor(struct odb_lock *lck)
118 {
119         tdb_chainunlock(lck->odb->w->tdb, lck->key);
120         return 0;
121 }
122
123 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file);
124
125 /*
126   get a lock on a entry in the odb. This call returns a lock handle,
127   which the caller should unlock using talloc_free().
128 */
129 static struct odb_lock *odb_tdb_lock(TALLOC_CTX *mem_ctx,
130                                      struct odb_context *odb, DATA_BLOB *file_key)
131 {
132         struct odb_lock *lck;
133         NTSTATUS status;
134
135         lck = talloc(mem_ctx, struct odb_lock);
136         if (lck == NULL) {
137                 return NULL;
138         }
139
140         lck->odb = talloc_reference(lck, odb);
141         lck->key.dptr = talloc_memdup(lck, file_key->data, file_key->length);
142         lck->key.dsize = file_key->length;
143         if (lck->key.dptr == NULL) {
144                 talloc_free(lck);
145                 return NULL;
146         }
147
148         if (tdb_chainlock(odb->w->tdb, lck->key) != 0) {
149                 talloc_free(lck);
150                 return NULL;
151         }
152
153         ZERO_STRUCT(lck->can_open);
154
155         talloc_set_destructor(lck, odb_lock_destructor);
156
157         status = odb_pull_record(lck, &lck->file);
158         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
159                 /* initialise a blank structure */
160                 ZERO_STRUCT(lck->file);
161         } else if (!NT_STATUS_IS_OK(status)) {
162                 talloc_free(lck);
163                 return NULL;
164         }
165         
166         return lck;
167 }
168
169 static DATA_BLOB odb_tdb_get_key(TALLOC_CTX *mem_ctx, struct odb_lock *lck)
170 {
171         return data_blob_talloc(mem_ctx, lck->key.dptr, lck->key.dsize);
172 }
173
174
175 /*
176   determine if two odb_entry structures conflict
177
178   return NT_STATUS_OK on no conflict
179 */
180 static NTSTATUS share_conflict(struct opendb_entry *e1,
181                                uint32_t stream_id,
182                                uint32_t share_access,
183                                uint32_t access_mask)
184 {
185         /* if either open involves no read.write or delete access then
186            it can't conflict */
187         if (!(e1->access_mask & (SEC_FILE_WRITE_DATA |
188                                  SEC_FILE_APPEND_DATA |
189                                  SEC_FILE_READ_DATA |
190                                  SEC_FILE_EXECUTE |
191                                  SEC_STD_DELETE))) {
192                 return NT_STATUS_OK;
193         }
194         if (!(access_mask & (SEC_FILE_WRITE_DATA |
195                              SEC_FILE_APPEND_DATA |
196                              SEC_FILE_READ_DATA |
197                              SEC_FILE_EXECUTE |
198                              SEC_STD_DELETE))) {
199                 return NT_STATUS_OK;
200         }
201
202         /* data IO access masks. This is skipped if the two open handles
203            are on different streams (as in that case the masks don't
204            interact) */
205         if (e1->stream_id != stream_id) {
206                 return NT_STATUS_OK;
207         }
208
209 #define CHECK_MASK(am, right, sa, share) \
210         if (((am) & (right)) && !((sa) & (share))) return NT_STATUS_SHARING_VIOLATION
211
212         CHECK_MASK(e1->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
213                    share_access, NTCREATEX_SHARE_ACCESS_WRITE);
214         CHECK_MASK(access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
215                    e1->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
216         
217         CHECK_MASK(e1->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
218                    share_access, NTCREATEX_SHARE_ACCESS_READ);
219         CHECK_MASK(access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
220                    e1->share_access, NTCREATEX_SHARE_ACCESS_READ);
221
222         CHECK_MASK(e1->access_mask, SEC_STD_DELETE,
223                    share_access, NTCREATEX_SHARE_ACCESS_DELETE);
224         CHECK_MASK(access_mask, SEC_STD_DELETE,
225                    e1->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
226 #undef CHECK_MASK
227         return NT_STATUS_OK;
228 }
229
230 /*
231   pull a record, translating from the db format to the opendb_file structure defined
232   in opendb.idl
233 */
234 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file)
235 {
236         struct odb_context *odb = lck->odb;
237         TDB_DATA dbuf;
238         DATA_BLOB blob;
239         enum ndr_err_code ndr_err;
240
241         dbuf = tdb_fetch(odb->w->tdb, lck->key);
242         if (dbuf.dptr == NULL) {
243                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
244         }
245
246         blob.data = dbuf.dptr;
247         blob.length = dbuf.dsize;
248
249         ndr_err = ndr_pull_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_pull_flags_fn_t)ndr_pull_opendb_file);
250         free(dbuf.dptr);
251         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
252                 return ndr_map_error2ntstatus(ndr_err);
253         }
254
255         return NT_STATUS_OK;
256 }
257
258 /*
259   push a record, translating from the opendb_file structure defined in opendb.idl
260 */
261 static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file)
262 {
263         struct odb_context *odb = lck->odb;
264         TDB_DATA dbuf;
265         DATA_BLOB blob;
266         enum ndr_err_code ndr_err;
267         int ret;
268
269         if (file->num_entries == 0) {
270                 ret = tdb_delete(odb->w->tdb, lck->key);
271                 if (ret != 0) {
272                         return NT_STATUS_INTERNAL_DB_CORRUPTION;
273                 }
274                 return NT_STATUS_OK;
275         }
276
277         ndr_err = ndr_push_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_push_flags_fn_t)ndr_push_opendb_file);
278         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
279                 return ndr_map_error2ntstatus(ndr_err);
280         }
281
282         dbuf.dptr = blob.data;
283         dbuf.dsize = blob.length;
284                 
285         ret = tdb_store(odb->w->tdb, lck->key, dbuf, TDB_REPLACE);
286         data_blob_free(&blob);
287         if (ret != 0) {
288                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
289         }
290
291         return NT_STATUS_OK;
292 }
293
294 /*
295   send an oplock break to a client
296 */
297 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
298                                       struct opendb_entry *e,
299                                       uint8_t level)
300 {
301         NTSTATUS status;
302         struct opendb_oplock_break op_break;
303         DATA_BLOB blob;
304
305         ZERO_STRUCT(op_break);
306
307         /* tell the server handling this open file about the need to send the client
308            a break */
309         op_break.file_handle    = e->file_handle;
310         op_break.level          = level;
311
312         blob = data_blob_const(&op_break, sizeof(op_break));
313
314         status = messaging_send(msg_ctx, e->server,
315                                 MSG_NTVFS_OPLOCK_BREAK, &blob);
316         NT_STATUS_NOT_OK_RETURN(status);
317
318         return NT_STATUS_OK;
319 }
320
321 static bool access_attributes_only(uint32_t access_mask,
322                                    uint32_t open_disposition,
323                                    bool break_to_none)
324 {
325         switch (open_disposition) {
326         case NTCREATEX_DISP_SUPERSEDE:
327         case NTCREATEX_DISP_OVERWRITE_IF:
328         case NTCREATEX_DISP_OVERWRITE:
329                 return false;
330         default:
331                 break;
332         }
333
334         if (break_to_none) {
335                 return false;
336         }
337
338 #define CHECK_MASK(m,g) ((m) && (((m) & ~(g))==0) && (((m) & (g)) != 0))
339         return CHECK_MASK(access_mask,
340                           SEC_STD_SYNCHRONIZE |
341                           SEC_FILE_READ_ATTRIBUTE |
342                           SEC_FILE_WRITE_ATTRIBUTE);
343 #undef CHECK_MASK
344 }
345
346 static NTSTATUS odb_tdb_open_can_internal(struct odb_context *odb,
347                                           const struct opendb_file *file,
348                                           uint32_t stream_id, uint32_t share_access,
349                                           uint32_t access_mask, bool delete_on_close,
350                                           uint32_t open_disposition, bool break_to_none,
351                                           bool *_attrs_only)
352 {
353         NTSTATUS status;
354         uint32_t i;
355         bool attrs_only = false;
356
357         /* see if anyone has an oplock, which we need to break */
358         for (i=0;i<file->num_entries;i++) {
359                 if (file->entries[i].oplock_level == OPLOCK_BATCH) {
360                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
361                         /* if this is an attribute only access
362                          * it doesn't conflict with a BACTCH oplock
363                          * but we'll not grant the oplock below
364                          */
365                         attrs_only = access_attributes_only(access_mask,
366                                                             open_disposition,
367                                                             break_to_none);
368                         if (attrs_only) {
369                                 break;
370                         }
371                         /* a batch oplock caches close calls, which
372                            means the client application might have
373                            already closed the file. We have to allow
374                            this close to propogate by sending a oplock
375                            break request and suspending this call
376                            until the break is acknowledged or the file
377                            is closed */
378                         if (break_to_none ||
379                             !file->entries[i].allow_level_II_oplock) {
380                                 oplock_return = OPLOCK_BREAK_TO_NONE;
381                         }
382                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
383                                               &file->entries[i],
384                                               oplock_return);
385                         return NT_STATUS_OPLOCK_NOT_GRANTED;
386                 }
387         }
388
389         if (file->delete_on_close) {
390                 /* while delete on close is set, no new opens are allowed */
391                 return NT_STATUS_DELETE_PENDING;
392         }
393
394         if (file->num_entries != 0 && delete_on_close) {
395                 return NT_STATUS_SHARING_VIOLATION;
396         }
397
398         /* check for sharing violations */
399         for (i=0;i<file->num_entries;i++) {
400                 status = share_conflict(&file->entries[i], stream_id,
401                                         share_access, access_mask);
402                 NT_STATUS_NOT_OK_RETURN(status);
403         }
404
405         /* we now know the open could succeed, but we need to check
406            for any exclusive oplocks. We can't grant a second open
407            till these are broken. Note that we check for batch oplocks
408            before checking for sharing violations, and check for
409            exclusive oplocks afterwards. */
410         for (i=0;i<file->num_entries;i++) {
411                 if (file->entries[i].oplock_level == OPLOCK_EXCLUSIVE) {
412                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
413                         /* if this is an attribute only access
414                          * it doesn't conflict with an EXCLUSIVE oplock
415                          * but we'll not grant the oplock below
416                          */
417                         attrs_only = access_attributes_only(access_mask,
418                                                             open_disposition,
419                                                             break_to_none);
420                         if (attrs_only) {
421                                 break;
422                         }
423                         /*
424                          * send an oplock break to the holder of the
425                          * oplock and tell caller to retry later
426                          */
427                         if (break_to_none ||
428                             !file->entries[i].allow_level_II_oplock) {
429                                 oplock_return = OPLOCK_BREAK_TO_NONE;
430                         }
431                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
432                                               &file->entries[i],
433                                               oplock_return);
434                         return NT_STATUS_OPLOCK_NOT_GRANTED;
435                 }
436         }
437
438         if (_attrs_only) {
439                 *_attrs_only = attrs_only;
440         }
441         return NT_STATUS_OK;
442 }
443
444 /*
445   register an open file in the open files database.
446   The share_access rules are implemented by odb_can_open()
447   and it's needed to call odb_can_open() before
448   odb_open_file() otherwise NT_STATUS_INTERNAL_ERROR is returned
449
450   Note that the path is only used by the delete on close logic, not
451   for comparing with other filenames
452 */
453 static NTSTATUS odb_tdb_open_file(struct odb_lock *lck,
454                                   void *file_handle, const char *path,
455                                   int *fd, bool allow_level_II_oplock,
456                                   uint32_t oplock_level, uint32_t *oplock_granted)
457 {
458         struct odb_context *odb = lck->odb;
459
460         if (!lck->can_open.e) {
461                 return NT_STATUS_INTERNAL_ERROR;
462         }
463
464         if (odb->oplocks == false) {
465                 oplock_level = OPLOCK_NONE;
466         }
467
468         if (!oplock_granted) {
469                 oplock_level = OPLOCK_NONE;
470         }
471
472         if (lck->file.path == NULL) {
473                 lck->file.path = talloc_strdup(lck, path);
474                 NT_STATUS_HAVE_NO_MEMORY(lck->file.path);
475         }
476
477         /*
478           possibly grant an exclusive, batch or level2 oplock
479         */
480         if (lck->can_open.attrs_only) {
481                 oplock_level    = OPLOCK_NONE;
482         } else if (oplock_level == OPLOCK_EXCLUSIVE) {
483                 if (lck->file.num_entries == 0) {
484                         oplock_level    = OPLOCK_EXCLUSIVE;
485                 } else if (allow_level_II_oplock) {
486                         oplock_level    = OPLOCK_LEVEL_II;
487                 } else {
488                         oplock_level    = OPLOCK_NONE;
489                 }
490         } else if (oplock_level == OPLOCK_BATCH) {
491                 if (lck->file.num_entries == 0) {
492                         oplock_level    = OPLOCK_BATCH;
493                 } else if (allow_level_II_oplock) {
494                         oplock_level    = OPLOCK_LEVEL_II;
495                 } else {
496                         oplock_level    = OPLOCK_NONE;
497                 }
498         } else if (oplock_level == OPLOCK_LEVEL_II) {
499                 oplock_level    = OPLOCK_LEVEL_II;
500         } else {
501                 oplock_level    = OPLOCK_NONE;
502         }
503
504         lck->can_open.e->file_handle            = file_handle;
505         lck->can_open.e->fd                     = fd;
506         lck->can_open.e->allow_level_II_oplock  = allow_level_II_oplock;
507         lck->can_open.e->oplock_level           = oplock_level;
508
509         if (odb->lease_ctx && fd) {
510                 NTSTATUS status;
511                 status = sys_lease_setup(odb->lease_ctx, lck->can_open.e);
512                 NT_STATUS_NOT_OK_RETURN(status);
513         }
514
515         if (oplock_granted) {
516                 if (lck->can_open.e->oplock_level == OPLOCK_EXCLUSIVE) {
517                         *oplock_granted = EXCLUSIVE_OPLOCK_RETURN;
518                 } else if (lck->can_open.e->oplock_level == OPLOCK_BATCH) {
519                         *oplock_granted = BATCH_OPLOCK_RETURN;
520                 } else if (lck->can_open.e->oplock_level == OPLOCK_LEVEL_II) {
521                         *oplock_granted = LEVEL_II_OPLOCK_RETURN;
522                 } else {
523                         *oplock_granted = NO_OPLOCK_RETURN;
524                 }
525         }
526
527         /* it doesn't conflict, so add it to the end */
528         lck->file.entries = talloc_realloc(lck, lck->file.entries,
529                                            struct opendb_entry,
530                                            lck->file.num_entries+1);
531         NT_STATUS_HAVE_NO_MEMORY(lck->file.entries);
532
533         lck->file.entries[lck->file.num_entries] = *lck->can_open.e;
534         lck->file.num_entries++;
535
536         talloc_free(lck->can_open.e);
537         lck->can_open.e = NULL;
538
539         return odb_push_record(lck, &lck->file);
540 }
541
542
543 /*
544   register a pending open file in the open files database
545 */
546 static NTSTATUS odb_tdb_open_file_pending(struct odb_lock *lck, void *private)
547 {
548         struct odb_context *odb = lck->odb;
549
550         if (lck->file.path == NULL) {
551                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
552         }
553
554         lck->file.pending = talloc_realloc(lck, lck->file.pending,
555                                            struct opendb_pending,
556                                            lck->file.num_pending+1);
557         NT_STATUS_HAVE_NO_MEMORY(lck->file.pending);
558
559         lck->file.pending[lck->file.num_pending].server = odb->ntvfs_ctx->server_id;
560         lck->file.pending[lck->file.num_pending].notify_ptr = private;
561
562         lck->file.num_pending++;
563
564         return odb_push_record(lck, &lck->file);
565 }
566
567
568 /*
569   remove a opendb entry
570 */
571 static NTSTATUS odb_tdb_close_file(struct odb_lock *lck, void *file_handle,
572                                    const char **_delete_path)
573 {
574         struct odb_context *odb = lck->odb;
575         const char *delete_path = NULL;
576         int i;
577
578         if (lck->file.path == NULL) {
579                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
580         }
581
582         /* find the entry, and delete it */
583         for (i=0;i<lck->file.num_entries;i++) {
584                 if (file_handle == lck->file.entries[i].file_handle &&
585                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.entries[i].server)) {
586                         if (lck->file.entries[i].delete_on_close) {
587                                 lck->file.delete_on_close = true;
588                         }
589                         if (odb->lease_ctx && lck->file.entries[i].fd) {
590                                 NTSTATUS status;
591                                 status = sys_lease_remove(odb->lease_ctx, &lck->file.entries[i]);
592                                 NT_STATUS_NOT_OK_RETURN(status);
593                         }
594                         if (i < lck->file.num_entries-1) {
595                                 memmove(lck->file.entries+i, lck->file.entries+i+1,
596                                         (lck->file.num_entries - (i+1)) *
597                                         sizeof(struct opendb_entry));
598                         }
599                         break;
600                 }
601         }
602
603         if (i == lck->file.num_entries) {
604                 return NT_STATUS_UNSUCCESSFUL;
605         }
606
607         /* send any pending notifications, removing them once sent */
608         for (i=0;i<lck->file.num_pending;i++) {
609                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
610                                    lck->file.pending[i].server,
611                                    MSG_PVFS_RETRY_OPEN,
612                                    lck->file.pending[i].notify_ptr);
613         }
614         lck->file.num_pending = 0;
615
616         lck->file.num_entries--;
617
618         if (lck->file.num_entries == 0 && lck->file.delete_on_close) {
619                 delete_path = lck->file.path;
620         }
621
622         if (_delete_path) {
623                 *_delete_path = delete_path;
624         }
625
626         return odb_push_record(lck, &lck->file);
627 }
628
629 /*
630   update the oplock level of the client
631 */
632 static NTSTATUS odb_tdb_update_oplock(struct odb_lock *lck, void *file_handle,
633                                       uint32_t oplock_level)
634 {
635         struct odb_context *odb = lck->odb;
636         int i;
637
638         if (lck->file.path == NULL) {
639                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
640         }
641
642         /* find the entry, and update it */
643         for (i=0;i<lck->file.num_entries;i++) {
644                 if (file_handle == lck->file.entries[i].file_handle &&
645                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.entries[i].server)) {
646                         lck->file.entries[i].oplock_level = oplock_level;
647
648                         if (odb->lease_ctx && lck->file.entries[i].fd) {
649                                 NTSTATUS status;
650                                 status = sys_lease_update(odb->lease_ctx, &lck->file.entries[i]);
651                                 NT_STATUS_NOT_OK_RETURN(status);
652                         }
653
654                         break;
655                 }
656         }
657
658         if (i == lck->file.num_entries) {
659                 return NT_STATUS_UNSUCCESSFUL;
660         }
661
662         /* send any pending notifications, removing them once sent */
663         for (i=0;i<lck->file.num_pending;i++) {
664                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
665                                    lck->file.pending[i].server,
666                                    MSG_PVFS_RETRY_OPEN,
667                                    lck->file.pending[i].notify_ptr);
668         }
669         lck->file.num_pending = 0;
670
671         return odb_push_record(lck, &lck->file);
672 }
673
674 /*
675   send oplocks breaks to none to all level2 holders
676 */
677 static NTSTATUS odb_tdb_break_oplocks(struct odb_lock *lck)
678 {
679         struct odb_context *odb = lck->odb;
680         int i;
681         bool modified = false;
682
683         /* see if anyone has an oplock, which we need to break */
684         for (i=0;i<lck->file.num_entries;i++) {
685                 if (lck->file.entries[i].oplock_level == OPLOCK_LEVEL_II) {
686                         /*
687                          * there could be multiple level2 oplocks
688                          * and we just send a break to none to all of them
689                          * without waiting for a release
690                          */
691                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
692                                               &lck->file.entries[i],
693                                               OPLOCK_BREAK_TO_NONE);
694                         lck->file.entries[i].oplock_level = OPLOCK_NONE;
695                         modified = true;
696                 }
697         }
698
699         if (modified) {
700                 return odb_push_record(lck, &lck->file);
701         }
702         return NT_STATUS_OK;
703 }
704
705 /*
706   remove a pending opendb entry
707 */
708 static NTSTATUS odb_tdb_remove_pending(struct odb_lock *lck, void *private)
709 {
710         struct odb_context *odb = lck->odb;
711         int i;
712
713         if (lck->file.path == NULL) {
714                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
715         }
716
717         /* find the entry, and delete it */
718         for (i=0;i<lck->file.num_pending;i++) {
719                 if (private == lck->file.pending[i].notify_ptr &&
720                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.pending[i].server)) {
721                         if (i < lck->file.num_pending-1) {
722                                 memmove(lck->file.pending+i, lck->file.pending+i+1,
723                                         (lck->file.num_pending - (i+1)) *
724                                         sizeof(struct opendb_pending));
725                         }
726                         break;
727                 }
728         }
729
730         if (i == lck->file.num_pending) {
731                 return NT_STATUS_UNSUCCESSFUL;
732         }
733
734         lck->file.num_pending--;
735         
736         return odb_push_record(lck, &lck->file);
737 }
738
739
740 /*
741   rename the path in a open file
742 */
743 static NTSTATUS odb_tdb_rename(struct odb_lock *lck, const char *path)
744 {
745         if (lck->file.path == NULL) {
746                 /* not having the record at all is OK */
747                 return NT_STATUS_OK;
748         }
749
750         lck->file.path = talloc_strdup(lck, path);
751         NT_STATUS_HAVE_NO_MEMORY(lck->file.path);
752
753         return odb_push_record(lck, &lck->file);
754 }
755
756 /*
757   get the path of an open file
758 */
759 static NTSTATUS odb_tdb_get_path(struct odb_lock *lck, const char **path)
760 {
761         *path = NULL;
762
763         /* we don't ignore NT_STATUS_OBJECT_NAME_NOT_FOUND here */
764         if (lck->file.path == NULL) {
765                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
766         }
767
768         *path = lck->file.path;
769
770         return NT_STATUS_OK;
771 }
772
773 /*
774   update delete on close flag on an open file
775 */
776 static NTSTATUS odb_tdb_set_delete_on_close(struct odb_lock *lck, bool del_on_close)
777 {
778         if (lck->file.path == NULL) {
779                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
780         }
781
782         lck->file.delete_on_close = del_on_close;
783
784         return odb_push_record(lck, &lck->file);
785 }
786
787 /*
788   return the current value of the delete_on_close bit, and how many
789   people still have the file open
790 */
791 static NTSTATUS odb_tdb_get_delete_on_close(struct odb_context *odb, 
792                                             DATA_BLOB *key, bool *del_on_close)
793 {
794         struct odb_lock *lck;
795
796         (*del_on_close) = false;
797
798         lck = odb_lock(odb, odb, key);
799         NT_STATUS_HAVE_NO_MEMORY(lck);
800
801         (*del_on_close) = lck->file.delete_on_close;
802
803         talloc_free(lck);
804
805         return NT_STATUS_OK;
806 }
807
808
809 /*
810   determine if a file can be opened with the given share_access,
811   create_options and access_mask
812 */
813 static NTSTATUS odb_tdb_can_open(struct odb_lock *lck,
814                                  uint32_t stream_id, uint32_t share_access,
815                                  uint32_t access_mask, bool delete_on_close,
816                                  uint32_t open_disposition, bool break_to_none)
817 {
818         struct odb_context *odb = lck->odb;
819         NTSTATUS status;
820
821         status = odb_tdb_open_can_internal(odb, &lck->file, stream_id,
822                                            share_access, access_mask,
823                                            delete_on_close, open_disposition,
824                                            break_to_none, &lck->can_open.attrs_only);
825         NT_STATUS_NOT_OK_RETURN(status);
826
827         lck->can_open.e = talloc(lck, struct opendb_entry);
828         NT_STATUS_HAVE_NO_MEMORY(lck->can_open.e);
829
830         lck->can_open.e->server                 = odb->ntvfs_ctx->server_id;
831         lck->can_open.e->file_handle            = NULL;
832         lck->can_open.e->fd                     = NULL;
833         lck->can_open.e->stream_id              = stream_id;
834         lck->can_open.e->share_access           = share_access;
835         lck->can_open.e->access_mask            = access_mask;
836         lck->can_open.e->delete_on_close        = delete_on_close;
837         lck->can_open.e->allow_level_II_oplock  = false;
838         lck->can_open.e->oplock_level           = OPLOCK_NONE;
839
840         return NT_STATUS_OK;
841 }
842
843
844 static const struct opendb_ops opendb_tdb_ops = {
845         .odb_init                = odb_tdb_init,
846         .odb_lock                = odb_tdb_lock,
847         .odb_get_key             = odb_tdb_get_key,
848         .odb_open_file           = odb_tdb_open_file,
849         .odb_open_file_pending   = odb_tdb_open_file_pending,
850         .odb_close_file          = odb_tdb_close_file,
851         .odb_remove_pending      = odb_tdb_remove_pending,
852         .odb_rename              = odb_tdb_rename,
853         .odb_get_path            = odb_tdb_get_path,
854         .odb_set_delete_on_close = odb_tdb_set_delete_on_close,
855         .odb_get_delete_on_close = odb_tdb_get_delete_on_close,
856         .odb_can_open            = odb_tdb_can_open,
857         .odb_update_oplock       = odb_tdb_update_oplock,
858         .odb_break_oplocks       = odb_tdb_break_oplocks
859 };
860
861
862 void odb_tdb_init_ops(void)
863 {
864         sys_lease_init();
865         odb_set_ops(&opendb_tdb_ops);
866 }