Merge tag 'xfs-for-linus-4.8-rc1' of git://git.kernel.org/pub/scm/linux/kernel/git...
[sfrench/cifs-2.6.git] / fs / xfs / xfs_buf.c
index a87a0d5477bdf89f18572b265bb62e4d982ead36..47a318ce82e0ab5828164e6604ffe9dd5f66a0d6 100644 (file)
@@ -79,6 +79,47 @@ xfs_buf_vmap_len(
        return (bp->b_page_count * PAGE_SIZE) - bp->b_offset;
 }
 
+/*
+ * Bump the I/O in flight count on the buftarg if we haven't yet done so for
+ * this buffer. The count is incremented once per buffer (per hold cycle)
+ * because the corresponding decrement is deferred to buffer release. Buffers
+ * can undergo I/O multiple times in a hold-release cycle and per buffer I/O
+ * tracking adds unnecessary overhead. This is used for sychronization purposes
+ * with unmount (see xfs_wait_buftarg()), so all we really need is a count of
+ * in-flight buffers.
+ *
+ * Buffers that are never released (e.g., superblock, iclog buffers) must set
+ * the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count
+ * never reaches zero and unmount hangs indefinitely.
+ */
+static inline void
+xfs_buf_ioacct_inc(
+       struct xfs_buf  *bp)
+{
+       if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT))
+               return;
+
+       ASSERT(bp->b_flags & XBF_ASYNC);
+       bp->b_flags |= _XBF_IN_FLIGHT;
+       percpu_counter_inc(&bp->b_target->bt_io_count);
+}
+
+/*
+ * Clear the in-flight state on a buffer about to be released to the LRU or
+ * freed and unaccount from the buftarg.
+ */
+static inline void
+xfs_buf_ioacct_dec(
+       struct xfs_buf  *bp)
+{
+       if (!(bp->b_flags & _XBF_IN_FLIGHT))
+               return;
+
+       ASSERT(bp->b_flags & XBF_ASYNC);
+       bp->b_flags &= ~_XBF_IN_FLIGHT;
+       percpu_counter_dec(&bp->b_target->bt_io_count);
+}
+
 /*
  * When we mark a buffer stale, we remove the buffer from the LRU and clear the
  * b_lru_ref count so that the buffer is freed immediately when the buffer
@@ -102,6 +143,14 @@ xfs_buf_stale(
         */
        bp->b_flags &= ~_XBF_DELWRI_Q;
 
+       /*
+        * Once the buffer is marked stale and unlocked, a subsequent lookup
+        * could reset b_flags. There is no guarantee that the buffer is
+        * unaccounted (released to LRU) before that occurs. Drop in-flight
+        * status now to preserve accounting consistency.
+        */
+       xfs_buf_ioacct_dec(bp);
+
        spin_lock(&bp->b_lock);
        atomic_set(&bp->b_lru_ref, 0);
        if (!(bp->b_state & XFS_BSTATE_DISPOSE) &&
@@ -815,7 +864,8 @@ xfs_buf_get_uncached(
        struct xfs_buf          *bp;
        DEFINE_SINGLE_BUF_MAP(map, XFS_BUF_DADDR_NULL, numblks);
 
-       bp = _xfs_buf_alloc(target, &map, 1, 0);
+       /* flags might contain irrelevant bits, pass only what we care about */
+       bp = _xfs_buf_alloc(target, &map, 1, flags & XBF_NO_IOACCT);
        if (unlikely(bp == NULL))
                goto fail;
 
@@ -866,63 +916,85 @@ xfs_buf_hold(
 }
 
 /*
- *     Releases a hold on the specified buffer.  If the
- *     the hold count is 1, calls xfs_buf_free.
+ * Release a hold on the specified buffer. If the hold count is 1, the buffer is
+ * placed on LRU or freed (depending on b_lru_ref).
  */
 void
 xfs_buf_rele(
        xfs_buf_t               *bp)
 {
        struct xfs_perag        *pag = bp->b_pag;
+       bool                    release;
+       bool                    freebuf = false;
 
        trace_xfs_buf_rele(bp, _RET_IP_);
 
        if (!pag) {
                ASSERT(list_empty(&bp->b_lru));
                ASSERT(RB_EMPTY_NODE(&bp->b_rbnode));
-               if (atomic_dec_and_test(&bp->b_hold))
+               if (atomic_dec_and_test(&bp->b_hold)) {
+                       xfs_buf_ioacct_dec(bp);
                        xfs_buf_free(bp);
+               }
                return;
        }
 
        ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode));
 
        ASSERT(atomic_read(&bp->b_hold) > 0);
-       if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) {
-               spin_lock(&bp->b_lock);
-               if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
-                       /*
-                        * If the buffer is added to the LRU take a new
-                        * reference to the buffer for the LRU and clear the
-                        * (now stale) dispose list state flag
-                        */
-                       if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
-                               bp->b_state &= ~XFS_BSTATE_DISPOSE;
-                               atomic_inc(&bp->b_hold);
-                       }
-                       spin_unlock(&bp->b_lock);
-                       spin_unlock(&pag->pag_buf_lock);
-               } else {
-                       /*
-                        * most of the time buffers will already be removed from
-                        * the LRU, so optimise that case by checking for the
-                        * XFS_BSTATE_DISPOSE flag indicating the last list the
-                        * buffer was on was the disposal list
-                        */
-                       if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
-                               list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
-                       } else {
-                               ASSERT(list_empty(&bp->b_lru));
-                       }
-                       spin_unlock(&bp->b_lock);
 
-                       ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
-                       rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
-                       spin_unlock(&pag->pag_buf_lock);
-                       xfs_perag_put(pag);
-                       xfs_buf_free(bp);
+       release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock);
+       spin_lock(&bp->b_lock);
+       if (!release) {
+               /*
+                * Drop the in-flight state if the buffer is already on the LRU
+                * and it holds the only reference. This is racy because we
+                * haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT
+                * ensures the decrement occurs only once per-buf.
+                */
+               if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru))
+                       xfs_buf_ioacct_dec(bp);
+               goto out_unlock;
+       }
+
+       /* the last reference has been dropped ... */
+       xfs_buf_ioacct_dec(bp);
+       if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
+               /*
+                * If the buffer is added to the LRU take a new reference to the
+                * buffer for the LRU and clear the (now stale) dispose list
+                * state flag
+                */
+               if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
+                       bp->b_state &= ~XFS_BSTATE_DISPOSE;
+                       atomic_inc(&bp->b_hold);
+               }
+               spin_unlock(&pag->pag_buf_lock);
+       } else {
+               /*
+                * most of the time buffers will already be removed from the
+                * LRU, so optimise that case by checking for the
+                * XFS_BSTATE_DISPOSE flag indicating the last list the buffer
+                * was on was the disposal list
+                */
+               if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
+                       list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
+               } else {
+                       ASSERT(list_empty(&bp->b_lru));
                }
+
+               ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
+               rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
+               spin_unlock(&pag->pag_buf_lock);
+               xfs_perag_put(pag);
+               freebuf = true;
        }
+
+out_unlock:
+       spin_unlock(&bp->b_lock);
+
+       if (freebuf)
+               xfs_buf_free(bp);
 }
 
 
@@ -944,10 +1016,12 @@ xfs_buf_trylock(
        int                     locked;
 
        locked = down_trylock(&bp->b_sema) == 0;
-       if (locked)
+       if (locked) {
                XB_SET_OWNER(bp);
-
-       trace_xfs_buf_trylock(bp, _RET_IP_);
+               trace_xfs_buf_trylock(bp, _RET_IP_);
+       } else {
+               trace_xfs_buf_trylock_fail(bp, _RET_IP_);
+       }
        return locked;
 }
 
@@ -1339,6 +1413,7 @@ xfs_buf_submit(
         * xfs_buf_ioend too early.
         */
        atomic_set(&bp->b_io_remaining, 1);
+       xfs_buf_ioacct_inc(bp);
        _xfs_buf_ioapply(bp);
 
        /*
@@ -1524,13 +1599,19 @@ xfs_wait_buftarg(
        int loop = 0;
 
        /*
-        * We need to flush the buffer workqueue to ensure that all IO
-        * completion processing is 100% done. Just waiting on buffer locks is
-        * not sufficient for async IO as the reference count held over IO is
-        * not released until after the buffer lock is dropped. Hence we need to
-        * ensure here that all reference counts have been dropped before we
-        * start walking the LRU list.
+        * First wait on the buftarg I/O count for all in-flight buffers to be
+        * released. This is critical as new buffers do not make the LRU until
+        * they are released.
+        *
+        * Next, flush the buffer workqueue to ensure all completion processing
+        * has finished. Just waiting on buffer locks is not sufficient for
+        * async IO as the reference count held over IO is not released until
+        * after the buffer lock is dropped. Hence we need to ensure here that
+        * all reference counts have been dropped before we start walking the
+        * LRU list.
         */
+       while (percpu_counter_sum(&btp->bt_io_count))
+               delay(100);
        drain_workqueue(btp->bt_mount->m_buf_workqueue);
 
        /* loop until there is nothing left on the lru list. */
@@ -1627,6 +1708,8 @@ xfs_free_buftarg(
        struct xfs_buftarg      *btp)
 {
        unregister_shrinker(&btp->bt_shrinker);
+       ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
+       percpu_counter_destroy(&btp->bt_io_count);
        list_lru_destroy(&btp->bt_lru);
 
        if (mp->m_flags & XFS_MOUNT_BARRIER)
@@ -1691,6 +1774,9 @@ xfs_alloc_buftarg(
        if (list_lru_init(&btp->bt_lru))
                goto error;
 
+       if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
+               goto error;
+
        btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count;
        btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan;
        btp->bt_shrinker.seeks = DEFAULT_SEEKS;
@@ -1774,18 +1860,33 @@ xfs_buf_cmp(
        return 0;
 }
 
+/*
+ * submit buffers for write.
+ *
+ * When we have a large buffer list, we do not want to hold all the buffers
+ * locked while we block on the request queue waiting for IO dispatch. To avoid
+ * this problem, we lock and submit buffers in groups of 50, thereby minimising
+ * the lock hold times for lists which may contain thousands of objects.
+ *
+ * To do this, we sort the buffer list before we walk the list to lock and
+ * submit buffers, and we plug and unplug around each group of buffers we
+ * submit.
+ */
 static int
-__xfs_buf_delwri_submit(
+xfs_buf_delwri_submit_buffers(
        struct list_head        *buffer_list,
-       struct list_head        *io_list,
-       bool                    wait)
+       struct list_head        *wait_list)
 {
-       struct blk_plug         plug;
        struct xfs_buf          *bp, *n;
+       LIST_HEAD               (submit_list);
        int                     pinned = 0;
+       struct blk_plug         plug;
 
+       list_sort(NULL, buffer_list, xfs_buf_cmp);
+
+       blk_start_plug(&plug);
        list_for_each_entry_safe(bp, n, buffer_list, b_list) {
-               if (!wait) {
+               if (!wait_list) {
                        if (xfs_buf_ispinned(bp)) {
                                pinned++;
                                continue;
@@ -1808,25 +1909,21 @@ __xfs_buf_delwri_submit(
                        continue;
                }
 
-               list_move_tail(&bp->b_list, io_list);
                trace_xfs_buf_delwri_split(bp, _RET_IP_);
-       }
-
-       list_sort(NULL, io_list, xfs_buf_cmp);
-
-       blk_start_plug(&plug);
-       list_for_each_entry_safe(bp, n, io_list, b_list) {
-               bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_ASYNC | XBF_WRITE_FAIL);
-               bp->b_flags |= XBF_WRITE | XBF_ASYNC;
 
                /*
-                * we do all Io submission async. This means if we need to wait
-                * for IO completion we need to take an extra reference so the
-                * buffer is still valid on the other side.
+                * We do all IO submission async. This means if we need
+                * to wait for IO completion we need to take an extra
+                * reference so the buffer is still valid on the other
+                * side. We need to move the buffer onto the io_list
+                * at this point so the caller can still access it.
                 */
-               if (wait)
+               bp->b_flags &= ~(_XBF_DELWRI_Q | XBF_WRITE_FAIL);
+               bp->b_flags |= XBF_WRITE | XBF_ASYNC;
+               if (wait_list) {
                        xfs_buf_hold(bp);
-               else
+                       list_move_tail(&bp->b_list, wait_list);
+               } else
                        list_del_init(&bp->b_list);
 
                xfs_buf_submit(bp);
@@ -1849,8 +1946,7 @@ int
 xfs_buf_delwri_submit_nowait(
        struct list_head        *buffer_list)
 {
-       LIST_HEAD               (io_list);
-       return __xfs_buf_delwri_submit(buffer_list, &io_list, false);
+       return xfs_buf_delwri_submit_buffers(buffer_list, NULL);
 }
 
 /*
@@ -1865,15 +1961,15 @@ int
 xfs_buf_delwri_submit(
        struct list_head        *buffer_list)
 {
-       LIST_HEAD               (io_list);
+       LIST_HEAD               (wait_list);
        int                     error = 0, error2;
        struct xfs_buf          *bp;
 
-       __xfs_buf_delwri_submit(buffer_list, &io_list, true);
+       xfs_buf_delwri_submit_buffers(buffer_list, &wait_list);
 
        /* Wait for IO to complete. */
-       while (!list_empty(&io_list)) {
-               bp = list_first_entry(&io_list, struct xfs_buf, b_list);
+       while (!list_empty(&wait_list)) {
+               bp = list_first_entry(&wait_list, struct xfs_buf, b_list);
 
                list_del_init(&bp->b_list);