Merge git://git.kernel.org/pub/scm/linux/kernel/git/davem/net
[sfrench/cifs-2.6.git] / fs / ceph / caps.c
index dc988337f8413cb253b00c27ec14cace933dddf8..dc10c9dd36c1a2ac6264ed21d3248e5f62f1e330 100644 (file)
@@ -833,7 +833,9 @@ int __ceph_caps_used(struct ceph_inode_info *ci)
                used |= CEPH_CAP_PIN;
        if (ci->i_rd_ref)
                used |= CEPH_CAP_FILE_RD;
-       if (ci->i_rdcache_ref || ci->vfs_inode.i_data.nrpages)
+       if (ci->i_rdcache_ref ||
+           (!S_ISDIR(ci->vfs_inode.i_mode) && /* ignore readdir cache */
+            ci->vfs_inode.i_data.nrpages))
                used |= CEPH_CAP_FILE_CACHE;
        if (ci->i_wr_ref)
                used |= CEPH_CAP_FILE_WR;
@@ -986,8 +988,8 @@ void __ceph_remove_cap(struct ceph_cap *cap, bool queue_release)
 static int send_cap_msg(struct ceph_mds_session *session,
                        u64 ino, u64 cid, int op,
                        int caps, int wanted, int dirty,
-                       u32 seq, u64 flush_tid, u32 issue_seq, u32 mseq,
-                       u64 size, u64 max_size,
+                       u32 seq, u64 flush_tid, u64 oldest_flush_tid,
+                       u32 issue_seq, u32 mseq, u64 size, u64 max_size,
                        struct timespec *mtime, struct timespec *atime,
                        u64 time_warp_seq,
                        kuid_t uid, kgid_t gid, umode_t mode,
@@ -1001,20 +1003,23 @@ static int send_cap_msg(struct ceph_mds_session *session,
        size_t extra_len;
 
        dout("send_cap_msg %s %llx %llx caps %s wanted %s dirty %s"
-            " seq %u/%u mseq %u follows %lld size %llu/%llu"
+            " seq %u/%u tid %llu/%llu mseq %u follows %lld size %llu/%llu"
             " xattr_ver %llu xattr_len %d\n", ceph_cap_op_name(op),
             cid, ino, ceph_cap_string(caps), ceph_cap_string(wanted),
             ceph_cap_string(dirty),
-            seq, issue_seq, mseq, follows, size, max_size,
+            seq, issue_seq, flush_tid, oldest_flush_tid,
+            mseq, follows, size, max_size,
             xattr_version, xattrs_buf ? (int)xattrs_buf->vec.iov_len : 0);
 
-       /* flock buffer size + inline version + inline data size */
-       extra_len = 4 + 8 + 4;
+       /* flock buffer size + inline version + inline data size +
+        * osd_epoch_barrier + oldest_flush_tid */
+       extra_len = 4 + 8 + 4 + 4 + 8;
        msg = ceph_msg_new(CEPH_MSG_CLIENT_CAPS, sizeof(*fc) + extra_len,
                           GFP_NOFS, false);
        if (!msg)
                return -ENOMEM;
 
+       msg->hdr.version = cpu_to_le16(6);
        msg->hdr.tid = cpu_to_le64(flush_tid);
 
        fc = msg->front.iov_base;
@@ -1050,6 +1055,10 @@ static int send_cap_msg(struct ceph_mds_session *session,
        ceph_encode_64(&p, inline_data ? 0 : CEPH_INLINE_NONE);
        /* inline data size */
        ceph_encode_32(&p, 0);
+       /* osd_epoch_barrier */
+       ceph_encode_32(&p, 0);
+       /* oldest_flush_tid */
+       ceph_encode_64(&p, oldest_flush_tid);
 
        fc->xattr_version = cpu_to_le64(xattr_version);
        if (xattrs_buf) {
@@ -1097,7 +1106,8 @@ void ceph_queue_caps_release(struct inode *inode)
  * caller should hold snap_rwsem (read), s_mutex.
  */
 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
-                     int op, int used, int want, int retain, int flushing)
+                     int op, int used, int want, int retain, int flushing,
+                     u64 flush_tid, u64 oldest_flush_tid)
        __releases(cap->ci->i_ceph_lock)
 {
        struct ceph_inode_info *ci = cap->ci;
@@ -1115,8 +1125,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        u64 xattr_version = 0;
        struct ceph_buffer *xattr_blob = NULL;
        int delayed = 0;
-       u64 flush_tid = 0;
-       int i;
        int ret;
        bool inline_data;
 
@@ -1160,24 +1168,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        cap->implemented &= cap->issued | used;
        cap->mds_wanted = want;
 
-       if (flushing) {
-               /*
-                * assign a tid for flush operations so we can avoid
-                * flush1 -> dirty1 -> flush2 -> flushack1 -> mark
-                * clean type races.  track latest tid for every bit
-                * so we can handle flush AxFw, flush Fw, and have the
-                * first ack clean Ax.
-                */
-               flush_tid = ++ci->i_cap_flush_last_tid;
-               dout(" cap_flush_tid %d\n", (int)flush_tid);
-               for (i = 0; i < CEPH_CAP_BITS; i++)
-                       if (flushing & (1 << i))
-                               ci->i_cap_flush_tid[i] = flush_tid;
-
-               follows = ci->i_head_snapc->seq;
-       } else {
-               follows = 0;
-       }
+       follows = flushing ? ci->i_head_snapc->seq : 0;
 
        keep = cap->implemented;
        seq = cap->seq;
@@ -1205,7 +1196,8 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        spin_unlock(&ci->i_ceph_lock);
 
        ret = send_cap_msg(session, ceph_vino(inode).ino, cap_id,
-               op, keep, want, flushing, seq, flush_tid, issue_seq, mseq,
+               op, keep, want, flushing, seq,
+               flush_tid, oldest_flush_tid, issue_seq, mseq,
                size, max_size, &mtime, &atime, time_warp_seq,
                uid, gid, mode, xattr_version, xattr_blob,
                follows, inline_data);
@@ -1311,7 +1303,10 @@ retry:
                        goto retry;
                }
 
-               capsnap->flush_tid = ++ci->i_cap_flush_last_tid;
+               spin_lock(&mdsc->cap_dirty_lock);
+               capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
+               spin_unlock(&mdsc->cap_dirty_lock);
+
                atomic_inc(&capsnap->nref);
                if (list_empty(&capsnap->flushing_item))
                        list_add_tail(&capsnap->flushing_item,
@@ -1322,8 +1317,8 @@ retry:
                     inode, capsnap, capsnap->follows, capsnap->flush_tid);
                send_cap_msg(session, ceph_vino(inode).ino, 0,
                             CEPH_CAP_OP_FLUSHSNAP, capsnap->issued, 0,
-                            capsnap->dirty, 0, capsnap->flush_tid, 0, mseq,
-                            capsnap->size, 0,
+                            capsnap->dirty, 0, capsnap->flush_tid, 0,
+                            0, mseq, capsnap->size, 0,
                             &capsnap->mtime, &capsnap->atime,
                             capsnap->time_warp_seq,
                             capsnap->uid, capsnap->gid, capsnap->mode,
@@ -1363,7 +1358,8 @@ static void ceph_flush_snaps(struct ceph_inode_info *ci)
  * Caller is then responsible for calling __mark_inode_dirty with the
  * returned flags value.
  */
-int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
+int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask,
+                          struct ceph_cap_flush **pcf)
 {
        struct ceph_mds_client *mdsc =
                ceph_sb_to_client(ci->vfs_inode.i_sb)->mdsc;
@@ -1383,6 +1379,9 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
             ceph_cap_string(was | mask));
        ci->i_dirty_caps |= mask;
        if (was == 0) {
+               WARN_ON_ONCE(ci->i_prealloc_cap_flush);
+               swap(ci->i_prealloc_cap_flush, *pcf);
+
                if (!ci->i_head_snapc) {
                        WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
                        ci->i_head_snapc = ceph_get_snap_context(
@@ -1398,6 +1397,8 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
                        ihold(inode);
                        dirty |= I_DIRTY_SYNC;
                }
+       } else {
+               WARN_ON_ONCE(!ci->i_prealloc_cap_flush);
        }
        BUG_ON(list_empty(&ci->i_dirty_item));
        if (((was | ci->i_flushing_caps) & CEPH_CAP_FILE_BUFFER) &&
@@ -1407,6 +1408,74 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
        return dirty;
 }
 
+static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
+                                       struct ceph_cap_flush *cf)
+{
+       struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_cap_flush *other = NULL;
+
+       while (*p) {
+               parent = *p;
+               other = rb_entry(parent, struct ceph_cap_flush, i_node);
+
+               if (cf->tid < other->tid)
+                       p = &(*p)->rb_left;
+               else if (cf->tid > other->tid)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&cf->i_node, parent, p);
+       rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
+}
+
+static void __add_cap_flushing_to_mdsc(struct ceph_mds_client *mdsc,
+                                      struct ceph_cap_flush *cf)
+{
+       struct rb_node **p = &mdsc->cap_flush_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_cap_flush *other = NULL;
+
+       while (*p) {
+               parent = *p;
+               other = rb_entry(parent, struct ceph_cap_flush, g_node);
+
+               if (cf->tid < other->tid)
+                       p = &(*p)->rb_left;
+               else if (cf->tid > other->tid)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&cf->g_node, parent, p);
+       rb_insert_color(&cf->g_node, &mdsc->cap_flush_tree);
+}
+
+struct ceph_cap_flush *ceph_alloc_cap_flush(void)
+{
+       return kmem_cache_alloc(ceph_cap_flush_cachep, GFP_KERNEL);
+}
+
+void ceph_free_cap_flush(struct ceph_cap_flush *cf)
+{
+       if (cf)
+               kmem_cache_free(ceph_cap_flush_cachep, cf);
+}
+
+static u64 __get_oldest_flush_tid(struct ceph_mds_client *mdsc)
+{
+       struct rb_node *n = rb_first(&mdsc->cap_flush_tree);
+       if (n) {
+               struct ceph_cap_flush *cf =
+                       rb_entry(n, struct ceph_cap_flush, g_node);
+               return cf->tid;
+       }
+       return 0;
+}
+
 /*
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
@@ -1414,14 +1483,17 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
  * Called under i_ceph_lock.
  */
 static int __mark_caps_flushing(struct inode *inode,
-                                struct ceph_mds_session *session)
+                               struct ceph_mds_session *session,
+                               u64 *flush_tid, u64 *oldest_flush_tid)
 {
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_cap_flush *cf = NULL;
        int flushing;
 
        BUG_ON(ci->i_dirty_caps == 0);
        BUG_ON(list_empty(&ci->i_dirty_item));
+       BUG_ON(!ci->i_prealloc_cap_flush);
 
        flushing = ci->i_dirty_caps;
        dout("__mark_caps_flushing flushing %s, flushing_caps %s -> %s\n",
@@ -1432,22 +1504,31 @@ static int __mark_caps_flushing(struct inode *inode,
        ci->i_dirty_caps = 0;
        dout(" inode %p now !dirty\n", inode);
 
+       swap(cf, ci->i_prealloc_cap_flush);
+       cf->caps = flushing;
+       cf->kick = false;
+
        spin_lock(&mdsc->cap_dirty_lock);
        list_del_init(&ci->i_dirty_item);
 
+       cf->tid = ++mdsc->last_cap_flush_tid;
+       __add_cap_flushing_to_mdsc(mdsc, cf);
+       *oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+
        if (list_empty(&ci->i_flushing_item)) {
-               ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
                list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
                mdsc->num_cap_flushing++;
-               dout(" inode %p now flushing seq %lld\n", inode,
-                    ci->i_cap_flush_seq);
+               dout(" inode %p now flushing tid %llu\n", inode, cf->tid);
        } else {
                list_move_tail(&ci->i_flushing_item, &session->s_cap_flushing);
-               dout(" inode %p now flushing (more) seq %lld\n", inode,
-                    ci->i_cap_flush_seq);
+               dout(" inode %p now flushing (more) tid %llu\n",
+                    inode, cf->tid);
        }
        spin_unlock(&mdsc->cap_dirty_lock);
 
+       __add_cap_flushing_to_inode(ci, cf);
+
+       *flush_tid = cf->tid;
        return flushing;
 }
 
@@ -1493,6 +1574,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap *cap;
+       u64 flush_tid, oldest_flush_tid;
        int file_wanted, used, cap_used;
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
        int issued, implemented, want, retain, revoking, flushing = 0;
@@ -1571,9 +1653,10 @@ retry_locked:
         * If we fail, it's because pages are locked.... try again later.
         */
        if ((!is_delayed || mdsc->stopping) &&
-           ci->i_wrbuffer_ref == 0 &&               /* no dirty pages... */
-           inode->i_data.nrpages &&                 /* have cached pages */
-           (file_wanted == 0 ||                     /* no open files */
+           !S_ISDIR(inode->i_mode) &&          /* ignore readdir cache */
+           ci->i_wrbuffer_ref == 0 &&          /* no dirty pages... */
+           inode->i_data.nrpages &&            /* have cached pages */
+           (file_wanted == 0 ||                /* no open files */
             (revoking & (CEPH_CAP_FILE_CACHE|
                          CEPH_CAP_FILE_LAZYIO))) && /*  or revoking cache */
            !tried_invalidate) {
@@ -1711,17 +1794,25 @@ ack:
                        took_snap_rwsem = 1;
                }
 
-               if (cap == ci->i_auth_cap && ci->i_dirty_caps)
-                       flushing = __mark_caps_flushing(inode, session);
-               else
+               if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
+                       flushing = __mark_caps_flushing(inode, session,
+                                                       &flush_tid,
+                                                       &oldest_flush_tid);
+               } else {
                        flushing = 0;
+                       flush_tid = 0;
+                       spin_lock(&mdsc->cap_dirty_lock);
+                       oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+                       spin_unlock(&mdsc->cap_dirty_lock);
+               }
 
                mds = cap->mds;  /* remember mds, so we don't repeat */
                sent++;
 
                /* __send_cap drops i_ceph_lock */
                delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
-                                     want, retain, flushing);
+                                     want, retain, flushing,
+                                     flush_tid, oldest_flush_tid);
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
 
@@ -1750,12 +1841,13 @@ ack:
 /*
  * Try to flush dirty caps back to the auth mds.
  */
-static int try_flush_caps(struct inode *inode, u16 flush_tid[])
+static int try_flush_caps(struct inode *inode, u64 *ptid)
 {
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_session *session = NULL;
        int flushing = 0;
+       u64 flush_tid = 0, oldest_flush_tid = 0;
 
 retry:
        spin_lock(&ci->i_ceph_lock);
@@ -1780,46 +1872,53 @@ retry:
                if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
                        goto out;
 
-               flushing = __mark_caps_flushing(inode, session);
+               flushing = __mark_caps_flushing(inode, session, &flush_tid,
+                                               &oldest_flush_tid);
 
                /* __send_cap drops i_ceph_lock */
                delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
-                                    cap->issued | cap->implemented, flushing);
+                                    (cap->issued | cap->implemented),
+                                    flushing, flush_tid, oldest_flush_tid);
 
-               spin_lock(&ci->i_ceph_lock);
-               if (delayed)
+               if (delayed) {
+                       spin_lock(&ci->i_ceph_lock);
                        __cap_delay_requeue(mdsc, ci);
+                       spin_unlock(&ci->i_ceph_lock);
+               }
+       } else {
+               struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
+               if (n) {
+                       struct ceph_cap_flush *cf =
+                               rb_entry(n, struct ceph_cap_flush, i_node);
+                       flush_tid = cf->tid;
+               }
+               flushing = ci->i_flushing_caps;
+               spin_unlock(&ci->i_ceph_lock);
        }
-
-       flushing = ci->i_flushing_caps;
-       if (flushing)
-               memcpy(flush_tid, ci->i_cap_flush_tid,
-                      sizeof(ci->i_cap_flush_tid));
 out:
-       spin_unlock(&ci->i_ceph_lock);
        if (session)
                mutex_unlock(&session->s_mutex);
+
+       *ptid = flush_tid;
        return flushing;
 }
 
 /*
  * Return true if we've flushed caps through the given flush_tid.
  */
-static int caps_are_flushed(struct inode *inode, u16 flush_tid[])
+static int caps_are_flushed(struct inode *inode, u64 flush_tid)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int i, ret = 1;
+       struct ceph_cap_flush *cf;
+       struct rb_node *n;
+       int ret = 1;
 
        spin_lock(&ci->i_ceph_lock);
-       for (i = 0; i < CEPH_CAP_BITS; i++) {
-               if (!(ci->i_flushing_caps & (1 << i)))
-                       continue;
-               // tid only has 16 bits. we need to handle wrapping
-               if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) {
-                       /* still flushing this bit */
+       n = rb_first(&ci->i_cap_flush_tree);
+       if (n) {
+               cf = rb_entry(n, struct ceph_cap_flush, i_node);
+               if (cf->tid <= flush_tid)
                        ret = 0;
-                       break;
-               }
        }
        spin_unlock(&ci->i_ceph_lock);
        return ret;
@@ -1922,7 +2021,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
        struct inode *inode = file->f_mapping->host;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       u16 flush_tid[CEPH_CAP_BITS];
+       u64 flush_tid;
        int ret;
        int dirty;
 
@@ -1938,7 +2037,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 
        mutex_lock(&inode->i_mutex);
 
-       dirty = try_flush_caps(inode, flush_tid);
+       dirty = try_flush_caps(inode, &flush_tid);
        dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
        ret = unsafe_dirop_wait(inode);
@@ -1967,14 +2066,14 @@ out:
 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       u16 flush_tid[CEPH_CAP_BITS];
+       u64 flush_tid;
        int err = 0;
        int dirty;
        int wait = wbc->sync_mode == WB_SYNC_ALL;
 
        dout("write_inode %p wait=%d\n", inode, wait);
        if (wait) {
-               dirty = try_flush_caps(inode, flush_tid);
+               dirty = try_flush_caps(inode, &flush_tid);
                if (dirty)
                        err = wait_event_interruptible(ci->i_cap_wq,
                                       caps_are_flushed(inode, flush_tid));
@@ -2022,6 +2121,104 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
        }
 }
 
+static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
+                               struct ceph_mds_session *session,
+                               struct ceph_inode_info *ci,
+                               bool kick_all)
+{
+       struct inode *inode = &ci->vfs_inode;
+       struct ceph_cap *cap;
+       struct ceph_cap_flush *cf;
+       struct rb_node *n;
+       int delayed = 0;
+       u64 first_tid = 0;
+       u64 oldest_flush_tid;
+
+       spin_lock(&mdsc->cap_dirty_lock);
+       oldest_flush_tid = __get_oldest_flush_tid(mdsc);
+       spin_unlock(&mdsc->cap_dirty_lock);
+
+       while (true) {
+               spin_lock(&ci->i_ceph_lock);
+               cap = ci->i_auth_cap;
+               if (!(cap && cap->session == session)) {
+                       pr_err("%p auth cap %p not mds%d ???\n", inode,
+                                       cap, session->s_mds);
+                       spin_unlock(&ci->i_ceph_lock);
+                       break;
+               }
+
+               for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
+                       cf = rb_entry(n, struct ceph_cap_flush, i_node);
+                       if (cf->tid < first_tid)
+                               continue;
+                       if (kick_all || cf->kick)
+                               break;
+               }
+               if (!n) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       break;
+               }
+
+               cf = rb_entry(n, struct ceph_cap_flush, i_node);
+               cf->kick = false;
+
+               first_tid = cf->tid + 1;
+
+               dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
+                    cap, cf->tid, ceph_cap_string(cf->caps));
+               delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+                                     __ceph_caps_used(ci),
+                                     __ceph_caps_wanted(ci),
+                                     cap->issued | cap->implemented,
+                                     cf->caps, cf->tid, oldest_flush_tid);
+       }
+       return delayed;
+}
+
+void ceph_early_kick_flushing_caps(struct ceph_mds_client *mdsc,
+                                  struct ceph_mds_session *session)
+{
+       struct ceph_inode_info *ci;
+       struct ceph_cap *cap;
+       struct ceph_cap_flush *cf;
+       struct rb_node *n;
+
+       dout("early_kick_flushing_caps mds%d\n", session->s_mds);
+       list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
+               spin_lock(&ci->i_ceph_lock);
+               cap = ci->i_auth_cap;
+               if (!(cap && cap->session == session)) {
+                       pr_err("%p auth cap %p not mds%d ???\n",
+                               &ci->vfs_inode, cap, session->s_mds);
+                       spin_unlock(&ci->i_ceph_lock);
+                       continue;
+               }
+
+
+               /*
+                * if flushing caps were revoked, we re-send the cap flush
+                * in client reconnect stage. This guarantees MDS * processes
+                * the cap flush message before issuing the flushing caps to
+                * other client.
+                */
+               if ((cap->issued & ci->i_flushing_caps) !=
+                   ci->i_flushing_caps) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       if (!__kick_flushing_caps(mdsc, session, ci, true))
+                               continue;
+                       spin_lock(&ci->i_ceph_lock);
+               }
+
+               for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
+                       cf = rb_entry(n, struct ceph_cap_flush, i_node);
+                       cf->kick = true;
+               }
+
+               spin_unlock(&ci->i_ceph_lock);
+       }
+}
+
 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
                             struct ceph_mds_session *session)
 {
@@ -2031,28 +2228,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 
        dout("kick_flushing_caps mds%d\n", session->s_mds);
        list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
-               struct inode *inode = &ci->vfs_inode;
-               struct ceph_cap *cap;
-               int delayed = 0;
-
-               spin_lock(&ci->i_ceph_lock);
-               cap = ci->i_auth_cap;
-               if (cap && cap->session == session) {
-                       dout("kick_flushing_caps %p cap %p %s\n", inode,
-                            cap, ceph_cap_string(ci->i_flushing_caps));
-                       delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-                                            __ceph_caps_used(ci),
-                                            __ceph_caps_wanted(ci),
-                                            cap->issued | cap->implemented,
-                                            ci->i_flushing_caps);
-                       if (delayed) {
-                               spin_lock(&ci->i_ceph_lock);
-                               __cap_delay_requeue(mdsc, ci);
-                               spin_unlock(&ci->i_ceph_lock);
-                       }
-               } else {
-                       pr_err("%p auth cap %p not mds%d ???\n", inode,
-                              cap, session->s_mds);
+               int delayed = __kick_flushing_caps(mdsc, session, ci, false);
+               if (delayed) {
+                       spin_lock(&ci->i_ceph_lock);
+                       __cap_delay_requeue(mdsc, ci);
                        spin_unlock(&ci->i_ceph_lock);
                }
        }
@@ -2064,26 +2243,25 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_cap *cap;
-       int delayed = 0;
 
        spin_lock(&ci->i_ceph_lock);
        cap = ci->i_auth_cap;
-       dout("kick_flushing_inode_caps %p flushing %s flush_seq %lld\n", inode,
-            ceph_cap_string(ci->i_flushing_caps), ci->i_cap_flush_seq);
+       dout("kick_flushing_inode_caps %p flushing %s\n", inode,
+            ceph_cap_string(ci->i_flushing_caps));
 
        __ceph_flush_snaps(ci, &session, 1);
 
        if (ci->i_flushing_caps) {
+               int delayed;
+
                spin_lock(&mdsc->cap_dirty_lock);
                list_move_tail(&ci->i_flushing_item,
                               &cap->session->s_cap_flushing);
                spin_unlock(&mdsc->cap_dirty_lock);
 
-               delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-                                    __ceph_caps_used(ci),
-                                    __ceph_caps_wanted(ci),
-                                    cap->issued | cap->implemented,
-                                    ci->i_flushing_caps);
+               spin_unlock(&ci->i_ceph_lock);
+
+               delayed = __kick_flushing_caps(mdsc, session, ci, true);
                if (delayed) {
                        spin_lock(&ci->i_ceph_lock);
                        __cap_delay_requeue(mdsc, ci);
@@ -2630,7 +2808,8 @@ static void handle_cap_grant(struct ceph_mds_client *mdsc,
         * try to invalidate (once).  (If there are dirty buffers, we
         * will invalidate _after_ writeback.)
         */
-       if (((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
+       if (!S_ISDIR(inode->i_mode) && /* don't invalidate readdir cache */
+           ((cap->issued & ~newcaps) & CEPH_CAP_FILE_CACHE) &&
            (newcaps & CEPH_CAP_FILE_LAZYIO) == 0 &&
            !ci->i_wrbuffer_ref) {
                if (try_nonblocking_invalidate(inode)) {
@@ -2836,16 +3015,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+       struct ceph_cap_flush *cf;
+       struct rb_node *n;
+       LIST_HEAD(to_remove);
        unsigned seq = le32_to_cpu(m->seq);
        int dirty = le32_to_cpu(m->dirty);
        int cleaned = 0;
        int drop = 0;
-       int i;
 
-       for (i = 0; i < CEPH_CAP_BITS; i++)
-               if ((dirty & (1 << i)) &&
-                   (u16)flush_tid == ci->i_cap_flush_tid[i])
-                       cleaned |= 1 << i;
+       n = rb_first(&ci->i_cap_flush_tree);
+       while (n) {
+               cf = rb_entry(n, struct ceph_cap_flush, i_node);
+               n = rb_next(&cf->i_node);
+               if (cf->tid == flush_tid)
+                       cleaned = cf->caps;
+               if (cf->tid <= flush_tid) {
+                       rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+                       list_add_tail(&cf->list, &to_remove);
+               } else {
+                       cleaned &= ~cf->caps;
+                       if (!cleaned)
+                               break;
+               }
+       }
 
        dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
             " flushing %s -> %s\n",
@@ -2853,12 +3045,23 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
             ceph_cap_string(cleaned), ceph_cap_string(ci->i_flushing_caps),
             ceph_cap_string(ci->i_flushing_caps & ~cleaned));
 
-       if (ci->i_flushing_caps == (ci->i_flushing_caps & ~cleaned))
+       if (list_empty(&to_remove) && !cleaned)
                goto out;
 
        ci->i_flushing_caps &= ~cleaned;
 
        spin_lock(&mdsc->cap_dirty_lock);
+
+       if (!list_empty(&to_remove)) {
+               list_for_each_entry(cf, &to_remove, list)
+                       rb_erase(&cf->g_node, &mdsc->cap_flush_tree);
+
+               n = rb_first(&mdsc->cap_flush_tree);
+               cf = n ? rb_entry(n, struct ceph_cap_flush, g_node) : NULL;
+               if (!cf || cf->tid > flush_tid)
+                       wake_up_all(&mdsc->cap_flushing_wq);
+       }
+
        if (ci->i_flushing_caps == 0) {
                list_del_init(&ci->i_flushing_item);
                if (!list_empty(&session->s_cap_flushing))
@@ -2868,7 +3071,6 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                                         struct ceph_inode_info,
                                         i_flushing_item)->vfs_inode);
                mdsc->num_cap_flushing--;
-               wake_up_all(&mdsc->cap_flushing_wq);
                dout(" inode %p now !flushing\n", inode);
 
                if (ci->i_dirty_caps == 0) {
@@ -2890,6 +3092,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 
 out:
        spin_unlock(&ci->i_ceph_lock);
+
+       while (!list_empty(&to_remove)) {
+               cf = list_first_entry(&to_remove,
+                                     struct ceph_cap_flush, list);
+               list_del(&cf->list);
+               ceph_free_cap_flush(cf);
+       }
        if (drop)
                iput(inode);
 }