opendb: force odb_can_open() before odb_open_file()
[bbaumbach/samba-autobuild/.git] / source4 / ntvfs / common / opendb_tdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Andrew Tridgell 2004
5    Copyright (C) Stefan Metzmacher 2008
6
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the open files database, tdb backend. It implements shared
23   storage of what files are open between server instances, and
24   implements the rules of shared access to files.
25
26   The caller needs to provide a file_key, which specifies what file
27   they are talking about. This needs to be a unique key across all
28   filesystems, and is usually implemented in terms of a device/inode
29   pair.
30
31   Before any operations can be performed the caller needs to establish
32   a lock on the record associated with file_key. That is done by
33   calling odb_lock(). The caller releases this lock by calling
34   talloc_free() on the returned handle.
35
36   All other operations on a record are done by passing the odb_lock()
37   handle back to this module. The handle contains internal
38   information about what file_key is being operated on.
39 */
40
41 #include "includes.h"
42 #include "system/filesys.h"
43 #include "lib/tdb/include/tdb.h"
44 #include "messaging/messaging.h"
45 #include "tdb_wrap.h"
46 #include "lib/messaging/irpc.h"
47 #include "librpc/gen_ndr/ndr_opendb.h"
48 #include "ntvfs/ntvfs.h"
49 #include "ntvfs/common/ntvfs_common.h"
50 #include "cluster/cluster.h"
51 #include "param/param.h"
52
53 struct odb_context {
54         struct tdb_wrap *w;
55         struct ntvfs_context *ntvfs_ctx;
56         bool oplocks;
57 };
58
59 /*
60   an odb lock handle. You must obtain one of these using odb_lock() before doing
61   any other operations. 
62 */
63 struct odb_lock {
64         struct odb_context *odb;
65         TDB_DATA key;
66
67         struct {
68                 struct opendb_entry *e;
69                 bool attrs_only;
70         } can_open;
71 };
72
73 /*
74   Open up the openfiles.tdb database. Close it down using
75   talloc_free(). We need the messaging_ctx to allow for pending open
76   notifications.
77 */
78 static struct odb_context *odb_tdb_init(TALLOC_CTX *mem_ctx, 
79                                         struct ntvfs_context *ntvfs_ctx)
80 {
81         struct odb_context *odb;
82
83         odb = talloc(mem_ctx, struct odb_context);
84         if (odb == NULL) {
85                 return NULL;
86         }
87
88         odb->w = cluster_tdb_tmp_open(odb, ntvfs_ctx->lp_ctx, "openfiles.tdb", TDB_DEFAULT);
89         if (odb->w == NULL) {
90                 talloc_free(odb);
91                 return NULL;
92         }
93
94         odb->ntvfs_ctx = ntvfs_ctx;
95
96         /* leave oplocks disabled by default until the code is working */
97         odb->oplocks = lp_parm_bool(ntvfs_ctx->lp_ctx, NULL, "opendb", "oplocks", false);
98
99         return odb;
100 }
101
102 /*
103   destroy a lock on the database
104 */
105 static int odb_lock_destructor(struct odb_lock *lck)
106 {
107         tdb_chainunlock(lck->odb->w->tdb, lck->key);
108         return 0;
109 }
110
111 /*
112   get a lock on a entry in the odb. This call returns a lock handle,
113   which the caller should unlock using talloc_free().
114 */
115 static struct odb_lock *odb_tdb_lock(TALLOC_CTX *mem_ctx,
116                                      struct odb_context *odb, DATA_BLOB *file_key)
117 {
118         struct odb_lock *lck;
119
120         lck = talloc(mem_ctx, struct odb_lock);
121         if (lck == NULL) {
122                 return NULL;
123         }
124
125         lck->odb = talloc_reference(lck, odb);
126         lck->key.dptr = talloc_memdup(lck, file_key->data, file_key->length);
127         lck->key.dsize = file_key->length;
128         if (lck->key.dptr == NULL) {
129                 talloc_free(lck);
130                 return NULL;
131         }
132
133         if (tdb_chainlock(odb->w->tdb, lck->key) != 0) {
134                 talloc_free(lck);
135                 return NULL;
136         }
137
138         ZERO_STRUCT(lck->can_open);
139
140         talloc_set_destructor(lck, odb_lock_destructor);
141         
142         return lck;
143 }
144
145 static DATA_BLOB odb_tdb_get_key(TALLOC_CTX *mem_ctx, struct odb_lock *lck)
146 {
147         return data_blob_talloc(mem_ctx, lck->key.dptr, lck->key.dsize);
148 }
149
150
151 /*
152   determine if two odb_entry structures conflict
153
154   return NT_STATUS_OK on no conflict
155 */
156 static NTSTATUS share_conflict(struct opendb_entry *e1,
157                                uint32_t stream_id,
158                                uint32_t share_access,
159                                uint32_t access_mask)
160 {
161         /* if either open involves no read.write or delete access then
162            it can't conflict */
163         if (!(e1->access_mask & (SEC_FILE_WRITE_DATA |
164                                  SEC_FILE_APPEND_DATA |
165                                  SEC_FILE_READ_DATA |
166                                  SEC_FILE_EXECUTE |
167                                  SEC_STD_DELETE))) {
168                 return NT_STATUS_OK;
169         }
170         if (!(access_mask & (SEC_FILE_WRITE_DATA |
171                              SEC_FILE_APPEND_DATA |
172                              SEC_FILE_READ_DATA |
173                              SEC_FILE_EXECUTE |
174                              SEC_STD_DELETE))) {
175                 return NT_STATUS_OK;
176         }
177
178         /* data IO access masks. This is skipped if the two open handles
179            are on different streams (as in that case the masks don't
180            interact) */
181         if (e1->stream_id != stream_id) {
182                 return NT_STATUS_OK;
183         }
184
185 #define CHECK_MASK(am, right, sa, share) \
186         if (((am) & (right)) && !((sa) & (share))) return NT_STATUS_SHARING_VIOLATION
187
188         CHECK_MASK(e1->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
189                    share_access, NTCREATEX_SHARE_ACCESS_WRITE);
190         CHECK_MASK(access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
191                    e1->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
192         
193         CHECK_MASK(e1->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
194                    share_access, NTCREATEX_SHARE_ACCESS_READ);
195         CHECK_MASK(access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
196                    e1->share_access, NTCREATEX_SHARE_ACCESS_READ);
197
198         CHECK_MASK(e1->access_mask, SEC_STD_DELETE,
199                    share_access, NTCREATEX_SHARE_ACCESS_DELETE);
200         CHECK_MASK(access_mask, SEC_STD_DELETE,
201                    e1->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
202 #undef CHECK_MASK
203         return NT_STATUS_OK;
204 }
205
206 /*
207   pull a record, translating from the db format to the opendb_file structure defined
208   in opendb.idl
209 */
210 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file)
211 {
212         struct odb_context *odb = lck->odb;
213         TDB_DATA dbuf;
214         DATA_BLOB blob;
215         enum ndr_err_code ndr_err;
216
217         dbuf = tdb_fetch(odb->w->tdb, lck->key);
218         if (dbuf.dptr == NULL) {
219                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
220         }
221
222         blob.data = dbuf.dptr;
223         blob.length = dbuf.dsize;
224
225         ndr_err = ndr_pull_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_pull_flags_fn_t)ndr_pull_opendb_file);
226         free(dbuf.dptr);
227         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
228                 return ndr_map_error2ntstatus(ndr_err);
229         }
230
231         return NT_STATUS_OK;
232 }
233
234 /*
235   push a record, translating from the opendb_file structure defined in opendb.idl
236 */
237 static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file)
238 {
239         struct odb_context *odb = lck->odb;
240         TDB_DATA dbuf;
241         DATA_BLOB blob;
242         enum ndr_err_code ndr_err;
243         int ret;
244
245         if (file->num_entries == 0) {
246                 ret = tdb_delete(odb->w->tdb, lck->key);
247                 if (ret != 0) {
248                         return NT_STATUS_INTERNAL_DB_CORRUPTION;
249                 }
250                 return NT_STATUS_OK;
251         }
252
253         ndr_err = ndr_push_struct_blob(&blob, lck, lp_iconv_convenience(lck->odb->ntvfs_ctx->lp_ctx), file, (ndr_push_flags_fn_t)ndr_push_opendb_file);
254         if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
255                 return ndr_map_error2ntstatus(ndr_err);
256         }
257
258         dbuf.dptr = blob.data;
259         dbuf.dsize = blob.length;
260                 
261         ret = tdb_store(odb->w->tdb, lck->key, dbuf, TDB_REPLACE);
262         data_blob_free(&blob);
263         if (ret != 0) {
264                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
265         }
266
267         return NT_STATUS_OK;
268 }
269
270 /*
271   send an oplock break to a client
272 */
273 static NTSTATUS odb_oplock_break_send(struct messaging_context *msg_ctx,
274                                       struct opendb_entry *e,
275                                       uint8_t level)
276 {
277         NTSTATUS status;
278         struct opendb_oplock_break op_break;
279         DATA_BLOB blob;
280
281         ZERO_STRUCT(op_break);
282
283         /* tell the server handling this open file about the need to send the client
284            a break */
285         op_break.file_handle    = e->file_handle;
286         op_break.level          = level;
287
288         blob = data_blob_const(&op_break, sizeof(op_break));
289
290         status = messaging_send(msg_ctx, e->server,
291                                 MSG_NTVFS_OPLOCK_BREAK, &blob);
292         NT_STATUS_NOT_OK_RETURN(status);
293
294         return NT_STATUS_OK;
295 }
296
297 static bool access_attributes_only(uint32_t access_mask,
298                                    uint32_t open_disposition,
299                                    bool break_to_none)
300 {
301         switch (open_disposition) {
302         case NTCREATEX_DISP_SUPERSEDE:
303         case NTCREATEX_DISP_OVERWRITE_IF:
304         case NTCREATEX_DISP_OVERWRITE:
305                 return false;
306         default:
307                 break;
308         }
309
310         if (break_to_none) {
311                 return false;
312         }
313
314 #define CHECK_MASK(m,g) ((m) && (((m) & ~(g))==0) && (((m) & (g)) != 0))
315         return CHECK_MASK(access_mask,
316                           SEC_STD_SYNCHRONIZE |
317                           SEC_FILE_READ_ATTRIBUTE |
318                           SEC_FILE_WRITE_ATTRIBUTE);
319 #undef CHECK_MASK
320 }
321
322 static NTSTATUS odb_tdb_open_can_internal(struct odb_context *odb,
323                                           const struct opendb_file *file,
324                                           uint32_t stream_id, uint32_t share_access,
325                                           uint32_t access_mask, bool delete_on_close,
326                                           uint32_t open_disposition, bool break_to_none,
327                                           bool *_attrs_only)
328 {
329         NTSTATUS status;
330         uint32_t i;
331         bool attrs_only = false;
332
333         /* see if anyone has an oplock, which we need to break */
334         for (i=0;i<file->num_entries;i++) {
335                 if (file->entries[i].oplock_level == OPLOCK_BATCH) {
336                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
337                         /* if this is an attribute only access
338                          * it doesn't conflict with a BACTCH oplock
339                          * but we'll not grant the oplock below
340                          */
341                         attrs_only = access_attributes_only(access_mask,
342                                                             open_disposition,
343                                                             break_to_none);
344                         if (attrs_only) {
345                                 break;
346                         }
347                         /* a batch oplock caches close calls, which
348                            means the client application might have
349                            already closed the file. We have to allow
350                            this close to propogate by sending a oplock
351                            break request and suspending this call
352                            until the break is acknowledged or the file
353                            is closed */
354                         if (break_to_none ||
355                             !file->entries[i].allow_level_II_oplock) {
356                                 oplock_return = OPLOCK_BREAK_TO_NONE;
357                         }
358                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
359                                               &file->entries[i],
360                                               oplock_return);
361                         return NT_STATUS_OPLOCK_NOT_GRANTED;
362                 }
363         }
364
365         if (file->delete_on_close) {
366                 /* while delete on close is set, no new opens are allowed */
367                 return NT_STATUS_DELETE_PENDING;
368         }
369
370         if (file->num_entries != 0 && delete_on_close) {
371                 return NT_STATUS_SHARING_VIOLATION;
372         }
373
374         /* check for sharing violations */
375         for (i=0;i<file->num_entries;i++) {
376                 status = share_conflict(&file->entries[i], stream_id,
377                                         share_access, access_mask);
378                 NT_STATUS_NOT_OK_RETURN(status);
379         }
380
381         /* we now know the open could succeed, but we need to check
382            for any exclusive oplocks. We can't grant a second open
383            till these are broken. Note that we check for batch oplocks
384            before checking for sharing violations, and check for
385            exclusive oplocks afterwards. */
386         for (i=0;i<file->num_entries;i++) {
387                 if (file->entries[i].oplock_level == OPLOCK_EXCLUSIVE) {
388                         bool oplock_return = OPLOCK_BREAK_TO_LEVEL_II;
389                         /* if this is an attribute only access
390                          * it doesn't conflict with an EXCLUSIVE oplock
391                          * but we'll not grant the oplock below
392                          */
393                         attrs_only = access_attributes_only(access_mask,
394                                                             open_disposition,
395                                                             break_to_none);
396                         if (attrs_only) {
397                                 break;
398                         }
399                         /*
400                          * send an oplock break to the holder of the
401                          * oplock and tell caller to retry later
402                          */
403                         if (break_to_none ||
404                             !file->entries[i].allow_level_II_oplock) {
405                                 oplock_return = OPLOCK_BREAK_TO_NONE;
406                         }
407                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
408                                               &file->entries[i],
409                                               oplock_return);
410                         return NT_STATUS_OPLOCK_NOT_GRANTED;
411                 }
412         }
413
414         if (_attrs_only) {
415                 *_attrs_only = attrs_only;
416         }
417         return NT_STATUS_OK;
418 }
419
420 /*
421   register an open file in the open files database.
422   The share_access rules are implemented by odb_can_open()
423   and it's needed to call odb_can_open() before
424   odb_open_file() otherwise NT_STATUS_INTERNAL_ERROR is returned
425
426   Note that the path is only used by the delete on close logic, not
427   for comparing with other filenames
428 */
429 static NTSTATUS odb_tdb_open_file(struct odb_lock *lck,
430                                   void *file_handle, const char *path,
431                                   bool allow_level_II_oplock,
432                                   uint32_t oplock_level, uint32_t *oplock_granted)
433 {
434         struct odb_context *odb = lck->odb;
435         struct opendb_file file;
436         NTSTATUS status;
437
438         if (!lck->can_open.e) {
439                 return NT_STATUS_INTERNAL_ERROR;
440         }
441
442         if (odb->oplocks == false) {
443                 oplock_level = OPLOCK_NONE;
444         }
445
446         if (!oplock_granted) {
447                 oplock_level = OPLOCK_NONE;
448         }
449
450         status = odb_pull_record(lck, &file);
451         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
452                 /* initialise a blank structure */
453                 ZERO_STRUCT(file);
454                 file.path = path;
455         } else {
456                 NT_STATUS_NOT_OK_RETURN(status);
457         }
458
459         /*
460           possibly grant an exclusive, batch or level2 oplock
461         */
462         if (lck->can_open.attrs_only) {
463                 oplock_level    = OPLOCK_NONE;
464         } else if (oplock_level == OPLOCK_EXCLUSIVE) {
465                 if (file.num_entries == 0) {
466                         oplock_level    = OPLOCK_EXCLUSIVE;
467                 } else if (allow_level_II_oplock) {
468                         oplock_level    = OPLOCK_LEVEL_II;
469                 } else {
470                         oplock_level    = OPLOCK_NONE;
471                 }
472         } else if (oplock_level == OPLOCK_BATCH) {
473                 if (file.num_entries == 0) {
474                         oplock_level    = OPLOCK_BATCH;
475                 } else if (allow_level_II_oplock) {
476                         oplock_level    = OPLOCK_LEVEL_II;
477                 } else {
478                         oplock_level    = OPLOCK_NONE;
479                 }
480         } else if (oplock_level == OPLOCK_LEVEL_II) {
481                 oplock_level    = OPLOCK_LEVEL_II;
482         } else {
483                 oplock_level    = OPLOCK_NONE;
484         }
485
486         if (oplock_granted) {
487                 if (oplock_level == OPLOCK_EXCLUSIVE) {
488                         *oplock_granted = EXCLUSIVE_OPLOCK_RETURN;
489                 } else if (oplock_level == OPLOCK_BATCH) {
490                         *oplock_granted = BATCH_OPLOCK_RETURN;
491                 } else if (oplock_level == OPLOCK_LEVEL_II) {
492                         *oplock_granted = LEVEL_II_OPLOCK_RETURN;
493                 } else {
494                         *oplock_granted = NO_OPLOCK_RETURN;
495                 }
496         }
497
498         lck->can_open.e->file_handle            = file_handle;
499         lck->can_open.e->allow_level_II_oplock  = allow_level_II_oplock;
500         lck->can_open.e->oplock_level           = oplock_level;
501
502         /* it doesn't conflict, so add it to the end */
503         file.entries = talloc_realloc(lck, file.entries, struct opendb_entry, 
504                                       file.num_entries+1);
505         NT_STATUS_HAVE_NO_MEMORY(file.entries);
506
507         file.entries[file.num_entries] = *lck->can_open.e;
508         file.num_entries++;
509
510         return odb_push_record(lck, &file);
511 }
512
513
514 /*
515   register a pending open file in the open files database
516 */
517 static NTSTATUS odb_tdb_open_file_pending(struct odb_lock *lck, void *private)
518 {
519         struct odb_context *odb = lck->odb;
520         struct opendb_file file;
521         NTSTATUS status;
522                 
523         status = odb_pull_record(lck, &file);
524         NT_STATUS_NOT_OK_RETURN(status);
525
526         file.pending = talloc_realloc(lck, file.pending, struct opendb_pending, 
527                                       file.num_pending+1);
528         NT_STATUS_HAVE_NO_MEMORY(file.pending);
529
530         file.pending[file.num_pending].server = odb->ntvfs_ctx->server_id;
531         file.pending[file.num_pending].notify_ptr = private;
532
533         file.num_pending++;
534
535         return odb_push_record(lck, &file);
536 }
537
538
539 /*
540   remove a opendb entry
541 */
542 static NTSTATUS odb_tdb_close_file(struct odb_lock *lck, void *file_handle,
543                                    const char **_delete_path)
544 {
545         struct odb_context *odb = lck->odb;
546         struct opendb_file file;
547         const char *delete_path = NULL;
548         int i;
549         NTSTATUS status;
550
551         status = odb_pull_record(lck, &file);
552         NT_STATUS_NOT_OK_RETURN(status);
553
554         /* find the entry, and delete it */
555         for (i=0;i<file.num_entries;i++) {
556                 if (file_handle == file.entries[i].file_handle &&
557                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.entries[i].server)) {
558                         if (file.entries[i].delete_on_close) {
559                                 file.delete_on_close = true;
560                         }
561                         if (i < file.num_entries-1) {
562                                 memmove(file.entries+i, file.entries+i+1, 
563                                         (file.num_entries - (i+1)) * 
564                                         sizeof(struct opendb_entry));
565                         }
566                         break;
567                 }
568         }
569
570         if (i == file.num_entries) {
571                 return NT_STATUS_UNSUCCESSFUL;
572         }
573
574         /* send any pending notifications, removing them once sent */
575         for (i=0;i<file.num_pending;i++) {
576                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx, file.pending[i].server, 
577                                    MSG_PVFS_RETRY_OPEN, 
578                                    file.pending[i].notify_ptr);
579         }
580         file.num_pending = 0;
581
582         file.num_entries--;
583
584         if (file.num_entries == 0 && file.delete_on_close) {
585                 delete_path = talloc_strdup(lck, file.path);
586                 NT_STATUS_HAVE_NO_MEMORY(delete_path);
587         }
588
589         if (_delete_path) {
590                 *_delete_path = delete_path;
591         }
592
593         return odb_push_record(lck, &file);
594 }
595
596 /*
597   update the oplock level of the client
598 */
599 static NTSTATUS odb_tdb_update_oplock(struct odb_lock *lck, void *file_handle,
600                                       uint32_t oplock_level)
601 {
602         struct odb_context *odb = lck->odb;
603         struct opendb_file file;
604         int i;
605         NTSTATUS status;
606
607         status = odb_pull_record(lck, &file);
608         NT_STATUS_NOT_OK_RETURN(status);
609
610         /* find the entry, and update it */
611         for (i=0;i<file.num_entries;i++) {
612                 if (file_handle == file.entries[i].file_handle &&
613                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.entries[i].server)) {
614                         file.entries[i].oplock_level = oplock_level;
615                         break;
616                 }
617         }
618
619         if (i == file.num_entries) {
620                 return NT_STATUS_UNSUCCESSFUL;
621         }
622
623         /* send any pending notifications, removing them once sent */
624         for (i=0;i<file.num_pending;i++) {
625                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx,
626                                    file.pending[i].server,
627                                    MSG_PVFS_RETRY_OPEN,
628                                    file.pending[i].notify_ptr);
629         }
630         file.num_pending = 0;
631
632         return odb_push_record(lck, &file);
633 }
634
635 /*
636   send oplocks breaks to none to all level2 holders
637 */
638 static NTSTATUS odb_tdb_break_oplocks(struct odb_lock *lck)
639 {
640         struct odb_context *odb = lck->odb;
641         NTSTATUS status;
642         struct opendb_file file;
643         int i;
644         bool modified = true;
645
646         status = odb_pull_record(lck, &file);
647         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
648                 return NT_STATUS_OK;
649         }
650         NT_STATUS_NOT_OK_RETURN(status);
651
652         /* see if anyone has an oplock, which we need to break */
653         for (i=0;i<file.num_entries;i++) {
654                 if (file.entries[i].oplock_level == OPLOCK_LEVEL_II) {
655                         /*
656                          * there could be multiple level2 oplocks
657                          * and we just send a break to none to all of them
658                          * without waiting for a release
659                          */
660                         odb_oplock_break_send(odb->ntvfs_ctx->msg_ctx,
661                                               &file.entries[i],
662                                               OPLOCK_BREAK_TO_NONE);
663                         file.entries[i].oplock_level = OPLOCK_NONE;
664                         modified = true;
665                 }
666         }
667
668         if (modified) {
669                 return odb_push_record(lck, &file);
670         }
671         return NT_STATUS_OK;
672 }
673
674 /*
675   remove a pending opendb entry
676 */
677 static NTSTATUS odb_tdb_remove_pending(struct odb_lock *lck, void *private)
678 {
679         struct odb_context *odb = lck->odb;
680         int i;
681         NTSTATUS status;
682         struct opendb_file file;
683
684         status = odb_pull_record(lck, &file);
685         NT_STATUS_NOT_OK_RETURN(status);
686
687         /* find the entry, and delete it */
688         for (i=0;i<file.num_pending;i++) {
689                 if (private == file.pending[i].notify_ptr &&
690                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.pending[i].server)) {
691                         if (i < file.num_pending-1) {
692                                 memmove(file.pending+i, file.pending+i+1, 
693                                         (file.num_pending - (i+1)) * 
694                                         sizeof(struct opendb_pending));
695                         }
696                         break;
697                 }
698         }
699
700         if (i == file.num_pending) {
701                 return NT_STATUS_UNSUCCESSFUL;
702         }
703
704         file.num_pending--;
705         
706         return odb_push_record(lck, &file);
707 }
708
709
710 /*
711   rename the path in a open file
712 */
713 static NTSTATUS odb_tdb_rename(struct odb_lock *lck, const char *path)
714 {
715         struct opendb_file file;
716         NTSTATUS status;
717
718         status = odb_pull_record(lck, &file);
719         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
720                 /* not having the record at all is OK */
721                 return NT_STATUS_OK;
722         }
723         NT_STATUS_NOT_OK_RETURN(status);
724
725         file.path = path;
726         return odb_push_record(lck, &file);
727 }
728
729 /*
730   get the path of an open file
731 */
732 static NTSTATUS odb_tdb_get_path(struct odb_lock *lck, const char **path)
733 {
734         struct opendb_file file;
735         NTSTATUS status;
736
737         *path = NULL;
738
739         status = odb_pull_record(lck, &file);
740         /* we don't ignore NT_STATUS_OBJECT_NAME_NOT_FOUND here */
741         NT_STATUS_NOT_OK_RETURN(status);
742
743         *path = file.path;
744
745         return NT_STATUS_OK;
746 }
747
748 /*
749   update delete on close flag on an open file
750 */
751 static NTSTATUS odb_tdb_set_delete_on_close(struct odb_lock *lck, bool del_on_close)
752 {
753         NTSTATUS status;
754         struct opendb_file file;
755
756         status = odb_pull_record(lck, &file);
757         NT_STATUS_NOT_OK_RETURN(status);
758
759         file.delete_on_close = del_on_close;
760
761         return odb_push_record(lck, &file);
762 }
763
764 /*
765   return the current value of the delete_on_close bit, and how many
766   people still have the file open
767 */
768 static NTSTATUS odb_tdb_get_delete_on_close(struct odb_context *odb, 
769                                             DATA_BLOB *key, bool *del_on_close)
770 {
771         NTSTATUS status;
772         struct opendb_file file;
773         struct odb_lock *lck;
774
775         (*del_on_close) = false;
776
777         lck = odb_lock(odb, odb, key);
778         NT_STATUS_HAVE_NO_MEMORY(lck);
779
780         status = odb_pull_record(lck, &file);
781         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
782                 talloc_free(lck);
783                 return NT_STATUS_OK;
784         }
785         if (!NT_STATUS_IS_OK(status)) {
786                 talloc_free(lck);
787                 return status;
788         }
789
790         (*del_on_close) = file.delete_on_close;
791
792         talloc_free(lck);
793
794         return NT_STATUS_OK;
795 }
796
797
798 /*
799   determine if a file can be opened with the given share_access,
800   create_options and access_mask
801 */
802 static NTSTATUS odb_tdb_can_open(struct odb_lock *lck,
803                                  uint32_t stream_id, uint32_t share_access,
804                                  uint32_t access_mask, bool delete_on_close,
805                                  uint32_t open_disposition, bool break_to_none)
806 {
807         struct odb_context *odb = lck->odb;
808         NTSTATUS status;
809         struct opendb_file file;
810
811         status = odb_pull_record(lck, &file);
812         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
813                 goto ok;
814         }
815         NT_STATUS_NOT_OK_RETURN(status);
816
817         status = odb_tdb_open_can_internal(odb, &file, stream_id,
818                                            share_access, access_mask,
819                                            delete_on_close, open_disposition,
820                                            break_to_none, &lck->can_open.attrs_only);
821         NT_STATUS_NOT_OK_RETURN(status);
822
823 ok:
824         lck->can_open.e = talloc(lck, struct opendb_entry);
825         NT_STATUS_HAVE_NO_MEMORY(lck->can_open.e);
826
827         lck->can_open.e->server                 = odb->ntvfs_ctx->server_id;
828         lck->can_open.e->file_handle            = NULL;
829         lck->can_open.e->stream_id              = stream_id;
830         lck->can_open.e->share_access           = share_access;
831         lck->can_open.e->access_mask            = access_mask;
832         lck->can_open.e->delete_on_close        = delete_on_close;
833         lck->can_open.e->allow_level_II_oplock  = false;
834         lck->can_open.e->oplock_level           = OPLOCK_NONE;
835
836         return NT_STATUS_OK;
837 }
838
839
840 static const struct opendb_ops opendb_tdb_ops = {
841         .odb_init                = odb_tdb_init,
842         .odb_lock                = odb_tdb_lock,
843         .odb_get_key             = odb_tdb_get_key,
844         .odb_open_file           = odb_tdb_open_file,
845         .odb_open_file_pending   = odb_tdb_open_file_pending,
846         .odb_close_file          = odb_tdb_close_file,
847         .odb_remove_pending      = odb_tdb_remove_pending,
848         .odb_rename              = odb_tdb_rename,
849         .odb_get_path            = odb_tdb_get_path,
850         .odb_set_delete_on_close = odb_tdb_set_delete_on_close,
851         .odb_get_delete_on_close = odb_tdb_get_delete_on_close,
852         .odb_can_open            = odb_tdb_can_open,
853         .odb_update_oplock       = odb_tdb_update_oplock,
854         .odb_break_oplocks       = odb_tdb_break_oplocks
855 };
856
857
858 void odb_tdb_init_ops(void)
859 {
860         odb_set_ops(&opendb_tdb_ops);
861 }