opendb_tdb: pass down struct messaging_context directly to odb_oplock_break_send()
[bbaumbach/samba-autobuild/.git] / source4 / ntvfs / common / opendb_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the open files database, tdb backend. It implements shared
23   storage of what files are open between server instances, and
24   implements the rules of shared access to files.
25
26   The caller needs to provide a file_key, which specifies what file
27   they are talking about. This needs to be a unique key across all
28   filesystems, and is usually implemented in terms of a device/inode
29   pair.
30
31   Before any operations can be performed the caller needs to establish
32   a lock on the record associated with file_key. That is done by
33   calling odb_lock(). The caller releases this lock by calling
34   talloc_free() on the returned handle.
35
36   All other operations on a record are done by passing the odb_lock()
37   handle back to this module. The handle contains internal
38   information about what file_key is being operated on.
39 */
40
41 #include "includes.h"
42 #include "system/filesys.h"
43 #include "lib/tdb/include/tdb.h"
44 #include "messaging/messaging.h"
45 #include "tdb_wrap.h"
46 #include "lib/messaging/irpc.h"
47 #include "librpc/gen_ndr/ndr_opendb.h"
48 #include "ntvfs/ntvfs.h"
49 #include "ntvfs/common/ntvfs_common.h"
50 #include "cluster/cluster.h"
51 #include "param/param.h"
52
53 struct odb_context {
54         struct tdb_wrap *w;
55         struct ntvfs_context *ntvfs_ctx;
56         bool oplocks;
57 };
58
59 /*
60   an odb lock handle. You must obtain one of these using odb_lock() before doing
61   any other operations. 
62 */
63 struct odb_lock {
64         struct odb_context *odb;
65         TDB_DATA key;
66 };
67
68 /*
69   Open up the openfiles.tdb database. Close it down using
70   talloc_free(). We need the messaging_ctx to allow for pending open
71   notifications.
72 */
73 static struct odb_context *odb_tdb_init(TALLOC_CTX *mem_ctx, 
74                                         struct ntvfs_context *ntvfs_ctx)
75 {
76         struct odb_context *odb;
77
78         odb = talloc(mem_ctx, struct odb_context);
79         if (odb == NULL) {
80                 return NULL;
81         }
82
83         odb->w = cluster_tdb_tmp_open(odb, ntvfs_ctx->lp_ctx, "openfiles.tdb", TDB_DEFAULT);
84         if (odb->w == NULL) {
85                 talloc_free(odb);
86                 return NULL;
87         }
88
89         odb->ntvfs_ctx = ntvfs_ctx;
90
91         /* leave oplocks disabled by default until the code is working */
92         odb->oplocks = lp_parm_bool(ntvfs_ctx->lp_ctx, NULL, "opendb", "oplocks", false);
93
94         return odb;
95 }
96
97 /*
98   destroy a lock on the database
99 */
100 static int odb_lock_destructor(struct odb_lock *lck)
101 {
102         tdb_chainunlock(lck->odb->w->tdb, lck->key);
103         return 0;
104 }
105
106 /*
107   get a lock on a entry in the odb. This call returns a lock handle,
108   which the caller should unlock using talloc_free().
109 */
110 static struct odb_lock *odb_tdb_lock(TALLOC_CTX *mem_ctx,
111                                      struct odb_context *odb, DATA_BLOB *file_key)
112 {
113         struct odb_lock *lck;
114
115         lck = talloc(mem_ctx, struct odb_lock);
116         if (lck == NULL) {
117                 return NULL;
118         }
119
120         lck->odb = talloc_reference(lck, odb);
121         lck->key.dptr = talloc_memdup(lck, file_key->data, file_key->length);
122         lck->key.dsize = file_key->length;
123         if (lck->key.dptr == NULL) {
124                 talloc_free(lck);
125                 return NULL;
126         }
127
128         if (tdb_chainlock(odb->w->tdb, lck->key) != 0) {
129                 talloc_free(lck);
130                 return NULL;
131         }
132
133         talloc_set_destructor(lck, odb_lock_destructor);
134         
135         return lck;
136 }
137
138 static DATA_BLOB odb_tdb_get_key(TALLOC_CTX *mem_ctx, struct odb_lock *lck)
139 {
140         return data_blob_talloc(mem_ctx, lck->key.dptr, lck->key.dsize);
141 }
142
143
144 /*
145   determine if two odb_entry structures conflict
146
147   return NT_STATUS_OK on no conflict
148 */
149 static NTSTATUS share_conflict(struct opendb_entry *e1,
150                                uint32_t stream_id,
151                                uint32_t share_access,
152                                uint32_t access_mask)
153 {
154         /* if either open involves no read.write or delete access then
155            it can't conflict */
156         if (!(e1->access_mask & (SEC_FILE_WRITE_DATA |
157                                  SEC_FILE_APPEND_DATA |
158                                  SEC_FILE_READ_DATA |
159                                  SEC_FILE_EXECUTE |
160                                  SEC_STD_DELETE))) {
161                 return NT_STATUS_OK;
162         }
163         if (!(access_mask & (SEC_FILE_WRITE_DATA |
164                              SEC_FILE_APPEND_DATA |
165                              SEC_FILE_READ_DATA |
166                              SEC_FILE_EXECUTE |
167                              SEC_STD_DELETE))) {
168                 return NT_STATUS_OK;
169         }
170
171         /* data IO access masks. This is skipped if the two open handles
172            are on different streams (as in that case the masks don't
173            interact) */
174         if (e1->stream_id != stream_id) {
175                 return NT_STATUS_OK;
176         }
177
178 #define CHECK_MASK(am, right, sa, share) \
179         if (((am) & (right)) && !((sa) & (share))) return NT_STATUS_SHARING_VIOLATION
180
181         CHECK_MASK(e1->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
182                    share_access, NTCREATEX_SHARE_ACCESS_WRITE);
183         CHECK_MASK(access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
184                    e1->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
185         
186         CHECK_MASK(e1->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
187                    share_access, NTCREATEX_SHARE_ACCESS_READ);
188         CHECK_MASK(access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
189                    e1->share_access, NTCREATEX_SHARE_ACCESS_READ);
190
191         CHECK_MASK(e1->access_mask, SEC_STD_DELETE,
192                    share_access, NTCREATEX_SHARE_ACCESS_DELETE);
193         CHECK_MASK(access_mask, SEC_STD_DELETE,
194                    e1->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
195 #undef CHECK_MASK
196         return NT_STATUS_OK;
197 }
198
199 /*
200   pull a record, translating from the db format to the opendb_file structure defined
201   in opendb.idl
202 */
203 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file)
204 {
205         struct odb_context *odb = lck->odb;
206         TDB_DATA dbuf;
207         DATA_BLOB blob;
208         enum ndr_err_code ndr_err;
209
210         dbuf = tdb_fetch(odb->w->tdb, lck->key);
211         if (dbuf.dptr == NULL) {
212                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
213         }
214
215         blob.data = dbuf.dptr;
216         blob.length = dbuf.dsize;
217
218         ndr_err = ndr_pull_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_pull_flags_fn_t)ndr_pull_opendb_file);
219         free(dbuf.dptr);
220         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
221                 return ndr_map_error2ntstatus(ndr_err);
222         }
223
224         return NT_STATUS_OK;
225 }
226
227 /*
228   push a record, translating from the opendb_file structure defined in opendb.idl
229 */
230 static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file)
231 {
232         struct odb_context *odb = lck->odb;
233         TDB_DATA dbuf;
234         DATA_BLOB blob;
235         enum ndr_err_code ndr_err;
236         int ret;
237
238         if (file->num_entries == 0) {
239                 ret = tdb_delete(odb->w->tdb, lck->key);
240                 if (ret != 0) {
241                         return NT_STATUS_INTERNAL_DB_CORRUPTION;
242                 }
243                 return NT_STATUS_OK;
244         }
245
246         ndr_err = ndr_push_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_push_flags_fn_t)ndr_push_opendb_file);
247         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
248                 return ndr_map_error2ntstatus(ndr_err);
249         }
250
251         dbuf.dptr = blob.data;
252         dbuf.dsize = blob.length;
253                 
254         ret = tdb_store(odb->w->tdb, lck->key, dbuf, TDB_REPLACE);
255         data_blob_free(&blob);
256         if (ret != 0) {
257                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
258         }
259
260         return NT_STATUS_OK;
261 }
262
263 /*
264   send an oplock break to a client
265 */
266 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
267                                       struct opendb_entry *e,
268                                       uint8_t level)
269 {
270         NTSTATUS status;
271         struct opendb_oplock_break op_break;
272         DATA_BLOB blob;
273
274         ZERO_STRUCT(op_break);
275
276         /* tell the server handling this open file about the need to send the client
277            a break */
278         op_break.file_handle    = e->file_handle;
279         op_break.level          = level;
280
281         blob = data_blob_const(&op_break, sizeof(op_break));
282
283         status = messaging_send(msg_ctx, e->server,
284                                 MSG_NTVFS_OPLOCK_BREAK, &blob);
285         NT_STATUS_NOT_OK_RETURN(status);
286
287         return NT_STATUS_OK;
288 }
289
290 static bool access_attributes_only(uint32_t access_mask,
291                                    uint32_t open_disposition,
292                                    bool break_to_none)
293 {
294         switch (open_disposition) {
295         case NTCREATEX_DISP_SUPERSEDE:
296         case NTCREATEX_DISP_OVERWRITE_IF:
297         case NTCREATEX_DISP_OVERWRITE:
298                 return false;
299         default:
300                 break;
301         }
302
303         if (break_to_none) {
304                 return false;
305         }
306
307 #define CHECK_MASK(m,g) ((m) && (((m) & ~(g))==0) && (((m) & (g)) != 0))
308         return CHECK_MASK(access_mask,
309                           SEC_STD_SYNCHRONIZE |
310                           SEC_FILE_READ_ATTRIBUTE |
311                           SEC_FILE_WRITE_ATTRIBUTE);
312 #undef CHECK_MASK
313 }
314
315 static NTSTATUS odb_tdb_open_can_internal(struct odb_context *odb,
316                                           const struct opendb_file *file,
317                                           uint32_t stream_id, uint32_t share_access,
318                                           uint32_t access_mask, bool delete_on_close,
319                                           uint32_t open_disposition, bool break_to_none,
320                                           bool *_attrs_only)
321 {
322         NTSTATUS status;
323         uint32_t i;
324         bool attrs_only = false;
325
326         /* see if anyone has an oplock, which we need to break */
327         for (i=0;i<file->num_entries;i++) {
328                 if (file->entries[i].oplock_level == OPLOCK_BATCH) {
329                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
330                         /* if this is an attribute only access
331                          * it doesn't conflict with a BACTCH oplock
332                          * but we'll not grant the oplock below
333                          */
334                         attrs_only = access_attributes_only(access_mask,
335                                                             open_disposition,
336                                                             break_to_none);
337                         if (attrs_only) {
338                                 break;
339                         }
340                         /* a batch oplock caches close calls, which
341                            means the client application might have
342                            already closed the file. We have to allow
343                            this close to propogate by sending a oplock
344                            break request and suspending this call
345                            until the break is acknowledged or the file
346                            is closed */
347                         if (break_to_none ||
348                             !file->entries[i].allow_level_II_oplock) {
349                                 oplock_return = OPLOCK_BREAK_TO_NONE;
350                         }
351                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
352                                               &file->entries[i],
353                                               oplock_return);
354                         return NT_STATUS_OPLOCK_NOT_GRANTED;
355                 }
356         }
357
358         if (file->delete_on_close) {
359                 /* while delete on close is set, no new opens are allowed */
360                 return NT_STATUS_DELETE_PENDING;
361         }
362
363         if (file->num_entries != 0 && delete_on_close) {
364                 return NT_STATUS_SHARING_VIOLATION;
365         }
366
367         /* check for sharing violations */
368         for (i=0;i<file->num_entries;i++) {
369                 status = share_conflict(&file->entries[i], stream_id,
370                                         share_access, access_mask);
371                 NT_STATUS_NOT_OK_RETURN(status);
372         }
373
374         /* we now know the open could succeed, but we need to check
375            for any exclusive oplocks. We can't grant a second open
376            till these are broken. Note that we check for batch oplocks
377            before checking for sharing violations, and check for
378            exclusive oplocks afterwards. */
379         for (i=0;i<file->num_entries;i++) {
380                 if (file->entries[i].oplock_level == OPLOCK_EXCLUSIVE) {
381                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
382                         /* if this is an attribute only access
383                          * it doesn't conflict with an EXCLUSIVE oplock
384                          * but we'll not grant the oplock below
385                          */
386                         attrs_only = access_attributes_only(access_mask,
387                                                             open_disposition,
388                                                             break_to_none);
389                         if (attrs_only) {
390                                 break;
391                         }
392                         /*
393                          * send an oplock break to the holder of the
394                          * oplock and tell caller to retry later
395                          */
396                         if (break_to_none ||
397                             !file->entries[i].allow_level_II_oplock) {
398                                 oplock_return = OPLOCK_BREAK_TO_NONE;
399                         }
400                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
401                                               &file->entries[i],
402                                               oplock_return);
403                         return NT_STATUS_OPLOCK_NOT_GRANTED;
404                 }
405         }
406
407         if (_attrs_only) {
408                 *_attrs_only = attrs_only;
409         }
410         return NT_STATUS_OK;
411 }
412
413 /*
414   register an open file in the open files database. This implements the share_access
415   rules
416
417   Note that the path is only used by the delete on close logic, not
418   for comparing with other filenames
419 */
420 static NTSTATUS odb_tdb_open_file(struct odb_lock *lck,
421                                   void *file_handle, const char *path,
422                                   uint32_t stream_id, uint32_t share_access,
423                                   uint32_t access_mask, bool delete_on_close,
424                                   uint32_t open_disposition, bool break_to_none,
425                                   bool allow_level_II_oplock,
426                                   uint32_t oplock_level, uint32_t *oplock_granted)
427 {
428         struct odb_context *odb = lck->odb;
429         struct opendb_entry e;
430         struct opendb_file file;
431         NTSTATUS status;
432         bool attrs_only = false;
433
434         if (odb->oplocks == false) {
435                 oplock_level = OPLOCK_NONE;
436         }
437
438         status = odb_pull_record(lck, &file);
439         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
440                 /* initialise a blank structure */
441                 ZERO_STRUCT(file);
442                 file.path = path;
443         } else {
444                 NT_STATUS_NOT_OK_RETURN(status);
445         }
446
447         /* see if it conflicts */
448         status = odb_tdb_open_can_internal(odb, &file, stream_id,
449                                            share_access, access_mask,
450                                            delete_on_close, open_disposition,
451                                            break_to_none, &attrs_only);
452         NT_STATUS_NOT_OK_RETURN(status);
453
454         /* see if it conflicts */
455         e.server                = odb->ntvfs_ctx->server_id;
456         e.file_handle           = file_handle;
457         e.stream_id             = stream_id;
458         e.share_access          = share_access;
459         e.access_mask           = access_mask;
460         e.delete_on_close       = delete_on_close;
461         e.allow_level_II_oplock = allow_level_II_oplock;
462         e.oplock_level          = OPLOCK_NONE;
463
464         /*
465           possibly grant an exclusive, batch or level2 oplock
466         */
467         if (oplock_granted) {
468                 if (attrs_only) {
469                         e.oplock_level  = OPLOCK_NONE;
470                         *oplock_granted = NO_OPLOCK_RETURN;
471                 } else if (oplock_level == OPLOCK_EXCLUSIVE) {
472                         if (file.num_entries == 0) {
473                                 e.oplock_level  = OPLOCK_EXCLUSIVE;
474                                 *oplock_granted = EXCLUSIVE_OPLOCK_RETURN;
475                         } else if (allow_level_II_oplock) {
476                                 e.oplock_level  = OPLOCK_LEVEL_II;
477                                 *oplock_granted = LEVEL_II_OPLOCK_RETURN;
478                         } else {
479                                 e.oplock_level  = OPLOCK_NONE;
480                                 *oplock_granted = NO_OPLOCK_RETURN;
481                         }
482                 } else if (oplock_level == OPLOCK_BATCH) {
483                         if (file.num_entries == 0) {
484                                 e.oplock_level  = OPLOCK_BATCH;
485                                 *oplock_granted = BATCH_OPLOCK_RETURN;
486                         } else if (allow_level_II_oplock) {
487                                 e.oplock_level  = OPLOCK_LEVEL_II;
488                                 *oplock_granted = LEVEL_II_OPLOCK_RETURN;
489                         } else {
490                                 e.oplock_level  = OPLOCK_NONE;
491                                 *oplock_granted = NO_OPLOCK_RETURN;
492                         }
493                 } else if (oplock_level == OPLOCK_LEVEL_II) {
494                         e.oplock_level  = OPLOCK_LEVEL_II;
495                         *oplock_granted = LEVEL_II_OPLOCK_RETURN;
496                 } else {
497                         e.oplock_level  = OPLOCK_NONE;
498                         *oplock_granted = NO_OPLOCK_RETURN;
499                 }
500         }
501
502         /* it doesn't conflict, so add it to the end */
503         file.entries = talloc_realloc(lck, file.entries, struct opendb_entry, 
504                                       file.num_entries+1);
505         NT_STATUS_HAVE_NO_MEMORY(file.entries);
506
507         file.entries[file.num_entries] = e;
508         file.num_entries++;
509
510         return odb_push_record(lck, &file);
511 }
512
513
514 /*
515   register a pending open file in the open files database
516 */
517 static NTSTATUS odb_tdb_open_file_pending(struct odb_lock *lck, void *private)
518 {
519         struct odb_context *odb = lck->odb;
520         struct opendb_file file;
521         NTSTATUS status;
522                 
523         status = odb_pull_record(lck, &file);
524         NT_STATUS_NOT_OK_RETURN(status);
525
526         file.pending = talloc_realloc(lck, file.pending, struct opendb_pending, 
527                                       file.num_pending+1);
528         NT_STATUS_HAVE_NO_MEMORY(file.pending);
529
530         file.pending[file.num_pending].server = odb->ntvfs_ctx->server_id;
531         file.pending[file.num_pending].notify_ptr = private;
532
533         file.num_pending++;
534
535         return odb_push_record(lck, &file);
536 }
537
538
539 /*
540   remove a opendb entry
541 */
542 static NTSTATUS odb_tdb_close_file(struct odb_lock *lck, void *file_handle,
543                                    const char **_delete_path)
544 {
545         struct odb_context *odb = lck->odb;
546         struct opendb_file file;
547         const char *delete_path = NULL;
548         int i;
549         NTSTATUS status;
550
551         status = odb_pull_record(lck, &file);
552         NT_STATUS_NOT_OK_RETURN(status);
553
554         /* find the entry, and delete it */
555         for (i=0;i<file.num_entries;i++) {
556                 if (file_handle == file.entries[i].file_handle &&
557                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.entries[i].server)) {
558                         if (file.entries[i].delete_on_close) {
559                                 file.delete_on_close = true;
560                         }
561                         if (i < file.num_entries-1) {
562                                 memmove(file.entries+i, file.entries+i+1, 
563                                         (file.num_entries - (i+1)) * 
564                                         sizeof(struct opendb_entry));
565                         }
566                         break;
567                 }
568         }
569
570         if (i == file.num_entries) {
571                 return NT_STATUS_UNSUCCESSFUL;
572         }
573
574         /* send any pending notifications, removing them once sent */
575         for (i=0;i<file.num_pending;i++) {
576                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx, file.pending[i].server, 
577                                    MSG_PVFS_RETRY_OPEN, 
578                                    file.pending[i].notify_ptr);
579         }
580         file.num_pending = 0;
581
582         file.num_entries--;
583
584         if (file.num_entries == 0 && file.delete_on_close) {
585                 delete_path = talloc_strdup(lck, file.path);
586                 NT_STATUS_HAVE_NO_MEMORY(delete_path);
587         }
588
589         if (_delete_path) {
590                 *_delete_path = delete_path;
591         }
592
593         return odb_push_record(lck, &file);
594 }
595
596 /*
597   update the oplock level of the client
598 */
599 static NTSTATUS odb_tdb_update_oplock(struct odb_lock *lck, void *file_handle,
600                                       uint32_t oplock_level)
601 {
602         struct odb_context *odb = lck->odb;
603         struct opendb_file file;
604         int i;
605         NTSTATUS status;
606
607         status = odb_pull_record(lck, &file);
608         NT_STATUS_NOT_OK_RETURN(status);
609
610         /* find the entry, and update it */
611         for (i=0;i<file.num_entries;i++) {
612                 if (file_handle == file.entries[i].file_handle &&
613                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.entries[i].server)) {
614                         file.entries[i].oplock_level = oplock_level;
615                         break;
616                 }
617         }
618
619         if (i == file.num_entries) {
620                 return NT_STATUS_UNSUCCESSFUL;
621         }
622
623         /* send any pending notifications, removing them once sent */
624         for (i=0;i<file.num_pending;i++) {
625                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
626                                    file.pending[i].server,
627                                    MSG_PVFS_RETRY_OPEN,
628                                    file.pending[i].notify_ptr);
629         }
630         file.num_pending = 0;
631
632         return odb_push_record(lck, &file);
633 }
634
635 /*
636   send oplocks breaks to none to all level2 holders
637 */
638 static NTSTATUS odb_tdb_break_oplocks(struct odb_lock *lck)
639 {
640         struct odb_context *odb = lck->odb;
641         NTSTATUS status;
642         struct opendb_file file;
643         int i;
644         bool modified = true;
645
646         status = odb_pull_record(lck, &file);
647         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
648                 return NT_STATUS_OK;
649         }
650         NT_STATUS_NOT_OK_RETURN(status);
651
652         /* see if anyone has an oplock, which we need to break */
653         for (i=0;i<file.num_entries;i++) {
654                 if (file.entries[i].oplock_level == OPLOCK_LEVEL_II) {
655                         /*
656                          * there could be multiple level2 oplocks
657                          * and we just send a break to none to all of them
658                          * without waiting for a release
659                          */
660                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
661                                               &file.entries[i],
662                                               OPLOCK_BREAK_TO_NONE);
663                         file.entries[i].oplock_level = OPLOCK_NONE;
664                         modified = true;
665                 }
666         }
667
668         if (modified) {
669                 return odb_push_record(lck, &file);
670         }
671         return NT_STATUS_OK;
672 }
673
674 /*
675   remove a pending opendb entry
676 */
677 static NTSTATUS odb_tdb_remove_pending(struct odb_lock *lck, void *private)
678 {
679         struct odb_context *odb = lck->odb;
680         int i;
681         NTSTATUS status;
682         struct opendb_file file;
683
684         status = odb_pull_record(lck, &file);
685         NT_STATUS_NOT_OK_RETURN(status);
686
687         /* find the entry, and delete it */
688         for (i=0;i<file.num_pending;i++) {
689                 if (private == file.pending[i].notify_ptr &&
690                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.pending[i].server)) {
691                         if (i < file.num_pending-1) {
692                                 memmove(file.pending+i, file.pending+i+1, 
693                                         (file.num_pending - (i+1)) * 
694                                         sizeof(struct opendb_pending));
695                         }
696                         break;
697                 }
698         }
699
700         if (i == file.num_pending) {
701                 return NT_STATUS_UNSUCCESSFUL;
702         }
703
704         file.num_pending--;
705         
706         return odb_push_record(lck, &file);
707 }
708
709
710 /*
711   rename the path in a open file
712 */
713 static NTSTATUS odb_tdb_rename(struct odb_lock *lck, const char *path)
714 {
715         struct opendb_file file;
716         NTSTATUS status;
717
718         status = odb_pull_record(lck, &file);
719         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
720                 /* not having the record at all is OK */
721                 return NT_STATUS_OK;
722         }
723         NT_STATUS_NOT_OK_RETURN(status);
724
725         file.path = path;
726         return odb_push_record(lck, &file);
727 }
728
729 /*
730   get the path of an open file
731 */
732 static NTSTATUS odb_tdb_get_path(struct odb_lock *lck, const char **path)
733 {
734         struct opendb_file file;
735         NTSTATUS status;
736
737         *path = NULL;
738
739         status = odb_pull_record(lck, &file);
740         /* we don't ignore NT_STATUS_OBJECT_NAME_NOT_FOUND here */
741         NT_STATUS_NOT_OK_RETURN(status);
742
743         *path = file.path;
744
745         return NT_STATUS_OK;
746 }
747
748 /*
749   update delete on close flag on an open file
750 */
751 static NTSTATUS odb_tdb_set_delete_on_close(struct odb_lock *lck, bool del_on_close)
752 {
753         NTSTATUS status;
754         struct opendb_file file;
755
756         status = odb_pull_record(lck, &file);
757         NT_STATUS_NOT_OK_RETURN(status);
758
759         file.delete_on_close = del_on_close;
760
761         return odb_push_record(lck, &file);
762 }
763
764 /*
765   return the current value of the delete_on_close bit, and how many
766   people still have the file open
767 */
768 static NTSTATUS odb_tdb_get_delete_on_close(struct odb_context *odb, 
769                                             DATA_BLOB *key, bool *del_on_close)
770 {
771         NTSTATUS status;
772         struct opendb_file file;
773         struct odb_lock *lck;
774
775         (*del_on_close) = false;
776
777         lck = odb_lock(odb, odb, key);
778         NT_STATUS_HAVE_NO_MEMORY(lck);
779
780         status = odb_pull_record(lck, &file);
781         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
782                 talloc_free(lck);
783                 return NT_STATUS_OK;
784         }
785         if (!NT_STATUS_IS_OK(status)) {
786                 talloc_free(lck);
787                 return status;
788         }
789
790         (*del_on_close) = file.delete_on_close;
791
792         talloc_free(lck);
793
794         return NT_STATUS_OK;
795 }
796
797
798 /*
799   determine if a file can be opened with the given share_access,
800   create_options and access_mask
801 */
802 static NTSTATUS odb_tdb_can_open(struct odb_lock *lck,
803                                  uint32_t stream_id, uint32_t share_access,
804                                  uint32_t access_mask, bool delete_on_close,
805                                  uint32_t open_disposition, bool break_to_none)
806 {
807         struct odb_context *odb = lck->odb;
808         NTSTATUS status;
809         struct opendb_file file;
810         bool attrs_only = false;
811
812         status = odb_pull_record(lck, &file);
813         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
814                 return NT_STATUS_OK;
815         }
816         NT_STATUS_NOT_OK_RETURN(status);
817
818         status = odb_tdb_open_can_internal(odb, &file, stream_id,
819                                            share_access, access_mask,
820                                            delete_on_close, open_disposition,
821                                            break_to_none, &attrs_only);
822         NT_STATUS_NOT_OK_RETURN(status);
823
824         return NT_STATUS_OK;
825 }
826
827
828 static const struct opendb_ops opendb_tdb_ops = {
829         .odb_init                = odb_tdb_init,
830         .odb_lock                = odb_tdb_lock,
831         .odb_get_key             = odb_tdb_get_key,
832         .odb_open_file           = odb_tdb_open_file,
833         .odb_open_file_pending   = odb_tdb_open_file_pending,
834         .odb_close_file          = odb_tdb_close_file,
835         .odb_remove_pending      = odb_tdb_remove_pending,
836         .odb_rename              = odb_tdb_rename,
837         .odb_get_path            = odb_tdb_get_path,
838         .odb_set_delete_on_close = odb_tdb_set_delete_on_close,
839         .odb_get_delete_on_close = odb_tdb_get_delete_on_close,
840         .odb_can_open            = odb_tdb_can_open,
841         .odb_update_oplock       = odb_tdb_update_oplock,
842         .odb_break_oplocks       = odb_tdb_break_oplocks
843 };
844
845
846 void odb_tdb_init_ops(void)
847 {
848         odb_set_ops(&opendb_tdb_ops);
849 }