ceph: strengthen rsize/wsize/readdir_max_bytes validation
[sfrench/cifs-2.6.git] / fs / ceph / caps.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/fs.h>
5 #include <linux/kernel.h>
6 #include <linux/sched/signal.h>
7 #include <linux/slab.h>
8 #include <linux/vmalloc.h>
9 #include <linux/wait.h>
10 #include <linux/writeback.h>
11
12 #include "super.h"
13 #include "mds_client.h"
14 #include "cache.h"
15 #include <linux/ceph/decode.h>
16 #include <linux/ceph/messenger.h>
17
18 /*
19  * Capability management
20  *
21  * The Ceph metadata servers control client access to inode metadata
22  * and file data by issuing capabilities, granting clients permission
23  * to read and/or write both inode field and file data to OSDs
24  * (storage nodes).  Each capability consists of a set of bits
25  * indicating which operations are allowed.
26  *
27  * If the client holds a *_SHARED cap, the client has a coherent value
28  * that can be safely read from the cached inode.
29  *
30  * In the case of a *_EXCL (exclusive) or FILE_WR capabilities, the
31  * client is allowed to change inode attributes (e.g., file size,
32  * mtime), note its dirty state in the ceph_cap, and asynchronously
33  * flush that metadata change to the MDS.
34  *
35  * In the event of a conflicting operation (perhaps by another
36  * client), the MDS will revoke the conflicting client capabilities.
37  *
38  * In order for a client to cache an inode, it must hold a capability
39  * with at least one MDS server.  When inodes are released, release
40  * notifications are batched and periodically sent en masse to the MDS
41  * cluster to release server state.
42  */
43
44 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc);
45 static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
46                                  struct ceph_mds_session *session,
47                                  struct ceph_inode_info *ci,
48                                  u64 oldest_flush_tid);
49
50 /*
51  * Generate readable cap strings for debugging output.
52  */
53 #define MAX_CAP_STR 20
54 static char cap_str[MAX_CAP_STR][40];
55 static DEFINE_SPINLOCK(cap_str_lock);
56 static int last_cap_str;
57
58 static char *gcap_string(char *s, int c)
59 {
60         if (c & CEPH_CAP_GSHARED)
61                 *s++ = 's';
62         if (c & CEPH_CAP_GEXCL)
63                 *s++ = 'x';
64         if (c & CEPH_CAP_GCACHE)
65                 *s++ = 'c';
66         if (c & CEPH_CAP_GRD)
67                 *s++ = 'r';
68         if (c & CEPH_CAP_GWR)
69                 *s++ = 'w';
70         if (c & CEPH_CAP_GBUFFER)
71                 *s++ = 'b';
72         if (c & CEPH_CAP_GWREXTEND)
73                 *s++ = 'a';
74         if (c & CEPH_CAP_GLAZYIO)
75                 *s++ = 'l';
76         return s;
77 }
78
79 const char *ceph_cap_string(int caps)
80 {
81         int i;
82         char *s;
83         int c;
84
85         spin_lock(&cap_str_lock);
86         i = last_cap_str++;
87         if (last_cap_str == MAX_CAP_STR)
88                 last_cap_str = 0;
89         spin_unlock(&cap_str_lock);
90
91         s = cap_str[i];
92
93         if (caps & CEPH_CAP_PIN)
94                 *s++ = 'p';
95
96         c = (caps >> CEPH_CAP_SAUTH) & 3;
97         if (c) {
98                 *s++ = 'A';
99                 s = gcap_string(s, c);
100         }
101
102         c = (caps >> CEPH_CAP_SLINK) & 3;
103         if (c) {
104                 *s++ = 'L';
105                 s = gcap_string(s, c);
106         }
107
108         c = (caps >> CEPH_CAP_SXATTR) & 3;
109         if (c) {
110                 *s++ = 'X';
111                 s = gcap_string(s, c);
112         }
113
114         c = caps >> CEPH_CAP_SFILE;
115         if (c) {
116                 *s++ = 'F';
117                 s = gcap_string(s, c);
118         }
119
120         if (s == cap_str[i])
121                 *s++ = '-';
122         *s = 0;
123         return cap_str[i];
124 }
125
126 void ceph_caps_init(struct ceph_mds_client *mdsc)
127 {
128         INIT_LIST_HEAD(&mdsc->caps_list);
129         spin_lock_init(&mdsc->caps_list_lock);
130 }
131
132 void ceph_caps_finalize(struct ceph_mds_client *mdsc)
133 {
134         struct ceph_cap *cap;
135
136         spin_lock(&mdsc->caps_list_lock);
137         while (!list_empty(&mdsc->caps_list)) {
138                 cap = list_first_entry(&mdsc->caps_list,
139                                        struct ceph_cap, caps_item);
140                 list_del(&cap->caps_item);
141                 kmem_cache_free(ceph_cap_cachep, cap);
142         }
143         mdsc->caps_total_count = 0;
144         mdsc->caps_avail_count = 0;
145         mdsc->caps_use_count = 0;
146         mdsc->caps_reserve_count = 0;
147         mdsc->caps_min_count = 0;
148         spin_unlock(&mdsc->caps_list_lock);
149 }
150
151 void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
152 {
153         spin_lock(&mdsc->caps_list_lock);
154         mdsc->caps_min_count += delta;
155         BUG_ON(mdsc->caps_min_count < 0);
156         spin_unlock(&mdsc->caps_list_lock);
157 }
158
159 /*
160  * Called under mdsc->mutex.
161  */
162 int ceph_reserve_caps(struct ceph_mds_client *mdsc,
163                       struct ceph_cap_reservation *ctx, int need)
164 {
165         int i, j;
166         struct ceph_cap *cap;
167         int have;
168         int alloc = 0;
169         int max_caps;
170         bool trimmed = false;
171         struct ceph_mds_session *s;
172         LIST_HEAD(newcaps);
173
174         dout("reserve caps ctx=%p need=%d\n", ctx, need);
175
176         /* first reserve any caps that are already allocated */
177         spin_lock(&mdsc->caps_list_lock);
178         if (mdsc->caps_avail_count >= need)
179                 have = need;
180         else
181                 have = mdsc->caps_avail_count;
182         mdsc->caps_avail_count -= have;
183         mdsc->caps_reserve_count += have;
184         BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
185                                          mdsc->caps_reserve_count +
186                                          mdsc->caps_avail_count);
187         spin_unlock(&mdsc->caps_list_lock);
188
189         for (i = have; i < need; ) {
190                 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
191                 if (cap) {
192                         list_add(&cap->caps_item, &newcaps);
193                         alloc++;
194                         i++;
195                         continue;
196                 }
197
198                 if (!trimmed) {
199                         for (j = 0; j < mdsc->max_sessions; j++) {
200                                 s = __ceph_lookup_mds_session(mdsc, j);
201                                 if (!s)
202                                         continue;
203                                 mutex_unlock(&mdsc->mutex);
204
205                                 mutex_lock(&s->s_mutex);
206                                 max_caps = s->s_nr_caps - (need - i);
207                                 ceph_trim_caps(mdsc, s, max_caps);
208                                 mutex_unlock(&s->s_mutex);
209
210                                 ceph_put_mds_session(s);
211                                 mutex_lock(&mdsc->mutex);
212                         }
213                         trimmed = true;
214
215                         spin_lock(&mdsc->caps_list_lock);
216                         if (mdsc->caps_avail_count) {
217                                 int more_have;
218                                 if (mdsc->caps_avail_count >= need - i)
219                                         more_have = need - i;
220                                 else
221                                         more_have = mdsc->caps_avail_count;
222
223                                 i += more_have;
224                                 have += more_have;
225                                 mdsc->caps_avail_count -= more_have;
226                                 mdsc->caps_reserve_count += more_have;
227
228                         }
229                         spin_unlock(&mdsc->caps_list_lock);
230
231                         continue;
232                 }
233
234                 pr_warn("reserve caps ctx=%p ENOMEM need=%d got=%d\n",
235                         ctx, need, have + alloc);
236                 goto out_nomem;
237         }
238         BUG_ON(have + alloc != need);
239
240         spin_lock(&mdsc->caps_list_lock);
241         mdsc->caps_total_count += alloc;
242         mdsc->caps_reserve_count += alloc;
243         list_splice(&newcaps, &mdsc->caps_list);
244
245         BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
246                                          mdsc->caps_reserve_count +
247                                          mdsc->caps_avail_count);
248         spin_unlock(&mdsc->caps_list_lock);
249
250         ctx->count = need;
251         dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
252              ctx, mdsc->caps_total_count, mdsc->caps_use_count,
253              mdsc->caps_reserve_count, mdsc->caps_avail_count);
254         return 0;
255
256 out_nomem:
257
258         spin_lock(&mdsc->caps_list_lock);
259         mdsc->caps_avail_count += have;
260         mdsc->caps_reserve_count -= have;
261
262         while (!list_empty(&newcaps)) {
263                 cap = list_first_entry(&newcaps,
264                                 struct ceph_cap, caps_item);
265                 list_del(&cap->caps_item);
266
267                 /* Keep some preallocated caps around (ceph_min_count), to
268                  * avoid lots of free/alloc churn. */
269                 if (mdsc->caps_avail_count >=
270                     mdsc->caps_reserve_count + mdsc->caps_min_count) {
271                         kmem_cache_free(ceph_cap_cachep, cap);
272                 } else {
273                         mdsc->caps_avail_count++;
274                         mdsc->caps_total_count++;
275                         list_add(&cap->caps_item, &mdsc->caps_list);
276                 }
277         }
278
279         BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
280                                          mdsc->caps_reserve_count +
281                                          mdsc->caps_avail_count);
282         spin_unlock(&mdsc->caps_list_lock);
283         return -ENOMEM;
284 }
285
286 int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
287                         struct ceph_cap_reservation *ctx)
288 {
289         int i;
290         struct ceph_cap *cap;
291
292         dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
293         if (ctx->count) {
294                 spin_lock(&mdsc->caps_list_lock);
295                 BUG_ON(mdsc->caps_reserve_count < ctx->count);
296                 mdsc->caps_reserve_count -= ctx->count;
297                 if (mdsc->caps_avail_count >=
298                     mdsc->caps_reserve_count + mdsc->caps_min_count) {
299                         mdsc->caps_total_count -= ctx->count;
300                         for (i = 0; i < ctx->count; i++) {
301                                 cap = list_first_entry(&mdsc->caps_list,
302                                         struct ceph_cap, caps_item);
303                                 list_del(&cap->caps_item);
304                                 kmem_cache_free(ceph_cap_cachep, cap);
305                         }
306                 } else {
307                         mdsc->caps_avail_count += ctx->count;
308                 }
309                 ctx->count = 0;
310                 dout("unreserve caps %d = %d used + %d resv + %d avail\n",
311                      mdsc->caps_total_count, mdsc->caps_use_count,
312                      mdsc->caps_reserve_count, mdsc->caps_avail_count);
313                 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
314                                                  mdsc->caps_reserve_count +
315                                                  mdsc->caps_avail_count);
316                 spin_unlock(&mdsc->caps_list_lock);
317         }
318         return 0;
319 }
320
321 struct ceph_cap *ceph_get_cap(struct ceph_mds_client *mdsc,
322                               struct ceph_cap_reservation *ctx)
323 {
324         struct ceph_cap *cap = NULL;
325
326         /* temporary, until we do something about cap import/export */
327         if (!ctx) {
328                 cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
329                 if (cap) {
330                         spin_lock(&mdsc->caps_list_lock);
331                         mdsc->caps_use_count++;
332                         mdsc->caps_total_count++;
333                         spin_unlock(&mdsc->caps_list_lock);
334                 } else {
335                         spin_lock(&mdsc->caps_list_lock);
336                         if (mdsc->caps_avail_count) {
337                                 BUG_ON(list_empty(&mdsc->caps_list));
338
339                                 mdsc->caps_avail_count--;
340                                 mdsc->caps_use_count++;
341                                 cap = list_first_entry(&mdsc->caps_list,
342                                                 struct ceph_cap, caps_item);
343                                 list_del(&cap->caps_item);
344
345                                 BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
346                                        mdsc->caps_reserve_count + mdsc->caps_avail_count);
347                         }
348                         spin_unlock(&mdsc->caps_list_lock);
349                 }
350
351                 return cap;
352         }
353
354         spin_lock(&mdsc->caps_list_lock);
355         dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
356              ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
357              mdsc->caps_reserve_count, mdsc->caps_avail_count);
358         BUG_ON(!ctx->count);
359         BUG_ON(ctx->count > mdsc->caps_reserve_count);
360         BUG_ON(list_empty(&mdsc->caps_list));
361
362         ctx->count--;
363         mdsc->caps_reserve_count--;
364         mdsc->caps_use_count++;
365
366         cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
367         list_del(&cap->caps_item);
368
369         BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
370                mdsc->caps_reserve_count + mdsc->caps_avail_count);
371         spin_unlock(&mdsc->caps_list_lock);
372         return cap;
373 }
374
375 void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
376 {
377         spin_lock(&mdsc->caps_list_lock);
378         dout("put_cap %p %d = %d used + %d resv + %d avail\n",
379              cap, mdsc->caps_total_count, mdsc->caps_use_count,
380              mdsc->caps_reserve_count, mdsc->caps_avail_count);
381         mdsc->caps_use_count--;
382         /*
383          * Keep some preallocated caps around (ceph_min_count), to
384          * avoid lots of free/alloc churn.
385          */
386         if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
387                                       mdsc->caps_min_count) {
388                 mdsc->caps_total_count--;
389                 kmem_cache_free(ceph_cap_cachep, cap);
390         } else {
391                 mdsc->caps_avail_count++;
392                 list_add(&cap->caps_item, &mdsc->caps_list);
393         }
394
395         BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
396                mdsc->caps_reserve_count + mdsc->caps_avail_count);
397         spin_unlock(&mdsc->caps_list_lock);
398 }
399
400 void ceph_reservation_status(struct ceph_fs_client *fsc,
401                              int *total, int *avail, int *used, int *reserved,
402                              int *min)
403 {
404         struct ceph_mds_client *mdsc = fsc->mdsc;
405
406         spin_lock(&mdsc->caps_list_lock);
407
408         if (total)
409                 *total = mdsc->caps_total_count;
410         if (avail)
411                 *avail = mdsc->caps_avail_count;
412         if (used)
413                 *used = mdsc->caps_use_count;
414         if (reserved)
415                 *reserved = mdsc->caps_reserve_count;
416         if (min)
417                 *min = mdsc->caps_min_count;
418
419         spin_unlock(&mdsc->caps_list_lock);
420 }
421
422 /*
423  * Find ceph_cap for given mds, if any.
424  *
425  * Called with i_ceph_lock held.
426  */
427 static struct ceph_cap *__get_cap_for_mds(struct ceph_inode_info *ci, int mds)
428 {
429         struct ceph_cap *cap;
430         struct rb_node *n = ci->i_caps.rb_node;
431
432         while (n) {
433                 cap = rb_entry(n, struct ceph_cap, ci_node);
434                 if (mds < cap->mds)
435                         n = n->rb_left;
436                 else if (mds > cap->mds)
437                         n = n->rb_right;
438                 else
439                         return cap;
440         }
441         return NULL;
442 }
443
444 struct ceph_cap *ceph_get_cap_for_mds(struct ceph_inode_info *ci, int mds)
445 {
446         struct ceph_cap *cap;
447
448         spin_lock(&ci->i_ceph_lock);
449         cap = __get_cap_for_mds(ci, mds);
450         spin_unlock(&ci->i_ceph_lock);
451         return cap;
452 }
453
454 /*
455  * Return id of any MDS with a cap, preferably FILE_WR|BUFFER|EXCL, else -1.
456  */
457 static int __ceph_get_cap_mds(struct ceph_inode_info *ci)
458 {
459         struct ceph_cap *cap;
460         int mds = -1;
461         struct rb_node *p;
462
463         /* prefer mds with WR|BUFFER|EXCL caps */
464         for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
465                 cap = rb_entry(p, struct ceph_cap, ci_node);
466                 mds = cap->mds;
467                 if (cap->issued & (CEPH_CAP_FILE_WR |
468                                    CEPH_CAP_FILE_BUFFER |
469                                    CEPH_CAP_FILE_EXCL))
470                         break;
471         }
472         return mds;
473 }
474
475 int ceph_get_cap_mds(struct inode *inode)
476 {
477         struct ceph_inode_info *ci = ceph_inode(inode);
478         int mds;
479         spin_lock(&ci->i_ceph_lock);
480         mds = __ceph_get_cap_mds(ceph_inode(inode));
481         spin_unlock(&ci->i_ceph_lock);
482         return mds;
483 }
484
485 /*
486  * Called under i_ceph_lock.
487  */
488 static void __insert_cap_node(struct ceph_inode_info *ci,
489                               struct ceph_cap *new)
490 {
491         struct rb_node **p = &ci->i_caps.rb_node;
492         struct rb_node *parent = NULL;
493         struct ceph_cap *cap = NULL;
494
495         while (*p) {
496                 parent = *p;
497                 cap = rb_entry(parent, struct ceph_cap, ci_node);
498                 if (new->mds < cap->mds)
499                         p = &(*p)->rb_left;
500                 else if (new->mds > cap->mds)
501                         p = &(*p)->rb_right;
502                 else
503                         BUG();
504         }
505
506         rb_link_node(&new->ci_node, parent, p);
507         rb_insert_color(&new->ci_node, &ci->i_caps);
508 }
509
510 /*
511  * (re)set cap hold timeouts, which control the delayed release
512  * of unused caps back to the MDS.  Should be called on cap use.
513  */
514 static void __cap_set_timeouts(struct ceph_mds_client *mdsc,
515                                struct ceph_inode_info *ci)
516 {
517         struct ceph_mount_options *ma = mdsc->fsc->mount_options;
518
519         ci->i_hold_caps_min = round_jiffies(jiffies +
520                                             ma->caps_wanted_delay_min * HZ);
521         ci->i_hold_caps_max = round_jiffies(jiffies +
522                                             ma->caps_wanted_delay_max * HZ);
523         dout("__cap_set_timeouts %p min %lu max %lu\n", &ci->vfs_inode,
524              ci->i_hold_caps_min - jiffies, ci->i_hold_caps_max - jiffies);
525 }
526
527 /*
528  * (Re)queue cap at the end of the delayed cap release list.
529  *
530  * If I_FLUSH is set, leave the inode at the front of the list.
531  *
532  * Caller holds i_ceph_lock
533  *    -> we take mdsc->cap_delay_lock
534  */
535 static void __cap_delay_requeue(struct ceph_mds_client *mdsc,
536                                 struct ceph_inode_info *ci)
537 {
538         __cap_set_timeouts(mdsc, ci);
539         dout("__cap_delay_requeue %p flags %d at %lu\n", &ci->vfs_inode,
540              ci->i_ceph_flags, ci->i_hold_caps_max);
541         if (!mdsc->stopping) {
542                 spin_lock(&mdsc->cap_delay_lock);
543                 if (!list_empty(&ci->i_cap_delay_list)) {
544                         if (ci->i_ceph_flags & CEPH_I_FLUSH)
545                                 goto no_change;
546                         list_del_init(&ci->i_cap_delay_list);
547                 }
548                 list_add_tail(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
549 no_change:
550                 spin_unlock(&mdsc->cap_delay_lock);
551         }
552 }
553
554 /*
555  * Queue an inode for immediate writeback.  Mark inode with I_FLUSH,
556  * indicating we should send a cap message to flush dirty metadata
557  * asap, and move to the front of the delayed cap list.
558  */
559 static void __cap_delay_requeue_front(struct ceph_mds_client *mdsc,
560                                       struct ceph_inode_info *ci)
561 {
562         dout("__cap_delay_requeue_front %p\n", &ci->vfs_inode);
563         spin_lock(&mdsc->cap_delay_lock);
564         ci->i_ceph_flags |= CEPH_I_FLUSH;
565         if (!list_empty(&ci->i_cap_delay_list))
566                 list_del_init(&ci->i_cap_delay_list);
567         list_add(&ci->i_cap_delay_list, &mdsc->cap_delay_list);
568         spin_unlock(&mdsc->cap_delay_lock);
569 }
570
571 /*
572  * Cancel delayed work on cap.
573  *
574  * Caller must hold i_ceph_lock.
575  */
576 static void __cap_delay_cancel(struct ceph_mds_client *mdsc,
577                                struct ceph_inode_info *ci)
578 {
579         dout("__cap_delay_cancel %p\n", &ci->vfs_inode);
580         if (list_empty(&ci->i_cap_delay_list))
581                 return;
582         spin_lock(&mdsc->cap_delay_lock);
583         list_del_init(&ci->i_cap_delay_list);
584         spin_unlock(&mdsc->cap_delay_lock);
585 }
586
587 /*
588  * Common issue checks for add_cap, handle_cap_grant.
589  */
590 static void __check_cap_issue(struct ceph_inode_info *ci, struct ceph_cap *cap,
591                               unsigned issued)
592 {
593         unsigned had = __ceph_caps_issued(ci, NULL);
594
595         /*
596          * Each time we receive FILE_CACHE anew, we increment
597          * i_rdcache_gen.
598          */
599         if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
600             (had & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0) {
601                 ci->i_rdcache_gen++;
602         }
603
604         /*
605          * If FILE_SHARED is newly issued, mark dir not complete. We don't
606          * know what happened to this directory while we didn't have the cap.
607          * If FILE_SHARED is being revoked, also mark dir not complete. It
608          * stops on-going cached readdir.
609          */
610         if ((issued & CEPH_CAP_FILE_SHARED) != (had & CEPH_CAP_FILE_SHARED)) {
611                 if (issued & CEPH_CAP_FILE_SHARED)
612                         atomic_inc(&ci->i_shared_gen);
613                 if (S_ISDIR(ci->vfs_inode.i_mode)) {
614                         dout(" marking %p NOT complete\n", &ci->vfs_inode);
615                         __ceph_dir_clear_complete(ci);
616                 }
617         }
618 }
619
620 /*
621  * Add a capability under the given MDS session.
622  *
623  * Caller should hold session snap_rwsem (read) and s_mutex.
624  *
625  * @fmode is the open file mode, if we are opening a file, otherwise
626  * it is < 0.  (This is so we can atomically add the cap and add an
627  * open file reference to it.)
628  */
629 void ceph_add_cap(struct inode *inode,
630                   struct ceph_mds_session *session, u64 cap_id,
631                   int fmode, unsigned issued, unsigned wanted,
632                   unsigned seq, unsigned mseq, u64 realmino, int flags,
633                   struct ceph_cap **new_cap)
634 {
635         struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
636         struct ceph_inode_info *ci = ceph_inode(inode);
637         struct ceph_cap *cap;
638         int mds = session->s_mds;
639         int actual_wanted;
640
641         dout("add_cap %p mds%d cap %llx %s seq %d\n", inode,
642              session->s_mds, cap_id, ceph_cap_string(issued), seq);
643
644         /*
645          * If we are opening the file, include file mode wanted bits
646          * in wanted.
647          */
648         if (fmode >= 0)
649                 wanted |= ceph_caps_for_mode(fmode);
650
651         cap = __get_cap_for_mds(ci, mds);
652         if (!cap) {
653                 cap = *new_cap;
654                 *new_cap = NULL;
655
656                 cap->issued = 0;
657                 cap->implemented = 0;
658                 cap->mds = mds;
659                 cap->mds_wanted = 0;
660                 cap->mseq = 0;
661
662                 cap->ci = ci;
663                 __insert_cap_node(ci, cap);
664
665                 /* add to session cap list */
666                 cap->session = session;
667                 spin_lock(&session->s_cap_lock);
668                 list_add_tail(&cap->session_caps, &session->s_caps);
669                 session->s_nr_caps++;
670                 spin_unlock(&session->s_cap_lock);
671         } else {
672                 /*
673                  * auth mds of the inode changed. we received the cap export
674                  * message, but still haven't received the cap import message.
675                  * handle_cap_export() updated the new auth MDS' cap.
676                  *
677                  * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing
678                  * a message that was send before the cap import message. So
679                  * don't remove caps.
680                  */
681                 if (ceph_seq_cmp(seq, cap->seq) <= 0) {
682                         WARN_ON(cap != ci->i_auth_cap);
683                         WARN_ON(cap->cap_id != cap_id);
684                         seq = cap->seq;
685                         mseq = cap->mseq;
686                         issued |= cap->issued;
687                         flags |= CEPH_CAP_FLAG_AUTH;
688                 }
689         }
690
691         if (!ci->i_snap_realm ||
692             ((flags & CEPH_CAP_FLAG_AUTH) &&
693              realmino != (u64)-1 && ci->i_snap_realm->ino != realmino)) {
694                 /*
695                  * add this inode to the appropriate snap realm
696                  */
697                 struct ceph_snap_realm *realm = ceph_lookup_snap_realm(mdsc,
698                                                                realmino);
699                 if (realm) {
700                         struct ceph_snap_realm *oldrealm = ci->i_snap_realm;
701                         if (oldrealm) {
702                                 spin_lock(&oldrealm->inodes_with_caps_lock);
703                                 list_del_init(&ci->i_snap_realm_item);
704                                 spin_unlock(&oldrealm->inodes_with_caps_lock);
705                         }
706
707                         spin_lock(&realm->inodes_with_caps_lock);
708                         list_add(&ci->i_snap_realm_item,
709                                  &realm->inodes_with_caps);
710                         ci->i_snap_realm = realm;
711                         if (realm->ino == ci->i_vino.ino)
712                                 realm->inode = inode;
713                         spin_unlock(&realm->inodes_with_caps_lock);
714
715                         if (oldrealm)
716                                 ceph_put_snap_realm(mdsc, oldrealm);
717                 } else {
718                         pr_err("ceph_add_cap: couldn't find snap realm %llx\n",
719                                realmino);
720                         WARN_ON(!realm);
721                 }
722         }
723
724         __check_cap_issue(ci, cap, issued);
725
726         /*
727          * If we are issued caps we don't want, or the mds' wanted
728          * value appears to be off, queue a check so we'll release
729          * later and/or update the mds wanted value.
730          */
731         actual_wanted = __ceph_caps_wanted(ci);
732         if ((wanted & ~actual_wanted) ||
733             (issued & ~actual_wanted & CEPH_CAP_ANY_WR)) {
734                 dout(" issued %s, mds wanted %s, actual %s, queueing\n",
735                      ceph_cap_string(issued), ceph_cap_string(wanted),
736                      ceph_cap_string(actual_wanted));
737                 __cap_delay_requeue(mdsc, ci);
738         }
739
740         if (flags & CEPH_CAP_FLAG_AUTH) {
741                 if (!ci->i_auth_cap ||
742                     ceph_seq_cmp(ci->i_auth_cap->mseq, mseq) < 0) {
743                         ci->i_auth_cap = cap;
744                         cap->mds_wanted = wanted;
745                 }
746         } else {
747                 WARN_ON(ci->i_auth_cap == cap);
748         }
749
750         dout("add_cap inode %p (%llx.%llx) cap %p %s now %s seq %d mds%d\n",
751              inode, ceph_vinop(inode), cap, ceph_cap_string(issued),
752              ceph_cap_string(issued|cap->issued), seq, mds);
753         cap->cap_id = cap_id;
754         cap->issued = issued;
755         cap->implemented |= issued;
756         if (ceph_seq_cmp(mseq, cap->mseq) > 0)
757                 cap->mds_wanted = wanted;
758         else
759                 cap->mds_wanted |= wanted;
760         cap->seq = seq;
761         cap->issue_seq = seq;
762         cap->mseq = mseq;
763         cap->cap_gen = session->s_cap_gen;
764
765         if (fmode >= 0)
766                 __ceph_get_fmode(ci, fmode);
767 }
768
769 /*
770  * Return true if cap has not timed out and belongs to the current
771  * generation of the MDS session (i.e. has not gone 'stale' due to
772  * us losing touch with the mds).
773  */
774 static int __cap_is_valid(struct ceph_cap *cap)
775 {
776         unsigned long ttl;
777         u32 gen;
778
779         spin_lock(&cap->session->s_gen_ttl_lock);
780         gen = cap->session->s_cap_gen;
781         ttl = cap->session->s_cap_ttl;
782         spin_unlock(&cap->session->s_gen_ttl_lock);
783
784         if (cap->cap_gen < gen || time_after_eq(jiffies, ttl)) {
785                 dout("__cap_is_valid %p cap %p issued %s "
786                      "but STALE (gen %u vs %u)\n", &cap->ci->vfs_inode,
787                      cap, ceph_cap_string(cap->issued), cap->cap_gen, gen);
788                 return 0;
789         }
790
791         return 1;
792 }
793
794 /*
795  * Return set of valid cap bits issued to us.  Note that caps time
796  * out, and may be invalidated in bulk if the client session times out
797  * and session->s_cap_gen is bumped.
798  */
799 int __ceph_caps_issued(struct ceph_inode_info *ci, int *implemented)
800 {
801         int have = ci->i_snap_caps;
802         struct ceph_cap *cap;
803         struct rb_node *p;
804
805         if (implemented)
806                 *implemented = 0;
807         for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
808                 cap = rb_entry(p, struct ceph_cap, ci_node);
809                 if (!__cap_is_valid(cap))
810                         continue;
811                 dout("__ceph_caps_issued %p cap %p issued %s\n",
812                      &ci->vfs_inode, cap, ceph_cap_string(cap->issued));
813                 have |= cap->issued;
814                 if (implemented)
815                         *implemented |= cap->implemented;
816         }
817         /*
818          * exclude caps issued by non-auth MDS, but are been revoking
819          * by the auth MDS. The non-auth MDS should be revoking/exporting
820          * these caps, but the message is delayed.
821          */
822         if (ci->i_auth_cap) {
823                 cap = ci->i_auth_cap;
824                 have &= ~cap->implemented | cap->issued;
825         }
826         return have;
827 }
828
829 /*
830  * Get cap bits issued by caps other than @ocap
831  */
832 int __ceph_caps_issued_other(struct ceph_inode_info *ci, struct ceph_cap *ocap)
833 {
834         int have = ci->i_snap_caps;
835         struct ceph_cap *cap;
836         struct rb_node *p;
837
838         for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
839                 cap = rb_entry(p, struct ceph_cap, ci_node);
840                 if (cap == ocap)
841                         continue;
842                 if (!__cap_is_valid(cap))
843                         continue;
844                 have |= cap->issued;
845         }
846         return have;
847 }
848
849 /*
850  * Move a cap to the end of the LRU (oldest caps at list head, newest
851  * at list tail).
852  */
853 static void __touch_cap(struct ceph_cap *cap)
854 {
855         struct ceph_mds_session *s = cap->session;
856
857         spin_lock(&s->s_cap_lock);
858         if (!s->s_cap_iterator) {
859                 dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
860                      s->s_mds);
861                 list_move_tail(&cap->session_caps, &s->s_caps);
862         } else {
863                 dout("__touch_cap %p cap %p mds%d NOP, iterating over caps\n",
864                      &cap->ci->vfs_inode, cap, s->s_mds);
865         }
866         spin_unlock(&s->s_cap_lock);
867 }
868
869 /*
870  * Check if we hold the given mask.  If so, move the cap(s) to the
871  * front of their respective LRUs.  (This is the preferred way for
872  * callers to check for caps they want.)
873  */
874 int __ceph_caps_issued_mask(struct ceph_inode_info *ci, int mask, int touch)
875 {
876         struct ceph_cap *cap;
877         struct rb_node *p;
878         int have = ci->i_snap_caps;
879
880         if ((have & mask) == mask) {
881                 dout("__ceph_caps_issued_mask %p snap issued %s"
882                      " (mask %s)\n", &ci->vfs_inode,
883                      ceph_cap_string(have),
884                      ceph_cap_string(mask));
885                 return 1;
886         }
887
888         for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
889                 cap = rb_entry(p, struct ceph_cap, ci_node);
890                 if (!__cap_is_valid(cap))
891                         continue;
892                 if ((cap->issued & mask) == mask) {
893                         dout("__ceph_caps_issued_mask %p cap %p issued %s"
894                              " (mask %s)\n", &ci->vfs_inode, cap,
895                              ceph_cap_string(cap->issued),
896                              ceph_cap_string(mask));
897                         if (touch)
898                                 __touch_cap(cap);
899                         return 1;
900                 }
901
902                 /* does a combination of caps satisfy mask? */
903                 have |= cap->issued;
904                 if ((have & mask) == mask) {
905                         dout("__ceph_caps_issued_mask %p combo issued %s"
906                              " (mask %s)\n", &ci->vfs_inode,
907                              ceph_cap_string(cap->issued),
908                              ceph_cap_string(mask));
909                         if (touch) {
910                                 struct rb_node *q;
911
912                                 /* touch this + preceding caps */
913                                 __touch_cap(cap);
914                                 for (q = rb_first(&ci->i_caps); q != p;
915                                      q = rb_next(q)) {
916                                         cap = rb_entry(q, struct ceph_cap,
917                                                        ci_node);
918                                         if (!__cap_is_valid(cap))
919                                                 continue;
920                                         __touch_cap(cap);
921                                 }
922                         }
923                         return 1;
924                 }
925         }
926
927         return 0;
928 }
929
930 /*
931  * Return true if mask caps are currently being revoked by an MDS.
932  */
933 int __ceph_caps_revoking_other(struct ceph_inode_info *ci,
934                                struct ceph_cap *ocap, int mask)
935 {
936         struct ceph_cap *cap;
937         struct rb_node *p;
938
939         for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
940                 cap = rb_entry(p, struct ceph_cap, ci_node);
941                 if (cap != ocap &&
942                     (cap->implemented & ~cap->issued & mask))
943                         return 1;
944         }
945         return 0;
946 }
947
948 int ceph_caps_revoking(struct ceph_inode_info *ci, int mask)
949 {
950         struct inode *inode = &ci->vfs_inode;
951         int ret;
952
953         spin_lock(&ci->i_ceph_lock);
954         ret = __ceph_caps_revoking_other(ci, NULL, mask);
955         spin_unlock(&ci->i_ceph_lock);
956         dout("ceph_caps_revoking %p %s = %d\n", inode,
957              ceph_cap_string(mask), ret);
958         return ret;
959 }
960
961 int __ceph_caps_used(struct ceph_inode_info *ci)
962 {
963         int used = 0;
964         if (ci->i_pin_ref)
965                 used |= CEPH_CAP_PIN;
966         if (ci->i_rd_ref)
967                 used |= CEPH_CAP_FILE_RD;
968         if (ci->i_rdcache_ref ||
969             (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
970              ci->vfs_inode.i_data.nrpages))
971                 used |= CEPH_CAP_FILE_CACHE;
972         if (ci->i_wr_ref)
973                 used |= CEPH_CAP_FILE_WR;
974         if (ci->i_wb_ref || ci->i_wrbuffer_ref)
975                 used |= CEPH_CAP_FILE_BUFFER;
976         return used;
977 }
978
979 /*
980  * wanted, by virtue of open file modes
981  */
982 int __ceph_caps_file_wanted(struct ceph_inode_info *ci)
983 {
984         int i, bits = 0;
985         for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
986                 if (ci->i_nr_by_mode[i])
987                         bits |= 1 << i;
988         }
989         if (bits == 0)
990                 return 0;
991         return ceph_caps_for_mode(bits >> 1);
992 }
993
994 /*
995  * Return caps we have registered with the MDS(s) as 'wanted'.
996  */
997 int __ceph_caps_mds_wanted(struct ceph_inode_info *ci, bool check)
998 {
999         struct ceph_cap *cap;
1000         struct rb_node *p;
1001         int mds_wanted = 0;
1002
1003         for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
1004                 cap = rb_entry(p, struct ceph_cap, ci_node);
1005                 if (check && !__cap_is_valid(cap))
1006                         continue;
1007                 if (cap == ci->i_auth_cap)
1008                         mds_wanted |= cap->mds_wanted;
1009                 else
1010                         mds_wanted |= (cap->mds_wanted & ~CEPH_CAP_ANY_FILE_WR);
1011         }
1012         return mds_wanted;
1013 }
1014
1015 /*
1016  * called under i_ceph_lock
1017  */
1018 static int __ceph_is_single_caps(struct ceph_inode_info *ci)
1019 {
1020         return rb_first(&ci->i_caps) == rb_last(&ci->i_caps);
1021 }
1022
1023 static int __ceph_is_any_caps(struct ceph_inode_info *ci)
1024 {
1025         return !RB_EMPTY_ROOT(&ci->i_caps);
1026 }
1027
1028 int ceph_is_any_caps(struct inode *inode)
1029 {
1030         struct ceph_inode_info *ci = ceph_inode(inode);
1031         int ret;
1032
1033         spin_lock(&ci->i_ceph_lock);
1034         ret = __ceph_is_any_caps(ci);
1035         spin_unlock(&ci->i_ceph_lock);
1036
1037         return ret;
1038 }
1039
1040 static void drop_inode_snap_realm(struct ceph_inode_info *ci)
1041 {
1042         struct ceph_snap_realm *realm = ci->i_snap_realm;
1043         spin_lock(&realm->inodes_with_caps_lock);
1044         list_del_init(&ci->i_snap_realm_item);
1045         ci->i_snap_realm_counter++;
1046         ci->i_snap_realm = NULL;
1047         spin_unlock(&realm->inodes_with_caps_lock);
1048         ceph_put_snap_realm(ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc,
1049                             realm);
1050 }
1051
1052 /*
1053  * Remove a cap.  Take steps to deal with a racing iterate_session_caps.
1054  *
1055  * caller should hold i_ceph_lock.
1056  * caller will not hold session s_mutex if called from destroy_inode.
1057  */
1058 void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
1059 {
1060         struct ceph_mds_session *session = cap->session;
1061         struct ceph_inode_info *ci = cap->ci;
1062         struct ceph_mds_client *mdsc =
1063                 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
1064         int removed = 0;
1065
1066         dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
1067
1068         /* remove from session list */
1069         spin_lock(&session->s_cap_lock);
1070         if (session->s_cap_iterator == cap) {
1071                 /* not yet, we are iterating over this very cap */
1072                 dout("__ceph_remove_cap  delaying %p removal from session %p\n",
1073                      cap, cap->session);
1074         } else {
1075                 list_del_init(&cap->session_caps);
1076                 session->s_nr_caps--;
1077                 cap->session = NULL;
1078                 removed = 1;
1079         }
1080         /* protect backpointer with s_cap_lock: see iterate_session_caps */
1081         cap->ci = NULL;
1082
1083         /*
1084          * s_cap_reconnect is protected by s_cap_lock. no one changes
1085          * s_cap_gen while session is in the reconnect state.
1086          */
1087         if (queue_release &&
1088             (!session->s_cap_reconnect || cap->cap_gen == session->s_cap_gen)) {
1089                 cap->queue_release = 1;
1090                 if (removed) {
1091                         list_add_tail(&cap->session_caps,
1092                                       &session->s_cap_releases);
1093                         session->s_num_cap_releases++;
1094                         removed = 0;
1095                 }
1096         } else {
1097                 cap->queue_release = 0;
1098         }
1099         cap->cap_ino = ci->i_vino.ino;
1100
1101         spin_unlock(&session->s_cap_lock);
1102
1103         /* remove from inode list */
1104         rb_erase(&cap->ci_node, &ci->i_caps);
1105         if (ci->i_auth_cap == cap)
1106                 ci->i_auth_cap = NULL;
1107
1108         if (removed)
1109                 ceph_put_cap(mdsc, cap);
1110
1111         /* when reconnect denied, we remove session caps forcibly,
1112          * i_wr_ref can be non-zero. If there are ongoing write,
1113          * keep i_snap_realm.
1114          */
1115         if (!__ceph_is_any_caps(ci) && ci->i_wr_ref == 0 && ci->i_snap_realm)
1116                 drop_inode_snap_realm(ci);
1117
1118         if (!__ceph_is_any_real_caps(ci))
1119                 __cap_delay_cancel(mdsc, ci);
1120 }
1121
1122 struct cap_msg_args {
1123         struct ceph_mds_session *session;
1124         u64                     ino, cid, follows;
1125         u64                     flush_tid, oldest_flush_tid, size, max_size;
1126         u64                     xattr_version;
1127         struct ceph_buffer      *xattr_buf;
1128         struct timespec         atime, mtime, ctime;
1129         int                     op, caps, wanted, dirty;
1130         u32                     seq, issue_seq, mseq, time_warp_seq;
1131         u32                     flags;
1132         kuid_t                  uid;
1133         kgid_t                  gid;
1134         umode_t                 mode;
1135         bool                    inline_data;
1136 };
1137
1138 /*
1139  * Build and send a cap message to the given MDS.
1140  *
1141  * Caller should be holding s_mutex.
1142  */
1143 static int send_cap_msg(struct cap_msg_args *arg)
1144 {
1145         struct ceph_mds_caps *fc;
1146         struct ceph_msg *msg;
1147         void *p;
1148         size_t extra_len;
1149         struct timespec zerotime = {0};
1150         struct ceph_osd_client *osdc = &arg->session->s_mdsc->fsc->client->osdc;
1151
1152         dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
1153              " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
1154              " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(arg->op),
1155              arg->cid, arg->ino, ceph_cap_string(arg->caps),
1156              ceph_cap_string(arg->wanted), ceph_cap_string(arg->dirty),
1157              arg->seq, arg->issue_seq, arg->flush_tid, arg->oldest_flush_tid,
1158              arg->mseq, arg->follows, arg->size, arg->max_size,
1159              arg->xattr_version,
1160              arg->xattr_buf ? (int)arg->xattr_buf->vec.iov_len : 0);
1161
1162         /* flock buffer size + inline version + inline data size +
1163          * osd_epoch_barrier + oldest_flush_tid */
1164         extra_len = 4 + 8 + 4 + 4 + 8 + 4 + 4 + 4 + 8 + 8 + 4;
1165         msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
1166                            GFP_NOFS, false);
1167         if (!msg)
1168                 return -ENOMEM;
1169
1170         msg->hdr.version = cpu_to_le16(10);
1171         msg->hdr.tid = cpu_to_le64(arg->flush_tid);
1172
1173         fc = msg->front.iov_base;
1174         memset(fc, 0, sizeof(*fc));
1175
1176         fc->cap_id = cpu_to_le64(arg->cid);
1177         fc->op = cpu_to_le32(arg->op);
1178         fc->seq = cpu_to_le32(arg->seq);
1179         fc->issue_seq = cpu_to_le32(arg->issue_seq);
1180         fc->migrate_seq = cpu_to_le32(arg->mseq);
1181         fc->caps = cpu_to_le32(arg->caps);
1182         fc->wanted = cpu_to_le32(arg->wanted);
1183         fc->dirty = cpu_to_le32(arg->dirty);
1184         fc->ino = cpu_to_le64(arg->ino);
1185         fc->snap_follows = cpu_to_le64(arg->follows);
1186
1187         fc->size = cpu_to_le64(arg->size);
1188         fc->max_size = cpu_to_le64(arg->max_size);
1189         ceph_encode_timespec(&fc->mtime, &arg->mtime);
1190         ceph_encode_timespec(&fc->atime, &arg->atime);
1191         ceph_encode_timespec(&fc->ctime, &arg->ctime);
1192         fc->time_warp_seq = cpu_to_le32(arg->time_warp_seq);
1193
1194         fc->uid = cpu_to_le32(from_kuid(&init_user_ns, arg->uid));
1195         fc->gid = cpu_to_le32(from_kgid(&init_user_ns, arg->gid));
1196         fc->mode = cpu_to_le32(arg->mode);
1197
1198         fc->xattr_version = cpu_to_le64(arg->xattr_version);
1199         if (arg->xattr_buf) {
1200                 msg->middle = ceph_buffer_get(arg->xattr_buf);
1201                 fc->xattr_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
1202                 msg->hdr.middle_len = cpu_to_le32(arg->xattr_buf->vec.iov_len);
1203         }
1204
1205         p = fc + 1;
1206         /* flock buffer size (version 2) */
1207         ceph_encode_32(&p, 0);
1208         /* inline version (version 4) */
1209         ceph_encode_64(&p, arg->inline_data ? 0 : CEPH_INLINE_NONE);
1210         /* inline data size */
1211         ceph_encode_32(&p, 0);
1212         /*
1213          * osd_epoch_barrier (version 5)
1214          * The epoch_barrier is protected osdc->lock, so READ_ONCE here in
1215          * case it was recently changed
1216          */
1217         ceph_encode_32(&p, READ_ONCE(osdc->epoch_barrier));
1218         /* oldest_flush_tid (version 6) */
1219         ceph_encode_64(&p, arg->oldest_flush_tid);
1220
1221         /*
1222          * caller_uid/caller_gid (version 7)
1223          *
1224          * Currently, we don't properly track which caller dirtied the caps
1225          * last, and force a flush of them when there is a conflict. For now,
1226          * just set this to 0:0, to emulate how the MDS has worked up to now.
1227          */
1228         ceph_encode_32(&p, 0);
1229         ceph_encode_32(&p, 0);
1230
1231         /* pool namespace (version 8) (mds always ignores this) */
1232         ceph_encode_32(&p, 0);
1233
1234         /*
1235          * btime and change_attr (version 9)
1236          *
1237          * We just zero these out for now, as the MDS ignores them unless
1238          * the requisite feature flags are set (which we don't do yet).
1239          */
1240         ceph_encode_timespec(p, &zerotime);
1241         p += sizeof(struct ceph_timespec);
1242         ceph_encode_64(&p, 0);
1243
1244         /* Advisory flags (version 10) */
1245         ceph_encode_32(&p, arg->flags);
1246
1247         ceph_con_send(&arg->session->s_con, msg);
1248         return 0;
1249 }
1250
1251 /*
1252  * Queue cap releases when an inode is dropped from our cache.  Since
1253  * inode is about to be destroyed, there is no need for i_ceph_lock.
1254  */
1255 void ceph_queue_caps_release(struct inode *inode)
1256 {
1257         struct ceph_inode_info *ci = ceph_inode(inode);
1258         struct rb_node *p;
1259
1260         p = rb_first(&ci->i_caps);
1261         while (p) {
1262                 struct ceph_cap *cap = rb_entry(p, struct ceph_cap, ci_node);
1263                 p = rb_next(p);
1264                 __ceph_remove_cap(cap, true);
1265         }
1266 }
1267
1268 /*
1269  * Send a cap msg on the given inode.  Update our caps state, then
1270  * drop i_ceph_lock and send the message.
1271  *
1272  * Make note of max_size reported/requested from mds, revoked caps
1273  * that have now been implemented.
1274  *
1275  * Make half-hearted attempt ot to invalidate page cache if we are
1276  * dropping RDCACHE.  Note that this will leave behind locked pages
1277  * that we'll then need to deal with elsewhere.
1278  *
1279  * Return non-zero if delayed release, or we experienced an error
1280  * such that the caller should requeue + retry later.
1281  *
1282  * called with i_ceph_lock, then drops it.
1283  * caller should hold snap_rwsem (read), s_mutex.
1284  */
1285 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
1286                       int op, bool sync, int used, int want, int retain,
1287                       int flushing, u64 flush_tid, u64 oldest_flush_tid)
1288         __releases(cap->ci->i_ceph_lock)
1289 {
1290         struct ceph_inode_info *ci = cap->ci;
1291         struct inode *inode = &ci->vfs_inode;
1292         struct cap_msg_args arg;
1293         int held, revoking;
1294         int wake = 0;
1295         int delayed = 0;
1296         int ret;
1297
1298         held = cap->issued | cap->implemented;
1299         revoking = cap->implemented & ~cap->issued;
1300         retain &= ~revoking;
1301
1302         dout("__send_cap %p cap %p session %p %s -> %s (revoking %s)\n",
1303              inode, cap, cap->session,
1304              ceph_cap_string(held), ceph_cap_string(held & retain),
1305              ceph_cap_string(revoking));
1306         BUG_ON((retain & CEPH_CAP_PIN) == 0);
1307
1308         arg.session = cap->session;
1309
1310         /* don't release wanted unless we've waited a bit. */
1311         if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
1312             time_before(jiffies, ci->i_hold_caps_min)) {
1313                 dout(" delaying issued %s -> %s, wanted %s -> %s on send\n",
1314                      ceph_cap_string(cap->issued),
1315                      ceph_cap_string(cap->issued & retain),
1316                      ceph_cap_string(cap->mds_wanted),
1317                      ceph_cap_string(want));
1318                 want |= cap->mds_wanted;
1319                 retain |= cap->issued;
1320                 delayed = 1;
1321         }
1322         ci->i_ceph_flags &= ~(CEPH_I_NODELAY | CEPH_I_FLUSH);
1323         if (want & ~cap->mds_wanted) {
1324                 /* user space may open/close single file frequently.
1325                  * This avoids droping mds_wanted immediately after
1326                  * requesting new mds_wanted.
1327                  */
1328                 __cap_set_timeouts(mdsc, ci);
1329         }
1330
1331         cap->issued &= retain;  /* drop bits we don't want */
1332         if (cap->implemented & ~cap->issued) {
1333                 /*
1334                  * Wake up any waiters on wanted -> needed transition.
1335                  * This is due to the weird transition from buffered
1336                  * to sync IO... we need to flush dirty pages _before_
1337                  * allowing sync writes to avoid reordering.
1338                  */
1339                 wake = 1;
1340         }
1341         cap->implemented &= cap->issued | used;
1342         cap->mds_wanted = want;
1343
1344         arg.ino = ceph_vino(inode).ino;
1345         arg.cid = cap->cap_id;
1346         arg.follows = flushing ? ci->i_head_snapc->seq : 0;
1347         arg.flush_tid = flush_tid;
1348         arg.oldest_flush_tid = oldest_flush_tid;
1349
1350         arg.size = inode->i_size;
1351         ci->i_reported_size = arg.size;
1352         arg.max_size = ci->i_wanted_max_size;
1353         ci->i_requested_max_size = arg.max_size;
1354
1355         if (flushing & CEPH_CAP_XATTR_EXCL) {
1356                 __ceph_build_xattrs_blob(ci);
1357                 arg.xattr_version = ci->i_xattrs.version;
1358                 arg.xattr_buf = ci->i_xattrs.blob;
1359         } else {
1360                 arg.xattr_buf = NULL;
1361         }
1362
1363         arg.mtime = inode->i_mtime;
1364         arg.atime = inode->i_atime;
1365         arg.ctime = inode->i_ctime;
1366
1367         arg.op = op;
1368         arg.caps = cap->implemented;
1369         arg.wanted = want;
1370         arg.dirty = flushing;
1371
1372         arg.seq = cap->seq;
1373         arg.issue_seq = cap->issue_seq;
1374         arg.mseq = cap->mseq;
1375         arg.time_warp_seq = ci->i_time_warp_seq;
1376
1377         arg.uid = inode->i_uid;
1378         arg.gid = inode->i_gid;
1379         arg.mode = inode->i_mode;
1380
1381         arg.inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
1382         if (list_empty(&ci->i_cap_snaps))
1383                 arg.flags = CEPH_CLIENT_CAPS_NO_CAPSNAP;
1384         else
1385                 arg.flags = CEPH_CLIENT_CAPS_PENDING_CAPSNAP;
1386         if (sync)
1387                 arg.flags |= CEPH_CLIENT_CAPS_SYNC;
1388
1389         spin_unlock(&ci->i_ceph_lock);
1390
1391         ret = send_cap_msg(&arg);
1392         if (ret < 0) {
1393                 dout("error sending cap msg, must requeue %p\n", inode);
1394                 delayed = 1;
1395         }
1396
1397         if (wake)
1398                 wake_up_all(&ci->i_cap_wq);
1399
1400         return delayed;
1401 }
1402
1403 static inline int __send_flush_snap(struct inode *inode,
1404                                     struct ceph_mds_session *session,
1405                                     struct ceph_cap_snap *capsnap,
1406                                     u32 mseq, u64 oldest_flush_tid)
1407 {
1408         struct cap_msg_args     arg;
1409
1410         arg.session = session;
1411         arg.ino = ceph_vino(inode).ino;
1412         arg.cid = 0;
1413         arg.follows = capsnap->follows;
1414         arg.flush_tid = capsnap->cap_flush.tid;
1415         arg.oldest_flush_tid = oldest_flush_tid;
1416
1417         arg.size = capsnap->size;
1418         arg.max_size = 0;
1419         arg.xattr_version = capsnap->xattr_version;
1420         arg.xattr_buf = capsnap->xattr_blob;
1421
1422         arg.atime = capsnap->atime;
1423         arg.mtime = capsnap->mtime;
1424         arg.ctime = capsnap->ctime;
1425
1426         arg.op = CEPH_CAP_OP_FLUSHSNAP;
1427         arg.caps = capsnap->issued;
1428         arg.wanted = 0;
1429         arg.dirty = capsnap->dirty;
1430
1431         arg.seq = 0;
1432         arg.issue_seq = 0;
1433         arg.mseq = mseq;
1434         arg.time_warp_seq = capsnap->time_warp_seq;
1435
1436         arg.uid = capsnap->uid;
1437         arg.gid = capsnap->gid;
1438         arg.mode = capsnap->mode;
1439
1440         arg.inline_data = capsnap->inline_data;
1441         arg.flags = 0;
1442
1443         return send_cap_msg(&arg);
1444 }
1445
1446 /*
1447  * When a snapshot is taken, clients accumulate dirty metadata on
1448  * inodes with capabilities in ceph_cap_snaps to describe the file
1449  * state at the time the snapshot was taken.  This must be flushed
1450  * asynchronously back to the MDS once sync writes complete and dirty
1451  * data is written out.
1452  *
1453  * Called under i_ceph_lock.  Takes s_mutex as needed.
1454  */
1455 static void __ceph_flush_snaps(struct ceph_inode_info *ci,
1456                                struct ceph_mds_session *session)
1457                 __releases(ci->i_ceph_lock)
1458                 __acquires(ci->i_ceph_lock)
1459 {
1460         struct inode *inode = &ci->vfs_inode;
1461         struct ceph_mds_client *mdsc = session->s_mdsc;
1462         struct ceph_cap_snap *capsnap;
1463         u64 oldest_flush_tid = 0;
1464         u64 first_tid = 1, last_tid = 0;
1465
1466         dout("__flush_snaps %p session %p\n", inode, session);
1467
1468         list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
1469                 /*
1470                  * we need to wait for sync writes to complete and for dirty
1471                  * pages to be written out.
1472                  */
1473                 if (capsnap->dirty_pages || capsnap->writing)
1474                         break;
1475
1476                 /* should be removed by ceph_try_drop_cap_snap() */
1477                 BUG_ON(!capsnap->need_flush);
1478
1479                 /* only flush each capsnap once */
1480                 if (capsnap->cap_flush.tid > 0) {
1481                         dout(" already flushed %p, skipping\n", capsnap);
1482                         continue;
1483                 }
1484
1485                 spin_lock(&mdsc->cap_dirty_lock);
1486                 capsnap->cap_flush.tid = ++mdsc->last_cap_flush_tid;
1487                 list_add_tail(&capsnap->cap_flush.g_list,
1488                               &mdsc->cap_flush_list);
1489                 if (oldest_flush_tid == 0)
1490                         oldest_flush_tid = __get_oldest_flush_tid(mdsc);
1491                 if (list_empty(&ci->i_flushing_item)) {
1492                         list_add_tail(&ci->i_flushing_item,
1493                                       &session->s_cap_flushing);
1494                 }
1495                 spin_unlock(&mdsc->cap_dirty_lock);
1496
1497                 list_add_tail(&capsnap->cap_flush.i_list,
1498                               &ci->i_cap_flush_list);
1499
1500                 if (first_tid == 1)
1501                         first_tid = capsnap->cap_flush.tid;
1502                 last_tid = capsnap->cap_flush.tid;
1503         }
1504
1505         ci->i_ceph_flags &= ~CEPH_I_FLUSH_SNAPS;
1506
1507         while (first_tid <= last_tid) {
1508                 struct ceph_cap *cap = ci->i_auth_cap;
1509                 struct ceph_cap_flush *cf;
1510                 int ret;
1511
1512                 if (!(cap && cap->session == session)) {
1513                         dout("__flush_snaps %p auth cap %p not mds%d, "
1514                              "stop\n", inode, cap, session->s_mds);
1515                         break;
1516                 }
1517
1518                 ret = -ENOENT;
1519                 list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
1520                         if (cf->tid >= first_tid) {
1521                                 ret = 0;
1522                                 break;
1523                         }
1524                 }
1525                 if (ret < 0)
1526                         break;
1527
1528                 first_tid = cf->tid + 1;
1529
1530                 capsnap = container_of(cf, struct ceph_cap_snap, cap_flush);
1531                 refcount_inc(&capsnap->nref);
1532                 spin_unlock(&ci->i_ceph_lock);
1533
1534                 dout("__flush_snaps %p capsnap %p tid %llu %s\n",
1535                      inode, capsnap, cf->tid, ceph_cap_string(capsnap->dirty));
1536
1537                 ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
1538                                         oldest_flush_tid);
1539                 if (ret < 0) {
1540                         pr_err("__flush_snaps: error sending cap flushsnap, "
1541                                "ino (%llx.%llx) tid %llu follows %llu\n",
1542                                 ceph_vinop(inode), cf->tid, capsnap->follows);
1543                 }
1544
1545                 ceph_put_cap_snap(capsnap);
1546                 spin_lock(&ci->i_ceph_lock);
1547         }
1548 }
1549
1550 void ceph_flush_snaps(struct ceph_inode_info *ci,
1551                       struct ceph_mds_session **psession)
1552 {
1553         struct inode *inode = &ci->vfs_inode;
1554         struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1555         struct ceph_mds_session *session = NULL;
1556         int mds;
1557
1558         dout("ceph_flush_snaps %p\n", inode);
1559         if (psession)
1560                 session = *psession;
1561 retry:
1562         spin_lock(&ci->i_ceph_lock);
1563         if (!(ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)) {
1564                 dout(" no capsnap needs flush, doing nothing\n");
1565                 goto out;
1566         }
1567         if (!ci->i_auth_cap) {
1568                 dout(" no auth cap (migrating?), doing nothing\n");
1569                 goto out;
1570         }
1571
1572         mds = ci->i_auth_cap->session->s_mds;
1573         if (session && session->s_mds != mds) {
1574                 dout(" oops, wrong session %p mutex\n", session);
1575                 mutex_unlock(&session->s_mutex);
1576                 ceph_put_mds_session(session);
1577                 session = NULL;
1578         }
1579         if (!session) {
1580                 spin_unlock(&ci->i_ceph_lock);
1581                 mutex_lock(&mdsc->mutex);
1582                 session = __ceph_lookup_mds_session(mdsc, mds);
1583                 mutex_unlock(&mdsc->mutex);
1584                 if (session) {
1585                         dout(" inverting session/ino locks on %p\n", session);
1586                         mutex_lock(&session->s_mutex);
1587                 }
1588                 goto retry;
1589         }
1590
1591         // make sure flushsnap messages are sent in proper order.
1592         if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
1593                 __kick_flushing_caps(mdsc, session, ci, 0);
1594                 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
1595         }
1596
1597         __ceph_flush_snaps(ci, session);
1598 out:
1599         spin_unlock(&ci->i_ceph_lock);
1600
1601         if (psession) {
1602                 *psession = session;
1603         } else if (session) {
1604                 mutex_unlock(&session->s_mutex);
1605                 ceph_put_mds_session(session);
1606         }
1607         /* we flushed them all; remove this inode from the queue */
1608         spin_lock(&mdsc->snap_flush_lock);
1609         list_del_init(&ci->i_snap_flush_item);
1610         spin_unlock(&mdsc->snap_flush_lock);
1611 }
1612
1613 /*
1614  * Mark caps dirty.  If inode is newly dirty, return the dirty flags.
1615  * Caller is then responsible for calling __mark_inode_dirty with the
1616  * returned flags value.
1617  */
1618 int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
1619                            struct ceph_cap_flush **pcf)
1620 {
1621         struct ceph_mds_client *mdsc =
1622                 ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
1623         struct inode *inode = &ci->vfs_inode;
1624         int was = ci->i_dirty_caps;
1625         int dirty = 0;
1626
1627         if (!ci->i_auth_cap) {
1628                 pr_warn("__mark_dirty_caps %p %llx mask %s, "
1629                         "but no auth cap (session was closed?)\n",
1630                         inode, ceph_ino(inode), ceph_cap_string(mask));
1631                 return 0;
1632         }
1633
1634         dout("__mark_dirty_caps %p %s dirty %s -> %s\n", &ci->vfs_inode,
1635              ceph_cap_string(mask), ceph_cap_string(was),
1636              ceph_cap_string(was | mask));
1637         ci->i_dirty_caps |= mask;
1638         if (was == 0) {
1639                 WARN_ON_ONCE(ci->i_prealloc_cap_flush);
1640                 swap(ci->i_prealloc_cap_flush, *pcf);
1641
1642                 if (!ci->i_head_snapc) {
1643                         WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
1644                         ci->i_head_snapc = ceph_get_snap_context(
1645                                 ci->i_snap_realm->cached_context);
1646                 }
1647                 dout(" inode %p now dirty snapc %p auth cap %p\n",
1648                      &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
1649                 BUG_ON(!list_empty(&ci->i_dirty_item));
1650                 spin_lock(&mdsc->cap_dirty_lock);
1651                 list_add(&ci->i_dirty_item, &mdsc->cap_dirty);
1652                 spin_unlock(&mdsc->cap_dirty_lock);
1653                 if (ci->i_flushing_caps == 0) {
1654                         ihold(inode);
1655                         dirty |= I_DIRTY_SYNC;
1656                 }
1657         } else {
1658                 WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
1659         }
1660         BUG_ON(list_empty(&ci->i_dirty_item));
1661         if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
1662             (mask & CEPH_CAP_FILE_BUFFER))
1663                 dirty |= I_DIRTY_DATASYNC;
1664         __cap_delay_requeue(mdsc, ci);
1665         return dirty;
1666 }
1667
1668 struct ceph_cap_flush *ceph_alloc_cap_flush(void)
1669 {
1670         return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
1671 }
1672
1673 void ceph_free_cap_flush(struct ceph_cap_flush *cf)
1674 {
1675         if (cf)
1676                 kmem_cache_free(ceph_cap_flush_cachep, cf);
1677 }
1678
1679 static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
1680 {
1681         if (!list_empty(&mdsc->cap_flush_list)) {
1682                 struct ceph_cap_flush *cf =
1683                         list_first_entry(&mdsc->cap_flush_list,
1684                                          struct ceph_cap_flush, g_list);
1685                 return cf->tid;
1686         }
1687         return 0;
1688 }
1689
1690 /*
1691  * Remove cap_flush from the mdsc's or inode's flushing cap list.
1692  * Return true if caller needs to wake up flush waiters.
1693  */
1694 static bool __finish_cap_flush(struct ceph_mds_client *mdsc,
1695                                struct ceph_inode_info *ci,
1696                                struct ceph_cap_flush *cf)
1697 {
1698         struct ceph_cap_flush *prev;
1699         bool wake = cf->wake;
1700         if (mdsc) {
1701                 /* are there older pending cap flushes? */
1702                 if (wake && cf->g_list.prev != &mdsc->cap_flush_list) {
1703                         prev = list_prev_entry(cf, g_list);
1704                         prev->wake = true;
1705                         wake = false;
1706                 }
1707                 list_del(&cf->g_list);
1708         } else if (ci) {
1709                 if (wake && cf->i_list.prev != &ci->i_cap_flush_list) {
1710                         prev = list_prev_entry(cf, i_list);
1711                         prev->wake = true;
1712                         wake = false;
1713                 }
1714                 list_del(&cf->i_list);
1715         } else {
1716                 BUG_ON(1);
1717         }
1718         return wake;
1719 }
1720
1721 /*
1722  * Add dirty inode to the flushing list.  Assigned a seq number so we
1723  * can wait for caps to flush without starving.
1724  *
1725  * Called under i_ceph_lock.
1726  */
1727 static int __mark_caps_flushing(struct inode *inode,
1728                                 struct ceph_mds_session *session, bool wake,
1729                                 u64 *flush_tid, u64 *oldest_flush_tid)
1730 {
1731         struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
1732         struct ceph_inode_info *ci = ceph_inode(inode);
1733         struct ceph_cap_flush *cf = NULL;
1734         int flushing;
1735
1736         BUG_ON(ci->i_dirty_caps == 0);
1737         BUG_ON(list_empty(&ci->i_dirty_item));
1738         BUG_ON(!ci->i_prealloc_cap_flush);
1739
1740         flushing = ci->i_dirty_caps;
1741         dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n",
1742              ceph_cap_string(flushing),
1743              ceph_cap_string(ci->i_flushing_caps),
1744              ceph_cap_string(ci->i_flushing_caps | flushing));
1745         ci->i_flushing_caps |= flushing;
1746         ci->i_dirty_caps = 0;
1747         dout(" inode %p now !dirty\n", inode);
1748
1749         swap(cf, ci->i_prealloc_cap_flush);
1750         cf->caps = flushing;
1751         cf->wake = wake;
1752
1753         spin_lock(&mdsc->cap_dirty_lock);
1754         list_del_init(&ci->i_dirty_item);
1755
1756         cf->tid = ++mdsc->last_cap_flush_tid;
1757         list_add_tail(&cf->g_list, &mdsc->cap_flush_list);
1758         *oldest_flush_tid = __get_oldest_flush_tid(mdsc);
1759
1760         if (list_empty(&ci->i_flushing_item)) {
1761                 list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
1762                 mdsc->num_cap_flushing++;
1763         }
1764         spin_unlock(&mdsc->cap_dirty_lock);
1765
1766         list_add_tail(&cf->i_list, &ci->i_cap_flush_list);
1767
1768         *flush_tid = cf->tid;
1769         return flushing;
1770 }
1771
1772 /*
1773  * try to invalidate mapping pages without blocking.
1774  */
1775 static int try_nonblocking_invalidate(struct inode *inode)
1776 {
1777         struct ceph_inode_info *ci = ceph_inode(inode);
1778         u32 invalidating_gen = ci->i_rdcache_gen;
1779
1780         spin_unlock(&ci->i_ceph_lock);
1781         invalidate_mapping_pages(&inode->i_data, 0, -1);
1782         spin_lock(&ci->i_ceph_lock);
1783
1784         if (inode->i_data.nrpages == 0 &&
1785             invalidating_gen == ci->i_rdcache_gen) {
1786                 /* success. */
1787                 dout("try_nonblocking_invalidate %p success\n", inode);
1788                 /* save any racing async invalidate some trouble */
1789                 ci->i_rdcache_revoking = ci->i_rdcache_gen - 1;
1790                 return 0;
1791         }
1792         dout("try_nonblocking_invalidate %p failed\n", inode);
1793         return -1;
1794 }
1795
1796 bool __ceph_should_report_size(struct ceph_inode_info *ci)
1797 {
1798         loff_t size = ci->vfs_inode.i_size;
1799         /* mds will adjust max size according to the reported size */
1800         if (ci->i_flushing_caps & CEPH_CAP_FILE_WR)
1801                 return false;
1802         if (size >= ci->i_max_size)
1803                 return true;
1804         /* half of previous max_size increment has been used */
1805         if (ci->i_max_size > ci->i_reported_size &&
1806             (size << 1) >= ci->i_max_size + ci->i_reported_size)
1807                 return true;
1808         return false;
1809 }
1810
1811 /*
1812  * Swiss army knife function to examine currently used and wanted
1813  * versus held caps.  Release, flush, ack revoked caps to mds as
1814  * appropriate.
1815  *
1816  *  CHECK_CAPS_NODELAY - caller is delayed work and we should not delay
1817  *    cap release further.
1818  *  CHECK_CAPS_AUTHONLY - we should only check the auth cap
1819  *  CHECK_CAPS_FLUSH - we should flush any dirty caps immediately, without
1820  *    further delay.
1821  */
1822 void ceph_check_caps(struct ceph_inode_info *ci, int flags,
1823                      struct ceph_mds_session *session)
1824 {
1825         struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
1826         struct ceph_mds_client *mdsc = fsc->mdsc;
1827         struct inode *inode = &ci->vfs_inode;
1828         struct ceph_cap *cap;
1829         u64 flush_tid, oldest_flush_tid;
1830         int file_wanted, used, cap_used;
1831         int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
1832         int issued, implemented, want, retain, revoking, flushing = 0;
1833         int mds = -1;   /* keep track of how far we've gone through i_caps list
1834                            to avoid an infinite loop on retry */
1835         struct rb_node *p;
1836         int delayed = 0, sent = 0;
1837         bool no_delay = flags & CHECK_CAPS_NODELAY;
1838         bool queue_invalidate = false;
1839         bool tried_invalidate = false;
1840
1841         /* if we are unmounting, flush any unused caps immediately. */
1842         if (mdsc->stopping)
1843                 no_delay = true;
1844
1845         spin_lock(&ci->i_ceph_lock);
1846
1847         if (ci->i_ceph_flags & CEPH_I_FLUSH)
1848                 flags |= CHECK_CAPS_FLUSH;
1849
1850         if (!(flags & CHECK_CAPS_AUTHONLY) ||
1851             (ci->i_auth_cap && __ceph_is_single_caps(ci)))
1852                 __cap_delay_cancel(mdsc, ci);
1853
1854         goto retry_locked;
1855 retry:
1856         spin_lock(&ci->i_ceph_lock);
1857 retry_locked:
1858         file_wanted = __ceph_caps_file_wanted(ci);
1859         used = __ceph_caps_used(ci);
1860         issued = __ceph_caps_issued(ci, &implemented);
1861         revoking = implemented & ~issued;
1862
1863         want = file_wanted;
1864         retain = file_wanted | used | CEPH_CAP_PIN;
1865         if (!mdsc->stopping && inode->i_nlink > 0) {
1866                 if (file_wanted) {
1867                         retain |= CEPH_CAP_ANY;       /* be greedy */
1868                 } else if (S_ISDIR(inode->i_mode) &&
1869                            (issued & CEPH_CAP_FILE_SHARED) &&
1870                             __ceph_dir_is_complete(ci)) {
1871                         /*
1872                          * If a directory is complete, we want to keep
1873                          * the exclusive cap. So that MDS does not end up
1874                          * revoking the shared cap on every create/unlink
1875                          * operation.
1876                          */
1877                         want = CEPH_CAP_ANY_SHARED | CEPH_CAP_FILE_EXCL;
1878                         retain |= want;
1879                 } else {
1880
1881                         retain |= CEPH_CAP_ANY_SHARED;
1882                         /*
1883                          * keep RD only if we didn't have the file open RW,
1884                          * because then the mds would revoke it anyway to
1885                          * journal max_size=0.
1886                          */
1887                         if (ci->i_max_size == 0)
1888                                 retain |= CEPH_CAP_ANY_RD;
1889                 }
1890         }
1891
1892         dout("check_caps %p file_want %s used %s dirty %s flushing %s"
1893              " issued %s revoking %s retain %s %s%s%s\n", inode,
1894              ceph_cap_string(file_wanted),
1895              ceph_cap_string(used), ceph_cap_string(ci->i_dirty_caps),
1896              ceph_cap_string(ci->i_flushing_caps),
1897              ceph_cap_string(issued), ceph_cap_string(revoking),
1898              ceph_cap_string(retain),
1899              (flags & CHECK_CAPS_AUTHONLY) ? " AUTHONLY" : "",
1900              (flags & CHECK_CAPS_NODELAY) ? " NODELAY" : "",
1901              (flags & CHECK_CAPS_FLUSH) ? " FLUSH" : "");
1902
1903         /*
1904          * If we no longer need to hold onto old our caps, and we may
1905          * have cached pages, but don't want them, then try to invalidate.
1906          * If we fail, it's because pages are locked.... try again later.
1907          */
1908         if ((!no_delay || mdsc->stopping) &&
1909             !S_ISDIR(inode->i_mode) &&          /* ignore readdir cache */
1910             !(ci->i_wb_ref || ci->i_wrbuffer_ref) &&   /* no dirty pages... */
1911             inode->i_data.nrpages &&            /* have cached pages */
1912             (revoking & (CEPH_CAP_FILE_CACHE|
1913                          CEPH_CAP_FILE_LAZYIO)) && /*  or revoking cache */
1914             !tried_invalidate) {
1915                 dout("check_caps trying to invalidate on %p\n", inode);
1916                 if (try_nonblocking_invalidate(inode) < 0) {
1917                         dout("check_caps queuing invalidate\n");
1918                         queue_invalidate = true;
1919                         ci->i_rdcache_revoking = ci->i_rdcache_gen;
1920                 }
1921                 tried_invalidate = true;
1922                 goto retry_locked;
1923         }
1924
1925         for (p = rb_first(&ci->i_caps); p; p = rb_next(p)) {
1926                 cap = rb_entry(p, struct ceph_cap, ci_node);
1927
1928                 /* avoid looping forever */
1929                 if (mds >= cap->mds ||
1930                     ((flags & CHECK_CAPS_AUTHONLY) && cap != ci->i_auth_cap))
1931                         continue;
1932
1933                 /* NOTE: no side-effects allowed, until we take s_mutex */
1934
1935                 cap_used = used;
1936                 if (ci->i_auth_cap && cap != ci->i_auth_cap)
1937                         cap_used &= ~ci->i_auth_cap->issued;
1938
1939                 revoking = cap->implemented & ~cap->issued;
1940                 dout(" mds%d cap %p used %s issued %s implemented %s revoking %s\n",
1941                      cap->mds, cap, ceph_cap_string(cap_used),
1942                      ceph_cap_string(cap->issued),
1943                      ceph_cap_string(cap->implemented),
1944                      ceph_cap_string(revoking));
1945
1946                 if (cap == ci->i_auth_cap &&
1947                     (cap->issued & CEPH_CAP_FILE_WR)) {
1948                         /* request larger max_size from MDS? */
1949                         if (ci->i_wanted_max_size > ci->i_max_size &&
1950                             ci->i_wanted_max_size > ci->i_requested_max_size) {
1951                                 dout("requesting new max_size\n");
1952                                 goto ack;
1953                         }
1954
1955                         /* approaching file_max? */
1956                         if (__ceph_should_report_size(ci)) {
1957                                 dout("i_size approaching max_size\n");
1958                                 goto ack;
1959                         }
1960                 }
1961                 /* flush anything dirty? */
1962                 if (cap == ci->i_auth_cap) {
1963                         if ((flags & CHECK_CAPS_FLUSH) && ci->i_dirty_caps) {
1964                                 dout("flushing dirty caps\n");
1965                                 goto ack;
1966                         }
1967                         if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS) {
1968                                 dout("flushing snap caps\n");
1969                                 goto ack;
1970                         }
1971                 }
1972
1973                 /* completed revocation? going down and there are no caps? */
1974                 if (revoking && (revoking & cap_used) == 0) {
1975                         dout("completed revocation of %s\n",
1976                              ceph_cap_string(cap->implemented & ~cap->issued));
1977                         goto ack;
1978                 }
1979
1980                 /* want more caps from mds? */
1981                 if (want & ~(cap->mds_wanted | cap->issued))
1982                         goto ack;
1983
1984                 /* things we might delay */
1985                 if ((cap->issued & ~retain) == 0 &&
1986                     cap->mds_wanted == want)
1987                         continue;     /* nope, all good */
1988
1989                 if (no_delay)
1990                         goto ack;
1991
1992                 /* delay? */
1993                 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0 &&
1994                     time_before(jiffies, ci->i_hold_caps_max)) {
1995                         dout(" delaying issued %s -> %s, wanted %s -> %s\n",
1996                              ceph_cap_string(cap->issued),
1997                              ceph_cap_string(cap->issued & retain),
1998                              ceph_cap_string(cap->mds_wanted),
1999                              ceph_cap_string(want));
2000                         delayed++;
2001                         continue;
2002                 }
2003
2004 ack:
2005                 if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
2006                         dout(" skipping %p I_NOFLUSH set\n", inode);
2007                         continue;
2008                 }
2009
2010                 if (session && session != cap->session) {
2011                         dout("oops, wrong session %p mutex\n", session);
2012                         mutex_unlock(&session->s_mutex);
2013                         session = NULL;
2014                 }
2015                 if (!session) {
2016                         session = cap->session;
2017                         if (mutex_trylock(&session->s_mutex) == 0) {
2018                                 dout("inverting session/ino locks on %p\n",
2019                                      session);
2020                                 spin_unlock(&ci->i_ceph_lock);
2021                                 if (took_snap_rwsem) {
2022                                         up_read(&mdsc->snap_rwsem);
2023                                         took_snap_rwsem = 0;
2024                                 }
2025                                 mutex_lock(&session->s_mutex);
2026                                 goto retry;
2027                         }
2028                 }
2029
2030                 /* kick flushing and flush snaps before sending normal
2031                  * cap message */
2032                 if (cap == ci->i_auth_cap &&
2033                     (ci->i_ceph_flags &
2034                      (CEPH_I_KICK_FLUSH | CEPH_I_FLUSH_SNAPS))) {
2035                         if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
2036                                 __kick_flushing_caps(mdsc, session, ci, 0);
2037                                 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
2038                         }
2039                         if (ci->i_ceph_flags & CEPH_I_FLUSH_SNAPS)
2040                                 __ceph_flush_snaps(ci, session);
2041
2042                         goto retry_locked;
2043                 }
2044
2045                 /* take snap_rwsem after session mutex */
2046                 if (!took_snap_rwsem) {
2047                         if (down_read_trylock(&mdsc->snap_rwsem) == 0) {
2048                                 dout("inverting snap/in locks on %p\n",
2049                                      inode);
2050                                 spin_unlock(&ci->i_ceph_lock);
2051                                 down_read(&mdsc->snap_rwsem);
2052                                 took_snap_rwsem = 1;
2053                                 goto retry;
2054                         }
2055                         took_snap_rwsem = 1;
2056                 }
2057
2058                 if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
2059                         flushing = __mark_caps_flushing(inode, session, false,
2060                                                         &flush_tid,
2061                                                         &oldest_flush_tid);
2062                 } else {
2063                         flushing = 0;
2064                         flush_tid = 0;
2065                         spin_lock(&mdsc->cap_dirty_lock);
2066                         oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2067                         spin_unlock(&mdsc->cap_dirty_lock);
2068                 }
2069
2070                 mds = cap->mds;  /* remember mds, so we don't repeat */
2071                 sent++;
2072
2073                 /* __send_cap drops i_ceph_lock */
2074                 delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, false,
2075                                 cap_used, want, retain, flushing,
2076                                 flush_tid, oldest_flush_tid);
2077                 goto retry; /* retake i_ceph_lock and restart our cap scan. */
2078         }
2079
2080         /* Reschedule delayed caps release if we delayed anything */
2081         if (delayed)
2082                 __cap_delay_requeue(mdsc, ci);
2083
2084         spin_unlock(&ci->i_ceph_lock);
2085
2086         if (queue_invalidate)
2087                 ceph_queue_invalidate(inode);
2088
2089         if (session)
2090                 mutex_unlock(&session->s_mutex);
2091         if (took_snap_rwsem)
2092                 up_read(&mdsc->snap_rwsem);
2093 }
2094
2095 /*
2096  * Try to flush dirty caps back to the auth mds.
2097  */
2098 static int try_flush_caps(struct inode *inode, u64 *ptid)
2099 {
2100         struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
2101         struct ceph_inode_info *ci = ceph_inode(inode);
2102         struct ceph_mds_session *session = NULL;
2103         int flushing = 0;
2104         u64 flush_tid = 0, oldest_flush_tid = 0;
2105
2106 retry:
2107         spin_lock(&ci->i_ceph_lock);
2108         if (ci->i_ceph_flags & CEPH_I_NOFLUSH) {
2109                 spin_unlock(&ci->i_ceph_lock);
2110                 dout("try_flush_caps skipping %p I_NOFLUSH set\n", inode);
2111                 goto out;
2112         }
2113         if (ci->i_dirty_caps && ci->i_auth_cap) {
2114                 struct ceph_cap *cap = ci->i_auth_cap;
2115                 int used = __ceph_caps_used(ci);
2116                 int want = __ceph_caps_wanted(ci);
2117                 int delayed;
2118
2119                 if (!session || session != cap->session) {
2120                         spin_unlock(&ci->i_ceph_lock);
2121                         if (session)
2122                                 mutex_unlock(&session->s_mutex);
2123                         session = cap->session;
2124                         mutex_lock(&session->s_mutex);
2125                         goto retry;
2126                 }
2127                 if (cap->session->s_state < CEPH_MDS_SESSION_OPEN) {
2128                         spin_unlock(&ci->i_ceph_lock);
2129                         goto out;
2130                 }
2131
2132                 flushing = __mark_caps_flushing(inode, session, true,
2133                                                 &flush_tid, &oldest_flush_tid);
2134
2135                 /* __send_cap drops i_ceph_lock */
2136                 delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, true,
2137                                 used, want, (cap->issued | cap->implemented),
2138                                 flushing, flush_tid, oldest_flush_tid);
2139
2140                 if (delayed) {
2141                         spin_lock(&ci->i_ceph_lock);
2142                         __cap_delay_requeue(mdsc, ci);
2143                         spin_unlock(&ci->i_ceph_lock);
2144                 }
2145         } else {
2146                 if (!list_empty(&ci->i_cap_flush_list)) {
2147                         struct ceph_cap_flush *cf =
2148                                 list_last_entry(&ci->i_cap_flush_list,
2149                                                 struct ceph_cap_flush, i_list);
2150                         cf->wake = true;
2151                         flush_tid = cf->tid;
2152                 }
2153                 flushing = ci->i_flushing_caps;
2154                 spin_unlock(&ci->i_ceph_lock);
2155         }
2156 out:
2157         if (session)
2158                 mutex_unlock(&session->s_mutex);
2159
2160         *ptid = flush_tid;
2161         return flushing;
2162 }
2163
2164 /*
2165  * Return true if we've flushed caps through the given flush_tid.
2166  */
2167 static int caps_are_flushed(struct inode *inode, u64 flush_tid)
2168 {
2169         struct ceph_inode_info *ci = ceph_inode(inode);
2170         int ret = 1;
2171
2172         spin_lock(&ci->i_ceph_lock);
2173         if (!list_empty(&ci->i_cap_flush_list)) {
2174                 struct ceph_cap_flush * cf =
2175                         list_first_entry(&ci->i_cap_flush_list,
2176                                          struct ceph_cap_flush, i_list);
2177                 if (cf->tid <= flush_tid)
2178                         ret = 0;
2179         }
2180         spin_unlock(&ci->i_ceph_lock);
2181         return ret;
2182 }
2183
2184 /*
2185  * wait for any unsafe requests to complete.
2186  */
2187 static int unsafe_request_wait(struct inode *inode)
2188 {
2189         struct ceph_inode_info *ci = ceph_inode(inode);
2190         struct ceph_mds_request *req1 = NULL, *req2 = NULL;
2191         int ret, err = 0;
2192
2193         spin_lock(&ci->i_unsafe_lock);
2194         if (S_ISDIR(inode->i_mode) && !list_empty(&ci->i_unsafe_dirops)) {
2195                 req1 = list_last_entry(&ci->i_unsafe_dirops,
2196                                         struct ceph_mds_request,
2197                                         r_unsafe_dir_item);
2198                 ceph_mdsc_get_request(req1);
2199         }
2200         if (!list_empty(&ci->i_unsafe_iops)) {
2201                 req2 = list_last_entry(&ci->i_unsafe_iops,
2202                                         struct ceph_mds_request,
2203                                         r_unsafe_target_item);
2204                 ceph_mdsc_get_request(req2);
2205         }
2206         spin_unlock(&ci->i_unsafe_lock);
2207
2208         dout("unsafe_request_wait %p wait on tid %llu %llu\n",
2209              inode, req1 ? req1->r_tid : 0ULL, req2 ? req2->r_tid : 0ULL);
2210         if (req1) {
2211                 ret = !wait_for_completion_timeout(&req1->r_safe_completion,
2212                                         ceph_timeout_jiffies(req1->r_timeout));
2213                 if (ret)
2214                         err = -EIO;
2215                 ceph_mdsc_put_request(req1);
2216         }
2217         if (req2) {
2218                 ret = !wait_for_completion_timeout(&req2->r_safe_completion,
2219                                         ceph_timeout_jiffies(req2->r_timeout));
2220                 if (ret)
2221                         err = -EIO;
2222                 ceph_mdsc_put_request(req2);
2223         }
2224         return err;
2225 }
2226
2227 int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
2228 {
2229         struct inode *inode = file->f_mapping->host;
2230         struct ceph_inode_info *ci = ceph_inode(inode);
2231         u64 flush_tid;
2232         int ret;
2233         int dirty;
2234
2235         dout("fsync %p%s\n", inode, datasync ? " datasync" : "");
2236
2237         ret = file_write_and_wait_range(file, start, end);
2238         if (ret < 0)
2239                 goto out;
2240
2241         if (datasync)
2242                 goto out;
2243
2244         inode_lock(inode);
2245
2246         dirty = try_flush_caps(inode, &flush_tid);
2247         dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
2248
2249         ret = unsafe_request_wait(inode);
2250
2251         /*
2252          * only wait on non-file metadata writeback (the mds
2253          * can recover size and mtime, so we don't need to
2254          * wait for that)
2255          */
2256         if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
2257                 ret = wait_event_interruptible(ci->i_cap_wq,
2258                                         caps_are_flushed(inode, flush_tid));
2259         }
2260         inode_unlock(inode);
2261 out:
2262         dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
2263         return ret;
2264 }
2265
2266 /*
2267  * Flush any dirty caps back to the mds.  If we aren't asked to wait,
2268  * queue inode for flush but don't do so immediately, because we can
2269  * get by with fewer MDS messages if we wait for data writeback to
2270  * complete first.
2271  */
2272 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
2273 {
2274         struct ceph_inode_info *ci = ceph_inode(inode);
2275         u64 flush_tid;
2276         int err = 0;
2277         int dirty;
2278         int wait = (wbc->sync_mode == WB_SYNC_ALL && !wbc->for_sync);
2279
2280         dout("write_inode %p wait=%d\n", inode, wait);
2281         if (wait) {
2282                 dirty = try_flush_caps(inode, &flush_tid);
2283                 if (dirty)
2284                         err = wait_event_interruptible(ci->i_cap_wq,
2285                                        caps_are_flushed(inode, flush_tid));
2286         } else {
2287                 struct ceph_mds_client *mdsc =
2288                         ceph_sb_to_client(inode->i_sb)->mdsc;
2289
2290                 spin_lock(&ci->i_ceph_lock);
2291                 if (__ceph_caps_dirty(ci))
2292                         __cap_delay_requeue_front(mdsc, ci);
2293                 spin_unlock(&ci->i_ceph_lock);
2294         }
2295         return err;
2296 }
2297
2298 static void __kick_flushing_caps(struct ceph_mds_client *mdsc,
2299                                  struct ceph_mds_session *session,
2300                                  struct ceph_inode_info *ci,
2301                                  u64 oldest_flush_tid)
2302         __releases(ci->i_ceph_lock)
2303         __acquires(ci->i_ceph_lock)
2304 {
2305         struct inode *inode = &ci->vfs_inode;
2306         struct ceph_cap *cap;
2307         struct ceph_cap_flush *cf;
2308         int ret;
2309         u64 first_tid = 0;
2310
2311         list_for_each_entry(cf, &ci->i_cap_flush_list, i_list) {
2312                 if (cf->tid < first_tid)
2313                         continue;
2314
2315                 cap = ci->i_auth_cap;
2316                 if (!(cap && cap->session == session)) {
2317                         pr_err("%p auth cap %p not mds%d ???\n",
2318                                inode, cap, session->s_mds);
2319                         break;
2320                 }
2321
2322                 first_tid = cf->tid + 1;
2323
2324                 if (cf->caps) {
2325                         dout("kick_flushing_caps %p cap %p tid %llu %s\n",
2326                              inode, cap, cf->tid, ceph_cap_string(cf->caps));
2327                         ci->i_ceph_flags |= CEPH_I_NODELAY;
2328                         ret = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
2329                                           false, __ceph_caps_used(ci),
2330                                           __ceph_caps_wanted(ci),
2331                                           cap->issued | cap->implemented,
2332                                           cf->caps, cf->tid, oldest_flush_tid);
2333                         if (ret) {
2334                                 pr_err("kick_flushing_caps: error sending "
2335                                         "cap flush, ino (%llx.%llx) "
2336                                         "tid %llu flushing %s\n",
2337                                         ceph_vinop(inode), cf->tid,
2338                                         ceph_cap_string(cf->caps));
2339                         }
2340                 } else {
2341                         struct ceph_cap_snap *capsnap =
2342                                         container_of(cf, struct ceph_cap_snap,
2343                                                     cap_flush);
2344                         dout("kick_flushing_caps %p capsnap %p tid %llu %s\n",
2345                              inode, capsnap, cf->tid,
2346                              ceph_cap_string(capsnap->dirty));
2347
2348                         refcount_inc(&capsnap->nref);
2349                         spin_unlock(&ci->i_ceph_lock);
2350
2351                         ret = __send_flush_snap(inode, session, capsnap, cap->mseq,
2352                                                 oldest_flush_tid);
2353                         if (ret < 0) {
2354                                 pr_err("kick_flushing_caps: error sending "
2355                                         "cap flushsnap, ino (%llx.%llx) "
2356                                         "tid %llu follows %llu\n",
2357                                         ceph_vinop(inode), cf->tid,
2358                                         capsnap->follows);
2359                         }
2360
2361                         ceph_put_cap_snap(capsnap);
2362                 }
2363
2364                 spin_lock(&ci->i_ceph_lock);
2365         }
2366 }
2367
2368 void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
2369                                    struct ceph_mds_session *session)
2370 {
2371         struct ceph_inode_info *ci;
2372         struct ceph_cap *cap;
2373         u64 oldest_flush_tid;
2374
2375         dout("early_kick_flushing_caps mds%d\n", session->s_mds);
2376
2377         spin_lock(&mdsc->cap_dirty_lock);
2378         oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2379         spin_unlock(&mdsc->cap_dirty_lock);
2380
2381         list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
2382                 spin_lock(&ci->i_ceph_lock);
2383                 cap = ci->i_auth_cap;
2384                 if (!(cap && cap->session == session)) {
2385                         pr_err("%p auth cap %p not mds%d ???\n",
2386                                 &ci->vfs_inode, cap, session->s_mds);
2387                         spin_unlock(&ci->i_ceph_lock);
2388                         continue;
2389                 }
2390
2391
2392                 /*
2393                  * if flushing caps were revoked, we re-send the cap flush
2394                  * in client reconnect stage. This guarantees MDS * processes
2395                  * the cap flush message before issuing the flushing caps to
2396                  * other client.
2397                  */
2398                 if ((cap->issued & ci->i_flushing_caps) !=
2399                     ci->i_flushing_caps) {
2400                         ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
2401                         __kick_flushing_caps(mdsc, session, ci,
2402                                              oldest_flush_tid);
2403                 } else {
2404                         ci->i_ceph_flags |= CEPH_I_KICK_FLUSH;
2405                 }
2406
2407                 spin_unlock(&ci->i_ceph_lock);
2408         }
2409 }
2410
2411 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
2412                              struct ceph_mds_session *session)
2413 {
2414         struct ceph_inode_info *ci;
2415         struct ceph_cap *cap;
2416         u64 oldest_flush_tid;
2417
2418         dout("kick_flushing_caps mds%d\n", session->s_mds);
2419
2420         spin_lock(&mdsc->cap_dirty_lock);
2421         oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2422         spin_unlock(&mdsc->cap_dirty_lock);
2423
2424         list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
2425                 spin_lock(&ci->i_ceph_lock);
2426                 cap = ci->i_auth_cap;
2427                 if (!(cap && cap->session == session)) {
2428                         pr_err("%p auth cap %p not mds%d ???\n",
2429                                 &ci->vfs_inode, cap, session->s_mds);
2430                         spin_unlock(&ci->i_ceph_lock);
2431                         continue;
2432                 }
2433                 if (ci->i_ceph_flags & CEPH_I_KICK_FLUSH) {
2434                         ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
2435                         __kick_flushing_caps(mdsc, session, ci,
2436                                              oldest_flush_tid);
2437                 }
2438                 spin_unlock(&ci->i_ceph_lock);
2439         }
2440 }
2441
2442 static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
2443                                      struct ceph_mds_session *session,
2444                                      struct inode *inode)
2445         __releases(ci->i_ceph_lock)
2446 {
2447         struct ceph_inode_info *ci = ceph_inode(inode);
2448         struct ceph_cap *cap;
2449
2450         cap = ci->i_auth_cap;
2451         dout("kick_flushing_inode_caps %p flushing %s\n", inode,
2452              ceph_cap_string(ci->i_flushing_caps));
2453
2454         if (!list_empty(&ci->i_cap_flush_list)) {
2455                 u64 oldest_flush_tid;
2456                 spin_lock(&mdsc->cap_dirty_lock);
2457                 list_move_tail(&ci->i_flushing_item,
2458                                &cap->session->s_cap_flushing);
2459                 oldest_flush_tid = __get_oldest_flush_tid(mdsc);
2460                 spin_unlock(&mdsc->cap_dirty_lock);
2461
2462                 ci->i_ceph_flags &= ~CEPH_I_KICK_FLUSH;
2463                 __kick_flushing_caps(mdsc, session, ci, oldest_flush_tid);
2464                 spin_unlock(&ci->i_ceph_lock);
2465         } else {
2466                 spin_unlock(&ci->i_ceph_lock);
2467         }
2468 }
2469
2470
2471 /*
2472  * Take references to capabilities we hold, so that we don't release
2473  * them to the MDS prematurely.
2474  *
2475  * Protected by i_ceph_lock.
2476  */
2477 static void __take_cap_refs(struct ceph_inode_info *ci, int got,
2478                             bool snap_rwsem_locked)
2479 {
2480         if (got & CEPH_CAP_PIN)
2481                 ci->i_pin_ref++;
2482         if (got & CEPH_CAP_FILE_RD)
2483                 ci->i_rd_ref++;
2484         if (got & CEPH_CAP_FILE_CACHE)
2485                 ci->i_rdcache_ref++;
2486         if (got & CEPH_CAP_FILE_WR) {
2487                 if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
2488                         BUG_ON(!snap_rwsem_locked);
2489                         ci->i_head_snapc = ceph_get_snap_context(
2490                                         ci->i_snap_realm->cached_context);
2491                 }
2492                 ci->i_wr_ref++;
2493         }
2494         if (got & CEPH_CAP_FILE_BUFFER) {
2495                 if (ci->i_wb_ref == 0)
2496                         ihold(&ci->vfs_inode);
2497                 ci->i_wb_ref++;
2498                 dout("__take_cap_refs %p wb %d -> %d (?)\n",
2499                      &ci->vfs_inode, ci->i_wb_ref-1, ci->i_wb_ref);
2500         }
2501 }
2502
2503 /*
2504  * Try to grab cap references.  Specify those refs we @want, and the
2505  * minimal set we @need.  Also include the larger offset we are writing
2506  * to (when applicable), and check against max_size here as well.
2507  * Note that caller is responsible for ensuring max_size increases are
2508  * requested from the MDS.
2509  */
2510 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
2511                             loff_t endoff, bool nonblock, int *got, int *err)
2512 {
2513         struct inode *inode = &ci->vfs_inode;
2514         struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
2515         int ret = 0;
2516         int have, implemented;
2517         int file_wanted;
2518         bool snap_rwsem_locked = false;
2519
2520         dout("get_cap_refs %p need %s want %s\n", inode,
2521              ceph_cap_string(need), ceph_cap_string(want));
2522
2523 again:
2524         spin_lock(&ci->i_ceph_lock);
2525
2526         /* make sure file is actually open */
2527         file_wanted = __ceph_caps_file_wanted(ci);
2528         if ((file_wanted & need) != need) {
2529                 dout("try_get_cap_refs need %s file_wanted %s, EBADF\n",
2530                      ceph_cap_string(need), ceph_cap_string(file_wanted));
2531                 *err = -EBADF;
2532                 ret = 1;
2533                 goto out_unlock;
2534         }
2535
2536         /* finish pending truncate */
2537         while (ci->i_truncate_pending) {
2538                 spin_unlock(&ci->i_ceph_lock);
2539                 if (snap_rwsem_locked) {
2540                         up_read(&mdsc->snap_rwsem);
2541                         snap_rwsem_locked = false;
2542                 }
2543                 __ceph_do_pending_vmtruncate(inode);
2544                 spin_lock(&ci->i_ceph_lock);
2545         }
2546
2547         have = __ceph_caps_issued(ci, &implemented);
2548
2549         if (have & need & CEPH_CAP_FILE_WR) {
2550                 if (endoff >= 0 && endoff > (loff_t)ci->i_max_size) {
2551                         dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
2552                              inode, endoff, ci->i_max_size);
2553                         if (endoff > ci->i_requested_max_size) {
2554                                 *err = -EAGAIN;
2555                                 ret = 1;
2556                         }
2557                         goto out_unlock;
2558                 }
2559                 /*
2560                  * If a sync write is in progress, we must wait, so that we
2561                  * can get a final snapshot value for size+mtime.
2562                  */
2563                 if (__ceph_have_pending_cap_snap(ci)) {
2564                         dout("get_cap_refs %p cap_snap_pending\n", inode);
2565                         goto out_unlock;
2566                 }
2567         }
2568
2569         if ((have & need) == need) {
2570                 /*
2571                  * Look at (implemented & ~have & not) so that we keep waiting
2572                  * on transition from wanted -> needed caps.  This is needed
2573                  * for WRBUFFER|WR -> WR to avoid a new WR sync write from
2574                  * going before a prior buffered writeback happens.
2575                  */
2576                 int not = want & ~(have & need);
2577                 int revoking = implemented & ~have;
2578                 dout("get_cap_refs %p have %s but not %s (revoking %s)\n",
2579                      inode, ceph_cap_string(have), ceph_cap_string(not),
2580                      ceph_cap_string(revoking));
2581                 if ((revoking & not) == 0) {
2582                         if (!snap_rwsem_locked &&
2583                             !ci->i_head_snapc &&
2584                             (need & CEPH_CAP_FILE_WR)) {
2585                                 if (!down_read_trylock(&mdsc->snap_rwsem)) {
2586                                         /*
2587                                          * we can not call down_read() when
2588                                          * task isn't in TASK_RUNNING state
2589                                          */
2590                                         if (nonblock) {
2591                                                 *err = -EAGAIN;
2592                                                 ret = 1;
2593                                                 goto out_unlock;
2594                                         }
2595
2596                                         spin_unlock(&ci->i_ceph_lock);
2597                                         down_read(&mdsc->snap_rwsem);
2598                                         snap_rwsem_locked = true;
2599                                         goto again;
2600                                 }
2601                                 snap_rwsem_locked = true;
2602                         }
2603                         *got = need | (have & want);
2604                         if ((need & CEPH_CAP_FILE_RD) &&
2605                             !(*got & CEPH_CAP_FILE_CACHE))
2606                                 ceph_disable_fscache_readpage(ci);
2607                         __take_cap_refs(ci, *got, true);
2608                         ret = 1;
2609                 }
2610         } else {
2611                 int session_readonly = false;
2612                 if ((need & CEPH_CAP_FILE_WR) && ci->i_auth_cap) {
2613                         struct ceph_mds_session *s = ci->i_auth_cap->session;
2614                         spin_lock(&s->s_cap_lock);
2615                         session_readonly = s->s_readonly;
2616                         spin_unlock(&s->s_cap_lock);
2617                 }
2618                 if (session_readonly) {
2619                         dout("get_cap_refs %p needed %s but mds%d readonly\n",
2620                              inode, ceph_cap_string(need), ci->i_auth_cap->mds);
2621                         *err = -EROFS;
2622                         ret = 1;
2623                         goto out_unlock;
2624                 }
2625
2626                 if (ci->i_ceph_flags & CEPH_I_CAP_DROPPED) {
2627                         int mds_wanted;
2628                         if (READ_ONCE(mdsc->fsc->mount_state) ==
2629                             CEPH_MOUNT_SHUTDOWN) {
2630                                 dout("get_cap_refs %p forced umount\n", inode);
2631                                 *err = -EIO;
2632                                 ret = 1;
2633                                 goto out_unlock;
2634                         }
2635                         mds_wanted = __ceph_caps_mds_wanted(ci, false);
2636                         if (need & ~(mds_wanted & need)) {
2637                                 dout("get_cap_refs %p caps were dropped"
2638                                      " (session killed?)\n", inode);
2639                                 *err = -ESTALE;
2640                                 ret = 1;
2641                                 goto out_unlock;
2642                         }
2643                         if (!(file_wanted & ~mds_wanted))
2644                                 ci->i_ceph_flags &= ~CEPH_I_CAP_DROPPED;
2645                 }
2646
2647                 dout("get_cap_refs %p have %s needed %s\n", inode,
2648                      ceph_cap_string(have), ceph_cap_string(need));
2649         }
2650 out_unlock:
2651         spin_unlock(&ci->i_ceph_lock);
2652         if (snap_rwsem_locked)
2653                 up_read(&mdsc->snap_rwsem);
2654
2655         dout("get_cap_refs %p ret %d got %s\n", inode,
2656              ret, ceph_cap_string(*got));
2657         return ret;
2658 }
2659
2660 /*
2661  * Check the offset we are writing up to against our current
2662  * max_size.  If necessary, tell the MDS we want to write to
2663  * a larger offset.
2664  */
2665 static void check_max_size(struct inode *inode, loff_t endoff)
2666 {
2667         struct ceph_inode_info *ci = ceph_inode(inode);
2668         int check = 0;
2669
2670         /* do we need to explicitly request a larger max_size? */
2671         spin_lock(&ci->i_ceph_lock);
2672         if (endoff >= ci->i_max_size && endoff > ci->i_wanted_max_size) {
2673                 dout("write %p at large endoff %llu, req max_size\n",
2674                      inode, endoff);
2675                 ci->i_wanted_max_size = endoff;
2676         }
2677         /* duplicate ceph_check_caps()'s logic */
2678         if (ci->i_auth_cap &&
2679             (ci->i_auth_cap->issued & CEPH_CAP_FILE_WR) &&
2680             ci->i_wanted_max_size > ci->i_max_size &&
2681             ci->i_wanted_max_size > ci->i_requested_max_size)
2682                 check = 1;
2683         spin_unlock(&ci->i_ceph_lock);
2684         if (check)
2685                 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
2686 }
2687
2688 int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
2689 {
2690         int ret, err = 0;
2691
2692         BUG_ON(need & ~CEPH_CAP_FILE_RD);
2693         BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
2694         ret = ceph_pool_perm_check(ci, need);
2695         if (ret < 0)
2696                 return ret;
2697
2698         ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
2699         if (ret) {
2700                 if (err == -EAGAIN) {
2701                         ret = 0;
2702                 } else if (err < 0) {
2703                         ret = err;
2704                 }
2705         }
2706         return ret;
2707 }
2708
2709 /*
2710  * Wait for caps, and take cap references.  If we can't get a WR cap
2711  * due to a small max_size, make sure we check_max_size (and possibly
2712  * ask the mds) so we don't get hung up indefinitely.
2713  */
2714 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
2715                   loff_t endoff, int *got, struct page **pinned_page)
2716 {
2717         int _got, ret, err = 0;
2718
2719         ret = ceph_pool_perm_check(ci, need);
2720         if (ret < 0)
2721                 return ret;
2722
2723         while (true) {
2724                 if (endoff > 0)
2725                         check_max_size(&ci->vfs_inode, endoff);
2726
2727                 err = 0;
2728                 _got = 0;
2729                 ret = try_get_cap_refs(ci, need, want, endoff,
2730                                        false, &_got, &err);
2731                 if (ret) {
2732                         if (err == -EAGAIN)
2733                                 continue;
2734                         if (err < 0)
2735                                 ret = err;
2736                 } else {
2737                         DEFINE_WAIT_FUNC(wait, woken_wake_function);
2738                         add_wait_queue(&ci->i_cap_wq, &wait);
2739
2740                         while (!try_get_cap_refs(ci, need, want, endoff,
2741                                                  true, &_got, &err)) {
2742                                 if (signal_pending(current)) {
2743                                         ret = -ERESTARTSYS;
2744                                         break;
2745                                 }
2746                                 wait_woken(&wait, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
2747                         }
2748
2749                         remove_wait_queue(&ci->i_cap_wq, &wait);
2750
2751                         if (err == -EAGAIN)
2752                                 continue;
2753                         if (err < 0)
2754                                 ret = err;
2755                 }
2756                 if (ret < 0) {
2757                         if (err == -ESTALE) {
2758                                 /* session was killed, try renew caps */
2759                                 ret = ceph_renew_caps(&ci->vfs_inode);
2760                                 if (ret == 0)
2761                                         continue;
2762                         }
2763                         return ret;
2764                 }
2765
2766                 if (ci->i_inline_version != CEPH_INLINE_NONE &&
2767                     (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
2768                     i_size_read(&ci->vfs_inode) > 0) {
2769                         struct page *page =
2770                                 find_get_page(ci->vfs_inode.i_mapping, 0);
2771                         if (page) {
2772                                 if (PageUptodate(page)) {
2773                                         *pinned_page = page;
2774                                         break;
2775                                 }
2776                                 put_page(page);
2777                         }
2778                         /*
2779                          * drop cap refs first because getattr while
2780                          * holding * caps refs can cause deadlock.
2781                          */
2782                         ceph_put_cap_refs(ci, _got);
2783                         _got = 0;
2784
2785                         /*
2786                          * getattr request will bring inline data into
2787                          * page cache
2788                          */
2789                         ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
2790                                                 CEPH_STAT_CAP_INLINE_DATA,
2791                                                 true);
2792                         if (ret < 0)
2793                                 return ret;
2794                         continue;
2795                 }
2796                 break;
2797         }
2798
2799         if ((_got & CEPH_CAP_FILE_RD) && (_got & CEPH_CAP_FILE_CACHE))
2800                 ceph_fscache_revalidate_cookie(ci);
2801
2802         *got = _got;
2803         return 0;
2804 }
2805
2806 /*
2807  * Take cap refs.  Caller must already know we hold at least one ref
2808  * on the caps in question or we don't know this is safe.
2809  */
2810 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
2811 {
2812         spin_lock(&ci->i_ceph_lock);
2813         __take_cap_refs(ci, caps, false);
2814         spin_unlock(&ci->i_ceph_lock);
2815 }
2816
2817
2818 /*
2819  * drop cap_snap that is not associated with any snapshot.
2820  * we don't need to send FLUSHSNAP message for it.
2821  */
2822 static int ceph_try_drop_cap_snap(struct ceph_inode_info *ci,
2823                                   struct ceph_cap_snap *capsnap)
2824 {
2825         if (!capsnap->need_flush &&
2826             !capsnap->writing && !capsnap->dirty_pages) {
2827                 dout("dropping cap_snap %p follows %llu\n",
2828                      capsnap, capsnap->follows);
2829                 BUG_ON(capsnap->cap_flush.tid > 0);
2830                 ceph_put_snap_context(capsnap->context);
2831                 if (!list_is_last(&capsnap->ci_item, &ci->i_cap_snaps))
2832                         ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
2833
2834                 list_del(&capsnap->ci_item);
2835                 ceph_put_cap_snap(capsnap);
2836                 return 1;
2837         }
2838         return 0;
2839 }
2840
2841 /*
2842  * Release cap refs.
2843  *
2844  * If we released the last ref on any given cap, call ceph_check_caps
2845  * to release (or schedule a release).
2846  *
2847  * If we are releasing a WR cap (from a sync write), finalize any affected
2848  * cap_snap, and wake up any waiters.
2849  */
2850 void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
2851 {
2852         struct inode *inode = &ci->vfs_inode;
2853         int last = 0, put = 0, flushsnaps = 0, wake = 0;
2854
2855         spin_lock(&ci->i_ceph_lock);
2856         if (had & CEPH_CAP_PIN)
2857                 --ci->i_pin_ref;
2858         if (had & CEPH_CAP_FILE_RD)
2859                 if (--ci->i_rd_ref == 0)
2860                         last++;
2861         if (had & CEPH_CAP_FILE_CACHE)
2862                 if (--ci->i_rdcache_ref == 0)
2863                         last++;
2864         if (had & CEPH_CAP_FILE_BUFFER) {
2865                 if (--ci->i_wb_ref == 0) {
2866                         last++;
2867                         put++;
2868                 }
2869                 dout("put_cap_refs %p wb %d -> %d (?)\n",
2870                      inode, ci->i_wb_ref+1, ci->i_wb_ref);
2871         }
2872         if (had & CEPH_CAP_FILE_WR)
2873                 if (--ci->i_wr_ref == 0) {
2874                         last++;
2875                         if (__ceph_have_pending_cap_snap(ci)) {
2876                                 struct ceph_cap_snap *capsnap =
2877                                         list_last_entry(&ci->i_cap_snaps,
2878                                                         struct ceph_cap_snap,
2879                                                         ci_item);
2880                                 capsnap->writing = 0;
2881                                 if (ceph_try_drop_cap_snap(ci, capsnap))
2882                                         put++;
2883                                 else if (__ceph_finish_cap_snap(ci, capsnap))
2884                                         flushsnaps = 1;
2885                                 wake = 1;
2886                         }
2887                         if (ci->i_wrbuffer_ref_head == 0 &&
2888                             ci->i_dirty_caps == 0 &&
2889                             ci->i_flushing_caps == 0) {
2890                                 BUG_ON(!ci->i_head_snapc);
2891                                 ceph_put_snap_context(ci->i_head_snapc);
2892                                 ci->i_head_snapc = NULL;
2893                         }
2894                         /* see comment in __ceph_remove_cap() */
2895                         if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
2896                                 drop_inode_snap_realm(ci);
2897                 }
2898         spin_unlock(&ci->i_ceph_lock);
2899
2900         dout("put_cap_refs %p had %s%s%s\n", inode, ceph_cap_string(had),
2901              last ? " last" : "", put ? " put" : "");
2902
2903         if (last && !flushsnaps)
2904                 ceph_check_caps(ci, 0, NULL);
2905         else if (flushsnaps)
2906                 ceph_flush_snaps(ci, NULL);
2907         if (wake)
2908                 wake_up_all(&ci->i_cap_wq);
2909         while (put-- > 0)
2910                 iput(inode);
2911 }
2912
2913 /*
2914  * Release @nr WRBUFFER refs on dirty pages for the given @snapc snap
2915  * context.  Adjust per-snap dirty page accounting as appropriate.
2916  * Once all dirty data for a cap_snap is flushed, flush snapped file
2917  * metadata back to the MDS.  If we dropped the last ref, call
2918  * ceph_check_caps.
2919  */
2920 void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
2921                                 struct ceph_snap_context *snapc)
2922 {
2923         struct inode *inode = &ci->vfs_inode;
2924         struct ceph_cap_snap *capsnap = NULL;
2925         int put = 0;
2926         bool last = false;
2927         bool found = false;
2928         bool flush_snaps = false;
2929         bool complete_capsnap = false;
2930
2931         spin_lock(&ci->i_ceph_lock);
2932         ci->i_wrbuffer_ref -= nr;
2933         if (ci->i_wrbuffer_ref == 0) {
2934                 last = true;
2935                 put++;
2936         }
2937
2938         if (ci->i_head_snapc == snapc) {
2939                 ci->i_wrbuffer_ref_head -= nr;
2940                 if (ci->i_wrbuffer_ref_head == 0 &&
2941                     ci->i_wr_ref == 0 &&
2942                     ci->i_dirty_caps == 0 &&
2943                     ci->i_flushing_caps == 0) {
2944                         BUG_ON(!ci->i_head_snapc);
2945                         ceph_put_snap_context(ci->i_head_snapc);
2946                         ci->i_head_snapc = NULL;
2947                 }
2948                 dout("put_wrbuffer_cap_refs on %p head %d/%d -> %d/%d %s\n",
2949                      inode,
2950                      ci->i_wrbuffer_ref+nr, ci->i_wrbuffer_ref_head+nr,
2951                      ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
2952                      last ? " LAST" : "");
2953         } else {
2954                 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
2955                         if (capsnap->context == snapc) {
2956                                 found = true;
2957                                 break;
2958                         }
2959                 }
2960                 BUG_ON(!found);
2961                 capsnap->dirty_pages -= nr;
2962                 if (capsnap->dirty_pages == 0) {
2963                         complete_capsnap = true;
2964                         if (!capsnap->writing) {
2965                                 if (ceph_try_drop_cap_snap(ci, capsnap)) {
2966                                         put++;
2967                                 } else {
2968                                         ci->i_ceph_flags |= CEPH_I_FLUSH_SNAPS;
2969                                         flush_snaps = true;
2970                                 }
2971                         }
2972                 }
2973                 dout("put_wrbuffer_cap_refs on %p cap_snap %p "
2974                      " snap %lld %d/%d -> %d/%d %s%s\n",
2975                      inode, capsnap, capsnap->context->seq,
2976                      ci->i_wrbuffer_ref+nr, capsnap->dirty_pages + nr,
2977                      ci->i_wrbuffer_ref, capsnap->dirty_pages,
2978                      last ? " (wrbuffer last)" : "",
2979                      complete_capsnap ? " (complete capsnap)" : "");
2980         }
2981
2982         spin_unlock(&ci->i_ceph_lock);
2983
2984         if (last) {
2985                 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
2986         } else if (flush_snaps) {
2987                 ceph_flush_snaps(ci, NULL);
2988         }
2989         if (complete_capsnap)
2990                 wake_up_all(&ci->i_cap_wq);
2991         while (put-- > 0)
2992                 iput(inode);
2993 }
2994
2995 /*
2996  * Invalidate unlinked inode's aliases, so we can drop the inode ASAP.
2997  */
2998 static void invalidate_aliases(struct inode *inode)
2999 {
3000         struct dentry *dn, *prev = NULL;
3001
3002         dout("invalidate_aliases inode %p\n", inode);
3003         d_prune_aliases(inode);
3004         /*
3005          * For non-directory inode, d_find_alias() only returns
3006          * hashed dentry. After calling d_invalidate(), the
3007          * dentry becomes unhashed.
3008          *
3009          * For directory inode, d_find_alias() can return
3010          * unhashed dentry. But directory inode should have
3011          * one alias at most.
3012          */
3013         while ((dn = d_find_alias(inode))) {
3014                 if (dn == prev) {
3015                         dput(dn);
3016                         break;
3017                 }
3018                 d_invalidate(dn);
3019                 if (prev)
3020                         dput(prev);
3021                 prev = dn;
3022         }
3023         if (prev)
3024                 dput(prev);
3025 }
3026
3027 struct cap_extra_info {
3028         struct ceph_string *pool_ns;
3029         /* inline data */
3030         u64 inline_version;
3031         void *inline_data;
3032         u32 inline_len;
3033         /* dirstat */
3034         bool dirstat_valid;
3035         u64 nfiles;
3036         u64 nsubdirs;
3037         /* currently issued */
3038         int issued;
3039 };
3040
3041 /*
3042  * Handle a cap GRANT message from the MDS.  (Note that a GRANT may
3043  * actually be a revocation if it specifies a smaller cap set.)
3044  *
3045  * caller holds s_mutex and i_ceph_lock, we drop both.
3046  */
3047 static void handle_cap_grant(struct inode *inode,
3048                              struct ceph_mds_session *session,
3049                              struct ceph_cap *cap,
3050                              struct ceph_mds_caps *grant,
3051                              struct ceph_buffer *xattr_buf,
3052                              struct cap_extra_info *extra_info)
3053         __releases(ci->i_ceph_lock)
3054         __releases(session->s_mdsc->snap_rwsem)
3055 {
3056         struct ceph_inode_info *ci = ceph_inode(inode);
3057         int seq = le32_to_cpu(grant->seq);
3058         int newcaps = le32_to_cpu(grant->caps);
3059         int used, wanted, dirty;
3060         u64 size = le64_to_cpu(grant->size);
3061         u64 max_size = le64_to_cpu(grant->max_size);
3062         int check_caps = 0;
3063         bool wake = false;
3064         bool writeback = false;
3065         bool queue_trunc = false;
3066         bool queue_invalidate = false;
3067         bool deleted_inode = false;
3068         bool fill_inline = false;
3069
3070         dout("handle_cap_grant inode %p cap %p mds%d seq %d %s\n",
3071              inode, cap, session->s_mds, seq, ceph_cap_string(newcaps));
3072         dout(" size %llu max_size %llu, i_size %llu\n", size, max_size,
3073                 inode->i_size);
3074
3075
3076         /*
3077          * auth mds of the inode changed. we received the cap export message,
3078          * but still haven't received the cap import message. handle_cap_export
3079          * updated the new auth MDS' cap.
3080          *
3081          * "ceph_seq_cmp(seq, cap->seq) <= 0" means we are processing a message
3082          * that was sent before the cap import message. So don't remove caps.
3083          */
3084         if (ceph_seq_cmp(seq, cap->seq) <= 0) {
3085                 WARN_ON(cap != ci->i_auth_cap);
3086                 WARN_ON(cap->cap_id != le64_to_cpu(grant->cap_id));
3087                 seq = cap->seq;
3088                 newcaps |= cap->issued;
3089         }
3090
3091         /*
3092          * If CACHE is being revoked, and we have no dirty buffers,
3093          * try to invalidate (once).  (If there are dirty buffers, we
3094          * will invalidate _after_ writeback.)
3095          */
3096         if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
3097             ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
3098             (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
3099             !(ci->i_wrbuffer_ref || ci->i_wb_ref)) {
3100                 if (try_nonblocking_invalidate(inode)) {
3101                         /* there were locked pages.. invalidate later
3102                            in a separate thread. */
3103                         if (ci->i_rdcache_revoking != ci->i_rdcache_gen) {
3104                                 queue_invalidate = true;
3105                                 ci->i_rdcache_revoking = ci->i_rdcache_gen;
3106                         }
3107                 }
3108         }
3109
3110         /* side effects now are allowed */
3111         cap->cap_gen = session->s_cap_gen;
3112         cap->seq = seq;
3113
3114         __check_cap_issue(ci, cap, newcaps);
3115
3116         if ((newcaps & CEPH_CAP_AUTH_SHARED) &&
3117             (extra_info->issued & CEPH_CAP_AUTH_EXCL) == 0) {
3118                 inode->i_mode = le32_to_cpu(grant->mode);
3119                 inode->i_uid = make_kuid(&init_user_ns, le32_to_cpu(grant->uid));
3120                 inode->i_gid = make_kgid(&init_user_ns, le32_to_cpu(grant->gid));
3121                 dout("%p mode 0%o uid.gid %d.%d\n", inode, inode->i_mode,
3122                      from_kuid(&init_user_ns, inode->i_uid),
3123                      from_kgid(&init_user_ns, inode->i_gid));
3124         }
3125
3126         if ((newcaps & CEPH_CAP_LINK_SHARED) &&
3127             (extra_info->issued & CEPH_CAP_LINK_EXCL) == 0) {
3128                 set_nlink(inode, le32_to_cpu(grant->nlink));
3129                 if (inode->i_nlink == 0 &&
3130                     (newcaps & (CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL)))
3131                         deleted_inode = true;
3132         }
3133
3134         if ((extra_info->issued & CEPH_CAP_XATTR_EXCL) == 0 &&
3135             grant->xattr_len) {
3136                 int len = le32_to_cpu(grant->xattr_len);
3137                 u64 version = le64_to_cpu(grant->xattr_version);
3138
3139                 if (version > ci->i_xattrs.version) {
3140                         dout(" got new xattrs v%llu on %p len %d\n",
3141                              version, inode, len);
3142                         if (ci->i_xattrs.blob)
3143                                 ceph_buffer_put(ci->i_xattrs.blob);
3144                         ci->i_xattrs.blob = ceph_buffer_get(xattr_buf);
3145                         ci->i_xattrs.version = version;
3146                         ceph_forget_all_cached_acls(inode);
3147                 }
3148         }
3149
3150         if (newcaps & CEPH_CAP_ANY_RD) {
3151                 struct timespec mtime, atime, ctime;
3152                 /* ctime/mtime/atime? */
3153                 ceph_decode_timespec(&mtime, &grant->mtime);
3154                 ceph_decode_timespec(&atime, &grant->atime);
3155                 ceph_decode_timespec(&ctime, &grant->ctime);
3156                 ceph_fill_file_time(inode, extra_info->issued,
3157                                     le32_to_cpu(grant->time_warp_seq),
3158                                     &ctime, &mtime, &atime);
3159         }
3160
3161         if ((newcaps & CEPH_CAP_FILE_SHARED) && extra_info->dirstat_valid) {
3162                 ci->i_files = extra_info->nfiles;
3163                 ci->i_subdirs = extra_info->nsubdirs;
3164         }
3165
3166         if (newcaps & (CEPH_CAP_ANY_FILE_RD | CEPH_CAP_ANY_FILE_WR)) {
3167                 /* file layout may have changed */
3168                 s64 old_pool = ci->i_layout.pool_id;
3169                 struct ceph_string *old_ns;
3170
3171                 ceph_file_layout_from_legacy(&ci->i_layout, &grant->layout);
3172                 old_ns = rcu_dereference_protected(ci->i_layout.pool_ns,
3173                                         lockdep_is_held(&ci->i_ceph_lock));
3174                 rcu_assign_pointer(ci->i_layout.pool_ns, extra_info->pool_ns);
3175
3176                 if (ci->i_layout.pool_id != old_pool ||
3177                     extra_info->pool_ns != old_ns)
3178                         ci->i_ceph_flags &= ~CEPH_I_POOL_PERM;
3179
3180                 extra_info->pool_ns = old_ns;
3181
3182                 /* size/truncate_seq? */
3183                 queue_trunc = ceph_fill_file_size(inode, extra_info->issued,
3184                                         le32_to_cpu(grant->truncate_seq),
3185                                         le64_to_cpu(grant->truncate_size),
3186                                         size);
3187         }
3188
3189         if (ci->i_auth_cap == cap && (newcaps & CEPH_CAP_ANY_FILE_WR)) {
3190                 if (max_size != ci->i_max_size) {
3191                         dout("max_size %lld -> %llu\n",
3192                              ci->i_max_size, max_size);
3193                         ci->i_max_size = max_size;
3194                         if (max_size >= ci->i_wanted_max_size) {
3195                                 ci->i_wanted_max_size = 0;  /* reset */
3196                                 ci->i_requested_max_size = 0;
3197                         }
3198                         wake = true;
3199                 } else if (ci->i_wanted_max_size > ci->i_max_size &&
3200                            ci->i_wanted_max_size > ci->i_requested_max_size) {
3201                         /* CEPH_CAP_OP_IMPORT */
3202                         wake = true;
3203                 }
3204         }
3205
3206         /* check cap bits */
3207         wanted = __ceph_caps_wanted(ci);
3208         used = __ceph_caps_used(ci);
3209         dirty = __ceph_caps_dirty(ci);
3210         dout(" my wanted = %s, used = %s, dirty %s\n",
3211              ceph_cap_string(wanted),
3212              ceph_cap_string(used),
3213              ceph_cap_string(dirty));
3214         if (wanted != le32_to_cpu(grant->wanted)) {
3215                 dout("mds wanted %s -> %s\n",
3216                      ceph_cap_string(le32_to_cpu(grant->wanted)),
3217                      ceph_cap_string(wanted));
3218                 /* imported cap may not have correct mds_wanted */
3219                 if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT)
3220                         check_caps = 1;
3221         }
3222
3223         /* revocation, grant, or no-op? */
3224         if (cap->issued & ~newcaps) {
3225                 int revoking = cap->issued & ~newcaps;
3226
3227                 dout("revocation: %s -> %s (revoking %s)\n",
3228                      ceph_cap_string(cap->issued),
3229                      ceph_cap_string(newcaps),
3230                      ceph_cap_string(revoking));
3231                 if (revoking & used & CEPH_CAP_FILE_BUFFER)
3232                         writeback = true;  /* initiate writeback; will delay ack */
3233                 else if (revoking == CEPH_CAP_FILE_CACHE &&
3234                          (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
3235                          queue_invalidate)
3236                         ; /* do nothing yet, invalidation will be queued */
3237                 else if (cap == ci->i_auth_cap)
3238                         check_caps = 1; /* check auth cap only */
3239                 else
3240                         check_caps = 2; /* check all caps */
3241                 cap->issued = newcaps;
3242                 cap->implemented |= newcaps;
3243         } else if (cap->issued == newcaps) {
3244                 dout("caps unchanged: %s -> %s\n",
3245                      ceph_cap_string(cap->issued), ceph_cap_string(newcaps));
3246         } else {
3247                 dout("grant: %s -> %s\n", ceph_cap_string(cap->issued),
3248                      ceph_cap_string(newcaps));
3249                 /* non-auth MDS is revoking the newly grant caps ? */
3250                 if (cap == ci->i_auth_cap &&
3251                     __ceph_caps_revoking_other(ci, cap, newcaps))
3252                     check_caps = 2;
3253
3254                 cap->issued = newcaps;
3255                 cap->implemented |= newcaps; /* add bits only, to
3256                                               * avoid stepping on a
3257                                               * pending revocation */
3258                 wake = true;
3259         }
3260         BUG_ON(cap->issued & ~cap->implemented);
3261
3262         if (extra_info->inline_version > 0 &&
3263             extra_info->inline_version >= ci->i_inline_version) {
3264                 ci->i_inline_version = extra_info->inline_version;
3265                 if (ci->i_inline_version != CEPH_INLINE_NONE &&
3266                     (newcaps & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)))
3267                         fill_inline = true;
3268         }
3269
3270         if (le32_to_cpu(grant->op) == CEPH_CAP_OP_IMPORT) {
3271                 if (newcaps & ~extra_info->issued)
3272                         wake = true;
3273                 kick_flushing_inode_caps(session->s_mdsc, session, inode);
3274                 up_read(&session->s_mdsc->snap_rwsem);
3275         } else {
3276                 spin_unlock(&ci->i_ceph_lock);
3277         }
3278
3279         if (fill_inline)
3280                 ceph_fill_inline_data(inode, NULL, extra_info->inline_data,
3281                                       extra_info->inline_len);
3282
3283         if (queue_trunc)
3284                 ceph_queue_vmtruncate(inode);
3285
3286         if (writeback)
3287                 /*
3288                  * queue inode for writeback: we can't actually call
3289                  * filemap_write_and_wait, etc. from message handler
3290                  * context.
3291                  */
3292                 ceph_queue_writeback(inode);
3293         if (queue_invalidate)
3294                 ceph_queue_invalidate(inode);
3295         if (deleted_inode)
3296                 invalidate_aliases(inode);
3297         if (wake)
3298                 wake_up_all(&ci->i_cap_wq);
3299
3300         if (check_caps == 1)
3301                 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_AUTHONLY,
3302                                 session);
3303         else if (check_caps == 2)
3304                 ceph_check_caps(ci, CHECK_CAPS_NODELAY, session);
3305         else
3306                 mutex_unlock(&session->s_mutex);
3307 }
3308
3309 /*
3310  * Handle FLUSH_ACK from MDS, indicating that metadata we sent to the
3311  * MDS has been safely committed.
3312  */
3313 static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
3314                                  struct ceph_mds_caps *m,
3315                                  struct ceph_mds_session *session,
3316                                  struct ceph_cap *cap)
3317         __releases(ci->i_ceph_lock)
3318 {
3319         struct ceph_inode_info *ci = ceph_inode(inode);
3320         struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
3321         struct ceph_cap_flush *cf, *tmp_cf;
3322         LIST_HEAD(to_remove);
3323         unsigned seq = le32_to_cpu(m->seq);
3324         int dirty = le32_to_cpu(m->dirty);
3325         int cleaned = 0;
3326         bool drop = false;
3327         bool wake_ci = false;
3328         bool wake_mdsc = false;
3329
3330         list_for_each_entry_safe(cf, tmp_cf, &ci->i_cap_flush_list, i_list) {
3331                 if (cf->tid == flush_tid)
3332                         cleaned = cf->caps;
3333                 if (cf->caps == 0) /* capsnap */
3334                         continue;
3335                 if (cf->tid <= flush_tid) {
3336                         if (__finish_cap_flush(NULL, ci, cf))
3337                                 wake_ci = true;
3338                         list_add_tail(&cf->i_list, &to_remove);
3339                 } else {
3340                         cleaned &= ~cf->caps;
3341                         if (!cleaned)
3342                                 break;
3343                 }
3344         }
3345
3346         dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
3347              " flushing %s -> %s\n",
3348              inode, session->s_mds, seq, ceph_cap_string(dirty),
3349              ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps),
3350              ceph_cap_string(ci->i_flushing_caps & ~cleaned));
3351
3352         if (list_empty(&to_remove) && !cleaned)
3353                 goto out;
3354
3355         ci->i_flushing_caps &= ~cleaned;
3356
3357         spin_lock(&mdsc->cap_dirty_lock);
3358
3359         list_for_each_entry(cf, &to_remove, i_list) {
3360                 if (__finish_cap_flush(mdsc, NULL, cf))
3361                         wake_mdsc = true;
3362         }
3363
3364         if (ci->i_flushing_caps == 0) {
3365                 if (list_empty(&ci->i_cap_flush_list)) {
3366                         list_del_init(&ci->i_flushing_item);
3367                         if (!list_empty(&session->s_cap_flushing)) {
3368                                 dout(" mds%d still flushing cap on %p\n",
3369                                      session->s_mds,
3370                                      &list_first_entry(&session->s_cap_flushing,
3371                                                 struct ceph_inode_info,
3372                                                 i_flushing_item)->vfs_inode);
3373                         }
3374                 }
3375                 mdsc->num_cap_flushing--;
3376                 dout(" inode %p now !flushing\n", inode);
3377
3378                 if (ci->i_dirty_caps == 0) {
3379                         dout(" inode %p now clean\n", inode);
3380                         BUG_ON(!list_empty(&ci->i_dirty_item));
3381                         drop = true;
3382                         if (ci->i_wr_ref == 0 &&
3383                             ci->i_wrbuffer_ref_head == 0) {
3384                                 BUG_ON(!ci->i_head_snapc);
3385                                 ceph_put_snap_context(ci->i_head_snapc);
3386                                 ci->i_head_snapc = NULL;
3387                         }
3388                 } else {
3389                         BUG_ON(list_empty(&ci->i_dirty_item));
3390                 }
3391         }
3392         spin_unlock(&mdsc->cap_dirty_lock);
3393
3394 out:
3395         spin_unlock(&ci->i_ceph_lock);
3396
3397         while (!list_empty(&to_remove)) {
3398                 cf = list_first_entry(&to_remove,
3399                                       struct ceph_cap_flush, i_list);
3400                 list_del(&cf->i_list);
3401                 ceph_free_cap_flush(cf);
3402         }
3403
3404         if (wake_ci)
3405                 wake_up_all(&ci->i_cap_wq);
3406         if (wake_mdsc)
3407                 wake_up_all(&mdsc->cap_flushing_wq);
3408         if (drop)
3409                 iput(inode);
3410 }
3411
3412 /*
3413  * Handle FLUSHSNAP_ACK.  MDS has flushed snap data to disk and we can
3414  * throw away our cap_snap.
3415  *
3416  * Caller hold s_mutex.
3417  */
3418 static void handle_cap_flushsnap_ack(struct inode *inode, u64 flush_tid,
3419                                      struct ceph_mds_caps *m,
3420                                      struct ceph_mds_session *session)
3421 {
3422         struct ceph_inode_info *ci = ceph_inode(inode);
3423         struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
3424         u64 follows = le64_to_cpu(m->snap_follows);
3425         struct ceph_cap_snap *capsnap;
3426         bool flushed = false;
3427         bool wake_ci = false;
3428         bool wake_mdsc = false;
3429
3430         dout("handle_cap_flushsnap_ack inode %p ci %p mds%d follows %lld\n",
3431              inode, ci, session->s_mds, follows);
3432
3433         spin_lock(&ci->i_ceph_lock);
3434         list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
3435                 if (capsnap->follows == follows) {
3436                         if (capsnap->cap_flush.tid != flush_tid) {
3437                                 dout(" cap_snap %p follows %lld tid %lld !="
3438                                      " %lld\n", capsnap, follows,
3439                                      flush_tid, capsnap->cap_flush.tid);
3440                                 break;
3441                         }
3442                         flushed = true;
3443                         break;
3444                 } else {
3445                         dout(" skipping cap_snap %p follows %lld\n",
3446                              capsnap, capsnap->follows);
3447                 }
3448         }
3449         if (flushed) {
3450                 WARN_ON(capsnap->dirty_pages || capsnap->writing);
3451                 dout(" removing %p cap_snap %p follows %lld\n",
3452                      inode, capsnap, follows);
3453                 list_del(&capsnap->ci_item);
3454                 if (__finish_cap_flush(NULL, ci, &capsnap->cap_flush))
3455                         wake_ci = true;
3456
3457                 spin_lock(&mdsc->cap_dirty_lock);
3458
3459                 if (list_empty(&ci->i_cap_flush_list))
3460                         list_del_init(&ci->i_flushing_item);
3461
3462                 if (__finish_cap_flush(mdsc, NULL, &capsnap->cap_flush))
3463                         wake_mdsc = true;
3464
3465                 spin_unlock(&mdsc->cap_dirty_lock);
3466         }
3467         spin_unlock(&ci->i_ceph_lock);
3468         if (flushed) {
3469                 ceph_put_snap_context(capsnap->context);
3470                 ceph_put_cap_snap(capsnap);
3471                 if (wake_ci)
3472                         wake_up_all(&ci->i_cap_wq);
3473                 if (wake_mdsc)
3474                         wake_up_all(&mdsc->cap_flushing_wq);
3475                 iput(inode);
3476         }
3477 }
3478
3479 /*
3480  * Handle TRUNC from MDS, indicating file truncation.
3481  *
3482  * caller hold s_mutex.
3483  */
3484 static void handle_cap_trunc(struct inode *inode,
3485                              struct ceph_mds_caps *trunc,
3486                              struct ceph_mds_session *session)
3487         __releases(ci->i_ceph_lock)
3488 {
3489         struct ceph_inode_info *ci = ceph_inode(inode);
3490         int mds = session->s_mds;
3491         int seq = le32_to_cpu(trunc->seq);
3492         u32 truncate_seq = le32_to_cpu(trunc->truncate_seq);
3493         u64 truncate_size = le64_to_cpu(trunc->truncate_size);
3494         u64 size = le64_to_cpu(trunc->size);
3495         int implemented = 0;
3496         int dirty = __ceph_caps_dirty(ci);
3497         int issued = __ceph_caps_issued(ceph_inode(inode), &implemented);
3498         int queue_trunc = 0;
3499
3500         issued |= implemented | dirty;
3501
3502         dout("handle_cap_trunc inode %p mds%d seq %d to %lld seq %d\n",
3503              inode, mds, seq, truncate_size, truncate_seq);
3504         queue_trunc = ceph_fill_file_size(inode, issued,
3505                                           truncate_seq, truncate_size, size);
3506         spin_unlock(&ci->i_ceph_lock);
3507
3508         if (queue_trunc)
3509                 ceph_queue_vmtruncate(inode);
3510 }
3511
3512 /*
3513  * Handle EXPORT from MDS.  Cap is being migrated _from_ this mds to a
3514  * different one.  If we are the most recent migration we've seen (as
3515  * indicated by mseq), make note of the migrating cap bits for the
3516  * duration (until we see the corresponding IMPORT).
3517  *
3518  * caller holds s_mutex
3519  */
3520 static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
3521                               struct ceph_mds_cap_peer *ph,
3522                               struct ceph_mds_session *session)
3523 {
3524         struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
3525         struct ceph_mds_session *tsession = NULL;
3526         struct ceph_cap *cap, *tcap, *new_cap = NULL;
3527         struct ceph_inode_info *ci = ceph_inode(inode);
3528         u64 t_cap_id;
3529         unsigned mseq = le32_to_cpu(ex->migrate_seq);
3530         unsigned t_seq, t_mseq;
3531         int target, issued;
3532         int mds = session->s_mds;
3533
3534         if (ph) {
3535                 t_cap_id = le64_to_cpu(ph->cap_id);
3536                 t_seq = le32_to_cpu(ph->seq);
3537                 t_mseq = le32_to_cpu(ph->mseq);
3538                 target = le32_to_cpu(ph->mds);
3539         } else {
3540                 t_cap_id = t_seq = t_mseq = 0;
3541                 target = -1;
3542         }
3543
3544         dout("handle_cap_export inode %p ci %p mds%d mseq %d target %d\n",
3545              inode, ci, mds, mseq, target);
3546 retry:
3547         spin_lock(&ci->i_ceph_lock);
3548         cap = __get_cap_for_mds(ci, mds);
3549         if (!cap || cap->cap_id != le64_to_cpu(ex->cap_id))
3550                 goto out_unlock;
3551
3552         if (target < 0) {
3553                 __ceph_remove_cap(cap, false);
3554                 if (!ci->i_auth_cap)
3555                         ci->i_ceph_flags |= CEPH_I_CAP_DROPPED;
3556                 goto out_unlock;
3557         }
3558
3559         /*
3560          * now we know we haven't received the cap import message yet
3561          * because the exported cap still exist.
3562          */
3563
3564         issued = cap->issued;
3565         if (issued != cap->implemented)
3566                 pr_err_ratelimited("handle_cap_export: issued != implemented: "
3567                                 "ino (%llx.%llx) mds%d seq %d mseq %d "
3568                                 "issued %s implemented %s\n",
3569                                 ceph_vinop(inode), mds, cap->seq, cap->mseq,
3570                                 ceph_cap_string(issued),
3571                                 ceph_cap_string(cap->implemented));
3572
3573
3574         tcap = __get_cap_for_mds(ci, target);
3575         if (tcap) {
3576                 /* already have caps from the target */
3577                 if (tcap->cap_id == t_cap_id &&
3578                     ceph_seq_cmp(tcap->seq, t_seq) < 0) {
3579                         dout(" updating import cap %p mds%d\n", tcap, target);
3580                         tcap->cap_id = t_cap_id;
3581                         tcap->seq = t_seq - 1;
3582                         tcap->issue_seq = t_seq - 1;
3583                         tcap->mseq = t_mseq;
3584                         tcap->issued |= issued;
3585                         tcap->implemented |= issued;
3586                         if (cap == ci->i_auth_cap)
3587                                 ci->i_auth_cap = tcap;
3588
3589                         if (!list_empty(&ci->i_cap_flush_list) &&
3590                             ci->i_auth_cap == tcap) {
3591                                 spin_lock(&mdsc->cap_dirty_lock);
3592                                 list_move_tail(&ci->i_flushing_item,
3593                                                &tcap->session->s_cap_flushing);
3594                                 spin_unlock(&mdsc->cap_dirty_lock);
3595                         }
3596                 }
3597                 __ceph_remove_cap(cap, false);
3598                 goto out_unlock;
3599         } else if (tsession) {
3600                 /* add placeholder for the export tagert */
3601                 int flag = (cap == ci->i_auth_cap) ? CEPH_CAP_FLAG_AUTH : 0;
3602                 tcap = new_cap;
3603                 ceph_add_cap(inode, tsession, t_cap_id, -1, issued, 0,
3604                              t_seq - 1, t_mseq, (u64)-1, flag, &new_cap);
3605
3606                 if (!list_empty(&ci->i_cap_flush_list) &&
3607                     ci->i_auth_cap == tcap) {
3608                         spin_lock(&mdsc->cap_dirty_lock);
3609                         list_move_tail(&ci->i_flushing_item,
3610                                        &tcap->session->s_cap_flushing);
3611                         spin_unlock(&mdsc->cap_dirty_lock);
3612                 }
3613
3614                 __ceph_remove_cap(cap, false);
3615                 goto out_unlock;
3616         }
3617
3618         spin_unlock(&ci->i_ceph_lock);
3619         mutex_unlock(&session->s_mutex);
3620
3621         /* open target session */
3622         tsession = ceph_mdsc_open_export_target_session(mdsc, target);
3623         if (!IS_ERR(tsession)) {
3624                 if (mds > target) {
3625                         mutex_lock(&session->s_mutex);
3626                         mutex_lock_nested(&tsession->s_mutex,
3627                                           SINGLE_DEPTH_NESTING);
3628                 } else {
3629                         mutex_lock(&tsession->s_mutex);
3630                         mutex_lock_nested(&session->s_mutex,
3631                                           SINGLE_DEPTH_NESTING);
3632                 }
3633                 new_cap = ceph_get_cap(mdsc, NULL);
3634         } else {
3635                 WARN_ON(1);
3636                 tsession = NULL;
3637                 target = -1;
3638         }
3639         goto retry;
3640
3641 out_unlock:
3642         spin_unlock(&ci->i_ceph_lock);
3643         mutex_unlock(&session->s_mutex);
3644         if (tsession) {
3645                 mutex_unlock(&tsession->s_mutex);
3646                 ceph_put_mds_session(tsession);
3647         }
3648         if (new_cap)
3649                 ceph_put_cap(mdsc, new_cap);
3650 }
3651
3652 /*
3653  * Handle cap IMPORT.
3654  *
3655  * caller holds s_mutex. acquires i_ceph_lock
3656  */
3657 static void handle_cap_import(struct ceph_mds_client *mdsc,
3658                               struct inode *inode, struct ceph_mds_caps *im,
3659                               struct ceph_mds_cap_peer *ph,
3660                               struct ceph_mds_session *session,
3661                               struct ceph_cap **target_cap, int *old_issued)
3662         __acquires(ci->i_ceph_lock)
3663 {
3664         struct ceph_inode_info *ci = ceph_inode(inode);
3665         struct ceph_cap *cap, *ocap, *new_cap = NULL;
3666         int mds = session->s_mds;
3667         int issued;
3668         unsigned caps = le32_to_cpu(im->caps);
3669         unsigned wanted = le32_to_cpu(im->wanted);
3670         unsigned seq = le32_to_cpu(im->seq);
3671         unsigned mseq = le32_to_cpu(im->migrate_seq);
3672         u64 realmino = le64_to_cpu(im->realm);
3673         u64 cap_id = le64_to_cpu(im->cap_id);
3674         u64 p_cap_id;
3675         int peer;
3676
3677         if (ph) {
3678                 p_cap_id = le64_to_cpu(ph->cap_id);
3679                 peer = le32_to_cpu(ph->mds);
3680         } else {
3681                 p_cap_id = 0;
3682                 peer = -1;
3683         }
3684
3685         dout("handle_cap_import inode %p ci %p mds%d mseq %d peer %d\n",
3686              inode, ci, mds, mseq, peer);
3687
3688 retry:
3689         spin_lock(&ci->i_ceph_lock);
3690         cap = __get_cap_for_mds(ci, mds);
3691         if (!cap) {
3692                 if (!new_cap) {
3693                         spin_unlock(&ci->i_ceph_lock);
3694                         new_cap = ceph_get_cap(mdsc, NULL);
3695                         goto retry;
3696                 }
3697                 cap = new_cap;
3698         } else {
3699                 if (new_cap) {
3700                         ceph_put_cap(mdsc, new_cap);
3701                         new_cap = NULL;
3702                 }
3703         }
3704
3705         __ceph_caps_issued(ci, &issued);
3706         issued |= __ceph_caps_dirty(ci);
3707
3708         ceph_add_cap(inode, session, cap_id, -1, caps, wanted, seq, mseq,
3709                      realmino, CEPH_CAP_FLAG_AUTH, &new_cap);
3710
3711         ocap = peer >= 0 ? __get_cap_for_mds(ci, peer) : NULL;
3712         if (ocap && ocap->cap_id == p_cap_id) {
3713                 dout(" remove export cap %p mds%d flags %d\n",
3714                      ocap, peer, ph->flags);
3715                 if ((ph->flags & CEPH_CAP_FLAG_AUTH) &&
3716                     (ocap->seq != le32_to_cpu(ph->seq) ||
3717                      ocap->mseq != le32_to_cpu(ph->mseq))) {
3718                         pr_err_ratelimited("handle_cap_import: "
3719                                         "mismatched seq/mseq: ino (%llx.%llx) "
3720                                         "mds%d seq %d mseq %d importer mds%d "
3721                                         "has peer seq %d mseq %d\n",
3722                                         ceph_vinop(inode), peer, ocap->seq,
3723                                         ocap->mseq, mds, le32_to_cpu(ph->seq),
3724                                         le32_to_cpu(ph->mseq));
3725                 }
3726                 __ceph_remove_cap(ocap, (ph->flags & CEPH_CAP_FLAG_RELEASE));
3727         }
3728
3729         /* make sure we re-request max_size, if necessary */
3730         ci->i_requested_max_size = 0;
3731
3732         *old_issued = issued;
3733         *target_cap = cap;
3734 }
3735
3736 /*
3737  * Handle a caps message from the MDS.
3738  *
3739  * Identify the appropriate session, inode, and call the right handler
3740  * based on the cap op.
3741  */
3742 void ceph_handle_caps(struct ceph_mds_session *session,
3743                       struct ceph_msg *msg)
3744 {
3745         struct ceph_mds_client *mdsc = session->s_mdsc;
3746         struct inode *inode;
3747         struct ceph_inode_info *ci;
3748         struct ceph_cap *cap;
3749         struct ceph_mds_caps *h;
3750         struct ceph_mds_cap_peer *peer = NULL;
3751         struct ceph_snap_realm *realm = NULL;
3752         int op;
3753         int msg_version = le16_to_cpu(msg->hdr.version);
3754         u32 seq, mseq;
3755         struct ceph_vino vino;
3756         void *snaptrace;
3757         size_t snaptrace_len;
3758         void *p, *end;
3759         struct cap_extra_info extra_info = {};
3760
3761         dout("handle_caps from mds%d\n", session->s_mds);
3762
3763         /* decode */
3764         end = msg->front.iov_base + msg->front.iov_len;
3765         if (msg->front.iov_len < sizeof(*h))
3766                 goto bad;
3767         h = msg->front.iov_base;
3768         op = le32_to_cpu(h->op);
3769         vino.ino = le64_to_cpu(h->ino);
3770         vino.snap = CEPH_NOSNAP;
3771         seq = le32_to_cpu(h->seq);
3772         mseq = le32_to_cpu(h->migrate_seq);
3773
3774         snaptrace = h + 1;
3775         snaptrace_len = le32_to_cpu(h->snap_trace_len);
3776         p = snaptrace + snaptrace_len;
3777
3778         if (msg_version >= 2) {
3779                 u32 flock_len;
3780                 ceph_decode_32_safe(&p, end, flock_len, bad);
3781                 if (p + flock_len > end)
3782                         goto bad;
3783                 p += flock_len;
3784         }
3785
3786         if (msg_version >= 3) {
3787                 if (op == CEPH_CAP_OP_IMPORT) {
3788                         if (p + sizeof(*peer) > end)
3789                                 goto bad;
3790                         peer = p;
3791                         p += sizeof(*peer);
3792                 } else if (op == CEPH_CAP_OP_EXPORT) {
3793                         /* recorded in unused fields */
3794                         peer = (void *)&h->size;
3795                 }
3796         }
3797
3798         if (msg_version >= 4) {
3799                 ceph_decode_64_safe(&p, end, extra_info.inline_version, bad);
3800                 ceph_decode_32_safe(&p, end, extra_info.inline_len, bad);
3801                 if (p + extra_info.inline_len > end)
3802                         goto bad;
3803                 extra_info.inline_data = p;
3804                 p += extra_info.inline_len;
3805         }
3806
3807         if (msg_version >= 5) {
3808                 struct ceph_osd_client  *osdc = &mdsc->fsc->client->osdc;
3809                 u32                     epoch_barrier;
3810
3811                 ceph_decode_32_safe(&p, end, epoch_barrier, bad);
3812                 ceph_osdc_update_epoch_barrier(osdc, epoch_barrier);
3813         }
3814
3815         if (msg_version >= 8) {
3816                 u64 flush_tid;
3817                 u32 caller_uid, caller_gid;
3818                 u32 pool_ns_len;
3819
3820                 /* version >= 6 */
3821                 ceph_decode_64_safe(&p, end, flush_tid, bad);
3822                 /* version >= 7 */
3823                 ceph_decode_32_safe(&p, end, caller_uid, bad);
3824                 ceph_decode_32_safe(&p, end, caller_gid, bad);
3825                 /* version >= 8 */
3826                 ceph_decode_32_safe(&p, end, pool_ns_len, bad);
3827                 if (pool_ns_len > 0) {
3828                         ceph_decode_need(&p, end, pool_ns_len, bad);
3829                         extra_info.pool_ns =
3830                                 ceph_find_or_create_string(p, pool_ns_len);
3831                         p += pool_ns_len;
3832                 }
3833         }
3834
3835         if (msg_version >= 11) {
3836                 struct ceph_timespec *btime;
3837                 u64 change_attr;
3838                 u32 flags;
3839
3840                 /* version >= 9 */
3841                 if (p + sizeof(*btime) > end)
3842                         goto bad;
3843                 btime = p;
3844                 p += sizeof(*btime);
3845                 ceph_decode_64_safe(&p, end, change_attr, bad);
3846                 /* version >= 10 */
3847                 ceph_decode_32_safe(&p, end, flags, bad);
3848                 /* version >= 11 */
3849                 extra_info.dirstat_valid = true;
3850                 ceph_decode_64_safe(&p, end, extra_info.nfiles, bad);
3851                 ceph_decode_64_safe(&p, end, extra_info.nsubdirs, bad);
3852         }
3853
3854         /* lookup ino */
3855         inode = ceph_find_inode(mdsc->fsc->sb, vino);
3856         ci = ceph_inode(inode);
3857         dout(" op %s ino %llx.%llx inode %p\n", ceph_cap_op_name(op), vino.ino,
3858              vino.snap, inode);
3859
3860         mutex_lock(&session->s_mutex);
3861         session->s_seq++;
3862         dout(" mds%d seq %lld cap seq %u\n", session->s_mds, session->s_seq,
3863              (unsigned)seq);
3864
3865         if (!inode) {
3866                 dout(" i don't have ino %llx\n", vino.ino);
3867
3868                 if (op == CEPH_CAP_OP_IMPORT) {
3869                         cap = ceph_get_cap(mdsc, NULL);
3870                         cap->cap_ino = vino.ino;
3871                         cap->queue_release = 1;
3872                         cap->cap_id = le64_to_cpu(h->cap_id);
3873                         cap->mseq = mseq;
3874                         cap->seq = seq;
3875                         cap->issue_seq = seq;
3876                         spin_lock(&session->s_cap_lock);
3877                         list_add_tail(&cap->session_caps,
3878                                         &session->s_cap_releases);
3879                         session->s_num_cap_releases++;
3880                         spin_unlock(&session->s_cap_lock);
3881                 }
3882                 goto flush_cap_releases;
3883         }
3884
3885         /* these will work even if we don't have a cap yet */
3886         switch (op) {
3887         case CEPH_CAP_OP_FLUSHSNAP_ACK:
3888                 handle_cap_flushsnap_ack(inode, le64_to_cpu(msg->hdr.tid),
3889                                          h, session);
3890                 goto done;
3891
3892         case CEPH_CAP_OP_EXPORT:
3893                 handle_cap_export(inode, h, peer, session);
3894                 goto done_unlocked;
3895
3896         case CEPH_CAP_OP_IMPORT:
3897                 realm = NULL;
3898                 if (snaptrace_len) {
3899                         down_write(&mdsc->snap_rwsem);
3900                         ceph_update_snap_trace(mdsc, snaptrace,
3901                                                snaptrace + snaptrace_len,
3902                                                false, &realm);
3903                         downgrade_write(&mdsc->snap_rwsem);
3904                 } else {
3905                         down_read(&mdsc->snap_rwsem);
3906                 }
3907                 handle_cap_import(mdsc, inode, h, peer, session,
3908                                   &cap, &extra_info.issued);
3909                 handle_cap_grant(inode, session, cap,
3910                                  h, msg->middle, &extra_info);
3911                 if (realm)
3912                         ceph_put_snap_realm(mdsc, realm);
3913                 goto done_unlocked;
3914         }
3915
3916         /* the rest require a cap */
3917         spin_lock(&ci->i_ceph_lock);
3918         cap = __get_cap_for_mds(ceph_inode(inode), session->s_mds);
3919         if (!cap) {
3920                 dout(" no cap on %p ino %llx.%llx from mds%d\n",
3921                      inode, ceph_ino(inode), ceph_snap(inode),
3922                      session->s_mds);
3923                 spin_unlock(&ci->i_ceph_lock);
3924                 goto flush_cap_releases;
3925         }
3926
3927         /* note that each of these drops i_ceph_lock for us */
3928         switch (op) {
3929         case CEPH_CAP_OP_REVOKE:
3930         case CEPH_CAP_OP_GRANT:
3931                 __ceph_caps_issued(ci, &extra_info.issued);
3932                 extra_info.issued |= __ceph_caps_dirty(ci);
3933                 handle_cap_grant(inode, session, cap,
3934                                  h, msg->middle, &extra_info);
3935                 goto done_unlocked;
3936
3937         case CEPH_CAP_OP_FLUSH_ACK:
3938                 handle_cap_flush_ack(inode, le64_to_cpu(msg->hdr.tid),
3939                                      h, session, cap);
3940                 break;
3941
3942         case CEPH_CAP_OP_TRUNC:
3943                 handle_cap_trunc(inode, h, session);
3944                 break;
3945
3946         default:
3947                 spin_unlock(&ci->i_ceph_lock);
3948                 pr_err("ceph_handle_caps: unknown cap op %d %s\n", op,
3949                        ceph_cap_op_name(op));
3950         }
3951
3952         goto done;
3953
3954 flush_cap_releases:
3955         /*
3956          * send any cap release message to try to move things
3957          * along for the mds (who clearly thinks we still have this
3958          * cap).
3959          */
3960         ceph_send_cap_releases(mdsc, session);
3961
3962 done:
3963         mutex_unlock(&session->s_mutex);
3964 done_unlocked:
3965         iput(inode);
3966         ceph_put_string(extra_info.pool_ns);
3967         return;
3968
3969 bad:
3970         pr_err("ceph_handle_caps: corrupt message\n");
3971         ceph_msg_dump(msg);
3972         return;
3973 }
3974
3975 /*
3976  * Delayed work handler to process end of delayed cap release LRU list.
3977  */
3978 void ceph_check_delayed_caps(struct ceph_mds_client *mdsc)
3979 {
3980         struct inode *inode;
3981         struct ceph_inode_info *ci;
3982         int flags = CHECK_CAPS_NODELAY;
3983
3984         dout("check_delayed_caps\n");
3985         while (1) {
3986                 spin_lock(&mdsc->cap_delay_lock);
3987                 if (list_empty(&mdsc->cap_delay_list))
3988                         break;
3989                 ci = list_first_entry(&mdsc->cap_delay_list,
3990                                       struct ceph_inode_info,
3991                                       i_cap_delay_list);
3992                 if ((ci->i_ceph_flags & CEPH_I_FLUSH) == 0 &&
3993                     time_before(jiffies, ci->i_hold_caps_max))
3994                         break;
3995                 list_del_init(&ci->i_cap_delay_list);
3996
3997                 inode = igrab(&ci->vfs_inode);
3998                 spin_unlock(&mdsc->cap_delay_lock);
3999
4000                 if (inode) {
4001                         dout("check_delayed_caps on %p\n", inode);
4002                         ceph_check_caps(ci, flags, NULL);
4003                         iput(inode);
4004                 }
4005         }
4006         spin_unlock(&mdsc->cap_delay_lock);
4007 }
4008
4009 /*
4010  * Flush all dirty caps to the mds
4011  */
4012 void ceph_flush_dirty_caps(struct ceph_mds_client *mdsc)
4013 {
4014         struct ceph_inode_info *ci;
4015         struct inode *inode;
4016
4017         dout("flush_dirty_caps\n");
4018         spin_lock(&mdsc->cap_dirty_lock);
4019         while (!list_empty(&mdsc->cap_dirty)) {
4020                 ci = list_first_entry(&mdsc->cap_dirty, struct ceph_inode_info,
4021                                       i_dirty_item);
4022                 inode = &ci->vfs_inode;
4023                 ihold(inode);
4024                 dout("flush_dirty_caps %p\n", inode);
4025                 spin_unlock(&mdsc->cap_dirty_lock);
4026                 ceph_check_caps(ci, CHECK_CAPS_NODELAY|CHECK_CAPS_FLUSH, NULL);
4027                 iput(inode);
4028                 spin_lock(&mdsc->cap_dirty_lock);
4029         }
4030         spin_unlock(&mdsc->cap_dirty_lock);
4031         dout("flush_dirty_caps done\n");
4032 }
4033
4034 void __ceph_get_fmode(struct ceph_inode_info *ci, int fmode)
4035 {
4036         int i;
4037         int bits = (fmode << 1) | 1;
4038         for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
4039                 if (bits & (1 << i))
4040                         ci->i_nr_by_mode[i]++;
4041         }
4042 }
4043
4044 /*
4045  * Drop open file reference.  If we were the last open file,
4046  * we may need to release capabilities to the MDS (or schedule
4047  * their delayed release).
4048  */
4049 void ceph_put_fmode(struct ceph_inode_info *ci, int fmode)
4050 {
4051         int i, last = 0;
4052         int bits = (fmode << 1) | 1;
4053         spin_lock(&ci->i_ceph_lock);
4054         for (i = 0; i < CEPH_FILE_MODE_BITS; i++) {
4055                 if (bits & (1 << i)) {
4056                         BUG_ON(ci->i_nr_by_mode[i] == 0);
4057                         if (--ci->i_nr_by_mode[i] == 0)
4058                                 last++;
4059                 }
4060         }
4061         dout("put_fmode %p fmode %d {%d,%d,%d,%d}\n",
4062              &ci->vfs_inode, fmode,
4063              ci->i_nr_by_mode[0], ci->i_nr_by_mode[1],
4064              ci->i_nr_by_mode[2], ci->i_nr_by_mode[3]);
4065         spin_unlock(&ci->i_ceph_lock);
4066
4067         if (last && ci->i_vino.snap == CEPH_NOSNAP)
4068                 ceph_check_caps(ci, 0, NULL);
4069 }
4070
4071 /*
4072  * For a soon-to-be unlinked file, drop the AUTH_RDCACHE caps. If it
4073  * looks like the link count will hit 0, drop any other caps (other
4074  * than PIN) we don't specifically want (due to the file still being
4075  * open).
4076  */
4077 int ceph_drop_caps_for_unlink(struct inode *inode)
4078 {
4079         struct ceph_inode_info *ci = ceph_inode(inode);
4080         int drop = CEPH_CAP_LINK_SHARED | CEPH_CAP_LINK_EXCL;
4081
4082         spin_lock(&ci->i_ceph_lock);
4083         if (inode->i_nlink == 1) {
4084                 drop |= ~(__ceph_caps_wanted(ci) | CEPH_CAP_PIN);
4085
4086                 ci->i_ceph_flags |= CEPH_I_NODELAY;
4087                 if (__ceph_caps_dirty(ci)) {
4088                         struct ceph_mds_client *mdsc =
4089                                 ceph_inode_to_client(inode)->mdsc;
4090                         __cap_delay_requeue_front(mdsc, ci);
4091                 }
4092         }
4093         spin_unlock(&ci->i_ceph_lock);
4094         return drop;
4095 }
4096
4097 /*
4098  * Helpers for embedding cap and dentry lease releases into mds
4099  * requests.
4100  *
4101  * @force is used by dentry_release (below) to force inclusion of a
4102  * record for the directory inode, even when there aren't any caps to
4103  * drop.
4104  */
4105 int ceph_encode_inode_release(void **p, struct inode *inode,
4106                               int mds, int drop, int unless, int force)
4107 {
4108         struct ceph_inode_info *ci = ceph_inode(inode);
4109         struct ceph_cap *cap;
4110         struct ceph_mds_request_release *rel = *p;
4111         int used, dirty;
4112         int ret = 0;
4113
4114         spin_lock(&ci->i_ceph_lock);
4115         used = __ceph_caps_used(ci);
4116         dirty = __ceph_caps_dirty(ci);
4117
4118         dout("encode_inode_release %p mds%d used|dirty %s drop %s unless %s\n",
4119              inode, mds, ceph_cap_string(used|dirty), ceph_cap_string(drop),
4120              ceph_cap_string(unless));
4121
4122         /* only drop unused, clean caps */
4123         drop &= ~(used | dirty);
4124
4125         cap = __get_cap_for_mds(ci, mds);
4126         if (cap && __cap_is_valid(cap)) {
4127                 unless &= cap->issued;
4128                 if (unless) {
4129                         if (unless & CEPH_CAP_AUTH_EXCL)
4130                                 drop &= ~CEPH_CAP_AUTH_SHARED;
4131                         if (unless & CEPH_CAP_LINK_EXCL)
4132                                 drop &= ~CEPH_CAP_LINK_SHARED;
4133                         if (unless & CEPH_CAP_XATTR_EXCL)
4134                                 drop &= ~CEPH_CAP_XATTR_SHARED;
4135                         if (unless & CEPH_CAP_FILE_EXCL)
4136                                 drop &= ~CEPH_CAP_FILE_SHARED;
4137                 }
4138
4139                 if (force || (cap->issued & drop)) {
4140                         if (cap->issued & drop) {
4141                                 int wanted = __ceph_caps_wanted(ci);
4142                                 if ((ci->i_ceph_flags & CEPH_I_NODELAY) == 0)
4143                                         wanted |= cap->mds_wanted;
4144                                 dout("encode_inode_release %p cap %p "
4145                                      "%s -> %s, wanted %s -> %s\n", inode, cap,
4146                                      ceph_cap_string(cap->issued),
4147                                      ceph_cap_string(cap->issued & ~drop),
4148                                      ceph_cap_string(cap->mds_wanted),
4149                                      ceph_cap_string(wanted));
4150
4151                                 cap->issued &= ~drop;
4152                                 cap->implemented &= ~drop;
4153                                 cap->mds_wanted = wanted;
4154                         } else {
4155                                 dout("encode_inode_release %p cap %p %s"
4156                                      " (force)\n", inode, cap,
4157                                      ceph_cap_string(cap->issued));
4158                         }
4159
4160                         rel->ino = cpu_to_le64(ceph_ino(inode));
4161                         rel->cap_id = cpu_to_le64(cap->cap_id);
4162                         rel->seq = cpu_to_le32(cap->seq);
4163                         rel->issue_seq = cpu_to_le32(cap->issue_seq);
4164                         rel->mseq = cpu_to_le32(cap->mseq);
4165                         rel->caps = cpu_to_le32(cap->implemented);
4166                         rel->wanted = cpu_to_le32(cap->mds_wanted);
4167                         rel->dname_len = 0;
4168                         rel->dname_seq = 0;
4169                         *p += sizeof(*rel);
4170                         ret = 1;
4171                 } else {
4172                         dout("encode_inode_release %p cap %p %s (noop)\n",
4173                              inode, cap, ceph_cap_string(cap->issued));
4174                 }
4175         }
4176         spin_unlock(&ci->i_ceph_lock);
4177         return ret;
4178 }
4179
4180 int ceph_encode_dentry_release(void **p, struct dentry *dentry,
4181                                struct inode *dir,
4182                                int mds, int drop, int unless)
4183 {
4184         struct dentry *parent = NULL;
4185         struct ceph_mds_request_release *rel = *p;
4186         struct ceph_dentry_info *di = ceph_dentry(dentry);
4187         int force = 0;
4188         int ret;
4189
4190         /*
4191          * force an record for the directory caps if we have a dentry lease.
4192          * this is racy (can't take i_ceph_lock and d_lock together), but it
4193          * doesn't have to be perfect; the mds will revoke anything we don't
4194          * release.
4195          */
4196         spin_lock(&dentry->d_lock);
4197         if (di->lease_session && di->lease_session->s_mds == mds)
4198                 force = 1;
4199         if (!dir) {
4200                 parent = dget(dentry->d_parent);
4201                 dir = d_inode(parent);
4202         }
4203         spin_unlock(&dentry->d_lock);
4204
4205         ret = ceph_encode_inode_release(p, dir, mds, drop, unless, force);
4206         dput(parent);
4207
4208         spin_lock(&dentry->d_lock);
4209         if (ret && di->lease_session && di->lease_session->s_mds == mds) {
4210                 dout("encode_dentry_release %p mds%d seq %d\n",
4211                      dentry, mds, (int)di->lease_seq);
4212                 rel->dname_len = cpu_to_le32(dentry->d_name.len);
4213                 memcpy(*p, dentry->d_name.name, dentry->d_name.len);
4214                 *p += dentry->d_name.len;
4215                 rel->dname_seq = cpu_to_le32(di->lease_seq);
4216                 __ceph_mdsc_drop_dentry_lease(dentry);
4217         }
4218         spin_unlock(&dentry->d_lock);
4219         return ret;
4220 }