Merge tag 'xfs-for-linus-4.8-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git...
[sfrench/cifs-2.6.git] / fs / xfs / xfs_buf.c
index efa2a734268f90ad2e3167ba1fff459d6c4f412f..47a318ce82e0ab5828164e6604ffe9dd5f66a0d6 100644 (file)
@@ -79,6 +79,47 @@ xfs_buf_vmap_len(
        return (bp->b_page_count * PAGE_SIZE) - bp->b_offset;
 }
 
+/*
+ * Bump the I/O in flight count on the buftarg if we haven't yet done so for
+ * this buffer. The count is incremented once per buffer (per hold cycle)
+ * because the corresponding decrement is deferred to buffer release. Buffers
+ * can undergo I/O multiple times in a hold-release cycle and per buffer I/O
+ * tracking adds unnecessary overhead. This is used for sychronization purposes
+ * with unmount (see xfs_wait_buftarg()), so all we really need is a count of
+ * in-flight buffers.
+ *
+ * Buffers that are never released (e.g., superblock, iclog buffers) must set
+ * the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count
+ * never reaches zero and unmount hangs indefinitely.
+ */
+static inline void
+xfs_buf_ioacct_inc(
+       struct xfs_buf  *bp)
+{
+       if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT))
+               return;
+
+       ASSERT(bp->b_flags & XBF_ASYNC);
+       bp->b_flags |= _XBF_IN_FLIGHT;
+       percpu_counter_inc(&bp->b_target->bt_io_count);
+}
+
+/*
+ * Clear the in-flight state on a buffer about to be released to the LRU or
+ * freed and unaccount from the buftarg.
+ */
+static inline void
+xfs_buf_ioacct_dec(
+       struct xfs_buf  *bp)
+{
+       if (!(bp->b_flags & _XBF_IN_FLIGHT))
+               return;
+
+       ASSERT(bp->b_flags & XBF_ASYNC);
+       bp->b_flags &= ~_XBF_IN_FLIGHT;
+       percpu_counter_dec(&bp->b_target->bt_io_count);
+}
+
 /*
  * When we mark a buffer stale, we remove the buffer from the LRU and clear the
  * b_lru_ref count so that the buffer is freed immediately when the buffer
@@ -102,6 +143,14 @@ xfs_buf_stale(
         */
        bp->b_flags &= ~_XBF_DELWRI_Q;
 
+       /*
+        * Once the buffer is marked stale and unlocked, a subsequent lookup
+        * could reset b_flags. There is no guarantee that the buffer is
+        * unaccounted (released to LRU) before that occurs. Drop in-flight
+        * status now to preserve accounting consistency.
+        */
+       xfs_buf_ioacct_dec(bp);
+
        spin_lock(&bp->b_lock);
        atomic_set(&bp->b_lru_ref, 0);
        if (!(bp->b_state & XFS_BSTATE_DISPOSE) &&
@@ -815,7 +864,8 @@ xfs_buf_get_uncached(
        struct xfs_buf          *bp;
        DEFINE_SINGLE_BUF_MAP(map, XFS_BUF_DADDR_NULL, numblks);
 
-       bp = _xfs_buf_alloc(target, &map, 1, 0);
+       /* flags might contain irrelevant bits, pass only what we care about */
+       bp = _xfs_buf_alloc(target, &map, 1, flags & XBF_NO_IOACCT);
        if (unlikely(bp == NULL))
                goto fail;
 
@@ -866,63 +916,85 @@ xfs_buf_hold(
 }
 
 /*
- *     Releases a hold on the specified buffer.  If the
- *     the hold count is 1, calls xfs_buf_free.
+ * Release a hold on the specified buffer. If the hold count is 1, the buffer is
+ * placed on LRU or freed (depending on b_lru_ref).
  */
 void
 xfs_buf_rele(
        xfs_buf_t               *bp)
 {
        struct xfs_perag        *pag = bp->b_pag;
+       bool                    release;
+       bool                    freebuf = false;
 
        trace_xfs_buf_rele(bp, _RET_IP_);
 
        if (!pag) {
                ASSERT(list_empty(&bp->b_lru));
                ASSERT(RB_EMPTY_NODE(&bp->b_rbnode));
-               if (atomic_dec_and_test(&bp->b_hold))
+               if (atomic_dec_and_test(&bp->b_hold)) {
+                       xfs_buf_ioacct_dec(bp);
                        xfs_buf_free(bp);
+               }
                return;
        }
 
        ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode));
 
        ASSERT(atomic_read(&bp->b_hold) > 0);
-       if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) {
-               spin_lock(&bp->b_lock);
-               if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
-                       /*
-                        * If the buffer is added to the LRU take a new
-                        * reference to the buffer for the LRU and clear the
-                        * (now stale) dispose list state flag
-                        */
-                       if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
-                               bp->b_state &= ~XFS_BSTATE_DISPOSE;
-                               atomic_inc(&bp->b_hold);
-                       }
-                       spin_unlock(&bp->b_lock);
-                       spin_unlock(&pag->pag_buf_lock);
-               } else {
-                       /*
-                        * most of the time buffers will already be removed from
-                        * the LRU, so optimise that case by checking for the
-                        * XFS_BSTATE_DISPOSE flag indicating the last list the
-                        * buffer was on was the disposal list
-                        */
-                       if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
-                               list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
-                       } else {
-                               ASSERT(list_empty(&bp->b_lru));
-                       }
-                       spin_unlock(&bp->b_lock);
 
-                       ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
-                       rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
-                       spin_unlock(&pag->pag_buf_lock);
-                       xfs_perag_put(pag);
-                       xfs_buf_free(bp);
+       release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock);
+       spin_lock(&bp->b_lock);
+       if (!release) {
+               /*
+                * Drop the in-flight state if the buffer is already on the LRU
+                * and it holds the only reference. This is racy because we
+                * haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT
+                * ensures the decrement occurs only once per-buf.
+                */
+               if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru))
+                       xfs_buf_ioacct_dec(bp);
+               goto out_unlock;
+       }
+
+       /* the last reference has been dropped ... */
+       xfs_buf_ioacct_dec(bp);
+       if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
+               /*
+                * If the buffer is added to the LRU take a new reference to the
+                * buffer for the LRU and clear the (now stale) dispose list
+                * state flag
+                */
+               if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
+                       bp->b_state &= ~XFS_BSTATE_DISPOSE;
+                       atomic_inc(&bp->b_hold);
+               }
+               spin_unlock(&pag->pag_buf_lock);
+       } else {
+               /*
+                * most of the time buffers will already be removed from the
+                * LRU, so optimise that case by checking for the
+                * XFS_BSTATE_DISPOSE flag indicating the last list the buffer
+                * was on was the disposal list
+                */
+               if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
+                       list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
+               } else {
+                       ASSERT(list_empty(&bp->b_lru));
                }
+
+               ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
+               rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
+               spin_unlock(&pag->pag_buf_lock);
+               xfs_perag_put(pag);
+               freebuf = true;
        }
+
+out_unlock:
+       spin_unlock(&bp->b_lock);
+
+       if (freebuf)
+               xfs_buf_free(bp);
 }
 
 
@@ -944,10 +1016,12 @@ xfs_buf_trylock(
        int                     locked;
 
        locked = down_trylock(&bp->b_sema) == 0;
-       if (locked)
+       if (locked) {
                XB_SET_OWNER(bp);
-
-       trace_xfs_buf_trylock(bp, _RET_IP_);
+               trace_xfs_buf_trylock(bp, _RET_IP_);
+       } else {
+               trace_xfs_buf_trylock_fail(bp, _RET_IP_);
+       }
        return locked;
 }
 
@@ -1127,7 +1201,8 @@ xfs_buf_ioapply_map(
        int             map,
        int             *buf_offset,
        int             *count,
-       int             rw)
+       int             op,
+       int             op_flags)
 {
        int             page_index;
        int             total_nr_pages = bp->b_page_count;
@@ -1157,16 +1232,14 @@ xfs_buf_ioapply_map(
 
 next_chunk:
        atomic_inc(&bp->b_io_remaining);
-       nr_pages = BIO_MAX_SECTORS >> (PAGE_SHIFT - BBSHIFT);
-       if (nr_pages > total_nr_pages)
-               nr_pages = total_nr_pages;
+       nr_pages = min(total_nr_pages, BIO_MAX_PAGES);
 
        bio = bio_alloc(GFP_NOIO, nr_pages);
        bio->bi_bdev = bp->b_target->bt_bdev;
        bio->bi_iter.bi_sector = sector;
        bio->bi_end_io = xfs_buf_bio_end_io;
        bio->bi_private = bp;
-
+       bio_set_op_attrs(bio, op, op_flags);
 
        for (; size && nr_pages; nr_pages--, page_index++) {
                int     rbytes, nbytes = PAGE_SIZE - offset;
@@ -1190,7 +1263,7 @@ next_chunk:
                        flush_kernel_vmap_range(bp->b_addr,
                                                xfs_buf_vmap_len(bp));
                }
-               submit_bio(rw, bio);
+               submit_bio(bio);
                if (size)
                        goto next_chunk;
        } else {
@@ -1210,7 +1283,8 @@ _xfs_buf_ioapply(
        struct xfs_buf  *bp)
 {
        struct blk_plug plug;
-       int             rw;
+       int             op;
+       int             op_flags = 0;
        int             offset;
        int             size;
        int             i;
@@ -1229,14 +1303,13 @@ _xfs_buf_ioapply(
                bp->b_ioend_wq = bp->b_target->bt_mount->m_buf_workqueue;
 
        if (bp->b_flags & XBF_WRITE) {
+               op = REQ_OP_WRITE;
                if (bp->b_flags & XBF_SYNCIO)
-                       rw = WRITE_SYNC;
-               else
-                       rw = WRITE;
+                       op_flags = WRITE_SYNC;
                if (bp->b_flags & XBF_FUA)
-                       rw |= REQ_FUA;
+                       op_flags |= REQ_FUA;
                if (bp->b_flags & XBF_FLUSH)
-                       rw |= REQ_FLUSH;
+                       op_flags |= REQ_PREFLUSH;
 
                /*
                 * Run the write verifier callback function if it exists. If
@@ -1266,13 +1339,14 @@ _xfs_buf_ioapply(
                        }
                }
        } else if (bp->b_flags & XBF_READ_AHEAD) {
-               rw = READA;
+               op = REQ_OP_READ;
+               op_flags = REQ_RAHEAD;
        } else {
-               rw = READ;
+               op = REQ_OP_READ;
        }
 
        /* we only use the buffer cache for meta-data */
-       rw |= REQ_META;
+       op_flags |= REQ_META;
 
        /*
         * Walk all the vectors issuing IO on them. Set up the initial offset
@@ -1284,7 +1358,7 @@ _xfs_buf_ioapply(
        size = BBTOB(bp->b_io_length);
        blk_start_plug(&plug);
        for (i = 0; i < bp->b_map_count; i++) {
-               xfs_buf_ioapply_map(bp, i, &offset, &size, rw);
+               xfs_buf_ioapply_map(bp, i, &offset, &size, op, op_flags);
                if (bp->b_error)
                        break;
                if (size <= 0)
@@ -1339,6 +1413,7 @@ xfs_buf_submit(
         * xfs_buf_ioend too early.
         */
        atomic_set(&bp->b_io_remaining, 1);
+       xfs_buf_ioacct_inc(bp);
        _xfs_buf_ioapply(bp);
 
        /*
@@ -1524,13 +1599,19 @@ xfs_wait_buftarg(
        int loop = 0;
 
        /*
-        * We need to flush the buffer workqueue to ensure that all IO
-        * completion processing is 100% done. Just waiting on buffer locks is
-        * not sufficient for async IO as the reference count held over IO is
-        * not released until after the buffer lock is dropped. Hence we need to
-        * ensure here that all reference counts have been dropped before we
-        * start walking the LRU list.
+        * First wait on the buftarg I/O count for all in-flight buffers to be
+        * released. This is critical as new buffers do not make the LRU until
+        * they are released.
+        *
+        * Next, flush the buffer workqueue to ensure all completion processing
+        * has finished. Just waiting on buffer locks is not sufficient for
+        * async IO as the reference count held over IO is not released until
+        * after the buffer lock is dropped. Hence we need to ensure here that
+        * all reference counts have been dropped before we start walking the
+        * LRU list.
         */
+       while (percpu_counter_sum(&btp->bt_io_count))
+               delay(100);
        drain_workqueue(btp->bt_mount->m_buf_workqueue);
 
        /* loop until there is nothing left on the lru list. */
@@ -1627,6 +1708,8 @@ xfs_free_buftarg(
        struct xfs_buftarg      *btp)
 {
        unregister_shrinker(&btp->bt_shrinker);
+       ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
+       percpu_counter_destroy(&btp->bt_io_count);
        list_lru_destroy(&btp->bt_lru);
 
        if (mp->m_flags & XFS_MOUNT_BARRIER)
@@ -1691,6 +1774,9 @@ xfs_alloc_buftarg(
        if (list_lru_init(&btp->bt_lru))
                goto error;
 
+       if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
+               goto error;
+
        btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count;
        btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan;
        btp->bt_shrinker.seeks = DEFAULT_SEEKS;
@@ -1832,7 +1918,7 @@ xfs_buf_delwri_submit_buffers(
                 * side. We need to move the buffer onto the io_list
                 * at this point so the caller can still access it.
                 */
-               bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_ASYNC | XBF_WRITE_FAIL);
+               bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_WRITE_FAIL);
                bp->b_flags |= XBF_WRITE | XBF_ASYNC;
                if (wait_list) {
                        xfs_buf_hold(bp);