Merge branch 'v4-0-test' of git://git.samba.org/samba into 4-0-local
[samba.git] / source4 / ntvfs / common / opendb_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the open files database, tdb backend. It implements shared
23   storage of what files are open between server instances, and
24   implements the rules of shared access to files.
25
26   The caller needs to provide a file_key, which specifies what file
27   they are talking about. This needs to be a unique key across all
28   filesystems, and is usually implemented in terms of a device/inode
29   pair.
30
31   Before any operations can be performed the caller needs to establish
32   a lock on the record associated with file_key. That is done by
33   calling odb_lock(). The caller releases this lock by calling
34   talloc_free() on the returned handle.
35
36   All other operations on a record are done by passing the odb_lock()
37   handle back to this module. The handle contains internal
38   information about what file_key is being operated on.
39 */
40
41 #include "includes.h"
42 #include "system/filesys.h"
43 #include "lib/tdb/include/tdb.h"
44 #include "messaging/messaging.h"
45 #include "tdb_wrap.h"
46 #include "lib/messaging/irpc.h"
47 #include "librpc/gen_ndr/ndr_opendb.h"
48 #include "ntvfs/ntvfs.h"
49 #include "ntvfs/common/ntvfs_common.h"
50 #include "cluster/cluster.h"
51 #include "param/param.h"
52
53 struct odb_context {
54         struct tdb_wrap *w;
55         struct ntvfs_context *ntvfs_ctx;
56         bool oplocks;
57 };
58
59 /*
60   an odb lock handle. You must obtain one of these using odb_lock() before doing
61   any other operations. 
62 */
63 struct odb_lock {
64         struct odb_context *odb;
65         TDB_DATA key;
66 };
67
68 /*
69   Open up the openfiles.tdb database. Close it down using
70   talloc_free(). We need the messaging_ctx to allow for pending open
71   notifications.
72 */
73 static struct odb_context *odb_tdb_init(TALLOC_CTX *mem_ctx, 
74                                         struct ntvfs_context *ntvfs_ctx)
75 {
76         struct odb_context *odb;
77
78         odb = talloc(mem_ctx, struct odb_context);
79         if (odb == NULL) {
80                 return NULL;
81         }
82
83         odb->w = cluster_tdb_tmp_open(odb, ntvfs_ctx->lp_ctx, "openfiles.tdb", TDB_DEFAULT);
84         if (odb->w == NULL) {
85                 talloc_free(odb);
86                 return NULL;
87         }
88
89         odb->ntvfs_ctx = ntvfs_ctx;
90
91         /* leave oplocks disabled by default until the code is working */
92         odb->oplocks = lp_parm_bool(ntvfs_ctx->lp_ctx, NULL, "opendb", "oplocks", false);
93
94         return odb;
95 }
96
97 /*
98   destroy a lock on the database
99 */
100 static int odb_lock_destructor(struct odb_lock *lck)
101 {
102         tdb_chainunlock(lck->odb->w->tdb, lck->key);
103         return 0;
104 }
105
106 /*
107   get a lock on a entry in the odb. This call returns a lock handle,
108   which the caller should unlock using talloc_free().
109 */
110 static struct odb_lock *odb_tdb_lock(TALLOC_CTX *mem_ctx,
111                                      struct odb_context *odb, DATA_BLOB *file_key)
112 {
113         struct odb_lock *lck;
114
115         lck = talloc(mem_ctx, struct odb_lock);
116         if (lck == NULL) {
117                 return NULL;
118         }
119
120         lck->odb = talloc_reference(lck, odb);
121         lck->key.dptr = talloc_memdup(lck, file_key->data, file_key->length);
122         lck->key.dsize = file_key->length;
123         if (lck->key.dptr == NULL) {
124                 talloc_free(lck);
125                 return NULL;
126         }
127
128         if (tdb_chainlock(odb->w->tdb, lck->key) != 0) {
129                 talloc_free(lck);
130                 return NULL;
131         }
132
133         talloc_set_destructor(lck, odb_lock_destructor);
134         
135         return lck;
136 }
137
138 static DATA_BLOB odb_tdb_get_key(TALLOC_CTX *mem_ctx, struct odb_lock *lck)
139 {
140         return data_blob_talloc(mem_ctx, lck->key.dptr, lck->key.dsize);
141 }
142
143
144 /*
145   determine if two odb_entry structures conflict
146
147   return NT_STATUS_OK on no conflict
148 */
149 static NTSTATUS share_conflict(struct opendb_entry *e1,
150                                uint32_t stream_id,
151                                uint32_t share_access,
152                                uint32_t access_mask)
153 {
154         /* if either open involves no read.write or delete access then
155            it can't conflict */
156         if (!(e1->access_mask & (SEC_FILE_WRITE_DATA |
157                                  SEC_FILE_APPEND_DATA |
158                                  SEC_FILE_READ_DATA |
159                                  SEC_FILE_EXECUTE |
160                                  SEC_STD_DELETE))) {
161                 return NT_STATUS_OK;
162         }
163         if (!(access_mask & (SEC_FILE_WRITE_DATA |
164                              SEC_FILE_APPEND_DATA |
165                              SEC_FILE_READ_DATA |
166                              SEC_FILE_EXECUTE |
167                              SEC_STD_DELETE))) {
168                 return NT_STATUS_OK;
169         }
170
171         /* data IO access masks. This is skipped if the two open handles
172            are on different streams (as in that case the masks don't
173            interact) */
174         if (e1->stream_id != stream_id) {
175                 return NT_STATUS_OK;
176         }
177
178 #define CHECK_MASK(am, right, sa, share) \
179         if (((am) & (right)) && !((sa) & (share))) return NT_STATUS_SHARING_VIOLATION
180
181         CHECK_MASK(e1->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
182                    share_access, NTCREATEX_SHARE_ACCESS_WRITE);
183         CHECK_MASK(access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
184                    e1->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
185         
186         CHECK_MASK(e1->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
187                    share_access, NTCREATEX_SHARE_ACCESS_READ);
188         CHECK_MASK(access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
189                    e1->share_access, NTCREATEX_SHARE_ACCESS_READ);
190
191         CHECK_MASK(e1->access_mask, SEC_STD_DELETE,
192                    share_access, NTCREATEX_SHARE_ACCESS_DELETE);
193         CHECK_MASK(access_mask, SEC_STD_DELETE,
194                    e1->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
195 #undef CHECK_MASK
196         return NT_STATUS_OK;
197 }
198
199 /*
200   pull a record, translating from the db format to the opendb_file structure defined
201   in opendb.idl
202 */
203 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file)
204 {
205         struct odb_context *odb = lck->odb;
206         TDB_DATA dbuf;
207         DATA_BLOB blob;
208         enum ndr_err_code ndr_err;
209
210         dbuf = tdb_fetch(odb->w->tdb, lck->key);
211         if (dbuf.dptr == NULL) {
212                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
213         }
214
215         blob.data = dbuf.dptr;
216         blob.length = dbuf.dsize;
217
218         ndr_err = ndr_pull_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_pull_flags_fn_t)ndr_pull_opendb_file);
219         free(dbuf.dptr);
220         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
221                 return ndr_map_error2ntstatus(ndr_err);
222         }
223
224         return NT_STATUS_OK;
225 }
226
227 /*
228   push a record, translating from the opendb_file structure defined in opendb.idl
229 */
230 static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file)
231 {
232         struct odb_context *odb = lck->odb;
233         TDB_DATA dbuf;
234         DATA_BLOB blob;
235         enum ndr_err_code ndr_err;
236         int ret;
237
238         if (file->num_entries == 0) {
239                 ret = tdb_delete(odb->w->tdb, lck->key);
240                 if (ret != 0) {
241                         return NT_STATUS_INTERNAL_DB_CORRUPTION;
242                 }
243                 return NT_STATUS_OK;
244         }
245
246         ndr_err = ndr_push_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_push_flags_fn_t)ndr_push_opendb_file);
247         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
248                 return ndr_map_error2ntstatus(ndr_err);
249         }
250
251         dbuf.dptr = blob.data;
252         dbuf.dsize = blob.length;
253                 
254         ret = tdb_store(odb->w->tdb, lck->key, dbuf, TDB_REPLACE);
255         data_blob_free(&blob);
256         if (ret != 0) {
257                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
258         }
259
260         return NT_STATUS_OK;
261 }
262
263 /*
264   send an oplock break to a client
265 */
266 static NTSTATUS odb_oplock_break_send(struct odb_context *odb,
267                                       struct opendb_entry *e,
268                                       uint8_t level)
269 {
270         NTSTATUS status;
271         struct opendb_oplock_break op_break;
272         DATA_BLOB blob;
273
274         ZERO_STRUCT(op_break);
275
276         /* tell the server handling this open file about the need to send the client
277            a break */
278         op_break.file_handle    = e->file_handle;
279         op_break.level          = level;
280
281         blob = data_blob_const(&op_break, sizeof(op_break));
282
283         status = messaging_send(odb->ntvfs_ctx->msg_ctx, e->server,
284                                 MSG_NTVFS_OPLOCK_BREAK, &blob);
285         NT_STATUS_NOT_OK_RETURN(status);
286
287         return NT_STATUS_OK;
288 }
289
290 static bool access_attributes_only(uint32_t access_mask,
291                                    uint32_t open_disposition,
292                                    bool break_to_none)
293 {
294         switch (open_disposition) {
295         case NTCREATEX_DISP_SUPERSEDE:
296         case NTCREATEX_DISP_OVERWRITE_IF:
297         case NTCREATEX_DISP_OVERWRITE:
298                 return false;
299         default:
300                 break;
301         }
302
303         if (break_to_none) {
304                 return false;
305         }
306
307 #define CHECK_MASK(m,g) ((m) && (((m) & ~(g))==0) && (((m) & (g)) != 0))
308         return CHECK_MASK(access_mask,
309                           SEC_STD_SYNCHRONIZE |
310                           SEC_FILE_READ_ATTRIBUTE |
311                           SEC_FILE_WRITE_ATTRIBUTE);
312 #undef CHECK_MASK
313 }
314
315 static NTSTATUS odb_tdb_open_can_internal(struct odb_context *odb,
316                                           const struct opendb_file *file,
317                                           uint32_t stream_id, uint32_t share_access,
318                                           uint32_t access_mask, bool delete_on_close,
319                                           uint32_t open_disposition, bool break_to_none,
320                                           bool *_attrs_only)
321 {
322         NTSTATUS status;
323         uint32_t i;
324         bool attrs_only = false;
325
326         /* see if anyone has an oplock, which we need to break */
327         for (i=0;i<file->num_entries;i++) {
328                 if (file->entries[i].oplock_level == OPLOCK_BATCH) {
329                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
330                         /* if this is an attribute only access
331                          * it doesn't conflict with a BACTCH oplock
332                          * but we'll not grant the oplock below
333                          */
334                         attrs_only = access_attributes_only(access_mask,
335                                                             open_disposition,
336                                                             break_to_none);
337                         if (attrs_only) {
338                                 break;
339                         }
340                         /* a batch oplock caches close calls, which
341                            means the client application might have
342                            already closed the file. We have to allow
343                            this close to propogate by sending a oplock
344                            break request and suspending this call
345                            until the break is acknowledged or the file
346                            is closed */
347                         if (break_to_none ||
348                             !file->entries[i].allow_level_II_oplock) {
349                                 oplock_return = OPLOCK_BREAK_TO_NONE;
350                         }
351                         odb_oplock_break_send(odb, &file->entries[i],
352                                               oplock_return);
353                         return NT_STATUS_OPLOCK_NOT_GRANTED;
354                 }
355         }
356
357         if (file->delete_on_close) {
358                 /* while delete on close is set, no new opens are allowed */
359                 return NT_STATUS_DELETE_PENDING;
360         }
361
362         if (file->num_entries != 0 && delete_on_close) {
363                 return NT_STATUS_SHARING_VIOLATION;
364         }
365
366         /* check for sharing violations */
367         for (i=0;i<file->num_entries;i++) {
368                 status = share_conflict(&file->entries[i], stream_id,
369                                         share_access, access_mask);
370                 NT_STATUS_NOT_OK_RETURN(status);
371         }
372
373         /* we now know the open could succeed, but we need to check
374            for any exclusive oplocks. We can't grant a second open
375            till these are broken. Note that we check for batch oplocks
376            before checking for sharing violations, and check for
377            exclusive oplocks afterwards. */
378         for (i=0;i<file->num_entries;i++) {
379                 if (file->entries[i].oplock_level == OPLOCK_EXCLUSIVE) {
380                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
381                         /* if this is an attribute only access
382                          * it doesn't conflict with an EXCLUSIVE oplock
383                          * but we'll not grant the oplock below
384                          */
385                         attrs_only = access_attributes_only(access_mask,
386                                                             open_disposition,
387                                                             break_to_none);
388                         if (attrs_only) {
389                                 break;
390                         }
391                         /*
392                          * send an oplock break to the holder of the
393                          * oplock and tell caller to retry later
394                          */
395                         if (break_to_none ||
396                             !file->entries[i].allow_level_II_oplock) {
397                                 oplock_return = OPLOCK_BREAK_TO_NONE;
398                         }
399                         odb_oplock_break_send(odb, &file->entries[i],
400                                               oplock_return);
401                         return NT_STATUS_OPLOCK_NOT_GRANTED;
402                 }
403         }
404
405         if (_attrs_only) {
406                 *_attrs_only = attrs_only;
407         }
408         return NT_STATUS_OK;
409 }
410
411 /*
412   register an open file in the open files database. This implements the share_access
413   rules
414
415   Note that the path is only used by the delete on close logic, not
416   for comparing with other filenames
417 */
418 static NTSTATUS odb_tdb_open_file(struct odb_lock *lck,
419                                   void *file_handle, const char *path,
420                                   uint32_t stream_id, uint32_t share_access,
421                                   uint32_t access_mask, bool delete_on_close,
422                                   uint32_t open_disposition, bool break_to_none,
423                                   bool allow_level_II_oplock,
424                                   uint32_t oplock_level, uint32_t *oplock_granted)
425 {
426         struct odb_context *odb = lck->odb;
427         struct opendb_entry e;
428         struct opendb_file file;
429         NTSTATUS status;
430         bool attrs_only = false;
431
432         if (odb->oplocks == false) {
433                 oplock_level = OPLOCK_NONE;
434         }
435
436         status = odb_pull_record(lck, &file);
437         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
438                 /* initialise a blank structure */
439                 ZERO_STRUCT(file);
440                 file.path = path;
441         } else {
442                 NT_STATUS_NOT_OK_RETURN(status);
443         }
444
445         /* see if it conflicts */
446         status = odb_tdb_open_can_internal(odb, &file, stream_id,
447                                            share_access, access_mask,
448                                            delete_on_close, open_disposition,
449                                            break_to_none, &attrs_only);
450         NT_STATUS_NOT_OK_RETURN(status);
451
452         /* see if it conflicts */
453         e.server                = odb->ntvfs_ctx->server_id;
454         e.file_handle           = file_handle;
455         e.stream_id             = stream_id;
456         e.share_access          = share_access;
457         e.access_mask           = access_mask;
458         e.delete_on_close       = delete_on_close;
459         e.allow_level_II_oplock = allow_level_II_oplock;
460         e.oplock_level          = OPLOCK_NONE;
461
462         /*
463           possibly grant an exclusive, batch or level2 oplock
464         */
465         if (oplock_granted) {
466                 if (attrs_only) {
467                         e.oplock_level  = OPLOCK_NONE;
468                         *oplock_granted = NO_OPLOCK_RETURN;
469                 } else if (oplock_level == OPLOCK_EXCLUSIVE) {
470                         if (file.num_entries == 0) {
471                                 e.oplock_level  = OPLOCK_EXCLUSIVE;
472                                 *oplock_granted = EXCLUSIVE_OPLOCK_RETURN;
473                         } else if (allow_level_II_oplock) {
474                                 e.oplock_level  = OPLOCK_LEVEL_II;
475                                 *oplock_granted = LEVEL_II_OPLOCK_RETURN;
476                         } else {
477                                 e.oplock_level  = OPLOCK_NONE;
478                                 *oplock_granted = NO_OPLOCK_RETURN;
479                         }
480                 } else if (oplock_level == OPLOCK_BATCH) {
481                         if (file.num_entries == 0) {
482                                 e.oplock_level  = OPLOCK_BATCH;
483                                 *oplock_granted = BATCH_OPLOCK_RETURN;
484                         } else if (allow_level_II_oplock) {
485                                 e.oplock_level  = OPLOCK_LEVEL_II;
486                                 *oplock_granted = LEVEL_II_OPLOCK_RETURN;
487                         } else {
488                                 e.oplock_level  = OPLOCK_NONE;
489                                 *oplock_granted = NO_OPLOCK_RETURN;
490                         }
491                 } else if (oplock_level == OPLOCK_LEVEL_II) {
492                         e.oplock_level  = OPLOCK_LEVEL_II;
493                         *oplock_granted = LEVEL_II_OPLOCK_RETURN;
494                 } else {
495                         e.oplock_level  = OPLOCK_NONE;
496                         *oplock_granted = NO_OPLOCK_RETURN;
497                 }
498         }
499
500         /* it doesn't conflict, so add it to the end */
501         file.entries = talloc_realloc(lck, file.entries, struct opendb_entry, 
502                                       file.num_entries+1);
503         NT_STATUS_HAVE_NO_MEMORY(file.entries);
504
505         file.entries[file.num_entries] = e;
506         file.num_entries++;
507
508         return odb_push_record(lck, &file);
509 }
510
511
512 /*
513   register a pending open file in the open files database
514 */
515 static NTSTATUS odb_tdb_open_file_pending(struct odb_lock *lck, void *private)
516 {
517         struct odb_context *odb = lck->odb;
518         struct opendb_file file;
519         NTSTATUS status;
520                 
521         status = odb_pull_record(lck, &file);
522         NT_STATUS_NOT_OK_RETURN(status);
523
524         file.pending = talloc_realloc(lck, file.pending, struct opendb_pending, 
525                                       file.num_pending+1);
526         NT_STATUS_HAVE_NO_MEMORY(file.pending);
527
528         file.pending[file.num_pending].server = odb->ntvfs_ctx->server_id;
529         file.pending[file.num_pending].notify_ptr = private;
530
531         file.num_pending++;
532
533         return odb_push_record(lck, &file);
534 }
535
536
537 /*
538   remove a opendb entry
539 */
540 static NTSTATUS odb_tdb_close_file(struct odb_lock *lck, void *file_handle,
541                                    const char **_delete_path)
542 {
543         struct odb_context *odb = lck->odb;
544         struct opendb_file file;
545         const char *delete_path = NULL;
546         int i;
547         NTSTATUS status;
548
549         status = odb_pull_record(lck, &file);
550         NT_STATUS_NOT_OK_RETURN(status);
551
552         /* find the entry, and delete it */
553         for (i=0;i<file.num_entries;i++) {
554                 if (file_handle == file.entries[i].file_handle &&
555                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.entries[i].server)) {
556                         if (file.entries[i].delete_on_close) {
557                                 file.delete_on_close = true;
558                         }
559                         if (i < file.num_entries-1) {
560                                 memmove(file.entries+i, file.entries+i+1, 
561                                         (file.num_entries - (i+1)) * 
562                                         sizeof(struct opendb_entry));
563                         }
564                         break;
565                 }
566         }
567
568         if (i == file.num_entries) {
569                 return NT_STATUS_UNSUCCESSFUL;
570         }
571
572         /* send any pending notifications, removing them once sent */
573         for (i=0;i<file.num_pending;i++) {
574                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx, file.pending[i].server, 
575                                    MSG_PVFS_RETRY_OPEN, 
576                                    file.pending[i].notify_ptr);
577         }
578         file.num_pending = 0;
579
580         file.num_entries--;
581
582         if (file.num_entries == 0 && file.delete_on_close) {
583                 delete_path = talloc_strdup(lck, file.path);
584                 NT_STATUS_HAVE_NO_MEMORY(delete_path);
585         }
586
587         if (_delete_path) {
588                 *_delete_path = delete_path;
589         }
590
591         return odb_push_record(lck, &file);
592 }
593
594 /*
595   update the oplock level of the client
596 */
597 static NTSTATUS odb_tdb_update_oplock(struct odb_lock *lck, void *file_handle,
598                                       uint32_t oplock_level)
599 {
600         struct odb_context *odb = lck->odb;
601         struct opendb_file file;
602         int i;
603         NTSTATUS status;
604
605         status = odb_pull_record(lck, &file);
606         NT_STATUS_NOT_OK_RETURN(status);
607
608         /* find the entry, and update it */
609         for (i=0;i<file.num_entries;i++) {
610                 if (file_handle == file.entries[i].file_handle &&
611                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.entries[i].server)) {
612                         file.entries[i].oplock_level = oplock_level;
613                         break;
614                 }
615         }
616
617         if (i == file.num_entries) {
618                 return NT_STATUS_UNSUCCESSFUL;
619         }
620
621         /* send any pending notifications, removing them once sent */
622         for (i=0;i<file.num_pending;i++) {
623                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
624                                    file.pending[i].server,
625                                    MSG_PVFS_RETRY_OPEN,
626                                    file.pending[i].notify_ptr);
627         }
628         file.num_pending = 0;
629
630         return odb_push_record(lck, &file);
631 }
632
633 /*
634   send oplocks breaks to none to all level2 holders
635 */
636 static NTSTATUS odb_tdb_break_oplocks(struct odb_lock *lck)
637 {
638         struct odb_context *odb = lck->odb;
639         NTSTATUS status;
640         struct opendb_file file;
641         int i;
642         bool modified = true;
643
644         status = odb_pull_record(lck, &file);
645         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
646                 return NT_STATUS_OK;
647         }
648         NT_STATUS_NOT_OK_RETURN(status);
649
650         /* see if anyone has an oplock, which we need to break */
651         for (i=0;i<file.num_entries;i++) {
652                 if (file.entries[i].oplock_level == OPLOCK_LEVEL_II) {
653                         /*
654                          * there could be multiple level2 oplocks
655                          * and we just send a break to none to all of them
656                          * without waiting for a release
657                          */
658                         odb_oplock_break_send(odb, &file.entries[i],
659                                               OPLOCK_BREAK_TO_NONE);
660                         file.entries[i].oplock_level = OPLOCK_NONE;
661                         modified = true;
662                 }
663         }
664
665         if (modified) {
666                 return odb_push_record(lck, &file);
667         }
668         return NT_STATUS_OK;
669 }
670
671 /*
672   remove a pending opendb entry
673 */
674 static NTSTATUS odb_tdb_remove_pending(struct odb_lock *lck, void *private)
675 {
676         struct odb_context *odb = lck->odb;
677         int i;
678         NTSTATUS status;
679         struct opendb_file file;
680
681         status = odb_pull_record(lck, &file);
682         NT_STATUS_NOT_OK_RETURN(status);
683
684         /* find the entry, and delete it */
685         for (i=0;i<file.num_pending;i++) {
686                 if (private == file.pending[i].notify_ptr &&
687                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.pending[i].server)) {
688                         if (i < file.num_pending-1) {
689                                 memmove(file.pending+i, file.pending+i+1, 
690                                         (file.num_pending - (i+1)) * 
691                                         sizeof(struct opendb_pending));
692                         }
693                         break;
694                 }
695         }
696
697         if (i == file.num_pending) {
698                 return NT_STATUS_UNSUCCESSFUL;
699         }
700
701         file.num_pending--;
702         
703         return odb_push_record(lck, &file);
704 }
705
706
707 /*
708   rename the path in a open file
709 */
710 static NTSTATUS odb_tdb_rename(struct odb_lock *lck, const char *path)
711 {
712         struct opendb_file file;
713         NTSTATUS status;
714
715         status = odb_pull_record(lck, &file);
716         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
717                 /* not having the record at all is OK */
718                 return NT_STATUS_OK;
719         }
720         NT_STATUS_NOT_OK_RETURN(status);
721
722         file.path = path;
723         return odb_push_record(lck, &file);
724 }
725
726 /*
727   get the path of an open file
728 */
729 static NTSTATUS odb_tdb_get_path(struct odb_lock *lck, const char **path)
730 {
731         struct opendb_file file;
732         NTSTATUS status;
733
734         *path = NULL;
735
736         status = odb_pull_record(lck, &file);
737         /* we don't ignore NT_STATUS_OBJECT_NAME_NOT_FOUND here */
738         NT_STATUS_NOT_OK_RETURN(status);
739
740         *path = file.path;
741
742         return NT_STATUS_OK;
743 }
744
745 /*
746   update delete on close flag on an open file
747 */
748 static NTSTATUS odb_tdb_set_delete_on_close(struct odb_lock *lck, bool del_on_close)
749 {
750         NTSTATUS status;
751         struct opendb_file file;
752
753         status = odb_pull_record(lck, &file);
754         NT_STATUS_NOT_OK_RETURN(status);
755
756         file.delete_on_close = del_on_close;
757
758         return odb_push_record(lck, &file);
759 }
760
761 /*
762   return the current value of the delete_on_close bit, and how many
763   people still have the file open
764 */
765 static NTSTATUS odb_tdb_get_delete_on_close(struct odb_context *odb, 
766                                             DATA_BLOB *key, bool *del_on_close)
767 {
768         NTSTATUS status;
769         struct opendb_file file;
770         struct odb_lock *lck;
771
772         (*del_on_close) = false;
773
774         lck = odb_lock(odb, odb, key);
775         NT_STATUS_HAVE_NO_MEMORY(lck);
776
777         status = odb_pull_record(lck, &file);
778         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
779                 talloc_free(lck);
780                 return NT_STATUS_OK;
781         }
782         if (!NT_STATUS_IS_OK(status)) {
783                 talloc_free(lck);
784                 return status;
785         }
786
787         (*del_on_close) = file.delete_on_close;
788
789         talloc_free(lck);
790
791         return NT_STATUS_OK;
792 }
793
794
795 /*
796   determine if a file can be opened with the given share_access,
797   create_options and access_mask
798 */
799 static NTSTATUS odb_tdb_can_open(struct odb_lock *lck,
800                                  uint32_t stream_id, uint32_t share_access,
801                                  uint32_t access_mask, bool delete_on_close,
802                                  uint32_t open_disposition, bool break_to_none)
803 {
804         struct odb_context *odb = lck->odb;
805         NTSTATUS status;
806         struct opendb_file file;
807         bool attrs_only = false;
808
809         status = odb_pull_record(lck, &file);
810         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
811                 return NT_STATUS_OK;
812         }
813         NT_STATUS_NOT_OK_RETURN(status);
814
815         status = odb_tdb_open_can_internal(odb, &file, stream_id,
816                                            share_access, access_mask,
817                                            delete_on_close, open_disposition,
818                                            break_to_none, &attrs_only);
819         NT_STATUS_NOT_OK_RETURN(status);
820
821         return NT_STATUS_OK;
822 }
823
824
825 static const struct opendb_ops opendb_tdb_ops = {
826         .odb_init                = odb_tdb_init,
827         .odb_lock                = odb_tdb_lock,
828         .odb_get_key             = odb_tdb_get_key,
829         .odb_open_file           = odb_tdb_open_file,
830         .odb_open_file_pending   = odb_tdb_open_file_pending,
831         .odb_close_file          = odb_tdb_close_file,
832         .odb_remove_pending      = odb_tdb_remove_pending,
833         .odb_rename              = odb_tdb_rename,
834         .odb_get_path            = odb_tdb_get_path,
835         .odb_set_delete_on_close = odb_tdb_set_delete_on_close,
836         .odb_get_delete_on_close = odb_tdb_get_delete_on_close,
837         .odb_can_open            = odb_tdb_can_open,
838         .odb_update_oplock       = odb_tdb_update_oplock,
839         .odb_break_oplocks       = odb_tdb_break_oplocks
840 };
841
842
843 void odb_tdb_init_ops(void)
844 {
845         odb_set_ops(&opendb_tdb_ops);
846 }