ed047b76bac3b970a9250c636700fd9e3538765f
[sfrench/samba-autobuild/.git] / source4 / cluster / ctdb / opendb_ctdb.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    Copyright (C) Ronnie Sahlberg 2007
5    Copyright (C) Andrew Tridgell 2007
6    
7    This program is free software; you can redistribute it and/or modify
8    it under the terms of the GNU General Public License as published by
9    the Free Software Foundation; either version 3 of the License, or
10    (at your option) any later version.
11    
12    This program is distributed in the hope that it will be useful,
13    but WITHOUT ANY WARRANTY; without even the implied warranty of
14    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15    GNU General Public License for more details.
16    
17    You should have received a copy of the GNU General Public License
18    along with this program.  If not, see <http://www.gnu.org/licenses/>.
19 */
20
21 /*
22   this is the open files database, ctdb backend. It implements shared
23   storage of what files are open between server instances, and
24   implements the rules of shared access to files.
25
26   The caller needs to provide a file_key, which specifies what file
27   they are talking about. This needs to be a unique key across all
28   filesystems, and is usually implemented in terms of a device/inode
29   pair.
30
31   Before any operations can be performed the caller needs to establish
32   a lock on the record associated with file_key. That is done by
33   calling odb_lock(). The caller releases this lock by calling
34   talloc_free() on the returned handle.
35
36   All other operations on a record are done by passing the odb_lock()
37   handle back to this module. The handle contains internal
38   information about what file_key is being operated on.
39 */
40
41 #include "includes.h"
42 #include "system/filesys.h"
43 #include "lib/tdb/include/tdb.h"
44 #include "messaging/messaging.h"
45 #include "db_wrap.h"
46 #include "lib/messaging/irpc.h"
47 #include "librpc/gen_ndr/ndr_opendb.h"
48 #include "ntvfs/ntvfs.h"
49 #include "ntvfs/common/ntvfs_common.h"
50 #include "cluster/cluster.h"
51 #include "include/ctdb.h"
52
53 struct odb_context {
54         struct ctdb_context *ctdb;
55         struct ctdb_db_context *ctdb_db;
56         struct ntvfs_context *ntvfs_ctx;
57         BOOL oplocks;
58 };
59
60 /*
61   an odb lock handle. You must obtain one of these using odb_lock() before doing
62   any other operations. 
63 */
64 struct odb_lock {
65         struct odb_context *odb;
66         struct ctdb_record_handle *rec;
67         TDB_DATA key;
68         TDB_DATA data;
69 };
70
71 /*
72   Open up the openfiles.tdb database. Close it down using
73   talloc_free(). We need the messaging_ctx to allow for pending open
74   notifications.
75 */
76 static struct odb_context *odb_ctdb_init(TALLOC_CTX *mem_ctx, 
77                                         struct ntvfs_context *ntvfs_ctx)
78 {
79         struct odb_context *odb;
80         struct ctdb_context *ctdb = talloc_get_type(cluster_backend_handle(), 
81                                                     struct ctdb_context);
82
83         odb = talloc(mem_ctx, struct odb_context);
84         if (odb == NULL) {
85                 return NULL;
86         }
87
88         odb->ctdb = ctdb;
89         odb->ctdb_db = ctdb_db_handle(ctdb, "opendb");
90         if (!odb->ctdb_db) {
91                 DEBUG(0,("Failed to get attached ctdb db handle for opendb\n"));
92                 talloc_free(odb);
93                 return NULL;
94         }
95
96         odb->ntvfs_ctx = ntvfs_ctx;
97
98         /* leave oplocks disabled by default until the code is working */
99         odb->oplocks = lp_parm_bool(-1, "opendb", "oplocks", False);
100
101         return odb;
102 }
103
104 /*
105   get a lock on a entry in the odb. This call returns a lock handle,
106   which the caller should unlock using talloc_free().
107 */
108 static struct odb_lock *odb_ctdb_lock(TALLOC_CTX *mem_ctx,
109                                      struct odb_context *odb, DATA_BLOB *file_key)
110 {
111         struct odb_lock *lck;
112
113         lck = talloc(mem_ctx, struct odb_lock);
114         if (lck == NULL) {
115                 return NULL;
116         }
117
118         lck->odb = talloc_reference(lck, odb);
119         lck->key.dptr = talloc_memdup(lck, file_key->data, file_key->length);
120         lck->key.dsize = file_key->length;
121         if (lck->key.dptr == NULL) {
122                 talloc_free(lck);
123                 return NULL;
124         }
125
126         lck->rec = ctdb_fetch_lock(odb->ctdb_db, (TALLOC_CTX *)lck, lck->key, &lck->data);
127         if (!lck->rec) {
128                 talloc_free(lck);
129                 return NULL;
130         }
131
132         return lck;
133 }
134
135 /*
136   determine if two odb_entry structures conflict
137
138   return NT_STATUS_OK on no conflict
139 */
140 static NTSTATUS share_conflict(struct opendb_entry *e1, struct opendb_entry *e2)
141 {
142         /* if either open involves no read.write or delete access then
143            it can't conflict */
144         if (!(e1->access_mask & (SEC_FILE_WRITE_DATA |
145                                  SEC_FILE_APPEND_DATA |
146                                  SEC_FILE_READ_DATA |
147                                  SEC_FILE_EXECUTE |
148                                  SEC_STD_DELETE))) {
149                 return NT_STATUS_OK;
150         }
151         if (!(e2->access_mask & (SEC_FILE_WRITE_DATA |
152                                  SEC_FILE_APPEND_DATA |
153                                  SEC_FILE_READ_DATA |
154                                  SEC_FILE_EXECUTE |
155                                  SEC_STD_DELETE))) {
156                 return NT_STATUS_OK;
157         }
158
159         /* data IO access masks. This is skipped if the two open handles
160            are on different streams (as in that case the masks don't
161            interact) */
162         if (e1->stream_id != e2->stream_id) {
163                 return NT_STATUS_OK;
164         }
165
166 #define CHECK_MASK(am, right, sa, share) \
167         if (((am) & (right)) && !((sa) & (share))) return NT_STATUS_SHARING_VIOLATION
168
169         CHECK_MASK(e1->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
170                    e2->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
171         CHECK_MASK(e2->access_mask, SEC_FILE_WRITE_DATA | SEC_FILE_APPEND_DATA,
172                    e1->share_access, NTCREATEX_SHARE_ACCESS_WRITE);
173         
174         CHECK_MASK(e1->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
175                    e2->share_access, NTCREATEX_SHARE_ACCESS_READ);
176         CHECK_MASK(e2->access_mask, SEC_FILE_READ_DATA | SEC_FILE_EXECUTE,
177                    e1->share_access, NTCREATEX_SHARE_ACCESS_READ);
178
179         CHECK_MASK(e1->access_mask, SEC_STD_DELETE,
180                    e2->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
181         CHECK_MASK(e2->access_mask, SEC_STD_DELETE,
182                    e1->share_access, NTCREATEX_SHARE_ACCESS_DELETE);
183
184         return NT_STATUS_OK;
185 }
186
187 /*
188   pull a record, translating from the db format to the opendb_file structure defined
189   in opendb.idl
190 */
191 static NTSTATUS odb_pull_record(struct odb_lock *lck, struct opendb_file *file)
192 {
193         TDB_DATA dbuf;
194         DATA_BLOB blob;
195         NTSTATUS status;
196
197         dbuf = lck->data;
198
199         if (dbuf.dsize == 0) {
200                 /* empty record in ctdb means the record isn't there */
201                 return NT_STATUS_OBJECT_NAME_NOT_FOUND;
202         }
203
204         blob.data = dbuf.dptr;
205         blob.length = dbuf.dsize;
206
207         status = ndr_pull_struct_blob(&blob, lck, file, (ndr_pull_flags_fn_t)ndr_pull_opendb_file);
208
209         return status;
210 }
211
212 /*
213   push a record, translating from the opendb_file structure defined in opendb.idl
214 */
215 static NTSTATUS odb_push_record(struct odb_lock *lck, struct opendb_file *file)
216 {
217         TDB_DATA dbuf;
218         DATA_BLOB blob;
219         NTSTATUS status;
220         int ret;
221
222         if (!file->num_entries) {
223                 dbuf.dptr  = NULL;
224                 dbuf.dsize = 0;
225                 ctdb_record_store(lck->rec, dbuf);
226                 talloc_free(lck->rec);
227                 return NT_STATUS_OK;
228         }
229
230         status = ndr_push_struct_blob(&blob, lck, file, (ndr_push_flags_fn_t)ndr_push_opendb_file);
231         NT_STATUS_NOT_OK_RETURN(status);
232
233         dbuf.dptr = blob.data;
234         dbuf.dsize = blob.length;
235                 
236         ret = ctdb_record_store(lck->rec, dbuf);
237         talloc_free(lck->rec);
238         data_blob_free(&blob);
239         if (ret != 0) {
240                 return NT_STATUS_INTERNAL_DB_CORRUPTION;
241         }
242
243         return NT_STATUS_OK;
244 }
245
246 /*
247   send an oplock break to a client
248 */
249 static NTSTATUS odb_oplock_break_send(struct odb_context *odb, struct opendb_entry *e)
250 {
251         /* tell the server handling this open file about the need to send the client
252            a break */
253         return messaging_send_ptr(odb->ntvfs_ctx->msg_ctx, e->server, 
254                                   MSG_NTVFS_OPLOCK_BREAK, e->file_handle);
255 }
256
257 /*
258   register an open file in the open files database. This implements the share_access
259   rules
260
261   Note that the path is only used by the delete on close logic, not
262   for comparing with other filenames
263 */
264 static NTSTATUS odb_ctdb_open_file(struct odb_lock *lck, void *file_handle,
265                                   uint32_t stream_id, uint32_t share_access, 
266                                   uint32_t access_mask, BOOL delete_on_close,
267                                   const char *path, 
268                                   uint32_t oplock_level, uint32_t *oplock_granted)
269 {
270         struct odb_context *odb = lck->odb;
271         struct opendb_entry e;
272         int i;
273         struct opendb_file file;
274         NTSTATUS status;
275
276         if (odb->oplocks == False) {
277                 oplock_level = OPLOCK_NONE;
278         }
279
280         status = odb_pull_record(lck, &file);
281         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
282                 /* initialise a blank structure */
283                 ZERO_STRUCT(file);
284                 file.path = path;
285         } else {
286                 NT_STATUS_NOT_OK_RETURN(status);
287         }
288
289         /* see if it conflicts */
290         e.server          = odb->ntvfs_ctx->server_id;
291         e.file_handle     = file_handle;
292         e.stream_id       = stream_id;
293         e.share_access    = share_access;
294         e.access_mask     = access_mask;
295         e.delete_on_close = delete_on_close;
296         e.oplock_level    = OPLOCK_NONE;
297                 
298         /* see if anyone has an oplock, which we need to break */
299         for (i=0;i<file.num_entries;i++) {
300                 if (file.entries[i].oplock_level == OPLOCK_BATCH) {
301                         /* a batch oplock caches close calls, which
302                            means the client application might have
303                            already closed the file. We have to allow
304                            this close to propogate by sending a oplock
305                            break request and suspending this call
306                            until the break is acknowledged or the file
307                            is closed */
308                         odb_oplock_break_send(odb, &file.entries[i]);
309                         return NT_STATUS_OPLOCK_NOT_GRANTED;
310                 }
311         }
312
313         if (file.delete_on_close || 
314             (file.num_entries != 0 && delete_on_close)) {
315                 /* while delete on close is set, no new opens are allowed */
316                 return NT_STATUS_DELETE_PENDING;
317         }
318
319         /* check for sharing violations */
320         for (i=0;i<file.num_entries;i++) {
321                 status = share_conflict(&file.entries[i], &e);
322                 NT_STATUS_NOT_OK_RETURN(status);
323         }
324
325         /* we now know the open could succeed, but we need to check
326            for any exclusive oplocks. We can't grant a second open
327            till these are broken. Note that we check for batch oplocks
328            before checking for sharing violations, and check for
329            exclusive oplocks afterwards. */
330         for (i=0;i<file.num_entries;i++) {
331                 if (file.entries[i].oplock_level == OPLOCK_EXCLUSIVE) {
332                         odb_oplock_break_send(odb, &file.entries[i]);
333                         return NT_STATUS_OPLOCK_NOT_GRANTED;
334                 }
335         }
336
337         /*
338           possibly grant an exclusive or batch oplock if this is the only client
339           with the file open. We don't yet grant levelII oplocks.
340         */
341         if (oplock_granted != NULL) {
342                 if ((oplock_level == OPLOCK_BATCH ||
343                      oplock_level == OPLOCK_EXCLUSIVE) &&
344                     file.num_entries == 0) {
345                         (*oplock_granted) = oplock_level;
346                 } else {
347                         (*oplock_granted) = OPLOCK_NONE;
348                 }
349                 e.oplock_level = (*oplock_granted);
350         }
351
352         /* it doesn't conflict, so add it to the end */
353         file.entries = talloc_realloc(lck, file.entries, struct opendb_entry, 
354                                       file.num_entries+1);
355         NT_STATUS_HAVE_NO_MEMORY(file.entries);
356
357         file.entries[file.num_entries] = e;
358         file.num_entries++;
359
360         return odb_push_record(lck, &file);
361 }
362
363
364 /*
365   register a pending open file in the open files database
366 */
367 static NTSTATUS odb_ctdb_open_file_pending(struct odb_lock *lck, void *private)
368 {
369         struct odb_context *odb = lck->odb;
370         struct opendb_file file;
371         NTSTATUS status;
372                 
373         status = odb_pull_record(lck, &file);
374         NT_STATUS_NOT_OK_RETURN(status);
375
376         file.pending = talloc_realloc(lck, file.pending, struct opendb_pending, 
377                                       file.num_pending+1);
378         NT_STATUS_HAVE_NO_MEMORY(file.pending);
379
380         file.pending[file.num_pending].server = odb->ntvfs_ctx->server_id;
381         file.pending[file.num_pending].notify_ptr = private;
382
383         file.num_pending++;
384
385         return odb_push_record(lck, &file);
386 }
387
388
389 /*
390   remove a opendb entry
391 */
392 static NTSTATUS odb_ctdb_close_file(struct odb_lock *lck, void *file_handle)
393 {
394         struct odb_context *odb = lck->odb;
395         struct opendb_file file;
396         int i;
397         NTSTATUS status;
398
399         status = odb_pull_record(lck, &file);
400         NT_STATUS_NOT_OK_RETURN(status);
401
402         /* find the entry, and delete it */
403         for (i=0;i<file.num_entries;i++) {
404                 if (file_handle == file.entries[i].file_handle &&
405                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.entries[i].server)) {
406                         if (file.entries[i].delete_on_close) {
407                                 file.delete_on_close = True;
408                         }
409                         if (i < file.num_entries-1) {
410                                 memmove(file.entries+i, file.entries+i+1, 
411                                         (file.num_entries - (i+1)) * 
412                                         sizeof(struct opendb_entry));
413                         }
414                         break;
415                 }
416         }
417
418         if (i == file.num_entries) {
419                 return NT_STATUS_UNSUCCESSFUL;
420         }
421
422         /* send any pending notifications, removing them once sent */
423         for (i=0;i<file.num_pending;i++) {
424                 messaging_send_ptr(odb->ntvfs_ctx->msg_ctx, file.pending[i].server, 
425                                    MSG_PVFS_RETRY_OPEN, 
426                                    file.pending[i].notify_ptr);
427         }
428         file.num_pending = 0;
429
430         file.num_entries--;
431         
432         return odb_push_record(lck, &file);
433 }
434
435
436 /*
437   remove a pending opendb entry
438 */
439 static NTSTATUS odb_ctdb_remove_pending(struct odb_lock *lck, void *private)
440 {
441         struct odb_context *odb = lck->odb;
442         int i;
443         NTSTATUS status;
444         struct opendb_file file;
445
446         status = odb_pull_record(lck, &file);
447         NT_STATUS_NOT_OK_RETURN(status);
448
449         /* find the entry, and delete it */
450         for (i=0;i<file.num_pending;i++) {
451                 if (private == file.pending[i].notify_ptr &&
452                     cluster_id_equal(&odb->ntvfs_ctx->server_id, &file.pending[i].server)) {
453                         if (i < file.num_pending-1) {
454                                 memmove(file.pending+i, file.pending+i+1, 
455                                         (file.num_pending - (i+1)) * 
456                                         sizeof(struct opendb_pending));
457                         }
458                         break;
459                 }
460         }
461
462         if (i == file.num_pending) {
463                 return NT_STATUS_UNSUCCESSFUL;
464         }
465
466         file.num_pending--;
467         
468         return odb_push_record(lck, &file);
469 }
470
471
472 /*
473   rename the path in a open file
474 */
475 static NTSTATUS odb_ctdb_rename(struct odb_lock *lck, const char *path)
476 {
477         struct opendb_file file;
478         NTSTATUS status;
479
480         status = odb_pull_record(lck, &file);
481         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
482                 /* not having the record at all is OK */
483                 return NT_STATUS_OK;
484         }
485         NT_STATUS_NOT_OK_RETURN(status);
486
487         file.path = path;
488         return odb_push_record(lck, &file);
489 }
490
491 /*
492   update delete on close flag on an open file
493 */
494 static NTSTATUS odb_ctdb_set_delete_on_close(struct odb_lock *lck, BOOL del_on_close)
495 {
496         NTSTATUS status;
497         struct opendb_file file;
498
499         status = odb_pull_record(lck, &file);
500         NT_STATUS_NOT_OK_RETURN(status);
501
502         file.delete_on_close = del_on_close;
503
504         return odb_push_record(lck, &file);
505 }
506
507 /*
508   return the current value of the delete_on_close bit, and how many
509   people still have the file open
510 */
511 static NTSTATUS odb_ctdb_get_delete_on_close(struct odb_context *odb, 
512                                             DATA_BLOB *key, BOOL *del_on_close, 
513                                             int *open_count, char **path)
514 {
515         NTSTATUS status;
516         struct opendb_file file;
517         struct odb_lock *lck;
518
519         lck = odb_lock(odb, odb, key);
520         NT_STATUS_HAVE_NO_MEMORY(lck);
521
522         status = odb_pull_record(lck, &file);
523         if (NT_STATUS_EQUAL(NT_STATUS_OBJECT_NAME_NOT_FOUND, status)) {
524                 talloc_free(lck);
525                 (*del_on_close) = False;
526                 return NT_STATUS_OK;
527         }
528         if (!NT_STATUS_IS_OK(status)) {
529                 talloc_free(lck);
530                 return status;
531         }
532
533         (*del_on_close) = file.delete_on_close;
534         if (open_count != NULL) {
535                 (*open_count) = file.num_entries;
536         }
537         if (path != NULL) {
538                 *path = talloc_strdup(odb, file.path);
539                 NT_STATUS_HAVE_NO_MEMORY(*path);
540                 if (file.num_entries == 1 && file.entries[0].delete_on_close) {
541                         (*del_on_close) = True;
542                 }
543         }
544
545         talloc_free(lck);
546
547         return NT_STATUS_OK;
548 }
549
550
551 /*
552   determine if a file can be opened with the given share_access,
553   create_options and access_mask
554 */
555 static NTSTATUS odb_ctdb_can_open(struct odb_lock *lck,
556                                  uint32_t share_access, uint32_t create_options, 
557                                  uint32_t access_mask)
558 {
559         struct odb_context *odb = lck->odb;
560         NTSTATUS status;
561         struct opendb_file file;
562         struct opendb_entry e;
563         int i;
564
565         status = odb_pull_record(lck, &file);
566         if (NT_STATUS_EQUAL(status, NT_STATUS_OBJECT_NAME_NOT_FOUND)) {
567                 return NT_STATUS_OK;
568         }
569         NT_STATUS_NOT_OK_RETURN(status);
570
571         if ((create_options & NTCREATEX_OPTIONS_DELETE_ON_CLOSE) && 
572             file.num_entries != 0) {
573                 return NT_STATUS_SHARING_VIOLATION;
574         }
575
576         if (file.delete_on_close) {
577                 return NT_STATUS_DELETE_PENDING;
578         }
579
580         e.server       = odb->ntvfs_ctx->server_id;
581         e.file_handle  = NULL;
582         e.stream_id    = 0;
583         e.share_access = share_access;
584         e.access_mask  = access_mask;
585                 
586         for (i=0;i<file.num_entries;i++) {
587                 status = share_conflict(&file.entries[i], &e);
588                 if (!NT_STATUS_IS_OK(status)) {
589                         /* note that we discard the error code
590                            here. We do this as unless we are actually
591                            doing an open (which comes via a different
592                            function), we need to return a sharing
593                            violation */
594                         return NT_STATUS_SHARING_VIOLATION;
595                 }
596         }
597
598         return NT_STATUS_OK;
599 }
600
601
602 static const struct opendb_ops opendb_ctdb_ops = {
603         .odb_init                = odb_ctdb_init,
604         .odb_lock                = odb_ctdb_lock,
605         .odb_open_file           = odb_ctdb_open_file,
606         .odb_open_file_pending   = odb_ctdb_open_file_pending,
607         .odb_close_file          = odb_ctdb_close_file,
608         .odb_remove_pending      = odb_ctdb_remove_pending,
609         .odb_rename              = odb_ctdb_rename,
610         .odb_set_delete_on_close = odb_ctdb_set_delete_on_close,
611         .odb_get_delete_on_close = odb_ctdb_get_delete_on_close,
612         .odb_can_open            = odb_ctdb_can_open
613 };
614
615
616 void odb_ctdb_init_ops(void)
617 {
618         odb_set_ops(&opendb_ctdb_ops);
619 }