944ec866314fb0ea4c6d7eb6315b489053d90cb2
[bbaumbach/samba-autobuild/.git] / source4 / ntvfs / common / opendb_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the open files database, tdb backend. It implements shared
23   storage of what files are open between server instances, and
24   implements the rules of shared access to files.
25
26   The caller needs to provide a file_key, which specifies what file
27   they are talking about. This needs to be a unique key across all
28   filesystems, and is usually implemented in terms of a device/inode
29   pair.
30
31   Before any operations can be performed the caller needs to establish
32   a lock on the record associated with file_key. That is done by
33   calling odb_lock(). The caller releases this lock by calling
34   talloc_free() on the returned handle.
35
36   All other operations on a record are done by passing the odb_lock()
37   handle back to this module. The handle contains internal
38   information about what file_key is being operated on.
39 */
40
41 #include "includes.h"
42 #include "system/filesys.h"
43 #include "../tdb/include/tdb.h"
44 #include "messaging/messaging.h"
45 #include "tdb_wrap.h"
46 #include "lib/messaging/irpc.h"
47 #include "librpc/gen_ndr/ndr_opendb.h"
48 #include "ntvfs/ntvfs.h"
49 #include "ntvfs/common/ntvfs_common.h"
50 #include "cluster/cluster.h"
51 #include "param/param.h"
52 #include "ntvfs/sysdep/sys_lease.h"
53
54 struct odb_context {
55         struct tdb_wrap *w;
56         struct ntvfs_context *ntvfs_ctx;
57         bool oplocks;
58         struct sys_lease_context *lease_ctx;
59 };
60
61 /*
62   an odb lock handle. You must obtain one of these using odb_lock() before doing
63   any other operations. 
64 */
65 struct odb_lock {
66         struct odb_context *odb;
67         TDB_DATA key;
68
69         struct opendb_file file;
70
71         struct {
72                 struct opendb_entry *e;
73                 bool attrs_only;
74         } can_open;
75 };
76
77 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
78                                       struct opendb_entry *e,
79                                       uint8_t level);
80
81 /*
82   Open up the openfiles.tdb database. Close it down using
83   talloc_free(). We need the messaging_ctx to allow for pending open
84   notifications.
85 */
86 static struct odb_context *odb_tdb_init(TALLOC_CTX *mem_ctx, 
87                                         struct ntvfs_context *ntvfs_ctx)
88 {
89         struct odb_context *odb;
90
91         odb = talloc(mem_ctx, struct odb_context);
92         if (odb == NULL) {
93                 return NULL;
94         }
95
96         odb->w = cluster_tdb_tmp_open(odb, ntvfs_ctx->lp_ctx, "openfiles.tdb", TDB_DEFAULT);
97         if (odb->w == NULL) {
98                 talloc_free(odb);
99                 return NULL;
100         }
101
102         odb->ntvfs_ctx = ntvfs_ctx;
103
104         odb->oplocks = share_bool_option(ntvfs_ctx->config, SHARE_OPLOCKS, SHARE_OPLOCKS_DEFAULT);
105
106         odb->lease_ctx = sys_lease_context_create(ntvfs_ctx->config, odb,
107                                                   ntvfs_ctx->event_ctx,
108                                                   ntvfs_ctx->msg_ctx,
109                                                   odb_oplock_break_send);
110
111         return odb;
112 }
113
114 /*
115   destroy a lock on the database
116 */
117 static int odb_lock_destructor(struct odb_lock *lck)
118 {
119         tdb_chainunlock(lck->odb->w->tdb, lck->key);
120         return 0;
121 }
122
123 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file);
124
125 /*
126   get a lock on a entry in the odb. This call returns a lock handle,
127   which the caller should unlock using talloc_free().
128 */
129 static struct odb_lock *odb_tdb_lock(TALLOC_CTX *mem_ctx,
130                                      struct odb_context *odb, DATA_BLOB *file_key)
131 {
132         struct odb_lock *lck;
133         NTSTATUS status;
134
135         lck = talloc(mem_ctx, struct odb_lock);
136         if (lck == NULL) {
137                 return NULL;
138         }
139
140         lck->odb = talloc_reference(lck, odb);
141         lck->key.dptr = talloc_memdup(lck, file_key->data, file_key->length);
142         lck->key.dsize = file_key->length;
143         if (lck->key.dptr == NULL) {
144                 talloc_free(lck);
145                 return NULL;
146         }
147
148         if (tdb_chainlock(odb->w->tdb, lck->key) != 0) {
149                 talloc_free(lck);
150                 return NULL;
151         }
152
153         ZERO_STRUCT(lck->can_open);
154
155         talloc_set_destructor(lck, odb_lock_destructor);
156
157         status = odb_pull_record(lck, &lck->file);
158         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
159                 /* initialise a blank structure */
160                 ZERO_STRUCT(lck->file);
161         } else if (!NT_STATUS_IS_OK(status)) {
162                 talloc_free(lck);
163                 return NULL;
164         }
165         
166         return lck;
167 }
168
169 static DATA_BLOB odb_tdb_get_key(TALLOC_CTX *mem_ctx, struct odb_lock *lck)
170 {
171         return data_blob_talloc(mem_ctx, lck->key.dptr, lck->key.dsize);
172 }
173
174
175 /*
176   determine if two odb_entry structures conflict
177
178   return NT_STATUS_OK on no conflict
179 */
180 static NTSTATUS share_conflict(struct opendb_entry *e1,
181                                uint32_t stream_id,
182                                uint32_t share_access,
183                                uint32_t access_mask)
184 {
185         /* if either open involves no read.write or delete access then
186            it can't conflict */
187         if (!(e1->access_mask & (SEC_FILE_WRITE_DATA |
188                                  SEC_FILE_APPEND_DATA |
189                                  SEC_FILE_READ_DATA |
190                                  SEC_FILE_EXECUTE |
191                                  SEC_STD_DELETE))) {
192                 return NT_STATUS_OK;
193         }
194         if (!(access_mask & (SEC_FILE_WRITE_DATA |
195                              SEC_FILE_APPEND_DATA |
196                              SEC_FILE_READ_DATA |
197                              SEC_FILE_EXECUTE |
198                              SEC_STD_DELETE))) {
199                 return NT_STATUS_OK;
200         }
201
202         /* data IO access masks. This is skipped if the two open handles
203            are on different streams (as in that case the masks don't
204            interact) */
205         if (e1->stream_id != stream_id) {
206                 return NT_STATUS_OK;
207         }
208
209 #define CHECK_MASK(am, right, sa, share) \
210         if (((am) & (right)) && !((sa) & (share))) return NT_STATUS_SHARING_VIOLATION
211
212         CHECK_MASK(e1->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
213                    share_access, NTCREATEX_SHARE_ACCESS_WRITE);
214         CHECK_MASK(access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
215                    e1->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
216         
217         CHECK_MASK(e1->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
218                    share_access, NTCREATEX_SHARE_ACCESS_READ);
219         CHECK_MASK(access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
220                    e1->share_access, NTCREATEX_SHARE_ACCESS_READ);
221
222         CHECK_MASK(e1->access_mask, SEC_STD_DELETE,
223                    share_access, NTCREATEX_SHARE_ACCESS_DELETE);
224         CHECK_MASK(access_mask, SEC_STD_DELETE,
225                    e1->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
226 #undef CHECK_MASK
227         return NT_STATUS_OK;
228 }
229
230 /*
231   pull a record, translating from the db format to the opendb_file structure defined
232   in opendb.idl
233 */
234 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file)
235 {
236         struct odb_context *odb = lck->odb;
237         TDB_DATA dbuf;
238         DATA_BLOB blob;
239         enum ndr_err_code ndr_err;
240
241         dbuf = tdb_fetch(odb->w->tdb, lck->key);
242         if (dbuf.dptr == NULL) {
243                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
244         }
245
246         blob.data = dbuf.dptr;
247         blob.length = dbuf.dsize;
248
249         ndr_err = ndr_pull_struct_blob(&blob, lck, file, (ndr_pull_flags_fn_t)ndr_pull_opendb_file);
250         free(dbuf.dptr);
251         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
252                 return ndr_map_error2ntstatus(ndr_err);
253         }
254
255         return NT_STATUS_OK;
256 }
257
258 /*
259   push a record, translating from the opendb_file structure defined in opendb.idl
260 */
261 static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file)
262 {
263         struct odb_context *odb = lck->odb;
264         TDB_DATA dbuf;
265         DATA_BLOB blob;
266         enum ndr_err_code ndr_err;
267         int ret;
268
269         if (file->num_entries == 0) {
270                 ret = tdb_delete(odb->w->tdb, lck->key);
271                 if (ret != 0) {
272                         return NT_STATUS_INTERNAL_DB_CORRUPTION;
273                 }
274                 return NT_STATUS_OK;
275         }
276
277         ndr_err = ndr_push_struct_blob(&blob, lck, file, (ndr_push_flags_fn_t)ndr_push_opendb_file);
278         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
279                 return ndr_map_error2ntstatus(ndr_err);
280         }
281
282         dbuf.dptr = blob.data;
283         dbuf.dsize = blob.length;
284                 
285         ret = tdb_store(odb->w->tdb, lck->key, dbuf, TDB_REPLACE);
286         data_blob_free(&blob);
287         if (ret != 0) {
288                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
289         }
290
291         return NT_STATUS_OK;
292 }
293
294 /*
295   send an oplock break to a client
296 */
297 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
298                                       struct opendb_entry *e,
299                                       uint8_t level)
300 {
301         NTSTATUS status;
302         struct opendb_oplock_break op_break;
303         DATA_BLOB blob;
304
305         ZERO_STRUCT(op_break);
306
307         /* tell the server handling this open file about the need to send the client
308            a break */
309         op_break.file_handle    = e->file_handle;
310         op_break.level          = level;
311
312         blob = data_blob_const(&op_break, sizeof(op_break));
313
314         status = messaging_send(msg_ctx, e->server,
315                                 MSG_NTVFS_OPLOCK_BREAK, &blob);
316         NT_STATUS_NOT_OK_RETURN(status);
317
318         return NT_STATUS_OK;
319 }
320
321 static bool access_attributes_only(uint32_t access_mask,
322                                    uint32_t open_disposition,
323                                    bool break_to_none)
324 {
325         switch (open_disposition) {
326         case NTCREATEX_DISP_SUPERSEDE:
327         case NTCREATEX_DISP_OVERWRITE_IF:
328         case NTCREATEX_DISP_OVERWRITE:
329                 return false;
330         default:
331                 break;
332         }
333
334         if (break_to_none) {
335                 return false;
336         }
337
338 #define CHECK_MASK(m,g) ((m) && (((m) & ~(g))==0) && (((m) & (g)) != 0))
339         return CHECK_MASK(access_mask,
340                           SEC_STD_SYNCHRONIZE |
341                           SEC_FILE_READ_ATTRIBUTE |
342                           SEC_FILE_WRITE_ATTRIBUTE);
343 #undef CHECK_MASK
344 }
345
346 static NTSTATUS odb_tdb_open_can_internal(struct odb_context *odb,
347                                           const struct opendb_file *file,
348                                           uint32_t stream_id, uint32_t share_access,
349                                           uint32_t access_mask, bool delete_on_close,
350                                           uint32_t open_disposition, bool break_to_none,
351                                           bool *_attrs_only)
352 {
353         NTSTATUS status;
354         uint32_t i;
355         bool attrs_only = false;
356
357         /* see if anyone has an oplock, which we need to break */
358         for (i=0;i<file->num_entries;i++) {
359                 if (file->entries[i].oplock_level == OPLOCK_BATCH) {
360                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
361                         /* if this is an attribute only access
362                          * it doesn't conflict with a BACTCH oplock
363                          * but we'll not grant the oplock below
364                          */
365                         attrs_only = access_attributes_only(access_mask,
366                                                             open_disposition,
367                                                             break_to_none);
368                         if (attrs_only) {
369                                 break;
370                         }
371                         /* a batch oplock caches close calls, which
372                            means the client application might have
373                            already closed the file. We have to allow
374                            this close to propogate by sending a oplock
375                            break request and suspending this call
376                            until the break is acknowledged or the file
377                            is closed */
378                         if (break_to_none ||
379                             !file->entries[i].allow_level_II_oplock) {
380                                 oplock_return = OPLOCK_BREAK_TO_NONE;
381                         }
382                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
383                                               &file->entries[i],
384                                               oplock_return);
385                         return NT_STATUS_OPLOCK_NOT_GRANTED;
386                 }
387         }
388
389         if (file->delete_on_close) {
390                 /* while delete on close is set, no new opens are allowed */
391                 return NT_STATUS_DELETE_PENDING;
392         }
393
394         if (file->num_entries != 0 && delete_on_close) {
395                 return NT_STATUS_SHARING_VIOLATION;
396         }
397
398         /* check for sharing violations */
399         for (i=0;i<file->num_entries;i++) {
400                 status = share_conflict(&file->entries[i], stream_id,
401                                         share_access, access_mask);
402                 NT_STATUS_NOT_OK_RETURN(status);
403         }
404
405         /* we now know the open could succeed, but we need to check
406            for any exclusive oplocks. We can't grant a second open
407            till these are broken. Note that we check for batch oplocks
408            before checking for sharing violations, and check for
409            exclusive oplocks afterwards. */
410         for (i=0;i<file->num_entries;i++) {
411                 if (file->entries[i].oplock_level == OPLOCK_EXCLUSIVE) {
412                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
413                         /* if this is an attribute only access
414                          * it doesn't conflict with an EXCLUSIVE oplock
415                          * but we'll not grant the oplock below
416                          */
417                         attrs_only = access_attributes_only(access_mask,
418                                                             open_disposition,
419                                                             break_to_none);
420                         if (attrs_only) {
421                                 break;
422                         }
423                         /*
424                          * send an oplock break to the holder of the
425                          * oplock and tell caller to retry later
426                          */
427                         if (break_to_none ||
428                             !file->entries[i].allow_level_II_oplock) {
429                                 oplock_return = OPLOCK_BREAK_TO_NONE;
430                         }
431                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
432                                               &file->entries[i],
433                                               oplock_return);
434                         return NT_STATUS_OPLOCK_NOT_GRANTED;
435                 }
436         }
437
438         if (_attrs_only) {
439                 *_attrs_only = attrs_only;
440         }
441         return NT_STATUS_OK;
442 }
443
444 /*
445   register an open file in the open files database.
446   The share_access rules are implemented by odb_can_open()
447   and it's needed to call odb_can_open() before
448   odb_open_file() otherwise NT_STATUS_INTERNAL_ERROR is returned
449
450   Note that the path is only used by the delete on close logic, not
451   for comparing with other filenames
452 */
453 static NTSTATUS odb_tdb_open_file(struct odb_lock *lck,
454                                   void *file_handle, const char *path,
455                                   int *fd, NTTIME open_write_time,
456                                   bool allow_level_II_oplock,
457                                   uint32_t oplock_level, uint32_t *oplock_granted)
458 {
459         struct odb_context *odb = lck->odb;
460
461         if (!lck->can_open.e) {
462                 return NT_STATUS_INTERNAL_ERROR;
463         }
464
465         if (odb->oplocks == false) {
466                 oplock_level = OPLOCK_NONE;
467         }
468
469         if (!oplock_granted) {
470                 oplock_level = OPLOCK_NONE;
471         }
472
473         if (lck->file.path == NULL) {
474                 lck->file.path = talloc_strdup(lck, path);
475                 NT_STATUS_HAVE_NO_MEMORY(lck->file.path);
476         }
477
478         if (lck->file.open_write_time == 0) {
479                 lck->file.open_write_time = open_write_time;
480         }
481
482         /*
483           possibly grant an exclusive, batch or level2 oplock
484         */
485         if (lck->can_open.attrs_only) {
486                 oplock_level    = OPLOCK_NONE;
487         } else if (oplock_level == OPLOCK_EXCLUSIVE) {
488                 if (lck->file.num_entries == 0) {
489                         oplock_level    = OPLOCK_EXCLUSIVE;
490                 } else if (allow_level_II_oplock) {
491                         oplock_level    = OPLOCK_LEVEL_II;
492                 } else {
493                         oplock_level    = OPLOCK_NONE;
494                 }
495         } else if (oplock_level == OPLOCK_BATCH) {
496                 if (lck->file.num_entries == 0) {
497                         oplock_level    = OPLOCK_BATCH;
498                 } else if (allow_level_II_oplock) {
499                         oplock_level    = OPLOCK_LEVEL_II;
500                 } else {
501                         oplock_level    = OPLOCK_NONE;
502                 }
503         } else if (oplock_level == OPLOCK_LEVEL_II) {
504                 oplock_level    = OPLOCK_LEVEL_II;
505         } else {
506                 oplock_level    = OPLOCK_NONE;
507         }
508
509         lck->can_open.e->file_handle            = file_handle;
510         lck->can_open.e->fd                     = fd;
511         lck->can_open.e->allow_level_II_oplock  = allow_level_II_oplock;
512         lck->can_open.e->oplock_level           = oplock_level;
513
514         if (odb->lease_ctx && fd) {
515                 NTSTATUS status;
516                 status = sys_lease_setup(odb->lease_ctx, lck->can_open.e);
517                 NT_STATUS_NOT_OK_RETURN(status);
518         }
519
520         if (oplock_granted) {
521                 if (lck->can_open.e->oplock_level == OPLOCK_EXCLUSIVE) {
522                         *oplock_granted = EXCLUSIVE_OPLOCK_RETURN;
523                 } else if (lck->can_open.e->oplock_level == OPLOCK_BATCH) {
524                         *oplock_granted = BATCH_OPLOCK_RETURN;
525                 } else if (lck->can_open.e->oplock_level == OPLOCK_LEVEL_II) {
526                         *oplock_granted = LEVEL_II_OPLOCK_RETURN;
527                 } else {
528                         *oplock_granted = NO_OPLOCK_RETURN;
529                 }
530         }
531
532         /* it doesn't conflict, so add it to the end */
533         lck->file.entries = talloc_realloc(lck, lck->file.entries,
534                                            struct opendb_entry,
535                                            lck->file.num_entries+1);
536         NT_STATUS_HAVE_NO_MEMORY(lck->file.entries);
537
538         lck->file.entries[lck->file.num_entries] = *lck->can_open.e;
539         lck->file.num_entries++;
540
541         talloc_free(lck->can_open.e);
542         lck->can_open.e = NULL;
543
544         return odb_push_record(lck, &lck->file);
545 }
546
547
548 /*
549   register a pending open file in the open files database
550 */
551 static NTSTATUS odb_tdb_open_file_pending(struct odb_lock *lck, void *private_data)
552 {
553         struct odb_context *odb = lck->odb;
554
555         if (lck->file.path == NULL) {
556                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
557         }
558
559         lck->file.pending = talloc_realloc(lck, lck->file.pending,
560                                            struct opendb_pending,
561                                            lck->file.num_pending+1);
562         NT_STATUS_HAVE_NO_MEMORY(lck->file.pending);
563
564         lck->file.pending[lck->file.num_pending].server = odb->ntvfs_ctx->server_id;
565         lck->file.pending[lck->file.num_pending].notify_ptr = private_data;
566
567         lck->file.num_pending++;
568
569         return odb_push_record(lck, &lck->file);
570 }
571
572
573 /*
574   remove a opendb entry
575 */
576 static NTSTATUS odb_tdb_close_file(struct odb_lock *lck, void *file_handle,
577                                    const char **_delete_path)
578 {
579         struct odb_context *odb = lck->odb;
580         const char *delete_path = NULL;
581         int i;
582
583         if (lck->file.path == NULL) {
584                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
585         }
586
587         /* find the entry, and delete it */
588         for (i=0;i<lck->file.num_entries;i++) {
589                 if (file_handle == lck->file.entries[i].file_handle &&
590                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.entries[i].server)) {
591                         if (lck->file.entries[i].delete_on_close) {
592                                 lck->file.delete_on_close = true;
593                         }
594                         if (odb->lease_ctx && lck->file.entries[i].fd) {
595                                 NTSTATUS status;
596                                 status = sys_lease_remove(odb->lease_ctx, &lck->file.entries[i]);
597                                 NT_STATUS_NOT_OK_RETURN(status);
598                         }
599                         if (i < lck->file.num_entries-1) {
600                                 memmove(lck->file.entries+i, lck->file.entries+i+1,
601                                         (lck->file.num_entries - (i+1)) *
602                                         sizeof(struct opendb_entry));
603                         }
604                         break;
605                 }
606         }
607
608         if (i == lck->file.num_entries) {
609                 return NT_STATUS_UNSUCCESSFUL;
610         }
611
612         /* send any pending notifications, removing them once sent */
613         for (i=0;i<lck->file.num_pending;i++) {
614                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
615                                    lck->file.pending[i].server,
616                                    MSG_PVFS_RETRY_OPEN,
617                                    lck->file.pending[i].notify_ptr);
618         }
619         lck->file.num_pending = 0;
620
621         lck->file.num_entries--;
622
623         if (lck->file.num_entries == 0 && lck->file.delete_on_close) {
624                 delete_path = lck->file.path;
625         }
626
627         if (_delete_path) {
628                 *_delete_path = delete_path;
629         }
630
631         return odb_push_record(lck, &lck->file);
632 }
633
634 /*
635   update the oplock level of the client
636 */
637 static NTSTATUS odb_tdb_update_oplock(struct odb_lock *lck, void *file_handle,
638                                       uint32_t oplock_level)
639 {
640         struct odb_context *odb = lck->odb;
641         int i;
642
643         if (lck->file.path == NULL) {
644                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
645         }
646
647         /* find the entry, and update it */
648         for (i=0;i<lck->file.num_entries;i++) {
649                 if (file_handle == lck->file.entries[i].file_handle &&
650                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.entries[i].server)) {
651                         lck->file.entries[i].oplock_level = oplock_level;
652
653                         if (odb->lease_ctx && lck->file.entries[i].fd) {
654                                 NTSTATUS status;
655                                 status = sys_lease_update(odb->lease_ctx, &lck->file.entries[i]);
656                                 NT_STATUS_NOT_OK_RETURN(status);
657                         }
658
659                         break;
660                 }
661         }
662
663         if (i == lck->file.num_entries) {
664                 return NT_STATUS_UNSUCCESSFUL;
665         }
666
667         /* send any pending notifications, removing them once sent */
668         for (i=0;i<lck->file.num_pending;i++) {
669                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
670                                    lck->file.pending[i].server,
671                                    MSG_PVFS_RETRY_OPEN,
672                                    lck->file.pending[i].notify_ptr);
673         }
674         lck->file.num_pending = 0;
675
676         return odb_push_record(lck, &lck->file);
677 }
678
679 /*
680   send oplocks breaks to none to all level2 holders
681 */
682 static NTSTATUS odb_tdb_break_oplocks(struct odb_lock *lck)
683 {
684         struct odb_context *odb = lck->odb;
685         int i;
686         bool modified = false;
687
688         /* see if anyone has an oplock, which we need to break */
689         for (i=0;i<lck->file.num_entries;i++) {
690                 if (lck->file.entries[i].oplock_level == OPLOCK_LEVEL_II) {
691                         /*
692                          * there could be multiple level2 oplocks
693                          * and we just send a break to none to all of them
694                          * without waiting for a release
695                          */
696                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
697                                               &lck->file.entries[i],
698                                               OPLOCK_BREAK_TO_NONE);
699                         lck->file.entries[i].oplock_level = OPLOCK_NONE;
700                         modified = true;
701                 }
702         }
703
704         if (modified) {
705                 return odb_push_record(lck, &lck->file);
706         }
707         return NT_STATUS_OK;
708 }
709
710 /*
711   remove a pending opendb entry
712 */
713 static NTSTATUS odb_tdb_remove_pending(struct odb_lock *lck, void *private_data)
714 {
715         struct odb_context *odb = lck->odb;
716         int i;
717
718         if (lck->file.path == NULL) {
719                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
720         }
721
722         /* find the entry, and delete it */
723         for (i=0;i<lck->file.num_pending;i++) {
724                 if (private_data == lck->file.pending[i].notify_ptr &&
725                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &lck->file.pending[i].server)) {
726                         if (i < lck->file.num_pending-1) {
727                                 memmove(lck->file.pending+i, lck->file.pending+i+1,
728                                         (lck->file.num_pending - (i+1)) *
729                                         sizeof(struct opendb_pending));
730                         }
731                         break;
732                 }
733         }
734
735         if (i == lck->file.num_pending) {
736                 return NT_STATUS_UNSUCCESSFUL;
737         }
738
739         lck->file.num_pending--;
740         
741         return odb_push_record(lck, &lck->file);
742 }
743
744
745 /*
746   rename the path in a open file
747 */
748 static NTSTATUS odb_tdb_rename(struct odb_lock *lck, const char *path)
749 {
750         if (lck->file.path == NULL) {
751                 /* not having the record at all is OK */
752                 return NT_STATUS_OK;
753         }
754
755         lck->file.path = talloc_strdup(lck, path);
756         NT_STATUS_HAVE_NO_MEMORY(lck->file.path);
757
758         return odb_push_record(lck, &lck->file);
759 }
760
761 /*
762   get the path of an open file
763 */
764 static NTSTATUS odb_tdb_get_path(struct odb_lock *lck, const char **path)
765 {
766         *path = NULL;
767
768         /* we don't ignore NT_STATUS_OBJECT_NAME_NOT_FOUND here */
769         if (lck->file.path == NULL) {
770                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
771         }
772
773         *path = lck->file.path;
774
775         return NT_STATUS_OK;
776 }
777
778 /*
779   update delete on close flag on an open file
780 */
781 static NTSTATUS odb_tdb_set_delete_on_close(struct odb_lock *lck, bool del_on_close)
782 {
783         if (lck->file.path == NULL) {
784                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
785         }
786
787         lck->file.delete_on_close = del_on_close;
788
789         return odb_push_record(lck, &lck->file);
790 }
791
792 /*
793   update the write time on an open file
794 */
795 static NTSTATUS odb_tdb_set_write_time(struct odb_lock *lck,
796                                        NTTIME write_time, bool force)
797 {
798         if (lck->file.path == NULL) {
799                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
800         }
801
802         if (lck->file.changed_write_time != 0 && !force) {
803                 return NT_STATUS_OK;
804         }
805
806         lck->file.changed_write_time = write_time;
807
808         return odb_push_record(lck, &lck->file);
809 }
810
811 /*
812   return the current value of the delete_on_close bit, and how many
813   people still have the file open
814 */
815 static NTSTATUS odb_tdb_get_file_infos(struct odb_context *odb, DATA_BLOB *key,
816                                        bool *del_on_close, NTTIME *write_time)
817 {
818         struct odb_lock *lck;
819
820         if (del_on_close) {
821                 *del_on_close = false;
822         }
823         if (write_time) {
824                 *write_time = 0;
825         }
826
827         lck = odb_lock(odb, odb, key);
828         NT_STATUS_HAVE_NO_MEMORY(lck);
829
830         if (del_on_close) {
831                 *del_on_close = lck->file.delete_on_close;
832         }
833         if (write_time) {
834                 if (lck->file.changed_write_time == 0) {
835                         *write_time = lck->file.open_write_time;
836                 } else {
837                         *write_time = lck->file.changed_write_time;
838                 }
839         }
840
841         talloc_free(lck);
842
843         return NT_STATUS_OK;
844 }
845
846
847 /*
848   determine if a file can be opened with the given share_access,
849   create_options and access_mask
850 */
851 static NTSTATUS odb_tdb_can_open(struct odb_lock *lck,
852                                  uint32_t stream_id, uint32_t share_access,
853                                  uint32_t access_mask, bool delete_on_close,
854                                  uint32_t open_disposition, bool break_to_none)
855 {
856         struct odb_context *odb = lck->odb;
857         NTSTATUS status;
858
859         status = odb_tdb_open_can_internal(odb, &lck->file, stream_id,
860                                            share_access, access_mask,
861                                            delete_on_close, open_disposition,
862                                            break_to_none, &lck->can_open.attrs_only);
863         NT_STATUS_NOT_OK_RETURN(status);
864
865         lck->can_open.e = talloc(lck, struct opendb_entry);
866         NT_STATUS_HAVE_NO_MEMORY(lck->can_open.e);
867
868         lck->can_open.e->server                 = odb->ntvfs_ctx->server_id;
869         lck->can_open.e->file_handle            = NULL;
870         lck->can_open.e->fd                     = NULL;
871         lck->can_open.e->stream_id              = stream_id;
872         lck->can_open.e->share_access           = share_access;
873         lck->can_open.e->access_mask            = access_mask;
874         lck->can_open.e->delete_on_close        = delete_on_close;
875         lck->can_open.e->allow_level_II_oplock  = false;
876         lck->can_open.e->oplock_level           = OPLOCK_NONE;
877
878         return NT_STATUS_OK;
879 }
880
881
882 static const struct opendb_ops opendb_tdb_ops = {
883         .odb_init                = odb_tdb_init,
884         .odb_lock                = odb_tdb_lock,
885         .odb_get_key             = odb_tdb_get_key,
886         .odb_open_file           = odb_tdb_open_file,
887         .odb_open_file_pending   = odb_tdb_open_file_pending,
888         .odb_close_file          = odb_tdb_close_file,
889         .odb_remove_pending      = odb_tdb_remove_pending,
890         .odb_rename              = odb_tdb_rename,
891         .odb_get_path            = odb_tdb_get_path,
892         .odb_set_delete_on_close = odb_tdb_set_delete_on_close,
893         .odb_set_write_time      = odb_tdb_set_write_time,
894         .odb_get_file_infos      = odb_tdb_get_file_infos,
895         .odb_can_open            = odb_tdb_can_open,
896         .odb_update_oplock       = odb_tdb_update_oplock,
897         .odb_break_oplocks       = odb_tdb_break_oplocks
898 };
899
900
901 void odb_tdb_init_ops(void)
902 {
903         sys_lease_init();
904         odb_set_ops(&opendb_tdb_ops);
905 }