Merge branch 'for-3.17/drivers' of git://git.kernel.dk/linux-block
authorLinus Torvalds <torvalds@linux-foundation.org>
Thu, 14 Aug 2014 15:10:21 +0000 (09:10 -0600)
committerLinus Torvalds <torvalds@linux-foundation.org>
Thu, 14 Aug 2014 15:10:21 +0000 (09:10 -0600)
Pull block driver changes from Jens Axboe:
 "Nothing out of the ordinary here, this pull request contains:

   - A big round of fixes for bcache from Kent Overstreet, Slava Pestov,
     and Surbhi Palande.  No new features, just a lot of fixes.

   - The usual round of drbd updates from Andreas Gruenbacher, Lars
     Ellenberg, and Philipp Reisner.

   - virtio_blk was converted to blk-mq back in 3.13, but now Ming Lei
     has taken it one step further and added support for actually using
     more than one queue.

   - Addition of an explicit SG_FLAG_Q_AT_HEAD for block/bsg, to
     compliment the the default behavior of adding to the tail of the
     queue.  From Douglas Gilbert"

* 'for-3.17/drivers' of git://git.kernel.dk/linux-block: (86 commits)
  bcache: Drop unneeded blk_sync_queue() calls
  bcache: add mutex lock for bch_is_open
  bcache: Correct printing of btree_gc_max_duration_ms
  bcache: try to set b->parent properly
  bcache: fix memory corruption in init error path
  bcache: fix crash with incomplete cache set
  bcache: Fix more early shutdown bugs
  bcache: fix use-after-free in btree_gc_coalesce()
  bcache: Fix an infinite loop in journal replay
  bcache: fix crash in bcache_btree_node_alloc_fail tracepoint
  bcache: bcache_write tracepoint was crashing
  bcache: fix typo in bch_bkey_equal_header
  bcache: Allocate bounce buffers with GFP_NOWAIT
  bcache: Make sure to pass GFP_WAIT to mempool_alloc()
  bcache: fix uninterruptible sleep in writeback thread
  bcache: wait for buckets when allocating new btree root
  bcache: fix crash on shutdown in passthrough mode
  bcache: fix lockdep warnings on shutdown
  bcache allocator: send discards with correct size
  bcache: Fix to remove the rcu_sched stalls.
  ...

39 files changed:
block/scsi_ioctl.c
drivers/block/drbd/Makefile
drivers/block/drbd/drbd_actlog.c
drivers/block/drbd/drbd_bitmap.c
drivers/block/drbd/drbd_debugfs.c [new file with mode: 0644]
drivers/block/drbd/drbd_debugfs.h [new file with mode: 0644]
drivers/block/drbd/drbd_int.h
drivers/block/drbd/drbd_interval.h
drivers/block/drbd/drbd_main.c
drivers/block/drbd/drbd_nl.c
drivers/block/drbd/drbd_proc.c
drivers/block/drbd/drbd_receiver.c
drivers/block/drbd/drbd_req.c
drivers/block/drbd/drbd_req.h
drivers/block/drbd/drbd_state.c
drivers/block/drbd/drbd_worker.c
drivers/block/virtio_blk.c
drivers/md/bcache/alloc.c
drivers/md/bcache/bcache.h
drivers/md/bcache/bset.c
drivers/md/bcache/bset.h
drivers/md/bcache/btree.c
drivers/md/bcache/btree.h
drivers/md/bcache/extents.c
drivers/md/bcache/extents.h
drivers/md/bcache/journal.c
drivers/md/bcache/request.c
drivers/md/bcache/super.c
drivers/md/bcache/util.h
drivers/md/bcache/writeback.c
drivers/md/bcache/writeback.h
include/linux/drbd.h
include/linux/drbd_genl.h
include/linux/drbd_limits.h
include/scsi/sg.h
include/trace/events/bcache.h
include/uapi/linux/bsg.h
include/uapi/linux/virtio_blk.h
lib/lru_cache.c

index bda1497add4c7d1e54a4017d4f16b966574f8f1e..51bf5155ee756a4ac479e9c49fcf88824b0aeedc 100644 (file)
@@ -290,6 +290,7 @@ static int sg_io(struct request_queue *q, struct gendisk *bd_disk,
        unsigned long start_time;
        ssize_t ret = 0;
        int writing = 0;
+       int at_head = 0;
        struct request *rq;
        char sense[SCSI_SENSE_BUFFERSIZE];
        struct bio *bio;
@@ -313,6 +314,8 @@ static int sg_io(struct request_queue *q, struct gendisk *bd_disk,
                case SG_DXFER_FROM_DEV:
                        break;
                }
+       if (hdr->flags & SG_FLAG_Q_AT_HEAD)
+               at_head = 1;
 
        rq = blk_get_request(q, writing ? WRITE : READ, GFP_KERNEL);
        if (!rq)
@@ -369,7 +372,7 @@ static int sg_io(struct request_queue *q, struct gendisk *bd_disk,
         * (if he doesn't check that is his problem).
         * N.B. a non-zero SCSI status is _not_ necessarily an error.
         */
-       blk_execute_rq(q, bd_disk, rq, 0);
+       blk_execute_rq(q, bd_disk, rq, at_head);
 
        hdr->duration = jiffies_to_msecs(jiffies - start_time);
 
index 8b450338075eca905b91c04e16ce3567e54f73d9..4464e353c1e81fd2faf3fe823bf244807fa0afd3 100644 (file)
@@ -3,5 +3,6 @@ drbd-y += drbd_worker.o drbd_receiver.o drbd_req.o drbd_actlog.o
 drbd-y += drbd_main.o drbd_strings.o drbd_nl.o
 drbd-y += drbd_interval.o drbd_state.o
 drbd-y += drbd_nla.o
+drbd-$(CONFIG_DEBUG_FS) += drbd_debugfs.o
 
 obj-$(CONFIG_BLK_DEV_DRBD)     += drbd.o
index 05a1780ffa850483cdf4d73a89b3a9a78964ed4c..d26a3fa6368849ce95d59a7847c8853c11861026 100644 (file)
@@ -92,34 +92,26 @@ struct __packed al_transaction_on_disk {
        __be32  context[AL_CONTEXT_PER_TRANSACTION];
 };
 
-struct update_odbm_work {
-       struct drbd_work w;
-       struct drbd_device *device;
-       unsigned int enr;
-};
-
-struct update_al_work {
-       struct drbd_work w;
-       struct drbd_device *device;
-       struct completion event;
-       int err;
-};
-
-
-void *drbd_md_get_buffer(struct drbd_device *device)
+void *drbd_md_get_buffer(struct drbd_device *device, const char *intent)
 {
        int r;
 
        wait_event(device->misc_wait,
-                  (r = atomic_cmpxchg(&device->md_io_in_use, 0, 1)) == 0 ||
+                  (r = atomic_cmpxchg(&device->md_io.in_use, 0, 1)) == 0 ||
                   device->state.disk <= D_FAILED);
 
-       return r ? NULL : page_address(device->md_io_page);
+       if (r)
+               return NULL;
+
+       device->md_io.current_use = intent;
+       device->md_io.start_jif = jiffies;
+       device->md_io.submit_jif = device->md_io.start_jif - 1;
+       return page_address(device->md_io.page);
 }
 
 void drbd_md_put_buffer(struct drbd_device *device)
 {
-       if (atomic_dec_and_test(&device->md_io_in_use))
+       if (atomic_dec_and_test(&device->md_io.in_use))
                wake_up(&device->misc_wait);
 }
 
@@ -145,10 +137,11 @@ void wait_until_done_or_force_detached(struct drbd_device *device, struct drbd_b
 
 static int _drbd_md_sync_page_io(struct drbd_device *device,
                                 struct drbd_backing_dev *bdev,
-                                struct page *page, sector_t sector,
-                                int rw, int size)
+                                sector_t sector, int rw)
 {
        struct bio *bio;
+       /* we do all our meta data IO in aligned 4k blocks. */
+       const int size = 4096;
        int err;
 
        device->md_io.done = 0;
@@ -156,15 +149,15 @@ static int _drbd_md_sync_page_io(struct drbd_device *device,
 
        if ((rw & WRITE) && !test_bit(MD_NO_FUA, &device->flags))
                rw |= REQ_FUA | REQ_FLUSH;
-       rw |= REQ_SYNC;
+       rw |= REQ_SYNC | REQ_NOIDLE;
 
        bio = bio_alloc_drbd(GFP_NOIO);
        bio->bi_bdev = bdev->md_bdev;
        bio->bi_iter.bi_sector = sector;
        err = -EIO;
-       if (bio_add_page(bio, page, size, 0) != size)
+       if (bio_add_page(bio, device->md_io.page, size, 0) != size)
                goto out;
-       bio->bi_private = &device->md_io;
+       bio->bi_private = device;
        bio->bi_end_io = drbd_md_io_complete;
        bio->bi_rw = rw;
 
@@ -179,7 +172,8 @@ static int _drbd_md_sync_page_io(struct drbd_device *device,
        }
 
        bio_get(bio); /* one bio_put() is in the completion handler */
-       atomic_inc(&device->md_io_in_use); /* drbd_md_put_buffer() is in the completion handler */
+       atomic_inc(&device->md_io.in_use); /* drbd_md_put_buffer() is in the completion handler */
+       device->md_io.submit_jif = jiffies;
        if (drbd_insert_fault(device, (rw & WRITE) ? DRBD_FAULT_MD_WR : DRBD_FAULT_MD_RD))
                bio_endio(bio, -EIO);
        else
@@ -197,9 +191,7 @@ int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bd
                         sector_t sector, int rw)
 {
        int err;
-       struct page *iop = device->md_io_page;
-
-       D_ASSERT(device, atomic_read(&device->md_io_in_use) == 1);
+       D_ASSERT(device, atomic_read(&device->md_io.in_use) == 1);
 
        BUG_ON(!bdev->md_bdev);
 
@@ -214,8 +206,7 @@ int drbd_md_sync_page_io(struct drbd_device *device, struct drbd_backing_dev *bd
                     current->comm, current->pid, __func__,
                     (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ");
 
-       /* we do all our meta data IO in aligned 4k blocks. */
-       err = _drbd_md_sync_page_io(device, bdev, iop, sector, rw, 4096);
+       err = _drbd_md_sync_page_io(device, bdev, sector, rw);
        if (err) {
                drbd_err(device, "drbd_md_sync_page_io(,%llus,%s) failed with error %d\n",
                    (unsigned long long)sector, (rw & WRITE) ? "WRITE" : "READ", err);
@@ -297,26 +288,12 @@ bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *
        return need_transaction;
 }
 
-static int al_write_transaction(struct drbd_device *device, bool delegate);
-
-/* When called through generic_make_request(), we must delegate
- * activity log I/O to the worker thread: a further request
- * submitted via generic_make_request() within the same task
- * would be queued on current->bio_list, and would only start
- * after this function returns (see generic_make_request()).
- *
- * However, if we *are* the worker, we must not delegate to ourselves.
- */
+static int al_write_transaction(struct drbd_device *device);
 
-/*
- * @delegate:   delegate activity log I/O to the worker thread
- */
-void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
+void drbd_al_begin_io_commit(struct drbd_device *device)
 {
        bool locked = false;
 
-       BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task);
-
        /* Serialize multiple transactions.
         * This uses test_and_set_bit, memory barrier is implicit.
         */
@@ -335,7 +312,7 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
                        rcu_read_unlock();
 
                        if (write_al_updates)
-                               al_write_transaction(device, delegate);
+                               al_write_transaction(device);
                        spin_lock_irq(&device->al_lock);
                        /* FIXME
                        if (err)
@@ -352,12 +329,10 @@ void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate)
 /*
  * @delegate:   delegate activity log I/O to the worker thread
  */
-void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate)
+void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i)
 {
-       BUG_ON(delegate && current == first_peer_device(device)->connection->worker.task);
-
        if (drbd_al_begin_io_prepare(device, i))
-               drbd_al_begin_io_commit(device, delegate);
+               drbd_al_begin_io_commit(device);
 }
 
 int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i)
@@ -380,8 +355,19 @@ int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *
        /* We want all necessary updates for a given request within the same transaction
         * We could first check how many updates are *actually* needed,
         * and use that instead of the worst-case nr_al_extents */
-       if (available_update_slots < nr_al_extents)
-               return -EWOULDBLOCK;
+       if (available_update_slots < nr_al_extents) {
+               /* Too many activity log extents are currently "hot".
+                *
+                * If we have accumulated pending changes already,
+                * we made progress.
+                *
+                * If we cannot get even a single pending change through,
+                * stop the fast path until we made some progress,
+                * or requests to "cold" extents could be starved. */
+               if (!al->pending_changes)
+                       __set_bit(__LC_STARVING, &device->act_log->flags);
+               return -ENOBUFS;
+       }
 
        /* Is resync active in this area? */
        for (enr = first; enr <= last; enr++) {
@@ -452,15 +438,6 @@ static unsigned int al_extent_to_bm_page(unsigned int al_enr)
                 (AL_EXTENT_SHIFT - BM_BLOCK_SHIFT));
 }
 
-static unsigned int rs_extent_to_bm_page(unsigned int rs_enr)
-{
-       return rs_enr >>
-               /* bit to page */
-               ((PAGE_SHIFT + 3) -
-               /* resync extent number to bit */
-                (BM_EXT_SHIFT - BM_BLOCK_SHIFT));
-}
-
 static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device)
 {
        const unsigned int stripes = device->ldev->md.al_stripes;
@@ -479,8 +456,7 @@ static sector_t al_tr_number_to_on_disk_sector(struct drbd_device *device)
        return device->ldev->md.md_offset + device->ldev->md.al_offset + t;
 }
 
-static int
-_al_write_transaction(struct drbd_device *device)
+int al_write_transaction(struct drbd_device *device)
 {
        struct al_transaction_on_disk *buffer;
        struct lc_element *e;
@@ -505,7 +481,8 @@ _al_write_transaction(struct drbd_device *device)
                return -EIO;
        }
 
-       buffer = drbd_md_get_buffer(device); /* protects md_io_buffer, al_tr_cycle, ... */
+       /* protects md_io_buffer, al_tr_cycle, ... */
+       buffer = drbd_md_get_buffer(device, __func__);
        if (!buffer) {
                drbd_err(device, "disk failed while waiting for md_io buffer\n");
                put_ldev(device);
@@ -590,38 +567,6 @@ _al_write_transaction(struct drbd_device *device)
        return err;
 }
 
-
-static int w_al_write_transaction(struct drbd_work *w, int unused)
-{
-       struct update_al_work *aw = container_of(w, struct update_al_work, w);
-       struct drbd_device *device = aw->device;
-       int err;
-
-       err = _al_write_transaction(device);
-       aw->err = err;
-       complete(&aw->event);
-
-       return err != -EIO ? err : 0;
-}
-
-/* Calls from worker context (see w_restart_disk_io()) need to write the
-   transaction directly. Others came through generic_make_request(),
-   those need to delegate it to the worker. */
-static int al_write_transaction(struct drbd_device *device, bool delegate)
-{
-       if (delegate) {
-               struct update_al_work al_work;
-               init_completion(&al_work.event);
-               al_work.w.cb = w_al_write_transaction;
-               al_work.device = device;
-               drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
-                                     &al_work.w);
-               wait_for_completion(&al_work.event);
-               return al_work.err;
-       } else
-               return _al_write_transaction(device);
-}
-
 static int _try_lc_del(struct drbd_device *device, struct lc_element *al_ext)
 {
        int rv;
@@ -682,72 +627,56 @@ int drbd_initialize_al(struct drbd_device *device, void *buffer)
        return 0;
 }
 
-static int w_update_odbm(struct drbd_work *w, int unused)
-{
-       struct update_odbm_work *udw = container_of(w, struct update_odbm_work, w);
-       struct drbd_device *device = udw->device;
-       struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
-
-       if (!get_ldev(device)) {
-               if (__ratelimit(&drbd_ratelimit_state))
-                       drbd_warn(device, "Can not update on disk bitmap, local IO disabled.\n");
-               kfree(udw);
-               return 0;
-       }
-
-       drbd_bm_write_page(device, rs_extent_to_bm_page(udw->enr));
-       put_ldev(device);
-
-       kfree(udw);
-
-       if (drbd_bm_total_weight(device) <= device->rs_failed) {
-               switch (device->state.conn) {
-               case C_SYNC_SOURCE:  case C_SYNC_TARGET:
-               case C_PAUSED_SYNC_S: case C_PAUSED_SYNC_T:
-                       drbd_resync_finished(device);
-               default:
-                       /* nothing to do */
-                       break;
-               }
-       }
-       drbd_bcast_event(device, &sib);
-
-       return 0;
-}
-
+static const char *drbd_change_sync_fname[] = {
+       [RECORD_RS_FAILED] = "drbd_rs_failed_io",
+       [SET_IN_SYNC] = "drbd_set_in_sync",
+       [SET_OUT_OF_SYNC] = "drbd_set_out_of_sync"
+};
 
 /* ATTENTION. The AL's extents are 4MB each, while the extents in the
  * resync LRU-cache are 16MB each.
  * The caller of this function has to hold an get_ldev() reference.
  *
+ * Adjusts the caching members ->rs_left (success) or ->rs_failed (!success),
+ * potentially pulling in (and recounting the corresponding bits)
+ * this resync extent into the resync extent lru cache.
+ *
+ * Returns whether all bits have been cleared for this resync extent,
+ * precisely: (rs_left <= rs_failed)
+ *
  * TODO will be obsoleted once we have a caching lru of the on disk bitmap
  */
-static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t sector,
-                                     int count, int success)
+static bool update_rs_extent(struct drbd_device *device,
+               unsigned int enr, int count,
+               enum update_sync_bits_mode mode)
 {
        struct lc_element *e;
-       struct update_odbm_work *udw;
-
-       unsigned int enr;
 
        D_ASSERT(device, atomic_read(&device->local_cnt));
 
-       /* I simply assume that a sector/size pair never crosses
-        * a 16 MB extent border. (Currently this is true...) */
-       enr = BM_SECT_TO_EXT(sector);
-
-       e = lc_get(device->resync, enr);
+       /* When setting out-of-sync bits,
+        * we don't need it cached (lc_find).
+        * But if it is present in the cache,
+        * we should update the cached bit count.
+        * Otherwise, that extent should be in the resync extent lru cache
+        * already -- or we want to pull it in if necessary -- (lc_get),
+        * then update and check rs_left and rs_failed. */
+       if (mode == SET_OUT_OF_SYNC)
+               e = lc_find(device->resync, enr);
+       else
+               e = lc_get(device->resync, enr);
        if (e) {
                struct bm_extent *ext = lc_entry(e, struct bm_extent, lce);
                if (ext->lce.lc_number == enr) {
-                       if (success)
+                       if (mode == SET_IN_SYNC)
                                ext->rs_left -= count;
+                       else if (mode == SET_OUT_OF_SYNC)
+                               ext->rs_left += count;
                        else
                                ext->rs_failed += count;
                        if (ext->rs_left < ext->rs_failed) {
-                               drbd_warn(device, "BAD! sector=%llus enr=%u rs_left=%d "
+                               drbd_warn(device, "BAD! enr=%u rs_left=%d "
                                    "rs_failed=%d count=%d cstate=%s\n",
-                                    (unsigned long long)sector,
                                     ext->lce.lc_number, ext->rs_left,
                                     ext->rs_failed, count,
                                     drbd_conn_str(device->state.conn));
@@ -781,34 +710,27 @@ static void drbd_try_clear_on_disk_bm(struct drbd_device *device, sector_t secto
                                     ext->lce.lc_number, ext->rs_failed);
                        }
                        ext->rs_left = rs_left;
-                       ext->rs_failed = success ? 0 : count;
+                       ext->rs_failed = (mode == RECORD_RS_FAILED) ? count : 0;
                        /* we don't keep a persistent log of the resync lru,
                         * we can commit any change right away. */
                        lc_committed(device->resync);
                }
-               lc_put(device->resync, &ext->lce);
+               if (mode != SET_OUT_OF_SYNC)
+                       lc_put(device->resync, &ext->lce);
                /* no race, we are within the al_lock! */
 
-               if (ext->rs_left == ext->rs_failed) {
+               if (ext->rs_left <= ext->rs_failed) {
                        ext->rs_failed = 0;
-
-                       udw = kmalloc(sizeof(*udw), GFP_ATOMIC);
-                       if (udw) {
-                               udw->enr = ext->lce.lc_number;
-                               udw->w.cb = w_update_odbm;
-                               udw->device = device;
-                               drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
-                                                     &udw->w);
-                       } else {
-                               drbd_warn(device, "Could not kmalloc an udw\n");
-                       }
+                       return true;
                }
-       } else {
+       } else if (mode != SET_OUT_OF_SYNC) {
+               /* be quiet if lc_find() did not find it. */
                drbd_err(device, "lc_get() failed! locked=%d/%d flags=%lu\n",
                    device->resync_locked,
                    device->resync->nr_elements,
                    device->resync->flags);
        }
+       return false;
 }
 
 void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go)
@@ -827,105 +749,105 @@ void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go
        }
 }
 
-/* clear the bit corresponding to the piece of storage in question:
- * size byte of data starting from sector.  Only clear a bits of the affected
- * one ore more _aligned_ BM_BLOCK_SIZE blocks.
- *
- * called by worker on C_SYNC_TARGET and receiver on SyncSource.
- *
- */
-void __drbd_set_in_sync(struct drbd_device *device, sector_t sector, int size,
-                      const char *file, const unsigned int line)
+/* It is called lazy update, so don't do write-out too often. */
+static bool lazy_bitmap_update_due(struct drbd_device *device)
 {
-       /* Is called from worker and receiver context _only_ */
-       unsigned long sbnr, ebnr, lbnr;
-       unsigned long count = 0;
-       sector_t esector, nr_sectors;
-       int wake_up = 0;
-       unsigned long flags;
+       return time_after(jiffies, device->rs_last_bcast + 2*HZ);
+}
 
-       if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
-               drbd_err(device, "drbd_set_in_sync: sector=%llus size=%d nonsense!\n",
-                               (unsigned long long)sector, size);
+static void maybe_schedule_on_disk_bitmap_update(struct drbd_device *device, bool rs_done)
+{
+       if (rs_done)
+               set_bit(RS_DONE, &device->flags);
+               /* and also set RS_PROGRESS below */
+       else if (!lazy_bitmap_update_due(device))
                return;
-       }
-
-       if (!get_ldev(device))
-               return; /* no disk, no metadata, no bitmap to clear bits in */
-
-       nr_sectors = drbd_get_capacity(device->this_bdev);
-       esector = sector + (size >> 9) - 1;
-
-       if (!expect(sector < nr_sectors))
-               goto out;
-       if (!expect(esector < nr_sectors))
-               esector = nr_sectors - 1;
-
-       lbnr = BM_SECT_TO_BIT(nr_sectors-1);
-
-       /* we clear it (in sync).
-        * round up start sector, round down end sector.  we make sure we only
-        * clear full, aligned, BM_BLOCK_SIZE (4K) blocks */
-       if (unlikely(esector < BM_SECT_PER_BIT-1))
-               goto out;
-       if (unlikely(esector == (nr_sectors-1)))
-               ebnr = lbnr;
-       else
-               ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
-       sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
 
-       if (sbnr > ebnr)
-               goto out;
+       drbd_device_post_work(device, RS_PROGRESS);
+}
 
+static int update_sync_bits(struct drbd_device *device,
+               unsigned long sbnr, unsigned long ebnr,
+               enum update_sync_bits_mode mode)
+{
        /*
-        * ok, (capacity & 7) != 0 sometimes, but who cares...
-        * we count rs_{total,left} in bits, not sectors.
+        * We keep a count of set bits per resync-extent in the ->rs_left
+        * caching member, so we need to loop and work within the resync extent
+        * alignment. Typically this loop will execute exactly once.
         */
-       count = drbd_bm_clear_bits(device, sbnr, ebnr);
-       if (count) {
-               drbd_advance_rs_marks(device, drbd_bm_total_weight(device));
-               spin_lock_irqsave(&device->al_lock, flags);
-               drbd_try_clear_on_disk_bm(device, sector, count, true);
-               spin_unlock_irqrestore(&device->al_lock, flags);
-
-               /* just wake_up unconditional now, various lc_chaged(),
-                * lc_put() in drbd_try_clear_on_disk_bm(). */
-               wake_up = 1;
+       unsigned long flags;
+       unsigned long count = 0;
+       unsigned int cleared = 0;
+       while (sbnr <= ebnr) {
+               /* set temporary boundary bit number to last bit number within
+                * the resync extent of the current start bit number,
+                * but cap at provided end bit number */
+               unsigned long tbnr = min(ebnr, sbnr | BM_BLOCKS_PER_BM_EXT_MASK);
+               unsigned long c;
+
+               if (mode == RECORD_RS_FAILED)
+                       /* Only called from drbd_rs_failed_io(), bits
+                        * supposedly still set.  Recount, maybe some
+                        * of the bits have been successfully cleared
+                        * by application IO meanwhile.
+                        */
+                       c = drbd_bm_count_bits(device, sbnr, tbnr);
+               else if (mode == SET_IN_SYNC)
+                       c = drbd_bm_clear_bits(device, sbnr, tbnr);
+               else /* if (mode == SET_OUT_OF_SYNC) */
+                       c = drbd_bm_set_bits(device, sbnr, tbnr);
+
+               if (c) {
+                       spin_lock_irqsave(&device->al_lock, flags);
+                       cleared += update_rs_extent(device, BM_BIT_TO_EXT(sbnr), c, mode);
+                       spin_unlock_irqrestore(&device->al_lock, flags);
+                       count += c;
+               }
+               sbnr = tbnr + 1;
        }
-out:
-       put_ldev(device);
-       if (wake_up)
+       if (count) {
+               if (mode == SET_IN_SYNC) {
+                       unsigned long still_to_go = drbd_bm_total_weight(device);
+                       bool rs_is_done = (still_to_go <= device->rs_failed);
+                       drbd_advance_rs_marks(device, still_to_go);
+                       if (cleared || rs_is_done)
+                               maybe_schedule_on_disk_bitmap_update(device, rs_is_done);
+               } else if (mode == RECORD_RS_FAILED)
+                       device->rs_failed += count;
                wake_up(&device->al_wait);
+       }
+       return count;
 }
 
-/*
- * this is intended to set one request worth of data out of sync.
- * affects at least 1 bit,
- * and at most 1+DRBD_MAX_BIO_SIZE/BM_BLOCK_SIZE bits.
+/* clear the bit corresponding to the piece of storage in question:
+ * size byte of data starting from sector.  Only clear a bits of the affected
+ * one ore more _aligned_ BM_BLOCK_SIZE blocks.
+ *
+ * called by worker on C_SYNC_TARGET and receiver on SyncSource.
  *
- * called by tl_clear and drbd_send_dblock (==drbd_make_request).
- * so this can be _any_ process.
  */
-int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size,
-                           const char *file, const unsigned int line)
+int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size,
+               enum update_sync_bits_mode mode,
+               const char *file, const unsigned int line)
 {
-       unsigned long sbnr, ebnr, flags;
+       /* Is called from worker and receiver context _only_ */
+       unsigned long sbnr, ebnr, lbnr;
+       unsigned long count = 0;
        sector_t esector, nr_sectors;
-       unsigned int enr, count = 0;
-       struct lc_element *e;
 
-       /* this should be an empty REQ_FLUSH */
-       if (size == 0)
+       /* This would be an empty REQ_FLUSH, be silent. */
+       if ((mode == SET_OUT_OF_SYNC) && size == 0)
                return 0;
 
-       if (size < 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
-               drbd_err(device, "sector: %llus, size: %d\n",
-                       (unsigned long long)sector, size);
+       if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
+               drbd_err(device, "%s: sector=%llus size=%d nonsense!\n",
+                               drbd_change_sync_fname[mode],
+                               (unsigned long long)sector, size);
                return 0;
        }
 
        if (!get_ldev(device))
-               return 0; /* no disk, no metadata, no bitmap to set bits in */
+               return 0; /* no disk, no metadata, no bitmap to manipulate bits in */
 
        nr_sectors = drbd_get_capacity(device->this_bdev);
        esector = sector + (size >> 9) - 1;
@@ -935,25 +857,28 @@ int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector, int size
        if (!expect(esector < nr_sectors))
                esector = nr_sectors - 1;
 
-       /* we set it out of sync,
-        * we do not need to round anything here */
-       sbnr = BM_SECT_TO_BIT(sector);
-       ebnr = BM_SECT_TO_BIT(esector);
-
-       /* ok, (capacity & 7) != 0 sometimes, but who cares...
-        * we count rs_{total,left} in bits, not sectors.  */
-       spin_lock_irqsave(&device->al_lock, flags);
-       count = drbd_bm_set_bits(device, sbnr, ebnr);
+       lbnr = BM_SECT_TO_BIT(nr_sectors-1);
 
-       enr = BM_SECT_TO_EXT(sector);
-       e = lc_find(device->resync, enr);
-       if (e)
-               lc_entry(e, struct bm_extent, lce)->rs_left += count;
-       spin_unlock_irqrestore(&device->al_lock, flags);
+       if (mode == SET_IN_SYNC) {
+               /* Round up start sector, round down end sector.  We make sure
+                * we only clear full, aligned, BM_BLOCK_SIZE blocks. */
+               if (unlikely(esector < BM_SECT_PER_BIT-1))
+                       goto out;
+               if (unlikely(esector == (nr_sectors-1)))
+                       ebnr = lbnr;
+               else
+                       ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
+               sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
+       } else {
+               /* We set it out of sync, or record resync failure.
+                * Should not round anything here. */
+               sbnr = BM_SECT_TO_BIT(sector);
+               ebnr = BM_SECT_TO_BIT(esector);
+       }
 
+       count = update_sync_bits(device, sbnr, ebnr, mode);
 out:
        put_ldev(device);
-
        return count;
 }
 
@@ -1075,6 +1000,15 @@ int drbd_try_rs_begin_io(struct drbd_device *device, sector_t sector)
        struct lc_element *e;
        struct bm_extent *bm_ext;
        int i;
+       bool throttle = drbd_rs_should_slow_down(device, sector, true);
+
+       /* If we need to throttle, a half-locked (only marked BME_NO_WRITES,
+        * not yet BME_LOCKED) extent needs to be kicked out explicitly if we
+        * need to throttle. There is at most one such half-locked extent,
+        * which is remembered in resync_wenr. */
+
+       if (throttle && device->resync_wenr != enr)
+               return -EAGAIN;
 
        spin_lock_irq(&device->al_lock);
        if (device->resync_wenr != LC_FREE && device->resync_wenr != enr) {
@@ -1098,8 +1032,10 @@ int drbd_try_rs_begin_io(struct drbd_device *device, sector_t sector)
                        D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags));
                        clear_bit(BME_NO_WRITES, &bm_ext->flags);
                        device->resync_wenr = LC_FREE;
-                       if (lc_put(device->resync, &bm_ext->lce) == 0)
+                       if (lc_put(device->resync, &bm_ext->lce) == 0) {
+                               bm_ext->flags = 0;
                                device->resync_locked--;
+                       }
                        wake_up(&device->al_wait);
                } else {
                        drbd_alert(device, "LOGIC BUG\n");
@@ -1161,8 +1097,20 @@ proceed:
        return 0;
 
 try_again:
-       if (bm_ext)
-               device->resync_wenr = enr;
+       if (bm_ext) {
+               if (throttle) {
+                       D_ASSERT(device, !test_bit(BME_LOCKED, &bm_ext->flags));
+                       D_ASSERT(device, test_bit(BME_NO_WRITES, &bm_ext->flags));
+                       clear_bit(BME_NO_WRITES, &bm_ext->flags);
+                       device->resync_wenr = LC_FREE;
+                       if (lc_put(device->resync, &bm_ext->lce) == 0) {
+                               bm_ext->flags = 0;
+                               device->resync_locked--;
+                       }
+                       wake_up(&device->al_wait);
+               } else
+                       device->resync_wenr = enr;
+       }
        spin_unlock_irq(&device->al_lock);
        return -EAGAIN;
 }
@@ -1270,69 +1218,3 @@ int drbd_rs_del_all(struct drbd_device *device)
 
        return 0;
 }
-
-/**
- * drbd_rs_failed_io() - Record information on a failure to resync the specified blocks
- * @device:    DRBD device.
- * @sector:    The sector number.
- * @size:      Size of failed IO operation, in byte.
- */
-void drbd_rs_failed_io(struct drbd_device *device, sector_t sector, int size)
-{
-       /* Is called from worker and receiver context _only_ */
-       unsigned long sbnr, ebnr, lbnr;
-       unsigned long count;
-       sector_t esector, nr_sectors;
-       int wake_up = 0;
-
-       if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_DISCARD_SIZE) {
-               drbd_err(device, "drbd_rs_failed_io: sector=%llus size=%d nonsense!\n",
-                               (unsigned long long)sector, size);
-               return;
-       }
-       nr_sectors = drbd_get_capacity(device->this_bdev);
-       esector = sector + (size >> 9) - 1;
-
-       if (!expect(sector < nr_sectors))
-               return;
-       if (!expect(esector < nr_sectors))
-               esector = nr_sectors - 1;
-
-       lbnr = BM_SECT_TO_BIT(nr_sectors-1);
-
-       /*
-        * round up start sector, round down end sector.  we make sure we only
-        * handle full, aligned, BM_BLOCK_SIZE (4K) blocks */
-       if (unlikely(esector < BM_SECT_PER_BIT-1))
-               return;
-       if (unlikely(esector == (nr_sectors-1)))
-               ebnr = lbnr;
-       else
-               ebnr = BM_SECT_TO_BIT(esector - (BM_SECT_PER_BIT-1));
-       sbnr = BM_SECT_TO_BIT(sector + BM_SECT_PER_BIT-1);
-
-       if (sbnr > ebnr)
-               return;
-
-       /*
-        * ok, (capacity & 7) != 0 sometimes, but who cares...
-        * we count rs_{total,left} in bits, not sectors.
-        */
-       spin_lock_irq(&device->al_lock);
-       count = drbd_bm_count_bits(device, sbnr, ebnr);
-       if (count) {
-               device->rs_failed += count;
-
-               if (get_ldev(device)) {
-                       drbd_try_clear_on_disk_bm(device, sector, count, false);
-                       put_ldev(device);
-               }
-
-               /* just wake_up unconditional now, various lc_chaged(),
-                * lc_put() in drbd_try_clear_on_disk_bm(). */
-               wake_up = 1;
-       }
-       spin_unlock_irq(&device->al_lock);
-       if (wake_up)
-               wake_up(&device->al_wait);
-}
index 1aa29f8fdfe1feb0f6d5d534cbdaf3fac90a530c..426c97aef9002193c6c7790040c95bdb2ec71eac 100644 (file)
@@ -22,6 +22,8 @@
    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
+#define pr_fmt(fmt)    KBUILD_MODNAME ": " fmt
+
 #include <linux/bitops.h>
 #include <linux/vmalloc.h>
 #include <linux/string.h>
@@ -353,9 +355,8 @@ static void bm_free_pages(struct page **pages, unsigned long number)
 
        for (i = 0; i < number; i++) {
                if (!pages[i]) {
-                       printk(KERN_ALERT "drbd: bm_free_pages tried to free "
-                                         "a NULL pointer; i=%lu n=%lu\n",
-                                         i, number);
+                       pr_alert("bm_free_pages tried to free a NULL pointer; i=%lu n=%lu\n",
+                                i, number);
                        continue;
                }
                __free_page(pages[i]);
@@ -592,7 +593,7 @@ static void bm_memset(struct drbd_bitmap *b, size_t offset, int c, size_t len)
        end = offset + len;
 
        if (end > b->bm_words) {
-               printk(KERN_ALERT "drbd: bm_memset end > bm_words\n");
+               pr_alert("bm_memset end > bm_words\n");
                return;
        }
 
@@ -602,7 +603,7 @@ static void bm_memset(struct drbd_bitmap *b, size_t offset, int c, size_t len)
                p_addr = bm_map_pidx(b, idx);
                bm = p_addr + MLPP(offset);
                if (bm+do_now > p_addr + LWPP) {
-                       printk(KERN_ALERT "drbd: BUG BUG BUG! p_addr:%p bm:%p do_now:%d\n",
+                       pr_alert("BUG BUG BUG! p_addr:%p bm:%p do_now:%d\n",
                               p_addr, bm, (int)do_now);
                } else
                        memset(bm, c, do_now * sizeof(long));
@@ -927,22 +928,14 @@ void drbd_bm_clear_all(struct drbd_device *device)
        spin_unlock_irq(&b->bm_lock);
 }
 
-struct bm_aio_ctx {
-       struct drbd_device *device;
-       atomic_t in_flight;
-       unsigned int done;
-       unsigned flags;
-#define BM_AIO_COPY_PAGES      1
-#define BM_AIO_WRITE_HINTED    2
-#define BM_WRITE_ALL_PAGES     4
-       int error;
-       struct kref kref;
-};
-
-static void bm_aio_ctx_destroy(struct kref *kref)
+static void drbd_bm_aio_ctx_destroy(struct kref *kref)
 {
-       struct bm_aio_ctx *ctx = container_of(kref, struct bm_aio_ctx, kref);
+       struct drbd_bm_aio_ctx *ctx = container_of(kref, struct drbd_bm_aio_ctx, kref);
+       unsigned long flags;
 
+       spin_lock_irqsave(&ctx->device->resource->req_lock, flags);
+       list_del(&ctx->list);
+       spin_unlock_irqrestore(&ctx->device->resource->req_lock, flags);
        put_ldev(ctx->device);
        kfree(ctx);
 }
@@ -950,7 +943,7 @@ static void bm_aio_ctx_destroy(struct kref *kref)
 /* bv_page may be a copy, or may be the original */
 static void bm_async_io_complete(struct bio *bio, int error)
 {
-       struct bm_aio_ctx *ctx = bio->bi_private;
+       struct drbd_bm_aio_ctx *ctx = bio->bi_private;
        struct drbd_device *device = ctx->device;
        struct drbd_bitmap *b = device->bitmap;
        unsigned int idx = bm_page_to_idx(bio->bi_io_vec[0].bv_page);
@@ -993,17 +986,18 @@ static void bm_async_io_complete(struct bio *bio, int error)
        if (atomic_dec_and_test(&ctx->in_flight)) {
                ctx->done = 1;
                wake_up(&device->misc_wait);
-               kref_put(&ctx->kref, &bm_aio_ctx_destroy);
+               kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy);
        }
 }
 
-static void bm_page_io_async(struct bm_aio_ctx *ctx, int page_nr, int rw) __must_hold(local)
+static void bm_page_io_async(struct drbd_bm_aio_ctx *ctx, int page_nr) __must_hold(local)
 {
        struct bio *bio = bio_alloc_drbd(GFP_NOIO);
        struct drbd_device *device = ctx->device;
        struct drbd_bitmap *b = device->bitmap;
        struct page *page;
        unsigned int len;
+       unsigned int rw = (ctx->flags & BM_AIO_READ) ? READ : WRITE;
 
        sector_t on_disk_sector =
                device->ldev->md.md_offset + device->ldev->md.bm_offset;
@@ -1049,9 +1043,9 @@ static void bm_page_io_async(struct bm_aio_ctx *ctx, int page_nr, int rw) __must
 /*
  * bm_rw: read/write the whole bitmap from/to its on disk location.
  */
-static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned lazy_writeout_upper_idx) __must_hold(local)
+static int bm_rw(struct drbd_device *device, const unsigned int flags, unsigned lazy_writeout_upper_idx) __must_hold(local)
 {
-       struct bm_aio_ctx *ctx;
+       struct drbd_bm_aio_ctx *ctx;
        struct drbd_bitmap *b = device->bitmap;
        int num_pages, i, count = 0;
        unsigned long now;
@@ -1067,12 +1061,13 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
         * as we submit copies of pages anyways.
         */
 
-       ctx = kmalloc(sizeof(struct bm_aio_ctx), GFP_NOIO);
+       ctx = kmalloc(sizeof(struct drbd_bm_aio_ctx), GFP_NOIO);
        if (!ctx)
                return -ENOMEM;
 
-       *ctx = (struct bm_aio_ctx) {
+       *ctx = (struct drbd_bm_aio_ctx) {
                .device = device,
+               .start_jif = jiffies,
                .in_flight = ATOMIC_INIT(1),
                .done = 0,
                .flags = flags,
@@ -1080,15 +1075,21 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
                .kref = { ATOMIC_INIT(2) },
        };
 
-       if (!get_ldev_if_state(device, D_ATTACHING)) {  /* put is in bm_aio_ctx_destroy() */
+       if (!get_ldev_if_state(device, D_ATTACHING)) {  /* put is in drbd_bm_aio_ctx_destroy() */
                drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in bm_rw()\n");
                kfree(ctx);
                return -ENODEV;
        }
+       /* Here D_ATTACHING is sufficient since drbd_bm_read() is called only from
+          drbd_adm_attach(), after device->ldev was assigned. */
 
-       if (!ctx->flags)
+       if (0 == (ctx->flags & ~BM_AIO_READ))
                WARN_ON(!(BM_LOCKED_MASK & b->bm_flags));
 
+       spin_lock_irq(&device->resource->req_lock);
+       list_add_tail(&ctx->list, &device->pending_bitmap_io);
+       spin_unlock_irq(&device->resource->req_lock);
+
        num_pages = b->bm_number_of_pages;
 
        now = jiffies;
@@ -1098,13 +1099,13 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
                /* ignore completely unchanged pages */
                if (lazy_writeout_upper_idx && i == lazy_writeout_upper_idx)
                        break;
-               if (rw & WRITE) {
+               if (!(flags & BM_AIO_READ)) {
                        if ((flags & BM_AIO_WRITE_HINTED) &&
                            !test_and_clear_bit(BM_PAGE_HINT_WRITEOUT,
                                    &page_private(b->bm_pages[i])))
                                continue;
 
-                       if (!(flags & BM_WRITE_ALL_PAGES) &&
+                       if (!(flags & BM_AIO_WRITE_ALL_PAGES) &&
                            bm_test_page_unchanged(b->bm_pages[i])) {
                                dynamic_drbd_dbg(device, "skipped bm write for idx %u\n", i);
                                continue;
@@ -1118,7 +1119,7 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
                        }
                }
                atomic_inc(&ctx->in_flight);
-               bm_page_io_async(ctx, i, rw);
+               bm_page_io_async(ctx, i);
                ++count;
                cond_resched();
        }
@@ -1134,12 +1135,12 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
        if (!atomic_dec_and_test(&ctx->in_flight))
                wait_until_done_or_force_detached(device, device->ldev, &ctx->done);
        else
-               kref_put(&ctx->kref, &bm_aio_ctx_destroy);
+               kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy);
 
        /* summary for global bitmap IO */
        if (flags == 0)
                drbd_info(device, "bitmap %s of %u pages took %lu jiffies\n",
-                        rw == WRITE ? "WRITE" : "READ",
+                        (flags & BM_AIO_READ) ? "READ" : "WRITE",
                         count, jiffies - now);
 
        if (ctx->error) {
@@ -1152,20 +1153,18 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
                err = -EIO; /* Disk timeout/force-detach during IO... */
 
        now = jiffies;
-       if (rw == WRITE) {
-               drbd_md_flush(device);
-       } else /* rw == READ */ {
+       if (flags & BM_AIO_READ) {
                b->bm_set = bm_count_bits(b);
                drbd_info(device, "recounting of set bits took additional %lu jiffies\n",
                     jiffies - now);
        }
        now = b->bm_set;
 
-       if (flags == 0)
+       if ((flags & ~BM_AIO_READ) == 0)
                drbd_info(device, "%s (%lu bits) marked out-of-sync by on disk bit-map.\n",
                     ppsize(ppb, now << (BM_BLOCK_SHIFT-10)), now);
 
-       kref_put(&ctx->kref, &bm_aio_ctx_destroy);
+       kref_put(&ctx->kref, &drbd_bm_aio_ctx_destroy);
        return err;
 }
 
@@ -1175,7 +1174,7 @@ static int bm_rw(struct drbd_device *device, int rw, unsigned flags, unsigned la
  */
 int drbd_bm_read(struct drbd_device *device) __must_hold(local)
 {
-       return bm_rw(device, READ, 0, 0);
+       return bm_rw(device, BM_AIO_READ, 0);
 }
 
 /**
@@ -1186,7 +1185,7 @@ int drbd_bm_read(struct drbd_device *device) __must_hold(local)
  */
 int drbd_bm_write(struct drbd_device *device) __must_hold(local)
 {
-       return bm_rw(device, WRITE, 0, 0);
+       return bm_rw(device, 0, 0);
 }
 
 /**
@@ -1197,7 +1196,17 @@ int drbd_bm_write(struct drbd_device *device) __must_hold(local)
  */
 int drbd_bm_write_all(struct drbd_device *device) __must_hold(local)
 {
-       return bm_rw(device, WRITE, BM_WRITE_ALL_PAGES, 0);
+       return bm_rw(device, BM_AIO_WRITE_ALL_PAGES, 0);
+}
+
+/**
+ * drbd_bm_write_lazy() - Write bitmap pages 0 to @upper_idx-1, if they have changed.
+ * @device:    DRBD device.
+ * @upper_idx: 0: write all changed pages; +ve: page index to stop scanning for changed pages
+ */
+int drbd_bm_write_lazy(struct drbd_device *device, unsigned upper_idx) __must_hold(local)
+{
+       return bm_rw(device, BM_AIO_COPY_PAGES, upper_idx);
 }
 
 /**
@@ -1213,7 +1222,7 @@ int drbd_bm_write_all(struct drbd_device *device) __must_hold(local)
  */
 int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local)
 {
-       return bm_rw(device, WRITE, BM_AIO_COPY_PAGES, 0);
+       return bm_rw(device, BM_AIO_COPY_PAGES, 0);
 }
 
 /**
@@ -1222,62 +1231,7 @@ int drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local)
  */
 int drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local)
 {
-       return bm_rw(device, WRITE, BM_AIO_WRITE_HINTED | BM_AIO_COPY_PAGES, 0);
-}
-
-/**
- * drbd_bm_write_page() - Writes a PAGE_SIZE aligned piece of bitmap
- * @device:    DRBD device.
- * @idx:       bitmap page index
- *
- * We don't want to special case on logical_block_size of the backend device,
- * so we submit PAGE_SIZE aligned pieces.
- * Note that on "most" systems, PAGE_SIZE is 4k.
- *
- * In case this becomes an issue on systems with larger PAGE_SIZE,
- * we may want to change this again to write 4k aligned 4k pieces.
- */
-int drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local)
-{
-       struct bm_aio_ctx *ctx;
-       int err;
-
-       if (bm_test_page_unchanged(device->bitmap->bm_pages[idx])) {
-               dynamic_drbd_dbg(device, "skipped bm page write for idx %u\n", idx);
-               return 0;
-       }
-
-       ctx = kmalloc(sizeof(struct bm_aio_ctx), GFP_NOIO);
-       if (!ctx)
-               return -ENOMEM;
-
-       *ctx = (struct bm_aio_ctx) {
-               .device = device,
-               .in_flight = ATOMIC_INIT(1),
-               .done = 0,
-               .flags = BM_AIO_COPY_PAGES,
-               .error = 0,
-               .kref = { ATOMIC_INIT(2) },
-       };
-
-       if (!get_ldev_if_state(device, D_ATTACHING)) {  /* put is in bm_aio_ctx_destroy() */
-               drbd_err(device, "ASSERT FAILED: get_ldev_if_state() == 1 in drbd_bm_write_page()\n");
-               kfree(ctx);
-               return -ENODEV;
-       }
-
-       bm_page_io_async(ctx, idx, WRITE_SYNC);
-       wait_until_done_or_force_detached(device, device->ldev, &ctx->done);
-
-       if (ctx->error)
-               drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR);
-               /* that causes us to detach, so the in memory bitmap will be
-                * gone in a moment as well. */
-
-       device->bm_writ_cnt++;
-       err = atomic_read(&ctx->in_flight) ? -EIO : ctx->error;
-       kref_put(&ctx->kref, &bm_aio_ctx_destroy);
-       return err;
+       return bm_rw(device, BM_AIO_WRITE_HINTED | BM_AIO_COPY_PAGES, 0);
 }
 
 /* NOTE
diff --git a/drivers/block/drbd/drbd_debugfs.c b/drivers/block/drbd/drbd_debugfs.c
new file mode 100644 (file)
index 0000000..5c20b18
--- /dev/null
@@ -0,0 +1,958 @@
+#define pr_fmt(fmt) "drbd debugfs: " fmt
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/debugfs.h>
+#include <linux/seq_file.h>
+#include <linux/stat.h>
+#include <linux/jiffies.h>
+#include <linux/list.h>
+
+#include "drbd_int.h"
+#include "drbd_req.h"
+#include "drbd_debugfs.h"
+
+
+/**********************************************************************
+ * Whenever you change the file format, remember to bump the version. *
+ **********************************************************************/
+
+static struct dentry *drbd_debugfs_root;
+static struct dentry *drbd_debugfs_version;
+static struct dentry *drbd_debugfs_resources;
+static struct dentry *drbd_debugfs_minors;
+
+static void seq_print_age_or_dash(struct seq_file *m, bool valid, unsigned long dt)
+{
+       if (valid)
+               seq_printf(m, "\t%d", jiffies_to_msecs(dt));
+       else
+               seq_printf(m, "\t-");
+}
+
+static void __seq_print_rq_state_bit(struct seq_file *m,
+       bool is_set, char *sep, const char *set_name, const char *unset_name)
+{
+       if (is_set && set_name) {
+               seq_putc(m, *sep);
+               seq_puts(m, set_name);
+               *sep = '|';
+       } else if (!is_set && unset_name) {
+               seq_putc(m, *sep);
+               seq_puts(m, unset_name);
+               *sep = '|';
+       }
+}
+
+static void seq_print_rq_state_bit(struct seq_file *m,
+       bool is_set, char *sep, const char *set_name)
+{
+       __seq_print_rq_state_bit(m, is_set, sep, set_name, NULL);
+}
+
+/* pretty print enum drbd_req_state_bits req->rq_state */
+static void seq_print_request_state(struct seq_file *m, struct drbd_request *req)
+{
+       unsigned int s = req->rq_state;
+       char sep = ' ';
+       seq_printf(m, "\t0x%08x", s);
+       seq_printf(m, "\tmaster: %s", req->master_bio ? "pending" : "completed");
+
+       /* RQ_WRITE ignored, already reported */
+       seq_puts(m, "\tlocal:");
+       seq_print_rq_state_bit(m, s & RQ_IN_ACT_LOG, &sep, "in-AL");
+       seq_print_rq_state_bit(m, s & RQ_POSTPONED, &sep, "postponed");
+       seq_print_rq_state_bit(m, s & RQ_COMPLETION_SUSP, &sep, "suspended");
+       sep = ' ';
+       seq_print_rq_state_bit(m, s & RQ_LOCAL_PENDING, &sep, "pending");
+       seq_print_rq_state_bit(m, s & RQ_LOCAL_COMPLETED, &sep, "completed");
+       seq_print_rq_state_bit(m, s & RQ_LOCAL_ABORTED, &sep, "aborted");
+       seq_print_rq_state_bit(m, s & RQ_LOCAL_OK, &sep, "ok");
+       if (sep == ' ')
+               seq_puts(m, " -");
+
+       /* for_each_connection ... */
+       seq_printf(m, "\tnet:");
+       sep = ' ';
+       seq_print_rq_state_bit(m, s & RQ_NET_PENDING, &sep, "pending");
+       seq_print_rq_state_bit(m, s & RQ_NET_QUEUED, &sep, "queued");
+       seq_print_rq_state_bit(m, s & RQ_NET_SENT, &sep, "sent");
+       seq_print_rq_state_bit(m, s & RQ_NET_DONE, &sep, "done");
+       seq_print_rq_state_bit(m, s & RQ_NET_SIS, &sep, "sis");
+       seq_print_rq_state_bit(m, s & RQ_NET_OK, &sep, "ok");
+       if (sep == ' ')
+               seq_puts(m, " -");
+
+       seq_printf(m, " :");
+       sep = ' ';
+       seq_print_rq_state_bit(m, s & RQ_EXP_RECEIVE_ACK, &sep, "B");
+       seq_print_rq_state_bit(m, s & RQ_EXP_WRITE_ACK, &sep, "C");
+       seq_print_rq_state_bit(m, s & RQ_EXP_BARR_ACK, &sep, "barr");
+       if (sep == ' ')
+               seq_puts(m, " -");
+       seq_printf(m, "\n");
+}
+
+static void seq_print_one_request(struct seq_file *m, struct drbd_request *req, unsigned long now)
+{
+       /* change anything here, fixup header below! */
+       unsigned int s = req->rq_state;
+
+#define RQ_HDR_1 "epoch\tsector\tsize\trw"
+       seq_printf(m, "0x%x\t%llu\t%u\t%s",
+               req->epoch,
+               (unsigned long long)req->i.sector, req->i.size >> 9,
+               (s & RQ_WRITE) ? "W" : "R");
+
+#define RQ_HDR_2 "\tstart\tin AL\tsubmit"
+       seq_printf(m, "\t%d", jiffies_to_msecs(now - req->start_jif));
+       seq_print_age_or_dash(m, s & RQ_IN_ACT_LOG, now - req->in_actlog_jif);
+       seq_print_age_or_dash(m, s & RQ_LOCAL_PENDING, now - req->pre_submit_jif);
+
+#define RQ_HDR_3 "\tsent\tacked\tdone"
+       seq_print_age_or_dash(m, s & RQ_NET_SENT, now - req->pre_send_jif);
+       seq_print_age_or_dash(m, (s & RQ_NET_SENT) && !(s & RQ_NET_PENDING), now - req->acked_jif);
+       seq_print_age_or_dash(m, s & RQ_NET_DONE, now - req->net_done_jif);
+
+#define RQ_HDR_4 "\tstate\n"
+       seq_print_request_state(m, req);
+}
+#define RQ_HDR RQ_HDR_1 RQ_HDR_2 RQ_HDR_3 RQ_HDR_4
+
+static void seq_print_minor_vnr_req(struct seq_file *m, struct drbd_request *req, unsigned long now)
+{
+       seq_printf(m, "%u\t%u\t", req->device->minor, req->device->vnr);
+       seq_print_one_request(m, req, now);
+}
+
+static void seq_print_resource_pending_meta_io(struct seq_file *m, struct drbd_resource *resource, unsigned long now)
+{
+       struct drbd_device *device;
+       unsigned int i;
+
+       seq_puts(m, "minor\tvnr\tstart\tsubmit\tintent\n");
+       rcu_read_lock();
+       idr_for_each_entry(&resource->devices, device, i) {
+               struct drbd_md_io tmp;
+               /* In theory this is racy,
+                * in the sense that there could have been a
+                * drbd_md_put_buffer(); drbd_md_get_buffer();
+                * between accessing these members here.  */
+               tmp = device->md_io;
+               if (atomic_read(&tmp.in_use)) {
+                       seq_printf(m, "%u\t%u\t%d\t",
+                               device->minor, device->vnr,
+                               jiffies_to_msecs(now - tmp.start_jif));
+                       if (time_before(tmp.submit_jif, tmp.start_jif))
+                               seq_puts(m, "-\t");
+                       else
+                               seq_printf(m, "%d\t", jiffies_to_msecs(now - tmp.submit_jif));
+                       seq_printf(m, "%s\n", tmp.current_use);
+               }
+       }
+       rcu_read_unlock();
+}
+
+static void seq_print_waiting_for_AL(struct seq_file *m, struct drbd_resource *resource, unsigned long now)
+{
+       struct drbd_device *device;
+       unsigned int i;
+
+       seq_puts(m, "minor\tvnr\tage\t#waiting\n");
+       rcu_read_lock();
+       idr_for_each_entry(&resource->devices, device, i) {
+               unsigned long jif;
+               struct drbd_request *req;
+               int n = atomic_read(&device->ap_actlog_cnt);
+               if (n) {
+                       spin_lock_irq(&device->resource->req_lock);
+                       req = list_first_entry_or_null(&device->pending_master_completion[1],
+                               struct drbd_request, req_pending_master_completion);
+                       /* if the oldest request does not wait for the activity log
+                        * it is not interesting for us here */
+                       if (req && !(req->rq_state & RQ_IN_ACT_LOG))
+                               jif = req->start_jif;
+                       else
+                               req = NULL;
+                       spin_unlock_irq(&device->resource->req_lock);
+               }
+               if (n) {
+                       seq_printf(m, "%u\t%u\t", device->minor, device->vnr);
+                       if (req)
+                               seq_printf(m, "%u\t", jiffies_to_msecs(now - jif));
+                       else
+                               seq_puts(m, "-\t");
+                       seq_printf(m, "%u\n", n);
+               }
+       }
+       rcu_read_unlock();
+}
+
+static void seq_print_device_bitmap_io(struct seq_file *m, struct drbd_device *device, unsigned long now)
+{
+       struct drbd_bm_aio_ctx *ctx;
+       unsigned long start_jif;
+       unsigned int in_flight;
+       unsigned int flags;
+       spin_lock_irq(&device->resource->req_lock);
+       ctx = list_first_entry_or_null(&device->pending_bitmap_io, struct drbd_bm_aio_ctx, list);
+       if (ctx && ctx->done)
+               ctx = NULL;
+       if (ctx) {
+               start_jif = ctx->start_jif;
+               in_flight = atomic_read(&ctx->in_flight);
+               flags = ctx->flags;
+       }
+       spin_unlock_irq(&device->resource->req_lock);
+       if (ctx) {
+               seq_printf(m, "%u\t%u\t%c\t%u\t%u\n",
+                       device->minor, device->vnr,
+                       (flags & BM_AIO_READ) ? 'R' : 'W',
+                       jiffies_to_msecs(now - start_jif),
+                       in_flight);
+       }
+}
+
+static void seq_print_resource_pending_bitmap_io(struct seq_file *m, struct drbd_resource *resource, unsigned long now)
+{
+       struct drbd_device *device;
+       unsigned int i;
+
+       seq_puts(m, "minor\tvnr\trw\tage\t#in-flight\n");
+       rcu_read_lock();
+       idr_for_each_entry(&resource->devices, device, i) {
+               seq_print_device_bitmap_io(m, device, now);
+       }
+       rcu_read_unlock();
+}
+
+/* pretty print enum peer_req->flags */
+static void seq_print_peer_request_flags(struct seq_file *m, struct drbd_peer_request *peer_req)
+{
+       unsigned long f = peer_req->flags;
+       char sep = ' ';
+
+       __seq_print_rq_state_bit(m, f & EE_SUBMITTED, &sep, "submitted", "preparing");
+       __seq_print_rq_state_bit(m, f & EE_APPLICATION, &sep, "application", "internal");
+       seq_print_rq_state_bit(m, f & EE_CALL_AL_COMPLETE_IO, &sep, "in-AL");
+       seq_print_rq_state_bit(m, f & EE_SEND_WRITE_ACK, &sep, "C");
+       seq_print_rq_state_bit(m, f & EE_MAY_SET_IN_SYNC, &sep, "set-in-sync");
+
+       if (f & EE_IS_TRIM) {
+               seq_putc(m, sep);
+               sep = '|';
+               if (f & EE_IS_TRIM_USE_ZEROOUT)
+                       seq_puts(m, "zero-out");
+               else
+                       seq_puts(m, "trim");
+       }
+       seq_putc(m, '\n');
+}
+
+static void seq_print_peer_request(struct seq_file *m,
+       struct drbd_device *device, struct list_head *lh,
+       unsigned long now)
+{
+       bool reported_preparing = false;
+       struct drbd_peer_request *peer_req;
+       list_for_each_entry(peer_req, lh, w.list) {
+               if (reported_preparing && !(peer_req->flags & EE_SUBMITTED))
+                       continue;
+
+               if (device)
+                       seq_printf(m, "%u\t%u\t", device->minor, device->vnr);
+
+               seq_printf(m, "%llu\t%u\t%c\t%u\t",
+                       (unsigned long long)peer_req->i.sector, peer_req->i.size >> 9,
+                       (peer_req->flags & EE_WRITE) ? 'W' : 'R',
+                       jiffies_to_msecs(now - peer_req->submit_jif));
+               seq_print_peer_request_flags(m, peer_req);
+               if (peer_req->flags & EE_SUBMITTED)
+                       break;
+               else
+                       reported_preparing = true;
+       }
+}
+
+static void seq_print_device_peer_requests(struct seq_file *m,
+       struct drbd_device *device, unsigned long now)
+{
+       seq_puts(m, "minor\tvnr\tsector\tsize\trw\tage\tflags\n");
+       spin_lock_irq(&device->resource->req_lock);
+       seq_print_peer_request(m, device, &device->active_ee, now);
+       seq_print_peer_request(m, device, &device->read_ee, now);
+       seq_print_peer_request(m, device, &device->sync_ee, now);
+       spin_unlock_irq(&device->resource->req_lock);
+       if (test_bit(FLUSH_PENDING, &device->flags)) {
+               seq_printf(m, "%u\t%u\t-\t-\tF\t%u\tflush\n",
+                       device->minor, device->vnr,
+                       jiffies_to_msecs(now - device->flush_jif));
+       }
+}
+
+static void seq_print_resource_pending_peer_requests(struct seq_file *m,
+       struct drbd_resource *resource, unsigned long now)
+{
+       struct drbd_device *device;
+       unsigned int i;
+
+       rcu_read_lock();
+       idr_for_each_entry(&resource->devices, device, i) {
+               seq_print_device_peer_requests(m, device, now);
+       }
+       rcu_read_unlock();
+}
+
+static void seq_print_resource_transfer_log_summary(struct seq_file *m,
+       struct drbd_resource *resource,
+       struct drbd_connection *connection,
+       unsigned long now)
+{
+       struct drbd_request *req;
+       unsigned int count = 0;
+       unsigned int show_state = 0;
+
+       seq_puts(m, "n\tdevice\tvnr\t" RQ_HDR);
+       spin_lock_irq(&resource->req_lock);
+       list_for_each_entry(req, &connection->transfer_log, tl_requests) {
+               unsigned int tmp = 0;
+               unsigned int s;
+               ++count;
+
+               /* don't disable irq "forever" */
+               if (!(count & 0x1ff)) {
+                       struct drbd_request *req_next;
+                       kref_get(&req->kref);
+                       spin_unlock_irq(&resource->req_lock);
+                       cond_resched();
+                       spin_lock_irq(&resource->req_lock);
+                       req_next = list_next_entry(req, tl_requests);
+                       if (kref_put(&req->kref, drbd_req_destroy))
+                               req = req_next;
+                       if (&req->tl_requests == &connection->transfer_log)
+                               break;
+               }
+
+               s = req->rq_state;
+
+               /* This is meant to summarize timing issues, to be able to tell
+                * local disk problems from network problems.
+                * Skip requests, if we have shown an even older request with
+                * similar aspects already.  */
+               if (req->master_bio == NULL)
+                       tmp |= 1;
+               if ((s & RQ_LOCAL_MASK) && (s & RQ_LOCAL_PENDING))
+                       tmp |= 2;
+               if (s & RQ_NET_MASK) {
+                       if (!(s & RQ_NET_SENT))
+                               tmp |= 4;
+                       if (s & RQ_NET_PENDING)
+                               tmp |= 8;
+                       if (!(s & RQ_NET_DONE))
+                               tmp |= 16;
+               }
+               if ((tmp & show_state) == tmp)
+                       continue;
+               show_state |= tmp;
+               seq_printf(m, "%u\t", count);
+               seq_print_minor_vnr_req(m, req, now);
+               if (show_state == 0x1f)
+                       break;
+       }
+       spin_unlock_irq(&resource->req_lock);
+}
+
+/* TODO: transfer_log and friends should be moved to resource */
+static int in_flight_summary_show(struct seq_file *m, void *pos)
+{
+       struct drbd_resource *resource = m->private;
+       struct drbd_connection *connection;
+       unsigned long jif = jiffies;
+
+       connection = first_connection(resource);
+       /* This does not happen, actually.
+        * But be robust and prepare for future code changes. */
+       if (!connection || !kref_get_unless_zero(&connection->kref))
+               return -ESTALE;
+
+       /* BUMP me if you change the file format/content/presentation */
+       seq_printf(m, "v: %u\n\n", 0);
+
+       seq_puts(m, "oldest bitmap IO\n");
+       seq_print_resource_pending_bitmap_io(m, resource, jif);
+       seq_putc(m, '\n');
+
+       seq_puts(m, "meta data IO\n");
+       seq_print_resource_pending_meta_io(m, resource, jif);
+       seq_putc(m, '\n');
+
+       seq_puts(m, "socket buffer stats\n");
+       /* for each connection ... once we have more than one */
+       rcu_read_lock();
+       if (connection->data.socket) {
+               /* open coded SIOCINQ, the "relevant" part */
+               struct tcp_sock *tp = tcp_sk(connection->data.socket->sk);
+               int answ = tp->rcv_nxt - tp->copied_seq;
+               seq_printf(m, "unread receive buffer: %u Byte\n", answ);
+               /* open coded SIOCOUTQ, the "relevant" part */
+               answ = tp->write_seq - tp->snd_una;
+               seq_printf(m, "unacked send buffer: %u Byte\n", answ);
+       }
+       rcu_read_unlock();
+       seq_putc(m, '\n');
+
+       seq_puts(m, "oldest peer requests\n");
+       seq_print_resource_pending_peer_requests(m, resource, jif);
+       seq_putc(m, '\n');
+
+       seq_puts(m, "application requests waiting for activity log\n");
+       seq_print_waiting_for_AL(m, resource, jif);
+       seq_putc(m, '\n');
+
+       seq_puts(m, "oldest application requests\n");
+       seq_print_resource_transfer_log_summary(m, resource, connection, jif);
+       seq_putc(m, '\n');
+
+       jif = jiffies - jif;
+       if (jif)
+               seq_printf(m, "generated in %d ms\n", jiffies_to_msecs(jif));
+       kref_put(&connection->kref, drbd_destroy_connection);
+       return 0;
+}
+
+/* simple_positive(file->f_dentry) respectively debugfs_positive(),
+ * but neither is "reachable" from here.
+ * So we have our own inline version of it above.  :-( */
+static inline int debugfs_positive(struct dentry *dentry)
+{
+        return dentry->d_inode && !d_unhashed(dentry);
+}
+
+/* make sure at *open* time that the respective object won't go away. */
+static int drbd_single_open(struct file *file, int (*show)(struct seq_file *, void *),
+                               void *data, struct kref *kref,
+                               void (*release)(struct kref *))
+{
+       struct dentry *parent;
+       int ret = -ESTALE;
+
+       /* Are we still linked,
+        * or has debugfs_remove() already been called? */
+       parent = file->f_dentry->d_parent;
+       /* not sure if this can happen: */
+       if (!parent || !parent->d_inode)
+               goto out;
+       /* serialize with d_delete() */
+       mutex_lock(&parent->d_inode->i_mutex);
+       /* Make sure the object is still alive */
+       if (debugfs_positive(file->f_dentry)
+       && kref_get_unless_zero(kref))
+               ret = 0;
+       mutex_unlock(&parent->d_inode->i_mutex);
+       if (!ret) {
+               ret = single_open(file, show, data);
+               if (ret)
+                       kref_put(kref, release);
+       }
+out:
+       return ret;
+}
+
+static int in_flight_summary_open(struct inode *inode, struct file *file)
+{
+       struct drbd_resource *resource = inode->i_private;
+       return drbd_single_open(file, in_flight_summary_show, resource,
+                               &resource->kref, drbd_destroy_resource);
+}
+
+static int in_flight_summary_release(struct inode *inode, struct file *file)
+{
+       struct drbd_resource *resource = inode->i_private;
+       kref_put(&resource->kref, drbd_destroy_resource);
+       return single_release(inode, file);
+}
+
+static const struct file_operations in_flight_summary_fops = {
+       .owner          = THIS_MODULE,
+       .open           = in_flight_summary_open,
+       .read           = seq_read,
+       .llseek         = seq_lseek,
+       .release        = in_flight_summary_release,
+};
+
+void drbd_debugfs_resource_add(struct drbd_resource *resource)
+{
+       struct dentry *dentry;
+       if (!drbd_debugfs_resources)
+               return;
+
+       dentry = debugfs_create_dir(resource->name, drbd_debugfs_resources);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       resource->debugfs_res = dentry;
+
+       dentry = debugfs_create_dir("volumes", resource->debugfs_res);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       resource->debugfs_res_volumes = dentry;
+
+       dentry = debugfs_create_dir("connections", resource->debugfs_res);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       resource->debugfs_res_connections = dentry;
+
+       dentry = debugfs_create_file("in_flight_summary", S_IRUSR|S_IRGRP,
+                       resource->debugfs_res, resource,
+                       &in_flight_summary_fops);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       resource->debugfs_res_in_flight_summary = dentry;
+       return;
+
+fail:
+       drbd_debugfs_resource_cleanup(resource);
+       drbd_err(resource, "failed to create debugfs dentry\n");
+}
+
+static void drbd_debugfs_remove(struct dentry **dp)
+{
+       debugfs_remove(*dp);
+       *dp = NULL;
+}
+
+void drbd_debugfs_resource_cleanup(struct drbd_resource *resource)
+{
+       /* it is ok to call debugfs_remove(NULL) */
+       drbd_debugfs_remove(&resource->debugfs_res_in_flight_summary);
+       drbd_debugfs_remove(&resource->debugfs_res_connections);
+       drbd_debugfs_remove(&resource->debugfs_res_volumes);
+       drbd_debugfs_remove(&resource->debugfs_res);
+}
+
+static void seq_print_one_timing_detail(struct seq_file *m,
+       const struct drbd_thread_timing_details *tdp,
+       unsigned long now)
+{
+       struct drbd_thread_timing_details td;
+       /* No locking...
+        * use temporary assignment to get at consistent data. */
+       do {
+               td = *tdp;
+       } while (td.cb_nr != tdp->cb_nr);
+       if (!td.cb_addr)
+               return;
+       seq_printf(m, "%u\t%d\t%s:%u\t%ps\n",
+                       td.cb_nr,
+                       jiffies_to_msecs(now - td.start_jif),
+                       td.caller_fn, td.line,
+                       td.cb_addr);
+}
+
+static void seq_print_timing_details(struct seq_file *m,
+               const char *title,
+               unsigned int cb_nr, struct drbd_thread_timing_details *tdp, unsigned long now)
+{
+       unsigned int start_idx;
+       unsigned int i;
+
+       seq_printf(m, "%s\n", title);
+       /* If not much is going on, this will result in natural ordering.
+        * If it is very busy, we will possibly skip events, or even see wrap
+        * arounds, which could only be avoided with locking.
+        */
+       start_idx = cb_nr % DRBD_THREAD_DETAILS_HIST;
+       for (i = start_idx; i < DRBD_THREAD_DETAILS_HIST; i++)
+               seq_print_one_timing_detail(m, tdp+i, now);
+       for (i = 0; i < start_idx; i++)
+               seq_print_one_timing_detail(m, tdp+i, now);
+}
+
+static int callback_history_show(struct seq_file *m, void *ignored)
+{
+       struct drbd_connection *connection = m->private;
+       unsigned long jif = jiffies;
+
+       /* BUMP me if you change the file format/content/presentation */
+       seq_printf(m, "v: %u\n\n", 0);
+
+       seq_puts(m, "n\tage\tcallsite\tfn\n");
+       seq_print_timing_details(m, "worker", connection->w_cb_nr, connection->w_timing_details, jif);
+       seq_print_timing_details(m, "receiver", connection->r_cb_nr, connection->r_timing_details, jif);
+       return 0;
+}
+
+static int callback_history_open(struct inode *inode, struct file *file)
+{
+       struct drbd_connection *connection = inode->i_private;
+       return drbd_single_open(file, callback_history_show, connection,
+                               &connection->kref, drbd_destroy_connection);
+}
+
+static int callback_history_release(struct inode *inode, struct file *file)
+{
+       struct drbd_connection *connection = inode->i_private;
+       kref_put(&connection->kref, drbd_destroy_connection);
+       return single_release(inode, file);
+}
+
+static const struct file_operations connection_callback_history_fops = {
+       .owner          = THIS_MODULE,
+       .open           = callback_history_open,
+       .read           = seq_read,
+       .llseek         = seq_lseek,
+       .release        = callback_history_release,
+};
+
+static int connection_oldest_requests_show(struct seq_file *m, void *ignored)
+{
+       struct drbd_connection *connection = m->private;
+       unsigned long now = jiffies;
+       struct drbd_request *r1, *r2;
+
+       /* BUMP me if you change the file format/content/presentation */
+       seq_printf(m, "v: %u\n\n", 0);
+
+       spin_lock_irq(&connection->resource->req_lock);
+       r1 = connection->req_next;
+       if (r1)
+               seq_print_minor_vnr_req(m, r1, now);
+       r2 = connection->req_ack_pending;
+       if (r2 && r2 != r1) {
+               r1 = r2;
+               seq_print_minor_vnr_req(m, r1, now);
+       }
+       r2 = connection->req_not_net_done;
+       if (r2 && r2 != r1)
+               seq_print_minor_vnr_req(m, r2, now);
+       spin_unlock_irq(&connection->resource->req_lock);
+       return 0;
+}
+
+static int connection_oldest_requests_open(struct inode *inode, struct file *file)
+{
+       struct drbd_connection *connection = inode->i_private;
+       return drbd_single_open(file, connection_oldest_requests_show, connection,
+                               &connection->kref, drbd_destroy_connection);
+}
+
+static int connection_oldest_requests_release(struct inode *inode, struct file *file)
+{
+       struct drbd_connection *connection = inode->i_private;
+       kref_put(&connection->kref, drbd_destroy_connection);
+       return single_release(inode, file);
+}
+
+static const struct file_operations connection_oldest_requests_fops = {
+       .owner          = THIS_MODULE,
+       .open           = connection_oldest_requests_open,
+       .read           = seq_read,
+       .llseek         = seq_lseek,
+       .release        = connection_oldest_requests_release,
+};
+
+void drbd_debugfs_connection_add(struct drbd_connection *connection)
+{
+       struct dentry *conns_dir = connection->resource->debugfs_res_connections;
+       struct dentry *dentry;
+       if (!conns_dir)
+               return;
+
+       /* Once we enable mutliple peers,
+        * these connections will have descriptive names.
+        * For now, it is just the one connection to the (only) "peer". */
+       dentry = debugfs_create_dir("peer", conns_dir);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       connection->debugfs_conn = dentry;
+
+       dentry = debugfs_create_file("callback_history", S_IRUSR|S_IRGRP,
+                       connection->debugfs_conn, connection,
+                       &connection_callback_history_fops);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       connection->debugfs_conn_callback_history = dentry;
+
+       dentry = debugfs_create_file("oldest_requests", S_IRUSR|S_IRGRP,
+                       connection->debugfs_conn, connection,
+                       &connection_oldest_requests_fops);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       connection->debugfs_conn_oldest_requests = dentry;
+       return;
+
+fail:
+       drbd_debugfs_connection_cleanup(connection);
+       drbd_err(connection, "failed to create debugfs dentry\n");
+}
+
+void drbd_debugfs_connection_cleanup(struct drbd_connection *connection)
+{
+       drbd_debugfs_remove(&connection->debugfs_conn_callback_history);
+       drbd_debugfs_remove(&connection->debugfs_conn_oldest_requests);
+       drbd_debugfs_remove(&connection->debugfs_conn);
+}
+
+static void resync_dump_detail(struct seq_file *m, struct lc_element *e)
+{
+       struct bm_extent *bme = lc_entry(e, struct bm_extent, lce);
+
+       seq_printf(m, "%5d %s %s %s\n", bme->rs_left,
+                 test_bit(BME_NO_WRITES, &bme->flags) ? "NO_WRITES" : "---------",
+                 test_bit(BME_LOCKED, &bme->flags) ? "LOCKED" : "------",
+                 test_bit(BME_PRIORITY, &bme->flags) ? "PRIORITY" : "--------"
+                 );
+}
+
+static int device_resync_extents_show(struct seq_file *m, void *ignored)
+{
+       struct drbd_device *device = m->private;
+
+       /* BUMP me if you change the file format/content/presentation */
+       seq_printf(m, "v: %u\n\n", 0);
+
+       if (get_ldev_if_state(device, D_FAILED)) {
+               lc_seq_printf_stats(m, device->resync);
+               lc_seq_dump_details(m, device->resync, "rs_left flags", resync_dump_detail);
+               put_ldev(device);
+       }
+       return 0;
+}
+
+static int device_act_log_extents_show(struct seq_file *m, void *ignored)
+{
+       struct drbd_device *device = m->private;
+
+       /* BUMP me if you change the file format/content/presentation */
+       seq_printf(m, "v: %u\n\n", 0);
+
+       if (get_ldev_if_state(device, D_FAILED)) {
+               lc_seq_printf_stats(m, device->act_log);
+               lc_seq_dump_details(m, device->act_log, "", NULL);
+               put_ldev(device);
+       }
+       return 0;
+}
+
+static int device_oldest_requests_show(struct seq_file *m, void *ignored)
+{
+       struct drbd_device *device = m->private;
+       struct drbd_resource *resource = device->resource;
+       unsigned long now = jiffies;
+       struct drbd_request *r1, *r2;
+       int i;
+
+       /* BUMP me if you change the file format/content/presentation */
+       seq_printf(m, "v: %u\n\n", 0);
+
+       seq_puts(m, RQ_HDR);
+       spin_lock_irq(&resource->req_lock);
+       /* WRITE, then READ */
+       for (i = 1; i >= 0; --i) {
+               r1 = list_first_entry_or_null(&device->pending_master_completion[i],
+                       struct drbd_request, req_pending_master_completion);
+               r2 = list_first_entry_or_null(&device->pending_completion[i],
+                       struct drbd_request, req_pending_local);
+               if (r1)
+                       seq_print_one_request(m, r1, now);
+               if (r2 && r2 != r1)
+                       seq_print_one_request(m, r2, now);
+       }
+       spin_unlock_irq(&resource->req_lock);
+       return 0;
+}
+
+static int device_data_gen_id_show(struct seq_file *m, void *ignored)
+{
+       struct drbd_device *device = m->private;
+       struct drbd_md *md;
+       enum drbd_uuid_index idx;
+
+       if (!get_ldev_if_state(device, D_FAILED))
+               return -ENODEV;
+
+       md = &device->ldev->md;
+       spin_lock_irq(&md->uuid_lock);
+       for (idx = UI_CURRENT; idx <= UI_HISTORY_END; idx++) {
+               seq_printf(m, "0x%016llX\n", md->uuid[idx]);
+       }
+       spin_unlock_irq(&md->uuid_lock);
+       put_ldev(device);
+       return 0;
+}
+
+#define drbd_debugfs_device_attr(name)                                         \
+static int device_ ## name ## _open(struct inode *inode, struct file *file)    \
+{                                                                              \
+       struct drbd_device *device = inode->i_private;                          \
+       return drbd_single_open(file, device_ ## name ## _show, device,         \
+                               &device->kref, drbd_destroy_device);            \
+}                                                                              \
+static int device_ ## name ## _release(struct inode *inode, struct file *file) \
+{                                                                              \
+       struct drbd_device *device = inode->i_private;                          \
+       kref_put(&device->kref, drbd_destroy_device);                           \
+       return single_release(inode, file);                                     \
+}                                                                              \
+static const struct file_operations device_ ## name ## _fops = {               \
+       .owner          = THIS_MODULE,                                          \
+       .open           = device_ ## name ## _open,                             \
+       .read           = seq_read,                                             \
+       .llseek         = seq_lseek,                                            \
+       .release        = device_ ## name ## _release,                          \
+};
+
+drbd_debugfs_device_attr(oldest_requests)
+drbd_debugfs_device_attr(act_log_extents)
+drbd_debugfs_device_attr(resync_extents)
+drbd_debugfs_device_attr(data_gen_id)
+
+void drbd_debugfs_device_add(struct drbd_device *device)
+{
+       struct dentry *vols_dir = device->resource->debugfs_res_volumes;
+       char minor_buf[8]; /* MINORMASK, MINORBITS == 20; */
+       char vnr_buf[8];   /* volume number vnr is even 16 bit only; */
+       char *slink_name = NULL;
+
+       struct dentry *dentry;
+       if (!vols_dir || !drbd_debugfs_minors)
+               return;
+
+       snprintf(vnr_buf, sizeof(vnr_buf), "%u", device->vnr);
+       dentry = debugfs_create_dir(vnr_buf, vols_dir);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       device->debugfs_vol = dentry;
+
+       snprintf(minor_buf, sizeof(minor_buf), "%u", device->minor);
+       slink_name = kasprintf(GFP_KERNEL, "../resources/%s/volumes/%u",
+                       device->resource->name, device->vnr);
+       if (!slink_name)
+               goto fail;
+       dentry = debugfs_create_symlink(minor_buf, drbd_debugfs_minors, slink_name);
+       kfree(slink_name);
+       slink_name = NULL;
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       device->debugfs_minor = dentry;
+
+#define DCF(name)      do {                                    \
+       dentry = debugfs_create_file(#name, S_IRUSR|S_IRGRP,    \
+                       device->debugfs_vol, device,            \
+                       &device_ ## name ## _fops);             \
+       if (IS_ERR_OR_NULL(dentry))                             \
+               goto fail;                                      \
+       device->debugfs_vol_ ## name = dentry;                  \
+       } while (0)
+
+       DCF(oldest_requests);
+       DCF(act_log_extents);
+       DCF(resync_extents);
+       DCF(data_gen_id);
+#undef DCF
+       return;
+
+fail:
+       drbd_debugfs_device_cleanup(device);
+       drbd_err(device, "failed to create debugfs entries\n");
+}
+
+void drbd_debugfs_device_cleanup(struct drbd_device *device)
+{
+       drbd_debugfs_remove(&device->debugfs_minor);
+       drbd_debugfs_remove(&device->debugfs_vol_oldest_requests);
+       drbd_debugfs_remove(&device->debugfs_vol_act_log_extents);
+       drbd_debugfs_remove(&device->debugfs_vol_resync_extents);
+       drbd_debugfs_remove(&device->debugfs_vol_data_gen_id);
+       drbd_debugfs_remove(&device->debugfs_vol);
+}
+
+void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device)
+{
+       struct dentry *conn_dir = peer_device->connection->debugfs_conn;
+       struct dentry *dentry;
+       char vnr_buf[8];
+
+       if (!conn_dir)
+               return;
+
+       snprintf(vnr_buf, sizeof(vnr_buf), "%u", peer_device->device->vnr);
+       dentry = debugfs_create_dir(vnr_buf, conn_dir);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       peer_device->debugfs_peer_dev = dentry;
+       return;
+
+fail:
+       drbd_debugfs_peer_device_cleanup(peer_device);
+       drbd_err(peer_device, "failed to create debugfs entries\n");
+}
+
+void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device)
+{
+       drbd_debugfs_remove(&peer_device->debugfs_peer_dev);
+}
+
+static int drbd_version_show(struct seq_file *m, void *ignored)
+{
+       seq_printf(m, "# %s\n", drbd_buildtag());
+       seq_printf(m, "VERSION=%s\n", REL_VERSION);
+       seq_printf(m, "API_VERSION=%u\n", API_VERSION);
+       seq_printf(m, "PRO_VERSION_MIN=%u\n", PRO_VERSION_MIN);
+       seq_printf(m, "PRO_VERSION_MAX=%u\n", PRO_VERSION_MAX);
+       return 0;
+}
+
+static int drbd_version_open(struct inode *inode, struct file *file)
+{
+       return single_open(file, drbd_version_show, NULL);
+}
+
+static struct file_operations drbd_version_fops = {
+       .owner = THIS_MODULE,
+       .open = drbd_version_open,
+       .llseek = seq_lseek,
+       .read = seq_read,
+       .release = single_release,
+};
+
+/* not __exit, may be indirectly called
+ * from the module-load-failure path as well. */
+void drbd_debugfs_cleanup(void)
+{
+       drbd_debugfs_remove(&drbd_debugfs_resources);
+       drbd_debugfs_remove(&drbd_debugfs_minors);
+       drbd_debugfs_remove(&drbd_debugfs_version);
+       drbd_debugfs_remove(&drbd_debugfs_root);
+}
+
+int __init drbd_debugfs_init(void)
+{
+       struct dentry *dentry;
+
+       dentry = debugfs_create_dir("drbd", NULL);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       drbd_debugfs_root = dentry;
+
+       dentry = debugfs_create_file("version", 0444, drbd_debugfs_root, NULL, &drbd_version_fops);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       drbd_debugfs_version = dentry;
+
+       dentry = debugfs_create_dir("resources", drbd_debugfs_root);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       drbd_debugfs_resources = dentry;
+
+       dentry = debugfs_create_dir("minors", drbd_debugfs_root);
+       if (IS_ERR_OR_NULL(dentry))
+               goto fail;
+       drbd_debugfs_minors = dentry;
+       return 0;
+
+fail:
+       drbd_debugfs_cleanup();
+       if (dentry)
+               return PTR_ERR(dentry);
+       else
+               return -EINVAL;
+}
diff --git a/drivers/block/drbd/drbd_debugfs.h b/drivers/block/drbd/drbd_debugfs.h
new file mode 100644 (file)
index 0000000..8bee213
--- /dev/null
@@ -0,0 +1,39 @@
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/debugfs.h>
+
+#include "drbd_int.h"
+
+#ifdef CONFIG_DEBUG_FS
+int __init drbd_debugfs_init(void);
+void drbd_debugfs_cleanup(void);
+
+void drbd_debugfs_resource_add(struct drbd_resource *resource);
+void drbd_debugfs_resource_cleanup(struct drbd_resource *resource);
+
+void drbd_debugfs_connection_add(struct drbd_connection *connection);
+void drbd_debugfs_connection_cleanup(struct drbd_connection *connection);
+
+void drbd_debugfs_device_add(struct drbd_device *device);
+void drbd_debugfs_device_cleanup(struct drbd_device *device);
+
+void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device);
+void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device);
+#else
+
+static inline int __init drbd_debugfs_init(void) { return -ENODEV; }
+static inline void drbd_debugfs_cleanup(void) { }
+
+static inline void drbd_debugfs_resource_add(struct drbd_resource *resource) { }
+static inline void drbd_debugfs_resource_cleanup(struct drbd_resource *resource) { }
+
+static inline void drbd_debugfs_connection_add(struct drbd_connection *connection) { }
+static inline void drbd_debugfs_connection_cleanup(struct drbd_connection *connection) { }
+
+static inline void drbd_debugfs_device_add(struct drbd_device *device) { }
+static inline void drbd_debugfs_device_cleanup(struct drbd_device *device) { }
+
+static inline void drbd_debugfs_peer_device_add(struct drbd_peer_device *peer_device) { }
+static inline void drbd_debugfs_peer_device_cleanup(struct drbd_peer_device *peer_device) { }
+
+#endif
index a76ceb344d64e9411bbfd39c4db9189e519fac72..1a000016ccdfb8bfecf6769a09bb9c48a6aab215 100644 (file)
@@ -317,7 +317,63 @@ struct drbd_request {
 
        struct list_head tl_requests; /* ring list in the transfer log */
        struct bio *master_bio;       /* master bio pointer */
-       unsigned long start_time;
+
+       /* see struct drbd_device */
+       struct list_head req_pending_master_completion;
+       struct list_head req_pending_local;
+
+       /* for generic IO accounting */
+       unsigned long start_jif;
+
+       /* for DRBD internal statistics */
+
+       /* Minimal set of time stamps to determine if we wait for activity log
+        * transactions, local disk or peer.  32 bit "jiffies" are good enough,
+        * we don't expect a DRBD request to be stalled for several month.
+        */
+
+       /* before actual request processing */
+       unsigned long in_actlog_jif;
+
+       /* local disk */
+       unsigned long pre_submit_jif;
+
+       /* per connection */
+       unsigned long pre_send_jif;
+       unsigned long acked_jif;
+       unsigned long net_done_jif;
+
+       /* Possibly even more detail to track each phase:
+        *  master_completion_jif
+        *      how long did it take to complete the master bio
+        *      (application visible latency)
+        *  allocated_jif
+        *      how long the master bio was blocked until we finally allocated
+        *      a tracking struct
+        *  in_actlog_jif
+        *      how long did we wait for activity log transactions
+        *
+        *  net_queued_jif
+        *      when did we finally queue it for sending
+        *  pre_send_jif
+        *      when did we start sending it
+        *  post_send_jif
+        *      how long did we block in the network stack trying to send it
+        *  acked_jif
+        *      when did we receive (or fake, in protocol A) a remote ACK
+        *  net_done_jif
+        *      when did we receive final acknowledgement (P_BARRIER_ACK),
+        *      or decide, e.g. on connection loss, that we do no longer expect
+        *      anything from this peer for this request.
+        *
+        *  pre_submit_jif
+        *  post_sub_jif
+        *      when did we start submiting to the lower level device,
+        *      and how long did we block in that submit function
+        *  local_completion_jif
+        *      how long did it take the lower level device to complete this request
+        */
+
 
        /* once it hits 0, we may complete the master_bio */
        atomic_t completion_ref;
@@ -366,6 +422,7 @@ struct drbd_peer_request {
        struct drbd_interval i;
        /* see comments on ee flag bits below */
        unsigned long flags;
+       unsigned long submit_jif;
        union {
                u64 block_id;
                struct digest_info *digest;
@@ -408,6 +465,17 @@ enum {
 
        /* Is set when net_conf had two_primaries set while creating this peer_req */
        __EE_IN_INTERVAL_TREE,
+
+       /* for debugfs: */
+       /* has this been submitted, or does it still wait for something else? */
+       __EE_SUBMITTED,
+
+       /* this is/was a write request */
+       __EE_WRITE,
+
+       /* this originates from application on peer
+        * (not some resync or verify or other DRBD internal request) */
+       __EE_APPLICATION,
 };
 #define EE_CALL_AL_COMPLETE_IO (1<<__EE_CALL_AL_COMPLETE_IO)
 #define EE_MAY_SET_IN_SYNC     (1<<__EE_MAY_SET_IN_SYNC)
@@ -419,6 +487,9 @@ enum {
 #define EE_RESTART_REQUESTS    (1<<__EE_RESTART_REQUESTS)
 #define EE_SEND_WRITE_ACK      (1<<__EE_SEND_WRITE_ACK)
 #define EE_IN_INTERVAL_TREE    (1<<__EE_IN_INTERVAL_TREE)
+#define EE_SUBMITTED           (1<<__EE_SUBMITTED)
+#define EE_WRITE               (1<<__EE_WRITE)
+#define EE_APPLICATION         (1<<__EE_APPLICATION)
 
 /* flag bits per device */
 enum {
@@ -433,11 +504,11 @@ enum {
        CONSIDER_RESYNC,
 
        MD_NO_FUA,              /* Users wants us to not use FUA/FLUSH on meta data dev */
+
        SUSPEND_IO,             /* suspend application io */
        BITMAP_IO,              /* suspend application io;
                                   once no more io in flight, start bitmap io */
        BITMAP_IO_QUEUED,       /* Started bitmap IO */
-       GO_DISKLESS,            /* Disk is being detached, on io-error or admin request. */
        WAS_IO_ERROR,           /* Local disk failed, returned IO error */
        WAS_READ_ERROR,         /* Local disk READ failed (set additionally to the above) */
        FORCE_DETACH,           /* Force-detach from local disk, aborting any pending local IO */
@@ -450,6 +521,20 @@ enum {
        B_RS_H_DONE,            /* Before resync handler done (already executed) */
        DISCARD_MY_DATA,        /* discard_my_data flag per volume */
        READ_BALANCE_RR,
+
+       FLUSH_PENDING,          /* if set, device->flush_jif is when we submitted that flush
+                                * from drbd_flush_after_epoch() */
+
+       /* cleared only after backing device related structures have been destroyed. */
+       GOING_DISKLESS,         /* Disk is being detached, because of io-error, or admin request. */
+
+       /* to be used in drbd_device_post_work() */
+       GO_DISKLESS,            /* tell worker to schedule cleanup before detach */
+       DESTROY_DISK,           /* tell worker to close backing devices and destroy related structures. */
+       MD_SYNC,                /* tell worker to call drbd_md_sync() */
+       RS_START,               /* tell worker to start resync/OV */
+       RS_PROGRESS,            /* tell worker that resync made significant progress */
+       RS_DONE,                /* tell worker that resync is done */
 };
 
 struct drbd_bitmap; /* opaque for drbd_device */
@@ -531,6 +616,11 @@ struct drbd_backing_dev {
 };
 
 struct drbd_md_io {
+       struct page *page;
+       unsigned long start_jif;        /* last call to drbd_md_get_buffer */
+       unsigned long submit_jif;       /* last _drbd_md_sync_page_io() submit */
+       const char *current_use;
+       atomic_t in_use;
        unsigned int done;
        int error;
 };
@@ -577,10 +667,18 @@ enum {
                                 * and potentially deadlock on, this drbd worker.
                                 */
        DISCONNECT_SENT,
+
+       DEVICE_WORK_PENDING,    /* tell worker that some device has pending work */
 };
 
 struct drbd_resource {
        char *name;
+#ifdef CONFIG_DEBUG_FS
+       struct dentry *debugfs_res;
+       struct dentry *debugfs_res_volumes;
+       struct dentry *debugfs_res_connections;
+       struct dentry *debugfs_res_in_flight_summary;
+#endif
        struct kref kref;
        struct idr devices;             /* volume number to device mapping */
        struct list_head connections;
@@ -594,12 +692,28 @@ struct drbd_resource {
        unsigned susp_nod:1;            /* IO suspended because no data */
        unsigned susp_fen:1;            /* IO suspended because fence peer handler runs */
 
+       enum write_ordering_e write_ordering;
+
        cpumask_var_t cpu_mask;
 };
 
+struct drbd_thread_timing_details
+{
+       unsigned long start_jif;
+       void *cb_addr;
+       const char *caller_fn;
+       unsigned int line;
+       unsigned int cb_nr;
+};
+
 struct drbd_connection {
        struct list_head connections;
        struct drbd_resource *resource;
+#ifdef CONFIG_DEBUG_FS
+       struct dentry *debugfs_conn;
+       struct dentry *debugfs_conn_callback_history;
+       struct dentry *debugfs_conn_oldest_requests;
+#endif
        struct kref kref;
        struct idr peer_devices;        /* volume number to peer device mapping */
        enum drbd_conns cstate;         /* Only C_STANDALONE to C_WF_REPORT_PARAMS */
@@ -636,7 +750,6 @@ struct drbd_connection {
        struct drbd_epoch *current_epoch;
        spinlock_t epoch_lock;
        unsigned int epochs;
-       enum write_ordering_e write_ordering;
        atomic_t current_tle_nr;        /* transfer log epoch number */
        unsigned current_tle_writes;    /* writes seen within this tl epoch */
 
@@ -645,9 +758,22 @@ struct drbd_connection {
        struct drbd_thread worker;
        struct drbd_thread asender;
 
+       /* cached pointers,
+        * so we can look up the oldest pending requests more quickly.
+        * protected by resource->req_lock */
+       struct drbd_request *req_next; /* DRBD 9: todo.req_next */
+       struct drbd_request *req_ack_pending;
+       struct drbd_request *req_not_net_done;
+
        /* sender side */
        struct drbd_work_queue sender_work;
 
+#define DRBD_THREAD_DETAILS_HIST       16
+       unsigned int w_cb_nr; /* keeps counting up */
+       unsigned int r_cb_nr; /* keeps counting up */
+       struct drbd_thread_timing_details w_timing_details[DRBD_THREAD_DETAILS_HIST];
+       struct drbd_thread_timing_details r_timing_details[DRBD_THREAD_DETAILS_HIST];
+
        struct {
                /* whether this sender thread
                 * has processed a single write yet. */
@@ -663,11 +789,22 @@ struct drbd_connection {
        } send;
 };
 
+void __update_timing_details(
+               struct drbd_thread_timing_details *tdp,
+               unsigned int *cb_nr,
+               void *cb,
+               const char *fn, const unsigned int line);
+
+#define update_worker_timing_details(c, cb) \
+       __update_timing_details(c->w_timing_details, &c->w_cb_nr, cb, __func__ , __LINE__ )
+#define update_receiver_timing_details(c, cb) \
+       __update_timing_details(c->r_timing_details, &c->r_cb_nr, cb, __func__ , __LINE__ )
+
 struct submit_worker {
        struct workqueue_struct *wq;
        struct work_struct worker;
 
-       spinlock_t lock;
+       /* protected by ..->resource->req_lock */
        struct list_head writes;
 };
 
@@ -675,12 +812,29 @@ struct drbd_peer_device {
        struct list_head peer_devices;
        struct drbd_device *device;
        struct drbd_connection *connection;
+#ifdef CONFIG_DEBUG_FS
+       struct dentry *debugfs_peer_dev;
+#endif
 };
 
 struct drbd_device {
        struct drbd_resource *resource;
        struct list_head peer_devices;
-       int vnr;                        /* volume number within the connection */
+       struct list_head pending_bitmap_io;
+
+       unsigned long flush_jif;
+#ifdef CONFIG_DEBUG_FS
+       struct dentry *debugfs_minor;
+       struct dentry *debugfs_vol;
+       struct dentry *debugfs_vol_oldest_requests;
+       struct dentry *debugfs_vol_act_log_extents;
+       struct dentry *debugfs_vol_resync_extents;
+       struct dentry *debugfs_vol_data_gen_id;
+#endif
+
+       unsigned int vnr;       /* volume number within the connection */
+       unsigned int minor;     /* device minor number */
+
        struct kref kref;
 
        /* things that are stored as / read from meta data on disk */
@@ -697,19 +851,10 @@ struct drbd_device {
        unsigned long last_reattach_jif;
        struct drbd_work resync_work;
        struct drbd_work unplug_work;
-       struct drbd_work go_diskless;
-       struct drbd_work md_sync_work;
-       struct drbd_work start_resync_work;
        struct timer_list resync_timer;
        struct timer_list md_sync_timer;
        struct timer_list start_resync_timer;
        struct timer_list request_timer;
-#ifdef DRBD_DEBUG_MD_SYNC
-       struct {
-               unsigned int line;
-               const char* func;
-       } last_md_mark_dirty;
-#endif
 
        /* Used after attach while negotiating new disk state. */
        union drbd_state new_state_tmp;
@@ -724,6 +869,7 @@ struct drbd_device {
        unsigned int al_writ_cnt;
        unsigned int bm_writ_cnt;
        atomic_t ap_bio_cnt;     /* Requests we need to complete */
+       atomic_t ap_actlog_cnt;  /* Requests waiting for activity log */
        atomic_t ap_pending_cnt; /* AP data packets on the wire, ack expected */
        atomic_t rs_pending_cnt; /* RS request/data packets on the wire */
        atomic_t unacked_cnt;    /* Need to send replies for */
@@ -733,6 +879,13 @@ struct drbd_device {
        struct rb_root read_requests;
        struct rb_root write_requests;
 
+       /* for statistics and timeouts */
+       /* [0] read, [1] write */
+       struct list_head pending_master_completion[2];
+       struct list_head pending_completion[2];
+
+       /* use checksums for *this* resync */
+       bool use_csums;
        /* blocks to resync in this run [unit BM_BLOCK_SIZE] */
        unsigned long rs_total;
        /* number of resync blocks that failed in this run */
@@ -788,9 +941,7 @@ struct drbd_device {
        atomic_t pp_in_use;             /* allocated from page pool */
        atomic_t pp_in_use_by_net;      /* sendpage()d, still referenced by tcp */
        wait_queue_head_t ee_wait;
-       struct page *md_io_page;        /* one page buffer for md_io */
        struct drbd_md_io md_io;
-       atomic_t md_io_in_use;          /* protects the md_io, md_io_page and md_io_tmpp */
        spinlock_t al_lock;
        wait_queue_head_t al_wait;
        struct lru_cache *act_log;      /* activity log */
@@ -800,7 +951,6 @@ struct drbd_device {
        atomic_t packet_seq;
        unsigned int peer_seq;
        spinlock_t peer_seq_lock;
-       unsigned int minor;
        unsigned long comm_bm_set; /* communicated number of set bits. */
        struct bm_io_work bm_io_work;
        u64 ed_uuid; /* UUID of the exposed data */
@@ -824,6 +974,21 @@ struct drbd_device {
        struct submit_worker submit;
 };
 
+struct drbd_bm_aio_ctx {
+       struct drbd_device *device;
+       struct list_head list; /* on device->pending_bitmap_io */;
+       unsigned long start_jif;
+       atomic_t in_flight;
+       unsigned int done;
+       unsigned flags;
+#define BM_AIO_COPY_PAGES      1
+#define BM_AIO_WRITE_HINTED    2
+#define BM_AIO_WRITE_ALL_PAGES 4
+#define BM_AIO_READ            8
+       int error;
+       struct kref kref;
+};
+
 struct drbd_config_context {
        /* assigned from drbd_genlmsghdr */
        unsigned int minor;
@@ -949,7 +1114,7 @@ extern int drbd_send_ov_request(struct drbd_peer_device *, sector_t sector, int
 extern int drbd_send_bitmap(struct drbd_device *device);
 extern void drbd_send_sr_reply(struct drbd_peer_device *, enum drbd_state_rv retcode);
 extern void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode);
-extern void drbd_free_bc(struct drbd_backing_dev *ldev);
+extern void drbd_free_ldev(struct drbd_backing_dev *ldev);
 extern void drbd_device_cleanup(struct drbd_device *device);
 void drbd_print_uuids(struct drbd_device *device, const char *text);
 
@@ -966,13 +1131,7 @@ extern void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must
 extern void drbd_md_set_flag(struct drbd_device *device, int flags) __must_hold(local);
 extern void drbd_md_clear_flag(struct drbd_device *device, int flags)__must_hold(local);
 extern int drbd_md_test_flag(struct drbd_backing_dev *, int);
-#ifndef DRBD_DEBUG_MD_SYNC
 extern void drbd_md_mark_dirty(struct drbd_device *device);
-#else
-#define drbd_md_mark_dirty(m)  drbd_md_mark_dirty_(m, __LINE__ , __func__ )
-extern void drbd_md_mark_dirty_(struct drbd_device *device,
-               unsigned int line, const char *func);
-#endif
 extern void drbd_queue_bitmap_io(struct drbd_device *device,
                                 int (*io_fn)(struct drbd_device *),
                                 void (*done)(struct drbd_device *, int),
@@ -983,9 +1142,8 @@ extern int drbd_bitmap_io(struct drbd_device *device,
 extern int drbd_bitmap_io_from_worker(struct drbd_device *device,
                int (*io_fn)(struct drbd_device *),
                char *why, enum bm_flag flags);
-extern int drbd_bmio_set_n_write(struct drbd_device *device);
-extern int drbd_bmio_clear_n_write(struct drbd_device *device);
-extern void drbd_ldev_destroy(struct drbd_device *device);
+extern int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local);
+extern int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local);
 
 /* Meta data layout
  *
@@ -1105,17 +1263,21 @@ struct bm_extent {
 /* in which _bitmap_ extent (resp. sector) the bit for a certain
  * _storage_ sector is located in */
 #define BM_SECT_TO_EXT(x)   ((x)>>(BM_EXT_SHIFT-9))
+#define BM_BIT_TO_EXT(x)    ((x) >> (BM_EXT_SHIFT - BM_BLOCK_SHIFT))
 
-/* how much _storage_ sectors we have per bitmap sector */
+/* first storage sector a bitmap extent corresponds to */
 #define BM_EXT_TO_SECT(x)   ((sector_t)(x) << (BM_EXT_SHIFT-9))
+/* how much _storage_ sectors we have per bitmap extent */
 #define BM_SECT_PER_EXT     BM_EXT_TO_SECT(1)
+/* how many bits are covered by one bitmap extent (resync extent) */
+#define BM_BITS_PER_EXT     (1UL << (BM_EXT_SHIFT - BM_BLOCK_SHIFT))
+
+#define BM_BLOCKS_PER_BM_EXT_MASK  (BM_BITS_PER_EXT - 1)
+
 
 /* in one sector of the bitmap, we have this many activity_log extents. */
 #define AL_EXT_PER_BM_SECT  (1 << (BM_EXT_SHIFT - AL_EXTENT_SHIFT))
 
-#define BM_BLOCKS_PER_BM_EXT_B (BM_EXT_SHIFT - BM_BLOCK_SHIFT)
-#define BM_BLOCKS_PER_BM_EXT_MASK  ((1<<BM_BLOCKS_PER_BM_EXT_B) - 1)
-
 /* the extent in "PER_EXTENT" below is an activity log extent
  * we need that many (long words/bytes) to store the bitmap
  *                  of one AL_EXTENT_SIZE chunk of storage.
@@ -1195,11 +1357,11 @@ extern void _drbd_bm_set_bits(struct drbd_device *device,
                const unsigned long s, const unsigned long e);
 extern int  drbd_bm_test_bit(struct drbd_device *device, unsigned long bitnr);
 extern int  drbd_bm_e_weight(struct drbd_device *device, unsigned long enr);
-extern int  drbd_bm_write_page(struct drbd_device *device, unsigned int idx) __must_hold(local);
 extern int  drbd_bm_read(struct drbd_device *device) __must_hold(local);
 extern void drbd_bm_mark_for_writeout(struct drbd_device *device, int page_nr);
 extern int  drbd_bm_write(struct drbd_device *device) __must_hold(local);
 extern int  drbd_bm_write_hinted(struct drbd_device *device) __must_hold(local);
+extern int  drbd_bm_write_lazy(struct drbd_device *device, unsigned upper_idx) __must_hold(local);
 extern int drbd_bm_write_all(struct drbd_device *device) __must_hold(local);
 extern int  drbd_bm_write_copy_pages(struct drbd_device *device) __must_hold(local);
 extern size_t       drbd_bm_words(struct drbd_device *device);
@@ -1213,7 +1375,6 @@ extern unsigned long _drbd_bm_find_next(struct drbd_device *device, unsigned lon
 extern unsigned long _drbd_bm_find_next_zero(struct drbd_device *device, unsigned long bm_fo);
 extern unsigned long _drbd_bm_total_weight(struct drbd_device *device);
 extern unsigned long drbd_bm_total_weight(struct drbd_device *device);
-extern int drbd_bm_rs_done(struct drbd_device *device);
 /* for receive_bitmap */
 extern void drbd_bm_merge_lel(struct drbd_device *device, size_t offset,
                size_t number, unsigned long *buffer);
@@ -1312,7 +1473,7 @@ enum determine_dev_size {
 extern enum determine_dev_size
 drbd_determine_dev_size(struct drbd_device *, enum dds_flags, struct resize_parms *) __must_hold(local);
 extern void resync_after_online_grow(struct drbd_device *);
-extern void drbd_reconsider_max_bio_size(struct drbd_device *device);
+extern void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev);
 extern enum drbd_state_rv drbd_set_role(struct drbd_device *device,
                                        enum drbd_role new_role,
                                        int force);
@@ -1333,7 +1494,7 @@ extern void resume_next_sg(struct drbd_device *device);
 extern void suspend_other_sg(struct drbd_device *device);
 extern int drbd_resync_finished(struct drbd_device *device);
 /* maybe rather drbd_main.c ? */
-extern void *drbd_md_get_buffer(struct drbd_device *device);
+extern void *drbd_md_get_buffer(struct drbd_device *device, const char *intent);
 extern void drbd_md_put_buffer(struct drbd_device *device);
 extern int drbd_md_sync_page_io(struct drbd_device *device,
                struct drbd_backing_dev *bdev, sector_t sector, int rw);
@@ -1380,7 +1541,8 @@ extern void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req);
 extern int drbd_receiver(struct drbd_thread *thi);
 extern int drbd_asender(struct drbd_thread *thi);
 extern bool drbd_rs_c_min_rate_throttle(struct drbd_device *device);
-extern bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector);
+extern bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
+               bool throttle_if_app_is_waiting);
 extern int drbd_submit_peer_request(struct drbd_device *,
                                    struct drbd_peer_request *, const unsigned,
                                    const int);
@@ -1464,10 +1626,7 @@ static inline void drbd_generic_make_request(struct drbd_device *device,
 {
        __release(local);
        if (!bio->bi_bdev) {
-               printk(KERN_ERR "drbd%d: drbd_generic_make_request: "
-                               "bio->bi_bdev == NULL\n",
-                      device_to_minor(device));
-               dump_stack();
+               drbd_err(device, "drbd_generic_make_request: bio->bi_bdev == NULL\n");
                bio_endio(bio, -ENODEV);
                return;
        }
@@ -1478,7 +1637,8 @@ static inline void drbd_generic_make_request(struct drbd_device *device,
                generic_make_request(bio);
 }
 
-void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo);
+void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
+                             enum write_ordering_e wo);
 
 /* drbd_proc.c */
 extern struct proc_dir_entry *drbd_proc;
@@ -1489,9 +1649,9 @@ extern const char *drbd_role_str(enum drbd_role s);
 /* drbd_actlog.c */
 extern bool drbd_al_begin_io_prepare(struct drbd_device *device, struct drbd_interval *i);
 extern int drbd_al_begin_io_nonblock(struct drbd_device *device, struct drbd_interval *i);
-extern void drbd_al_begin_io_commit(struct drbd_device *device, bool delegate);
+extern void drbd_al_begin_io_commit(struct drbd_device *device);
 extern bool drbd_al_begin_io_fastpath(struct drbd_device *device, struct drbd_interval *i);
-extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i, bool delegate);
+extern void drbd_al_begin_io(struct drbd_device *device, struct drbd_interval *i);
 extern void drbd_al_complete_io(struct drbd_device *device, struct drbd_interval *i);
 extern void drbd_rs_complete_io(struct drbd_device *device, sector_t sector);
 extern int drbd_rs_begin_io(struct drbd_device *device, sector_t sector);
@@ -1501,14 +1661,17 @@ extern int drbd_rs_del_all(struct drbd_device *device);
 extern void drbd_rs_failed_io(struct drbd_device *device,
                sector_t sector, int size);
 extern void drbd_advance_rs_marks(struct drbd_device *device, unsigned long still_to_go);
-extern void __drbd_set_in_sync(struct drbd_device *device, sector_t sector,
-               int size, const char *file, const unsigned int line);
+
+enum update_sync_bits_mode { RECORD_RS_FAILED, SET_OUT_OF_SYNC, SET_IN_SYNC };
+extern int __drbd_change_sync(struct drbd_device *device, sector_t sector, int size,
+               enum update_sync_bits_mode mode,
+               const char *file, const unsigned int line);
 #define drbd_set_in_sync(device, sector, size) \
-       __drbd_set_in_sync(device, sector, size, __FILE__, __LINE__)
-extern int __drbd_set_out_of_sync(struct drbd_device *device, sector_t sector,
-               int size, const char *file, const unsigned int line);
+       __drbd_change_sync(device, sector, size, SET_IN_SYNC, __FILE__, __LINE__)
 #define drbd_set_out_of_sync(device, sector, size) \
-       __drbd_set_out_of_sync(device, sector, size, __FILE__, __LINE__)
+       __drbd_change_sync(device, sector, size, SET_OUT_OF_SYNC, __FILE__, __LINE__)
+#define drbd_rs_failed_io(device, sector, size) \
+       __drbd_change_sync(device, sector, size, RECORD_RS_FAILED, __FILE__, __LINE__)
 extern void drbd_al_shrink(struct drbd_device *device);
 extern int drbd_initialize_al(struct drbd_device *, void *);
 
@@ -1764,25 +1927,38 @@ static inline sector_t drbd_md_ss(struct drbd_backing_dev *bdev)
 }
 
 static inline void
-drbd_queue_work_front(struct drbd_work_queue *q, struct drbd_work *w)
+drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
 {
        unsigned long flags;
        spin_lock_irqsave(&q->q_lock, flags);
-       list_add(&w->list, &q->q);
+       list_add_tail(&w->list, &q->q);
        spin_unlock_irqrestore(&q->q_lock, flags);
        wake_up(&q->q_wait);
 }
 
 static inline void
-drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
+drbd_queue_work_if_unqueued(struct drbd_work_queue *q, struct drbd_work *w)
 {
        unsigned long flags;
        spin_lock_irqsave(&q->q_lock, flags);
-       list_add_tail(&w->list, &q->q);
+       if (list_empty_careful(&w->list))
+               list_add_tail(&w->list, &q->q);
        spin_unlock_irqrestore(&q->q_lock, flags);
        wake_up(&q->q_wait);
 }
 
+static inline void
+drbd_device_post_work(struct drbd_device *device, int work_bit)
+{
+       if (!test_and_set_bit(work_bit, &device->flags)) {
+               struct drbd_connection *connection =
+                       first_peer_device(device)->connection;
+               struct drbd_work_queue *q = &connection->sender_work;
+               if (!test_and_set_bit(DEVICE_WORK_PENDING, &connection->flags))
+                       wake_up(&q->q_wait);
+       }
+}
+
 extern void drbd_flush_workqueue(struct drbd_work_queue *work_queue);
 
 static inline void wake_asender(struct drbd_connection *connection)
@@ -1859,7 +2035,7 @@ static inline void inc_ap_pending(struct drbd_device *device)
                        func, line,                                     \
                        atomic_read(&device->which))
 
-#define dec_ap_pending(device) _dec_ap_pending(device, __FUNCTION__, __LINE__)
+#define dec_ap_pending(device) _dec_ap_pending(device, __func__, __LINE__)
 static inline void _dec_ap_pending(struct drbd_device *device, const char *func, int line)
 {
        if (atomic_dec_and_test(&device->ap_pending_cnt))
@@ -1878,7 +2054,7 @@ static inline void inc_rs_pending(struct drbd_device *device)
        atomic_inc(&device->rs_pending_cnt);
 }
 
-#define dec_rs_pending(device) _dec_rs_pending(device, __FUNCTION__, __LINE__)
+#define dec_rs_pending(device) _dec_rs_pending(device, __func__, __LINE__)
 static inline void _dec_rs_pending(struct drbd_device *device, const char *func, int line)
 {
        atomic_dec(&device->rs_pending_cnt);
@@ -1899,20 +2075,29 @@ static inline void inc_unacked(struct drbd_device *device)
        atomic_inc(&device->unacked_cnt);
 }
 
-#define dec_unacked(device) _dec_unacked(device, __FUNCTION__, __LINE__)
+#define dec_unacked(device) _dec_unacked(device, __func__, __LINE__)
 static inline void _dec_unacked(struct drbd_device *device, const char *func, int line)
 {
        atomic_dec(&device->unacked_cnt);
        ERR_IF_CNT_IS_NEGATIVE(unacked_cnt, func, line);
 }
 
-#define sub_unacked(device, n) _sub_unacked(device, n, __FUNCTION__, __LINE__)
+#define sub_unacked(device, n) _sub_unacked(device, n, __func__, __LINE__)
 static inline void _sub_unacked(struct drbd_device *device, int n, const char *func, int line)
 {
        atomic_sub(n, &device->unacked_cnt);
        ERR_IF_CNT_IS_NEGATIVE(unacked_cnt, func, line);
 }
 
+static inline bool is_sync_state(enum drbd_conns connection_state)
+{
+       return
+          (connection_state == C_SYNC_SOURCE
+       ||  connection_state == C_SYNC_TARGET
+       ||  connection_state == C_PAUSED_SYNC_S
+       ||  connection_state == C_PAUSED_SYNC_T);
+}
+
 /**
  * get_ldev() - Increase the ref count on device->ldev. Returns 0 if there is no ldev
  * @M:         DRBD device.
@@ -1924,6 +2109,11 @@ static inline void _sub_unacked(struct drbd_device *device, int n, const char *f
 
 static inline void put_ldev(struct drbd_device *device)
 {
+       enum drbd_disk_state ds = device->state.disk;
+       /* We must check the state *before* the atomic_dec becomes visible,
+        * or we have a theoretical race where someone hitting zero,
+        * while state still D_FAILED, will then see D_DISKLESS in the
+        * condition below and calling into destroy, where he must not, yet. */
        int i = atomic_dec_return(&device->local_cnt);
 
        /* This may be called from some endio handler,
@@ -1932,15 +2122,13 @@ static inline void put_ldev(struct drbd_device *device)
        __release(local);
        D_ASSERT(device, i >= 0);
        if (i == 0) {
-               if (device->state.disk == D_DISKLESS)
+               if (ds == D_DISKLESS)
                        /* even internal references gone, safe to destroy */
-                       drbd_ldev_destroy(device);
-               if (device->state.disk == D_FAILED) {
+                       drbd_device_post_work(device, DESTROY_DISK);
+               if (ds == D_FAILED)
                        /* all application IO references gone. */
-                       if (!test_and_set_bit(GO_DISKLESS, &device->flags))
-                               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
-                                               &device->go_diskless);
-               }
+                       if (!test_and_set_bit(GOING_DISKLESS, &device->flags))
+                               drbd_device_post_work(device, GO_DISKLESS);
                wake_up(&device->misc_wait);
        }
 }
@@ -1964,54 +2152,6 @@ static inline int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_
 extern int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins);
 #endif
 
-/* you must have an "get_ldev" reference */
-static inline void drbd_get_syncer_progress(struct drbd_device *device,
-               unsigned long *bits_left, unsigned int *per_mil_done)
-{
-       /* this is to break it at compile time when we change that, in case we
-        * want to support more than (1<<32) bits on a 32bit arch. */
-       typecheck(unsigned long, device->rs_total);
-
-       /* note: both rs_total and rs_left are in bits, i.e. in
-        * units of BM_BLOCK_SIZE.
-        * for the percentage, we don't care. */
-
-       if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
-               *bits_left = device->ov_left;
-       else
-               *bits_left = drbd_bm_total_weight(device) - device->rs_failed;
-       /* >> 10 to prevent overflow,
-        * +1 to prevent division by zero */
-       if (*bits_left > device->rs_total) {
-               /* doh. maybe a logic bug somewhere.
-                * may also be just a race condition
-                * between this and a disconnect during sync.
-                * for now, just prevent in-kernel buffer overflow.
-                */
-               smp_rmb();
-               drbd_warn(device, "cs:%s rs_left=%lu > rs_total=%lu (rs_failed %lu)\n",
-                               drbd_conn_str(device->state.conn),
-                               *bits_left, device->rs_total, device->rs_failed);
-               *per_mil_done = 0;
-       } else {
-               /* Make sure the division happens in long context.
-                * We allow up to one petabyte storage right now,
-                * at a granularity of 4k per bit that is 2**38 bits.
-                * After shift right and multiplication by 1000,
-                * this should still fit easily into a 32bit long,
-                * so we don't need a 64bit division on 32bit arch.
-                * Note: currently we don't support such large bitmaps on 32bit
-                * arch anyways, but no harm done to be prepared for it here.
-                */
-               unsigned int shift = device->rs_total > UINT_MAX ? 16 : 10;
-               unsigned long left = *bits_left >> shift;
-               unsigned long total = 1UL + (device->rs_total >> shift);
-               unsigned long tmp = 1000UL - left * 1000UL/total;
-               *per_mil_done = tmp;
-       }
-}
-
-
 /* this throttles on-the-fly application requests
  * according to max_buffers settings;
  * maybe re-implement using semaphores? */
@@ -2201,25 +2341,6 @@ static inline int drbd_queue_order_type(struct drbd_device *device)
        return QUEUE_ORDERED_NONE;
 }
 
-static inline void drbd_md_flush(struct drbd_device *device)
-{
-       int r;
-
-       if (device->ldev == NULL) {
-               drbd_warn(device, "device->ldev == NULL in drbd_md_flush\n");
-               return;
-       }
-
-       if (test_bit(MD_NO_FUA, &device->flags))
-               return;
-
-       r = blkdev_issue_flush(device->ldev->md_bdev, GFP_NOIO, NULL);
-       if (r) {
-               set_bit(MD_NO_FUA, &device->flags);
-               drbd_err(device, "meta data flush failed with status %d, disabling md-flushes\n", r);
-       }
-}
-
 static inline struct drbd_connection *first_connection(struct drbd_resource *resource)
 {
        return list_first_entry_or_null(&resource->connections,
index f38fcb00c10d6c39b09c334483d80296b463e608..f210543f05f4782674de8abb0b0053e0e4833bfb 100644 (file)
@@ -10,7 +10,9 @@ struct drbd_interval {
        unsigned int size;      /* size in bytes */
        sector_t end;           /* highest interval end in subtree */
        int local:1             /* local or remote request? */;
-       int waiting:1;
+       int waiting:1;          /* someone is waiting for this to complete */
+       int completed:1;        /* this has been completed already;
+                                * ignore for conflict detection */
 };
 
 static inline void drbd_clear_interval(struct drbd_interval *i)
index 960645c26e6fc1b107e0db1ae016fd15800d3514..9b465bb68487b5c0e5a51f72d161ae7ade749453 100644 (file)
 
  */
 
+#define pr_fmt(fmt)    KBUILD_MODNAME ": " fmt
+
 #include <linux/module.h>
+#include <linux/jiffies.h>
 #include <linux/drbd.h>
 #include <asm/uaccess.h>
 #include <asm/types.h>
 #include "drbd_int.h"
 #include "drbd_protocol.h"
 #include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
-
 #include "drbd_vli.h"
+#include "drbd_debugfs.h"
 
 static DEFINE_MUTEX(drbd_main_mutex);
 static int drbd_open(struct block_device *bdev, fmode_t mode);
 static void drbd_release(struct gendisk *gd, fmode_t mode);
-static int w_md_sync(struct drbd_work *w, int unused);
 static void md_sync_timer_fn(unsigned long data);
 static int w_bitmap_io(struct drbd_work *w, int unused);
-static int w_go_diskless(struct drbd_work *w, int unused);
 
 MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
              "Lars Ellenberg <lars@linbit.com>");
@@ -264,7 +265,7 @@ bail:
 
 /**
  * _tl_restart() - Walks the transfer log, and applies an action to all requests
- * @device:    DRBD device.
+ * @connection:        DRBD connection to operate on.
  * @what:       The action/event to perform with all request objects
  *
  * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
@@ -662,6 +663,11 @@ static int __send_command(struct drbd_connection *connection, int vnr,
                            msg_flags);
        if (data && !err)
                err = drbd_send_all(connection, sock->socket, data, size, 0);
+       /* DRBD protocol "pings" are latency critical.
+        * This is supposed to trigger tcp_push_pending_frames() */
+       if (!err && (cmd == P_PING || cmd == P_PING_ACK))
+               drbd_tcp_nodelay(sock->socket);
+
        return err;
 }
 
@@ -1636,7 +1642,10 @@ int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request *
        if (peer_device->connection->agreed_pro_version >= 100) {
                if (req->rq_state & RQ_EXP_RECEIVE_ACK)
                        dp_flags |= DP_SEND_RECEIVE_ACK;
-               if (req->rq_state & RQ_EXP_WRITE_ACK)
+               /* During resync, request an explicit write ack,
+                * even in protocol != C */
+               if (req->rq_state & RQ_EXP_WRITE_ACK
+               || (dp_flags & DP_MAY_SET_IN_SYNC))
                        dp_flags |= DP_SEND_WRITE_ACK;
        }
        p->dp_flags = cpu_to_be32(dp_flags);
@@ -1900,6 +1909,7 @@ void drbd_init_set_defaults(struct drbd_device *device)
        drbd_set_defaults(device);
 
        atomic_set(&device->ap_bio_cnt, 0);
+       atomic_set(&device->ap_actlog_cnt, 0);
        atomic_set(&device->ap_pending_cnt, 0);
        atomic_set(&device->rs_pending_cnt, 0);
        atomic_set(&device->unacked_cnt, 0);
@@ -1908,7 +1918,7 @@ void drbd_init_set_defaults(struct drbd_device *device)
        atomic_set(&device->rs_sect_in, 0);
        atomic_set(&device->rs_sect_ev, 0);
        atomic_set(&device->ap_in_flight, 0);
-       atomic_set(&device->md_io_in_use, 0);
+       atomic_set(&device->md_io.in_use, 0);
 
        mutex_init(&device->own_state_mutex);
        device->state_mutex = &device->own_state_mutex;
@@ -1924,17 +1934,15 @@ void drbd_init_set_defaults(struct drbd_device *device)
        INIT_LIST_HEAD(&device->resync_reads);
        INIT_LIST_HEAD(&device->resync_work.list);
        INIT_LIST_HEAD(&device->unplug_work.list);
-       INIT_LIST_HEAD(&device->go_diskless.list);
-       INIT_LIST_HEAD(&device->md_sync_work.list);
-       INIT_LIST_HEAD(&device->start_resync_work.list);
        INIT_LIST_HEAD(&device->bm_io_work.w.list);
+       INIT_LIST_HEAD(&device->pending_master_completion[0]);
+       INIT_LIST_HEAD(&device->pending_master_completion[1]);
+       INIT_LIST_HEAD(&device->pending_completion[0]);
+       INIT_LIST_HEAD(&device->pending_completion[1]);
 
        device->resync_work.cb  = w_resync_timer;
        device->unplug_work.cb  = w_send_write_hint;
-       device->go_diskless.cb  = w_go_diskless;
-       device->md_sync_work.cb = w_md_sync;
        device->bm_io_work.w.cb = w_bitmap_io;
-       device->start_resync_work.cb = w_start_resync;
 
        init_timer(&device->resync_timer);
        init_timer(&device->md_sync_timer);
@@ -1992,7 +2000,7 @@ void drbd_device_cleanup(struct drbd_device *device)
                drbd_bm_cleanup(device);
        }
 
-       drbd_free_bc(device->ldev);
+       drbd_free_ldev(device->ldev);
        device->ldev = NULL;
 
        clear_bit(AL_SUSPENDED, &device->flags);
@@ -2006,7 +2014,6 @@ void drbd_device_cleanup(struct drbd_device *device)
        D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q));
        D_ASSERT(device, list_empty(&device->resync_work.list));
        D_ASSERT(device, list_empty(&device->unplug_work.list));
-       D_ASSERT(device, list_empty(&device->go_diskless.list));
 
        drbd_set_defaults(device);
 }
@@ -2129,20 +2136,6 @@ Enomem:
        return -ENOMEM;
 }
 
-static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
-       void *unused)
-{
-       /* just so we have it.  you never know what interesting things we
-        * might want to do here some day...
-        */
-
-       return NOTIFY_DONE;
-}
-
-static struct notifier_block drbd_notifier = {
-       .notifier_call = drbd_notify_sys,
-};
-
 static void drbd_release_all_peer_reqs(struct drbd_device *device)
 {
        int rr;
@@ -2173,7 +2166,7 @@ void drbd_destroy_device(struct kref *kref)
 {
        struct drbd_device *device = container_of(kref, struct drbd_device, kref);
        struct drbd_resource *resource = device->resource;
-       struct drbd_connection *connection;
+       struct drbd_peer_device *peer_device, *tmp_peer_device;
 
        del_timer_sync(&device->request_timer);
 
@@ -2187,7 +2180,7 @@ void drbd_destroy_device(struct kref *kref)
        if (device->this_bdev)
                bdput(device->this_bdev);
 
-       drbd_free_bc(device->ldev);
+       drbd_free_ldev(device->ldev);
        device->ldev = NULL;
 
        drbd_release_all_peer_reqs(device);
@@ -2200,15 +2193,20 @@ void drbd_destroy_device(struct kref *kref)
 
        if (device->bitmap) /* should no longer be there. */
                drbd_bm_cleanup(device);
-       __free_page(device->md_io_page);
+       __free_page(device->md_io.page);
        put_disk(device->vdisk);
        blk_cleanup_queue(device->rq_queue);
        kfree(device->rs_plan_s);
-       kfree(first_peer_device(device));
-       kfree(device);
 
-       for_each_connection(connection, resource)
-               kref_put(&connection->kref, drbd_destroy_connection);
+       /* not for_each_connection(connection, resource):
+        * those may have been cleaned up and disassociated already.
+        */
+       for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
+               kref_put(&peer_device->connection->kref, drbd_destroy_connection);
+               kfree(peer_device);
+       }
+       memset(device, 0xfd, sizeof(*device));
+       kfree(device);
        kref_put(&resource->kref, drbd_destroy_resource);
 }
 
@@ -2236,7 +2234,7 @@ static void do_retry(struct work_struct *ws)
        list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
                struct drbd_device *device = req->device;
                struct bio *bio = req->master_bio;
-               unsigned long start_time = req->start_time;
+               unsigned long start_jif = req->start_jif;
                bool expected;
 
                expected =
@@ -2271,10 +2269,12 @@ static void do_retry(struct work_struct *ws)
                /* We are not just doing generic_make_request(),
                 * as we want to keep the start_time information. */
                inc_ap_bio(device);
-               __drbd_make_request(device, bio, start_time);
+               __drbd_make_request(device, bio, start_jif);
        }
 }
 
+/* called via drbd_req_put_completion_ref(),
+ * holds resource->req_lock */
 void drbd_restart_request(struct drbd_request *req)
 {
        unsigned long flags;
@@ -2298,6 +2298,7 @@ void drbd_destroy_resource(struct kref *kref)
        idr_destroy(&resource->devices);
        free_cpumask_var(resource->cpu_mask);
        kfree(resource->name);
+       memset(resource, 0xf2, sizeof(*resource));
        kfree(resource);
 }
 
@@ -2307,8 +2308,10 @@ void drbd_free_resource(struct drbd_resource *resource)
 
        for_each_connection_safe(connection, tmp, resource) {
                list_del(&connection->connections);
+               drbd_debugfs_connection_cleanup(connection);
                kref_put(&connection->kref, drbd_destroy_connection);
        }
+       drbd_debugfs_resource_cleanup(resource);
        kref_put(&resource->kref, drbd_destroy_resource);
 }
 
@@ -2318,8 +2321,6 @@ static void drbd_cleanup(void)
        struct drbd_device *device;
        struct drbd_resource *resource, *tmp;
 
-       unregister_reboot_notifier(&drbd_notifier);
-
        /* first remove proc,
         * drbdsetup uses it's presence to detect
         * whether DRBD is loaded.
@@ -2335,6 +2336,7 @@ static void drbd_cleanup(void)
                destroy_workqueue(retry.wq);
 
        drbd_genl_unregister();
+       drbd_debugfs_cleanup();
 
        idr_for_each_entry(&drbd_devices, device, i)
                drbd_delete_device(device);
@@ -2350,7 +2352,7 @@ static void drbd_cleanup(void)
 
        idr_destroy(&drbd_devices);
 
-       printk(KERN_INFO "drbd: module cleanup done.\n");
+       pr_info("module cleanup done.\n");
 }
 
 /**
@@ -2539,6 +2541,20 @@ int set_resource_options(struct drbd_resource *resource, struct res_opts *res_op
        if (nr_cpu_ids > 1 && res_opts->cpu_mask[0] != 0) {
                err = bitmap_parse(res_opts->cpu_mask, DRBD_CPU_MASK_SIZE,
                                   cpumask_bits(new_cpu_mask), nr_cpu_ids);
+               if (err == -EOVERFLOW) {
+                       /* So what. mask it out. */
+                       cpumask_var_t tmp_cpu_mask;
+                       if (zalloc_cpumask_var(&tmp_cpu_mask, GFP_KERNEL)) {
+                               cpumask_setall(tmp_cpu_mask);
+                               cpumask_and(new_cpu_mask, new_cpu_mask, tmp_cpu_mask);
+                               drbd_warn(resource, "Overflow in bitmap_parse(%.12s%s), truncating to %u bits\n",
+                                       res_opts->cpu_mask,
+                                       strlen(res_opts->cpu_mask) > 12 ? "..." : "",
+                                       nr_cpu_ids);
+                               free_cpumask_var(tmp_cpu_mask);
+                               err = 0;
+                       }
+               }
                if (err) {
                        drbd_warn(resource, "bitmap_parse() failed with %d\n", err);
                        /* retcode = ERR_CPU_MASK_PARSE; */
@@ -2579,10 +2595,12 @@ struct drbd_resource *drbd_create_resource(const char *name)
        kref_init(&resource->kref);
        idr_init(&resource->devices);
        INIT_LIST_HEAD(&resource->connections);
+       resource->write_ordering = WO_bdev_flush;
        list_add_tail_rcu(&resource->resources, &drbd_resources);
        mutex_init(&resource->conf_update);
        mutex_init(&resource->adm_mutex);
        spin_lock_init(&resource->req_lock);
+       drbd_debugfs_resource_add(resource);
        return resource;
 
 fail_free_name:
@@ -2593,7 +2611,7 @@ fail:
        return NULL;
 }
 
-/* caller must be under genl_lock() */
+/* caller must be under adm_mutex */
 struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
 {
        struct drbd_resource *resource;
@@ -2617,7 +2635,6 @@ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
        INIT_LIST_HEAD(&connection->current_epoch->list);
        connection->epochs = 1;
        spin_lock_init(&connection->epoch_lock);
-       connection->write_ordering = WO_bdev_flush;
 
        connection->send.seen_any_write_yet = false;
        connection->send.current_epoch_nr = 0;
@@ -2652,6 +2669,7 @@ struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
 
        kref_get(&resource->kref);
        list_add_tail_rcu(&connection->connections, &resource->connections);
+       drbd_debugfs_connection_add(connection);
        return connection;
 
 fail_resource:
@@ -2680,6 +2698,7 @@ void drbd_destroy_connection(struct kref *kref)
        drbd_free_socket(&connection->data);
        kfree(connection->int_dig_in);
        kfree(connection->int_dig_vv);
+       memset(connection, 0xfc, sizeof(*connection));
        kfree(connection);
        kref_put(&resource->kref, drbd_destroy_resource);
 }
@@ -2694,7 +2713,6 @@ static int init_submitter(struct drbd_device *device)
                return -ENOMEM;
 
        INIT_WORK(&device->submit.worker, do_submit);
-       spin_lock_init(&device->submit.lock);
        INIT_LIST_HEAD(&device->submit.writes);
        return 0;
 }
@@ -2764,8 +2782,8 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
        blk_queue_merge_bvec(q, drbd_merge_bvec);
        q->queue_lock = &resource->req_lock;
 
-       device->md_io_page = alloc_page(GFP_KERNEL);
-       if (!device->md_io_page)
+       device->md_io.page = alloc_page(GFP_KERNEL);
+       if (!device->md_io.page)
                goto out_no_io_page;
 
        if (drbd_bm_init(device))
@@ -2794,6 +2812,7 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
        kref_get(&device->kref);
 
        INIT_LIST_HEAD(&device->peer_devices);
+       INIT_LIST_HEAD(&device->pending_bitmap_io);
        for_each_connection(connection, resource) {
                peer_device = kzalloc(sizeof(struct drbd_peer_device), GFP_KERNEL);
                if (!peer_device)
@@ -2829,7 +2848,10 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
                for_each_peer_device(peer_device, device)
                        drbd_connected(peer_device);
        }
-
+       /* move to create_peer_device() */
+       for_each_peer_device(peer_device, device)
+               drbd_debugfs_peer_device_add(peer_device);
+       drbd_debugfs_device_add(device);
        return NO_ERROR;
 
 out_idr_remove_vol:
@@ -2853,7 +2875,7 @@ out_idr_remove_minor:
 out_no_minor_idr:
        drbd_bm_cleanup(device);
 out_no_bitmap:
-       __free_page(device->md_io_page);
+       __free_page(device->md_io.page);
 out_no_io_page:
        put_disk(disk);
 out_no_disk:
@@ -2868,8 +2890,13 @@ void drbd_delete_device(struct drbd_device *device)
 {
        struct drbd_resource *resource = device->resource;
        struct drbd_connection *connection;
+       struct drbd_peer_device *peer_device;
        int refs = 3;
 
+       /* move to free_peer_device() */
+       for_each_peer_device(peer_device, device)
+               drbd_debugfs_peer_device_cleanup(peer_device);
+       drbd_debugfs_device_cleanup(device);
        for_each_connection(connection, resource) {
                idr_remove(&connection->peer_devices, device->vnr);
                refs++;
@@ -2881,13 +2908,12 @@ void drbd_delete_device(struct drbd_device *device)
        kref_sub(&device->kref, refs, drbd_destroy_device);
 }
 
-int __init drbd_init(void)
+static int __init drbd_init(void)
 {
        int err;
 
        if (minor_count < DRBD_MINOR_COUNT_MIN || minor_count > DRBD_MINOR_COUNT_MAX) {
-               printk(KERN_ERR
-                      "drbd: invalid minor_count (%d)\n", minor_count);
+               pr_err("invalid minor_count (%d)\n", minor_count);
 #ifdef MODULE
                return -EINVAL;
 #else
@@ -2897,14 +2923,11 @@ int __init drbd_init(void)
 
        err = register_blkdev(DRBD_MAJOR, "drbd");
        if (err) {
-               printk(KERN_ERR
-                      "drbd: unable to register block device major %d\n",
+               pr_err("unable to register block device major %d\n",
                       DRBD_MAJOR);
                return err;
        }
 
-       register_reboot_notifier(&drbd_notifier);
-
        /*
         * allocate all necessary structs
         */
@@ -2918,7 +2941,7 @@ int __init drbd_init(void)
 
        err = drbd_genl_register();
        if (err) {
-               printk(KERN_ERR "drbd: unable to register generic netlink family\n");
+               pr_err("unable to register generic netlink family\n");
                goto fail;
        }
 
@@ -2929,38 +2952,39 @@ int __init drbd_init(void)
        err = -ENOMEM;
        drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
        if (!drbd_proc) {
-               printk(KERN_ERR "drbd: unable to register proc file\n");
+               pr_err("unable to register proc file\n");
                goto fail;
        }
 
        retry.wq = create_singlethread_workqueue("drbd-reissue");
        if (!retry.wq) {
-               printk(KERN_ERR "drbd: unable to create retry workqueue\n");
+               pr_err("unable to create retry workqueue\n");
                goto fail;
        }
        INIT_WORK(&retry.worker, do_retry);
        spin_lock_init(&retry.lock);
        INIT_LIST_HEAD(&retry.writes);
 
-       printk(KERN_INFO "drbd: initialized. "
+       if (drbd_debugfs_init())
+               pr_notice("failed to initialize debugfs -- will not be available\n");
+
+       pr_info("initialized. "
               "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
               API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
-       printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
-       printk(KERN_INFO "drbd: registered as block device major %d\n",
-               DRBD_MAJOR);
-
+       pr_info("%s\n", drbd_buildtag());
+       pr_info("registered as block device major %d\n", DRBD_MAJOR);
        return 0; /* Success! */
 
 fail:
        drbd_cleanup();
        if (err == -ENOMEM)
-               printk(KERN_ERR "drbd: ran out of memory\n");
+               pr_err("ran out of memory\n");
        else
-               printk(KERN_ERR "drbd: initialization failure\n");
+               pr_err("initialization failure\n");
        return err;
 }
 
-void drbd_free_bc(struct drbd_backing_dev *ldev)
+void drbd_free_ldev(struct drbd_backing_dev *ldev)
 {
        if (ldev == NULL)
                return;
@@ -2972,24 +2996,29 @@ void drbd_free_bc(struct drbd_backing_dev *ldev)
        kfree(ldev);
 }
 
-void drbd_free_sock(struct drbd_connection *connection)
+static void drbd_free_one_sock(struct drbd_socket *ds)
 {
-       if (connection->data.socket) {
-               mutex_lock(&connection->data.mutex);
-               kernel_sock_shutdown(connection->data.socket, SHUT_RDWR);
-               sock_release(connection->data.socket);
-               connection->data.socket = NULL;
-               mutex_unlock(&connection->data.mutex);
-       }
-       if (connection->meta.socket) {
-               mutex_lock(&connection->meta.mutex);
-               kernel_sock_shutdown(connection->meta.socket, SHUT_RDWR);
-               sock_release(connection->meta.socket);
-               connection->meta.socket = NULL;
-               mutex_unlock(&connection->meta.mutex);
+       struct socket *s;
+       mutex_lock(&ds->mutex);
+       s = ds->socket;
+       ds->socket = NULL;
+       mutex_unlock(&ds->mutex);
+       if (s) {
+               /* so debugfs does not need to mutex_lock() */
+               synchronize_rcu();
+               kernel_sock_shutdown(s, SHUT_RDWR);
+               sock_release(s);
        }
 }
 
+void drbd_free_sock(struct drbd_connection *connection)
+{
+       if (connection->data.socket)
+               drbd_free_one_sock(&connection->data);
+       if (connection->meta.socket)
+               drbd_free_one_sock(&connection->meta);
+}
+
 /* meta data management */
 
 void conn_md_sync(struct drbd_connection *connection)
@@ -3093,7 +3122,7 @@ void drbd_md_sync(struct drbd_device *device)
        if (!get_ldev_if_state(device, D_FAILED))
                return;
 
-       buffer = drbd_md_get_buffer(device);
+       buffer = drbd_md_get_buffer(device, __func__);
        if (!buffer)
                goto out;
 
@@ -3253,7 +3282,7 @@ int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev)
        if (device->state.disk != D_DISKLESS)
                return ERR_DISK_CONFIGURED;
 
-       buffer = drbd_md_get_buffer(device);
+       buffer = drbd_md_get_buffer(device, __func__);
        if (!buffer)
                return ERR_NOMEM;
 
@@ -3466,23 +3495,19 @@ void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local)
  *
  * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
  */
-int drbd_bmio_set_n_write(struct drbd_device *device)
+int drbd_bmio_set_n_write(struct drbd_device *device) __must_hold(local)
 {
        int rv = -EIO;
 
-       if (get_ldev_if_state(device, D_ATTACHING)) {
-               drbd_md_set_flag(device, MDF_FULL_SYNC);
-               drbd_md_sync(device);
-               drbd_bm_set_all(device);
-
-               rv = drbd_bm_write(device);
+       drbd_md_set_flag(device, MDF_FULL_SYNC);
+       drbd_md_sync(device);
+       drbd_bm_set_all(device);
 
-               if (!rv) {
-                       drbd_md_clear_flag(device, MDF_FULL_SYNC);
-                       drbd_md_sync(device);
-               }
+       rv = drbd_bm_write(device);
 
-               put_ldev(device);
+       if (!rv) {
+               drbd_md_clear_flag(device, MDF_FULL_SYNC);
+               drbd_md_sync(device);
        }
 
        return rv;
@@ -3494,18 +3519,11 @@ int drbd_bmio_set_n_write(struct drbd_device *device)
  *
  * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
  */
-int drbd_bmio_clear_n_write(struct drbd_device *device)
+int drbd_bmio_clear_n_write(struct drbd_device *device) __must_hold(local)
 {
-       int rv = -EIO;
-
        drbd_resume_al(device);
-       if (get_ldev_if_state(device, D_ATTACHING)) {
-               drbd_bm_clear_all(device);
-               rv = drbd_bm_write(device);
-               put_ldev(device);
-       }
-
-       return rv;
+       drbd_bm_clear_all(device);
+       return drbd_bm_write(device);
 }
 
 static int w_bitmap_io(struct drbd_work *w, int unused)
@@ -3537,61 +3555,6 @@ static int w_bitmap_io(struct drbd_work *w, int unused)
        return 0;
 }
 
-void drbd_ldev_destroy(struct drbd_device *device)
-{
-       lc_destroy(device->resync);
-       device->resync = NULL;
-       lc_destroy(device->act_log);
-       device->act_log = NULL;
-       __no_warn(local,
-               drbd_free_bc(device->ldev);
-               device->ldev = NULL;);
-
-       clear_bit(GO_DISKLESS, &device->flags);
-}
-
-static int w_go_diskless(struct drbd_work *w, int unused)
-{
-       struct drbd_device *device =
-               container_of(w, struct drbd_device, go_diskless);
-
-       D_ASSERT(device, device->state.disk == D_FAILED);
-       /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
-        * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
-        * the protected members anymore, though, so once put_ldev reaches zero
-        * again, it will be safe to free them. */
-
-       /* Try to write changed bitmap pages, read errors may have just
-        * set some bits outside the area covered by the activity log.
-        *
-        * If we have an IO error during the bitmap writeout,
-        * we will want a full sync next time, just in case.
-        * (Do we want a specific meta data flag for this?)
-        *
-        * If that does not make it to stable storage either,
-        * we cannot do anything about that anymore.
-        *
-        * We still need to check if both bitmap and ldev are present, we may
-        * end up here after a failed attach, before ldev was even assigned.
-        */
-       if (device->bitmap && device->ldev) {
-               /* An interrupted resync or similar is allowed to recounts bits
-                * while we detach.
-                * Any modifications would not be expected anymore, though.
-                */
-               if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
-                                       "detach", BM_LOCKED_TEST_ALLOWED)) {
-                       if (test_bit(WAS_READ_ERROR, &device->flags)) {
-                               drbd_md_set_flag(device, MDF_FULL_SYNC);
-                               drbd_md_sync(device);
-                       }
-               }
-       }
-
-       drbd_force_state(device, NS(disk, D_DISKLESS));
-       return 0;
-}
-
 /**
  * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
  * @device:    DRBD device.
@@ -3603,6 +3566,9 @@ static int w_go_diskless(struct drbd_work *w, int unused)
  * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
  * called from worker context. It MUST NOT be used while a previous such
  * work is still pending!
+ *
+ * Its worker function encloses the call of io_fn() by get_ldev() and
+ * put_ldev().
  */
 void drbd_queue_bitmap_io(struct drbd_device *device,
                          int (*io_fn)(struct drbd_device *),
@@ -3685,25 +3651,7 @@ int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
 static void md_sync_timer_fn(unsigned long data)
 {
        struct drbd_device *device = (struct drbd_device *) data;
-
-       /* must not double-queue! */
-       if (list_empty(&device->md_sync_work.list))
-               drbd_queue_work_front(&first_peer_device(device)->connection->sender_work,
-                                     &device->md_sync_work);
-}
-
-static int w_md_sync(struct drbd_work *w, int unused)
-{
-       struct drbd_device *device =
-               container_of(w, struct drbd_device, md_sync_work);
-
-       drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
-#ifdef DEBUG
-       drbd_warn(device, "last md_mark_dirty: %s:%u\n",
-               device->last_md_mark_dirty.func, device->last_md_mark_dirty.line);
-#endif
-       drbd_md_sync(device);
-       return 0;
+       drbd_device_post_work(device, MD_SYNC);
 }
 
 const char *cmdname(enum drbd_packet cmd)
index 3f2e1673808053a4de70077735a67723f9955b64..1cd47df44bdaf57d74c1ff3adede40835f1b1f29 100644 (file)
@@ -23,6 +23,8 @@
 
  */
 
+#define pr_fmt(fmt)    KBUILD_MODNAME ": " fmt
+
 #include <linux/module.h>
 #include <linux/drbd.h>
 #include <linux/in.h>
@@ -85,7 +87,7 @@ static void drbd_adm_send_reply(struct sk_buff *skb, struct genl_info *info)
 {
        genlmsg_end(skb, genlmsg_data(nlmsg_data(nlmsg_hdr(skb))));
        if (genlmsg_reply(skb, info))
-               printk(KERN_ERR "drbd: error sending genl reply\n");
+               pr_err("error sending genl reply\n");
 }
 
 /* Used on a fresh "drbd_adm_prepare"d reply_skb, this cannot fail: The only
@@ -558,8 +560,10 @@ void conn_try_outdate_peer_async(struct drbd_connection *connection)
 }
 
 enum drbd_state_rv
-drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
+drbd_set_role(struct drbd_device *const device, enum drbd_role new_role, int force)
 {
+       struct drbd_peer_device *const peer_device = first_peer_device(device);
+       struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
        const int max_tries = 4;
        enum drbd_state_rv rv = SS_UNKNOWN_ERROR;
        struct net_conf *nc;
@@ -607,7 +611,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
                    device->state.disk == D_CONSISTENT && mask.pdsk == 0) {
                        D_ASSERT(device, device->state.pdsk == D_UNKNOWN);
 
-                       if (conn_try_outdate_peer(first_peer_device(device)->connection)) {
+                       if (conn_try_outdate_peer(connection)) {
                                val.disk = D_UP_TO_DATE;
                                mask.disk = D_MASK;
                        }
@@ -617,7 +621,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
                if (rv == SS_NOTHING_TO_DO)
                        goto out;
                if (rv == SS_PRIMARY_NOP && mask.pdsk == 0) {
-                       if (!conn_try_outdate_peer(first_peer_device(device)->connection) && force) {
+                       if (!conn_try_outdate_peer(connection) && force) {
                                drbd_warn(device, "Forced into split brain situation!\n");
                                mask.pdsk = D_MASK;
                                val.pdsk  = D_OUTDATED;
@@ -630,7 +634,7 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
                           retry at most once more in this case. */
                        int timeo;
                        rcu_read_lock();
-                       nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+                       nc = rcu_dereference(connection->net_conf);
                        timeo = nc ? (nc->ping_timeo + 1) * HZ / 10 : 1;
                        rcu_read_unlock();
                        schedule_timeout_interruptible(timeo);
@@ -659,19 +663,17 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
        /* FIXME also wait for all pending P_BARRIER_ACK? */
 
        if (new_role == R_SECONDARY) {
-               set_disk_ro(device->vdisk, true);
                if (get_ldev(device)) {
                        device->ldev->md.uuid[UI_CURRENT] &= ~(u64)1;
                        put_ldev(device);
                }
        } else {
-               /* Called from drbd_adm_set_role only.
-                * We are still holding the conf_update mutex. */
-               nc = first_peer_device(device)->connection->net_conf;
+               mutex_lock(&device->resource->conf_update);
+               nc = connection->net_conf;
                if (nc)
                        nc->discard_my_data = 0; /* without copy; single bit op is atomic */
+               mutex_unlock(&device->resource->conf_update);
 
-               set_disk_ro(device->vdisk, false);
                if (get_ldev(device)) {
                        if (((device->state.conn < C_CONNECTED ||
                               device->state.pdsk <= D_FAILED)
@@ -689,12 +691,12 @@ drbd_set_role(struct drbd_device *device, enum drbd_role new_role, int force)
        if (device->state.conn >= C_WF_REPORT_PARAMS) {
                /* if this was forced, we should consider sync */
                if (forced)
-                       drbd_send_uuids(first_peer_device(device));
-               drbd_send_current_state(first_peer_device(device));
+                       drbd_send_uuids(peer_device);
+               drbd_send_current_state(peer_device);
        }
 
        drbd_md_sync(device);
-
+       set_disk_ro(device->vdisk, new_role == R_SECONDARY);
        kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
 out:
        mutex_unlock(device->state_mutex);
@@ -891,7 +893,7 @@ drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct
         * still lock the act_log to not trigger ASSERTs there.
         */
        drbd_suspend_io(device);
-       buffer = drbd_md_get_buffer(device); /* Lock meta-data IO */
+       buffer = drbd_md_get_buffer(device, __func__); /* Lock meta-data IO */
        if (!buffer) {
                drbd_resume_io(device);
                return DS_ERROR;
@@ -971,6 +973,10 @@ drbd_determine_dev_size(struct drbd_device *device, enum dds_flags flags, struct
        if (la_size_changed || md_moved || rs) {
                u32 prev_flags;
 
+               /* We do some synchronous IO below, which may take some time.
+                * Clear the timer, to avoid scary "timer expired!" messages,
+                * "Superblock" is written out at least twice below, anyways. */
+               del_timer(&device->md_sync_timer);
                drbd_al_shrink(device); /* All extents inactive. */
 
                prev_flags = md->flags;
@@ -1116,15 +1122,16 @@ static int drbd_check_al_size(struct drbd_device *device, struct disk_conf *dc)
        return 0;
 }
 
-static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_bio_size)
+static void drbd_setup_queue_param(struct drbd_device *device, struct drbd_backing_dev *bdev,
+                                  unsigned int max_bio_size)
 {
        struct request_queue * const q = device->rq_queue;
        unsigned int max_hw_sectors = max_bio_size >> 9;
        unsigned int max_segments = 0;
        struct request_queue *b = NULL;
 
-       if (get_ldev_if_state(device, D_ATTACHING)) {
-               b = device->ldev->backing_bdev->bd_disk->queue;
+       if (bdev) {
+               b = bdev->backing_bdev->bd_disk->queue;
 
                max_hw_sectors = min(queue_max_hw_sectors(b), max_bio_size >> 9);
                rcu_read_lock();
@@ -1169,11 +1176,10 @@ static void drbd_setup_queue_param(struct drbd_device *device, unsigned int max_
                                 b->backing_dev_info.ra_pages);
                        q->backing_dev_info.ra_pages = b->backing_dev_info.ra_pages;
                }
-               put_ldev(device);
        }
 }
 
-void drbd_reconsider_max_bio_size(struct drbd_device *device)
+void drbd_reconsider_max_bio_size(struct drbd_device *device, struct drbd_backing_dev *bdev)
 {
        unsigned int now, new, local, peer;
 
@@ -1181,10 +1187,9 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device)
        local = device->local_max_bio_size; /* Eventually last known value, from volatile memory */
        peer = device->peer_max_bio_size; /* Eventually last known value, from meta data */
 
-       if (get_ldev_if_state(device, D_ATTACHING)) {
-               local = queue_max_hw_sectors(device->ldev->backing_bdev->bd_disk->queue) << 9;
+       if (bdev) {
+               local = queue_max_hw_sectors(bdev->backing_bdev->bd_disk->queue) << 9;
                device->local_max_bio_size = local;
-               put_ldev(device);
        }
        local = min(local, DRBD_MAX_BIO_SIZE);
 
@@ -1217,7 +1222,7 @@ void drbd_reconsider_max_bio_size(struct drbd_device *device)
        if (new != now)
                drbd_info(device, "max BIO size = %u\n", new);
 
-       drbd_setup_queue_param(device, new);
+       drbd_setup_queue_param(device, bdev, new);
 }
 
 /* Starts the worker thread */
@@ -1299,6 +1304,13 @@ static unsigned int drbd_al_extents_max(struct drbd_backing_dev *bdev)
        return (al_size_4k - 1) * AL_CONTEXT_PER_TRANSACTION;
 }
 
+static bool write_ordering_changed(struct disk_conf *a, struct disk_conf *b)
+{
+       return  a->disk_barrier != b->disk_barrier ||
+               a->disk_flushes != b->disk_flushes ||
+               a->disk_drain != b->disk_drain;
+}
+
 int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
 {
        struct drbd_config_context adm_ctx;
@@ -1405,7 +1417,8 @@ int drbd_adm_disk_opts(struct sk_buff *skb, struct genl_info *info)
        else
                set_bit(MD_NO_FUA, &device->flags);
 
-       drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush);
+       if (write_ordering_changed(old_disk_conf, new_disk_conf))
+               drbd_bump_write_ordering(device->resource, NULL, WO_bdev_flush);
 
        drbd_md_sync(device);
 
@@ -1440,6 +1453,8 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
 {
        struct drbd_config_context adm_ctx;
        struct drbd_device *device;
+       struct drbd_peer_device *peer_device;
+       struct drbd_connection *connection;
        int err;
        enum drbd_ret_code retcode;
        enum determine_dev_size dd;
@@ -1462,7 +1477,9 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
 
        device = adm_ctx.device;
        mutex_lock(&adm_ctx.resource->adm_mutex);
-       conn_reconfig_start(first_peer_device(device)->connection);
+       peer_device = first_peer_device(device);
+       connection = peer_device ? peer_device->connection : NULL;
+       conn_reconfig_start(connection);
 
        /* if you want to reconfigure, please tear down first */
        if (device->state.disk > D_DISKLESS) {
@@ -1473,7 +1490,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
         * drbd_ldev_destroy is done already, we may end up here very fast,
         * e.g. if someone calls attach from the on-io-error handler,
         * to realize a "hot spare" feature (not that I'd recommend that) */
-       wait_event(device->misc_wait, !atomic_read(&device->local_cnt));
+       wait_event(device->misc_wait, !test_bit(GOING_DISKLESS, &device->flags));
 
        /* make sure there is no leftover from previous force-detach attempts */
        clear_bit(FORCE_DETACH, &device->flags);
@@ -1529,7 +1546,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
                goto fail;
 
        rcu_read_lock();
-       nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+       nc = rcu_dereference(connection->net_conf);
        if (nc) {
                if (new_disk_conf->fencing == FP_STONITH && nc->wire_protocol == DRBD_PROT_A) {
                        rcu_read_unlock();
@@ -1649,7 +1666,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
         */
        wait_event(device->misc_wait, !atomic_read(&device->ap_pending_cnt) || drbd_suspended(device));
        /* and for any other previously queued work */
-       drbd_flush_workqueue(&first_peer_device(device)->connection->sender_work);
+       drbd_flush_workqueue(&connection->sender_work);
 
        rv = _drbd_request_state(device, NS(disk, D_ATTACHING), CS_VERBOSE);
        retcode = rv;  /* FIXME: Type mismatch. */
@@ -1710,7 +1727,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
        new_disk_conf = NULL;
        new_plan = NULL;
 
-       drbd_bump_write_ordering(first_peer_device(device)->connection, WO_bdev_flush);
+       drbd_bump_write_ordering(device->resource, device->ldev, WO_bdev_flush);
 
        if (drbd_md_test_flag(device->ldev, MDF_CRASHED_PRIMARY))
                set_bit(CRASHED_PRIMARY, &device->flags);
@@ -1726,7 +1743,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
        device->read_cnt = 0;
        device->writ_cnt = 0;
 
-       drbd_reconsider_max_bio_size(device);
+       drbd_reconsider_max_bio_size(device, device->ldev);
 
        /* If I am currently not R_PRIMARY,
         * but meta data primary indicator is set,
@@ -1845,7 +1862,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
 
        kobject_uevent(&disk_to_dev(device->vdisk)->kobj, KOBJ_CHANGE);
        put_ldev(device);
-       conn_reconfig_done(first_peer_device(device)->connection);
+       conn_reconfig_done(connection);
        mutex_unlock(&adm_ctx.resource->adm_mutex);
        drbd_adm_finish(&adm_ctx, info, retcode);
        return 0;
@@ -1856,7 +1873,7 @@ int drbd_adm_attach(struct sk_buff *skb, struct genl_info *info)
        drbd_force_state(device, NS(disk, D_DISKLESS));
        drbd_md_sync(device);
  fail:
-       conn_reconfig_done(first_peer_device(device)->connection);
+       conn_reconfig_done(connection);
        if (nbc) {
                if (nbc->backing_bdev)
                        blkdev_put(nbc->backing_bdev,
@@ -1888,7 +1905,7 @@ static int adm_detach(struct drbd_device *device, int force)
        }
 
        drbd_suspend_io(device); /* so no-one is stuck in drbd_al_begin_io */
-       drbd_md_get_buffer(device); /* make sure there is no in-flight meta-data IO */
+       drbd_md_get_buffer(device, __func__); /* make sure there is no in-flight meta-data IO */
        retcode = drbd_request_state(device, NS(disk, D_FAILED));
        drbd_md_put_buffer(device);
        /* D_FAILED will transition to DISKLESS. */
@@ -2654,8 +2671,13 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info)
        if (retcode != NO_ERROR)
                goto out;
 
-       mutex_lock(&adm_ctx.resource->adm_mutex);
        device = adm_ctx.device;
+       if (!get_ldev(device)) {
+               retcode = ERR_NO_DISK;
+               goto out;
+       }
+
+       mutex_lock(&adm_ctx.resource->adm_mutex);
 
        /* If there is still bitmap IO pending, probably because of a previous
         * resync just being finished, wait for it before requesting a new resync.
@@ -2679,6 +2701,7 @@ int drbd_adm_invalidate(struct sk_buff *skb, struct genl_info *info)
                retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_T));
        drbd_resume_io(device);
        mutex_unlock(&adm_ctx.resource->adm_mutex);
+       put_ldev(device);
 out:
        drbd_adm_finish(&adm_ctx, info, retcode);
        return 0;
@@ -2704,7 +2727,7 @@ out:
        return 0;
 }
 
-static int drbd_bmio_set_susp_al(struct drbd_device *device)
+static int drbd_bmio_set_susp_al(struct drbd_device *device) __must_hold(local)
 {
        int rv;
 
@@ -2725,8 +2748,13 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info)
        if (retcode != NO_ERROR)
                goto out;
 
-       mutex_lock(&adm_ctx.resource->adm_mutex);
        device = adm_ctx.device;
+       if (!get_ldev(device)) {
+               retcode = ERR_NO_DISK;
+               goto out;
+       }
+
+       mutex_lock(&adm_ctx.resource->adm_mutex);
 
        /* If there is still bitmap IO pending, probably because of a previous
         * resync just being finished, wait for it before requesting a new resync.
@@ -2753,6 +2781,7 @@ int drbd_adm_invalidate_peer(struct sk_buff *skb, struct genl_info *info)
                retcode = drbd_request_state(device, NS(conn, C_STARTING_SYNC_S));
        drbd_resume_io(device);
        mutex_unlock(&adm_ctx.resource->adm_mutex);
+       put_ldev(device);
 out:
        drbd_adm_finish(&adm_ctx, info, retcode);
        return 0;
@@ -2892,7 +2921,7 @@ static struct drbd_connection *the_only_connection(struct drbd_resource *resourc
        return list_first_entry(&resource->connections, struct drbd_connection, connections);
 }
 
-int nla_put_status_info(struct sk_buff *skb, struct drbd_device *device,
+static int nla_put_status_info(struct sk_buff *skb, struct drbd_device *device,
                const struct sib_info *sib)
 {
        struct drbd_resource *resource = device->resource;
@@ -3622,13 +3651,6 @@ void drbd_bcast_event(struct drbd_device *device, const struct sib_info *sib)
        unsigned seq;
        int err = -ENOMEM;
 
-       if (sib->sib_reason == SIB_SYNC_PROGRESS) {
-               if (time_after(jiffies, device->rs_last_bcast + HZ))
-                       device->rs_last_bcast = jiffies;
-               else
-                       return;
-       }
-
        seq = atomic_inc_return(&drbd_genl_seq);
        msg = genlmsg_new(NLMSG_GOODSIZE, GFP_NOIO);
        if (!msg)
index 89736bdbbc7044aedaaacbd5dc9c858ca3933596..06e6147c76013602d2591cab96bb239afb073d17 100644 (file)
@@ -60,20 +60,65 @@ static void seq_printf_with_thousands_grouping(struct seq_file *seq, long v)
                seq_printf(seq, "%ld", v);
 }
 
+static void drbd_get_syncer_progress(struct drbd_device *device,
+               union drbd_dev_state state, unsigned long *rs_total,
+               unsigned long *bits_left, unsigned int *per_mil_done)
+{
+       /* this is to break it at compile time when we change that, in case we
+        * want to support more than (1<<32) bits on a 32bit arch. */
+       typecheck(unsigned long, device->rs_total);
+       *rs_total = device->rs_total;
+
+       /* note: both rs_total and rs_left are in bits, i.e. in
+        * units of BM_BLOCK_SIZE.
+        * for the percentage, we don't care. */
+
+       if (state.conn == C_VERIFY_S || state.conn == C_VERIFY_T)
+               *bits_left = device->ov_left;
+       else
+               *bits_left = drbd_bm_total_weight(device) - device->rs_failed;
+       /* >> 10 to prevent overflow,
+        * +1 to prevent division by zero */
+       if (*bits_left > *rs_total) {
+               /* D'oh. Maybe a logic bug somewhere.  More likely just a race
+                * between state change and reset of rs_total.
+                */
+               *bits_left = *rs_total;
+               *per_mil_done = *rs_total ? 0 : 1000;
+       } else {
+               /* Make sure the division happens in long context.
+                * We allow up to one petabyte storage right now,
+                * at a granularity of 4k per bit that is 2**38 bits.
+                * After shift right and multiplication by 1000,
+                * this should still fit easily into a 32bit long,
+                * so we don't need a 64bit division on 32bit arch.
+                * Note: currently we don't support such large bitmaps on 32bit
+                * arch anyways, but no harm done to be prepared for it here.
+                */
+               unsigned int shift = *rs_total > UINT_MAX ? 16 : 10;
+               unsigned long left = *bits_left >> shift;
+               unsigned long total = 1UL + (*rs_total >> shift);
+               unsigned long tmp = 1000UL - left * 1000UL/total;
+               *per_mil_done = tmp;
+       }
+}
+
+
 /*lge
  * progress bars shamelessly adapted from driver/md/md.c
  * output looks like
  *     [=====>..............] 33.5% (23456/123456)
  *     finish: 2:20:20 speed: 6,345 (6,456) K/sec
  */
-static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *seq)
+static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *seq,
+               union drbd_dev_state state)
 {
-       unsigned long db, dt, dbdt, rt, rs_left;
+       unsigned long db, dt, dbdt, rt, rs_total, rs_left;
        unsigned int res;
        int i, x, y;
        int stalled = 0;
 
-       drbd_get_syncer_progress(device, &rs_left, &res);
+       drbd_get_syncer_progress(device, state, &rs_total, &rs_left, &res);
 
        x = res/50;
        y = 20-x;
@@ -85,21 +130,21 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
                seq_printf(seq, ".");
        seq_printf(seq, "] ");
 
-       if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
+       if (state.conn == C_VERIFY_S || state.conn == C_VERIFY_T)
                seq_printf(seq, "verified:");
        else
                seq_printf(seq, "sync'ed:");
        seq_printf(seq, "%3u.%u%% ", res / 10, res % 10);
 
        /* if more than a few GB, display in MB */
-       if (device->rs_total > (4UL << (30 - BM_BLOCK_SHIFT)))
+       if (rs_total > (4UL << (30 - BM_BLOCK_SHIFT)))
                seq_printf(seq, "(%lu/%lu)M",
                            (unsigned long) Bit2KB(rs_left >> 10),
-                           (unsigned long) Bit2KB(device->rs_total >> 10));
+                           (unsigned long) Bit2KB(rs_total >> 10));
        else
                seq_printf(seq, "(%lu/%lu)K\n\t",
                            (unsigned long) Bit2KB(rs_left),
-                           (unsigned long) Bit2KB(device->rs_total));
+                           (unsigned long) Bit2KB(rs_total));
 
        /* see drivers/md/md.c
         * We do not want to overflow, so the order of operands and
@@ -150,13 +195,13 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
        dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
        if (dt == 0)
                dt = 1;
-       db = device->rs_total - rs_left;
+       db = rs_total - rs_left;
        dbdt = Bit2KB(db/dt);
        seq_printf_with_thousands_grouping(seq, dbdt);
        seq_printf(seq, ")");
 
-       if (device->state.conn == C_SYNC_TARGET ||
-           device->state.conn == C_VERIFY_S) {
+       if (state.conn == C_SYNC_TARGET ||
+           state.conn == C_VERIFY_S) {
                seq_printf(seq, " want: ");
                seq_printf_with_thousands_grouping(seq, device->c_sync_rate);
        }
@@ -168,8 +213,8 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
                unsigned long bm_bits = drbd_bm_bits(device);
                unsigned long bit_pos;
                unsigned long long stop_sector = 0;
-               if (device->state.conn == C_VERIFY_S ||
-                   device->state.conn == C_VERIFY_T) {
+               if (state.conn == C_VERIFY_S ||
+                   state.conn == C_VERIFY_T) {
                        bit_pos = bm_bits - device->ov_left;
                        if (verify_can_do_stop_sector(device))
                                stop_sector = device->ov_stop_sector;
@@ -188,22 +233,13 @@ static void drbd_syncer_progress(struct drbd_device *device, struct seq_file *se
        }
 }
 
-static void resync_dump_detail(struct seq_file *seq, struct lc_element *e)
-{
-       struct bm_extent *bme = lc_entry(e, struct bm_extent, lce);
-
-       seq_printf(seq, "%5d %s %s\n", bme->rs_left,
-                  bme->flags & BME_NO_WRITES ? "NO_WRITES" : "---------",
-                  bme->flags & BME_LOCKED ? "LOCKED" : "------"
-                  );
-}
-
 static int drbd_seq_show(struct seq_file *seq, void *v)
 {
        int i, prev_i = -1;
        const char *sn;
        struct drbd_device *device;
        struct net_conf *nc;
+       union drbd_dev_state state;
        char wp;
 
        static char write_ordering_chars[] = {
@@ -241,11 +277,12 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
                        seq_printf(seq, "\n");
                prev_i = i;
 
-               sn = drbd_conn_str(device->state.conn);
+               state = device->state;
+               sn = drbd_conn_str(state.conn);
 
-               if (device->state.conn == C_STANDALONE &&
-                   device->state.disk == D_DISKLESS &&
-                   device->state.role == R_SECONDARY) {
+               if (state.conn == C_STANDALONE &&
+                   state.disk == D_DISKLESS &&
+                   state.role == R_SECONDARY) {
                        seq_printf(seq, "%2d: cs:Unconfigured\n", i);
                } else {
                        /* reset device->congestion_reason */
@@ -258,15 +295,15 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
                           "    ns:%u nr:%u dw:%u dr:%u al:%u bm:%u "
                           "lo:%d pe:%d ua:%d ap:%d ep:%d wo:%c",
                           i, sn,
-                          drbd_role_str(device->state.role),
-                          drbd_role_str(device->state.peer),
-                          drbd_disk_str(device->state.disk),
-                          drbd_disk_str(device->state.pdsk),
+                          drbd_role_str(state.role),
+                          drbd_role_str(state.peer),
+                          drbd_disk_str(state.disk),
+                          drbd_disk_str(state.pdsk),
                           wp,
                           drbd_suspended(device) ? 's' : 'r',
-                          device->state.aftr_isp ? 'a' : '-',
-                          device->state.peer_isp ? 'p' : '-',
-                          device->state.user_isp ? 'u' : '-',
+                          state.aftr_isp ? 'a' : '-',
+                          state.peer_isp ? 'p' : '-',
+                          state.user_isp ? 'u' : '-',
                           device->congestion_reason ?: '-',
                           test_bit(AL_SUSPENDED, &device->flags) ? 's' : '-',
                           device->send_cnt/2,
@@ -281,17 +318,17 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
                           atomic_read(&device->unacked_cnt),
                           atomic_read(&device->ap_bio_cnt),
                           first_peer_device(device)->connection->epochs,
-                          write_ordering_chars[first_peer_device(device)->connection->write_ordering]
+                          write_ordering_chars[device->resource->write_ordering]
                        );
                        seq_printf(seq, " oos:%llu\n",
                                   Bit2KB((unsigned long long)
                                           drbd_bm_total_weight(device)));
                }
-               if (device->state.conn == C_SYNC_SOURCE ||
-                   device->state.conn == C_SYNC_TARGET ||
-                   device->state.conn == C_VERIFY_S ||
-                   device->state.conn == C_VERIFY_T)
-                       drbd_syncer_progress(device, seq);
+               if (state.conn == C_SYNC_SOURCE ||
+                   state.conn == C_SYNC_TARGET ||
+                   state.conn == C_VERIFY_S ||
+                   state.conn == C_VERIFY_T)
+                       drbd_syncer_progress(device, seq, state);
 
                if (proc_details >= 1 && get_ldev_if_state(device, D_FAILED)) {
                        lc_seq_printf_stats(seq, device->resync);
@@ -299,12 +336,8 @@ static int drbd_seq_show(struct seq_file *seq, void *v)
                        put_ldev(device);
                }
 
-               if (proc_details >= 2) {
-                       if (device->resync) {
-                               lc_seq_dump_details(seq, device->resync, "rs_left",
-                                       resync_dump_detail);
-                       }
-               }
+               if (proc_details >= 2)
+                       seq_printf(seq, "\tblocked on activity log: %d\n", atomic_read(&device->ap_actlog_cnt));
        }
        rcu_read_unlock();
 
@@ -316,7 +349,7 @@ static int drbd_proc_open(struct inode *inode, struct file *file)
        int err;
 
        if (try_module_get(THIS_MODULE)) {
-               err = single_open(file, drbd_seq_show, PDE_DATA(inode));
+               err = single_open(file, drbd_seq_show, NULL);
                if (err)
                        module_put(THIS_MODULE);
                return err;
index 5b17ec88ea058e766071e66eeadf3d8fca3f4940..9342b8da73ab517620dda3b38f9852e6c1219853 100644 (file)
@@ -362,17 +362,14 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto
                        goto fail;
        }
 
+       memset(peer_req, 0, sizeof(*peer_req));
+       INIT_LIST_HEAD(&peer_req->w.list);
        drbd_clear_interval(&peer_req->i);
        peer_req->i.size = data_size;
        peer_req->i.sector = sector;
-       peer_req->i.local = false;
-       peer_req->i.waiting = false;
-
-       peer_req->epoch = NULL;
+       peer_req->submit_jif = jiffies;
        peer_req->peer_device = peer_device;
        peer_req->pages = page;
-       atomic_set(&peer_req->pending_bios, 0);
-       peer_req->flags = 0;
        /*
         * The block_id is opaque to the receiver.  It is not endianness
         * converted, and sent back to the sender unchanged.
@@ -389,11 +386,16 @@ drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t secto
 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
                       int is_net)
 {
+       might_sleep();
        if (peer_req->flags & EE_HAS_DIGEST)
                kfree(peer_req->digest);
        drbd_free_pages(device, peer_req->pages, is_net);
        D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
        D_ASSERT(device, drbd_interval_empty(&peer_req->i));
+       if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
+               peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
+               drbd_al_complete_io(device, &peer_req->i);
+       }
        mempool_free(peer_req, drbd_ee_mempool);
 }
 
@@ -791,8 +793,18 @@ static int receive_first_packet(struct drbd_connection *connection, struct socke
 {
        unsigned int header_size = drbd_header_size(connection);
        struct packet_info pi;
+       struct net_conf *nc;
        int err;
 
+       rcu_read_lock();
+       nc = rcu_dereference(connection->net_conf);
+       if (!nc) {
+               rcu_read_unlock();
+               return -EIO;
+       }
+       sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
+       rcu_read_unlock();
+
        err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
        if (err != header_size) {
                if (err >= 0)
@@ -809,7 +821,7 @@ static int receive_first_packet(struct drbd_connection *connection, struct socke
  * drbd_socket_okay() - Free the socket if its connection is not okay
  * @sock:      pointer to the pointer to the socket.
  */
-static int drbd_socket_okay(struct socket **sock)
+static bool drbd_socket_okay(struct socket **sock)
 {
        int rr;
        char tb[4];
@@ -827,6 +839,30 @@ static int drbd_socket_okay(struct socket **sock)
                return false;
        }
 }
+
+static bool connection_established(struct drbd_connection *connection,
+                                  struct socket **sock1,
+                                  struct socket **sock2)
+{
+       struct net_conf *nc;
+       int timeout;
+       bool ok;
+
+       if (!*sock1 || !*sock2)
+               return false;
+
+       rcu_read_lock();
+       nc = rcu_dereference(connection->net_conf);
+       timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
+       rcu_read_unlock();
+       schedule_timeout_interruptible(timeout);
+
+       ok = drbd_socket_okay(sock1);
+       ok = drbd_socket_okay(sock2) && ok;
+
+       return ok;
+}
+
 /* Gets called if a connection is established, or if a new minor gets created
    in a connection */
 int drbd_connected(struct drbd_peer_device *peer_device)
@@ -868,8 +904,8 @@ static int conn_connect(struct drbd_connection *connection)
        struct drbd_socket sock, msock;
        struct drbd_peer_device *peer_device;
        struct net_conf *nc;
-       int vnr, timeout, h, ok;
-       bool discard_my_data;
+       int vnr, timeout, h;
+       bool discard_my_data, ok;
        enum drbd_state_rv rv;
        struct accept_wait_data ad = {
                .connection = connection,
@@ -913,17 +949,8 @@ static int conn_connect(struct drbd_connection *connection)
                        }
                }
 
-               if (sock.socket && msock.socket) {
-                       rcu_read_lock();
-                       nc = rcu_dereference(connection->net_conf);
-                       timeout = nc->ping_timeo * HZ / 10;
-                       rcu_read_unlock();
-                       schedule_timeout_interruptible(timeout);
-                       ok = drbd_socket_okay(&sock.socket);
-                       ok = drbd_socket_okay(&msock.socket) && ok;
-                       if (ok)
-                               break;
-               }
+               if (connection_established(connection, &sock.socket, &msock.socket))
+                       break;
 
 retry:
                s = drbd_wait_for_connect(connection, &ad);
@@ -969,8 +996,7 @@ randomize:
                                goto out_release_sockets;
                }
 
-               ok = drbd_socket_okay(&sock.socket);
-               ok = drbd_socket_okay(&msock.socket) && ok;
+               ok = connection_established(connection, &sock.socket, &msock.socket);
        } while (!ok);
 
        if (ad.s_listen)
@@ -1151,7 +1177,7 @@ static void drbd_flush(struct drbd_connection *connection)
        struct drbd_peer_device *peer_device;
        int vnr;
 
-       if (connection->write_ordering >= WO_bdev_flush) {
+       if (connection->resource->write_ordering >= WO_bdev_flush) {
                rcu_read_lock();
                idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
                        struct drbd_device *device = peer_device->device;
@@ -1161,14 +1187,22 @@ static void drbd_flush(struct drbd_connection *connection)
                        kref_get(&device->kref);
                        rcu_read_unlock();
 
+                       /* Right now, we have only this one synchronous code path
+                        * for flushes between request epochs.
+                        * We may want to make those asynchronous,
+                        * or at least parallelize the flushes to the volume devices.
+                        */
+                       device->flush_jif = jiffies;
+                       set_bit(FLUSH_PENDING, &device->flags);
                        rv = blkdev_issue_flush(device->ldev->backing_bdev,
                                        GFP_NOIO, NULL);
+                       clear_bit(FLUSH_PENDING, &device->flags);
                        if (rv) {
                                drbd_info(device, "local disk flush failed with status %d\n", rv);
                                /* would rather check on EOPNOTSUPP, but that is not reliable.
                                 * don't try again for ANY return value != 0
                                 * if (rv == -EOPNOTSUPP) */
-                               drbd_bump_write_ordering(connection, WO_drain_io);
+                               drbd_bump_write_ordering(connection->resource, NULL, WO_drain_io);
                        }
                        put_ldev(device);
                        kref_put(&device->kref, drbd_destroy_device);
@@ -1257,15 +1291,30 @@ static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connectio
        return rv;
 }
 
+static enum write_ordering_e
+max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
+{
+       struct disk_conf *dc;
+
+       dc = rcu_dereference(bdev->disk_conf);
+
+       if (wo == WO_bdev_flush && !dc->disk_flushes)
+               wo = WO_drain_io;
+       if (wo == WO_drain_io && !dc->disk_drain)
+               wo = WO_none;
+
+       return wo;
+}
+
 /**
  * drbd_bump_write_ordering() - Fall back to an other write ordering method
  * @connection:        DRBD connection.
  * @wo:                Write ordering method to try.
  */
-void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ordering_e wo)
+void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
+                             enum write_ordering_e wo)
 {
-       struct disk_conf *dc;
-       struct drbd_peer_device *peer_device;
+       struct drbd_device *device;
        enum write_ordering_e pwo;
        int vnr;
        static char *write_ordering_str[] = {
@@ -1274,26 +1323,27 @@ void drbd_bump_write_ordering(struct drbd_connection *connection, enum write_ord
                [WO_bdev_flush] = "flush",
        };
 
-       pwo = connection->write_ordering;
-       wo = min(pwo, wo);
+       pwo = resource->write_ordering;
+       if (wo != WO_bdev_flush)
+               wo = min(pwo, wo);
        rcu_read_lock();
-       idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
-               struct drbd_device *device = peer_device->device;
+       idr_for_each_entry(&resource->devices, device, vnr) {
+               if (get_ldev(device)) {
+                       wo = max_allowed_wo(device->ldev, wo);
+                       if (device->ldev == bdev)
+                               bdev = NULL;
+                       put_ldev(device);
+               }
+       }
 
-               if (!get_ldev_if_state(device, D_ATTACHING))
-                       continue;
-               dc = rcu_dereference(device->ldev->disk_conf);
+       if (bdev)
+               wo = max_allowed_wo(bdev, wo);
 
-               if (wo == WO_bdev_flush && !dc->disk_flushes)
-                       wo = WO_drain_io;
-               if (wo == WO_drain_io && !dc->disk_drain)
-                       wo = WO_none;
-               put_ldev(device);
-       }
        rcu_read_unlock();
-       connection->write_ordering = wo;
-       if (pwo != connection->write_ordering || wo == WO_bdev_flush)
-               drbd_info(connection, "Method to ensure write ordering: %s\n", write_ordering_str[connection->write_ordering]);
+
+       resource->write_ordering = wo;
+       if (pwo != resource->write_ordering || wo == WO_bdev_flush)
+               drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
 }
 
 /**
@@ -1330,6 +1380,13 @@ int drbd_submit_peer_request(struct drbd_device *device,
                /* wait for all pending IO completions, before we start
                 * zeroing things out. */
                conn_wait_active_ee_empty(first_peer_device(device)->connection);
+               /* add it to the active list now,
+                * so we can find it to present it in debugfs */
+               peer_req->submit_jif = jiffies;
+               peer_req->flags |= EE_SUBMITTED;
+               spin_lock_irq(&device->resource->req_lock);
+               list_add_tail(&peer_req->w.list, &device->active_ee);
+               spin_unlock_irq(&device->resource->req_lock);
                if (blkdev_issue_zeroout(device->ldev->backing_bdev,
                        sector, ds >> 9, GFP_NOIO))
                        peer_req->flags |= EE_WAS_ERROR;
@@ -1398,6 +1455,9 @@ submit:
        D_ASSERT(device, page == NULL);
 
        atomic_set(&peer_req->pending_bios, n_bios);
+       /* for debugfs: update timestamp, mark as submitted */
+       peer_req->submit_jif = jiffies;
+       peer_req->flags |= EE_SUBMITTED;
        do {
                bio = bios;
                bios = bios->bi_next;
@@ -1471,7 +1531,7 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf
         * R_PRIMARY crashes now.
         * Therefore we must send the barrier_ack after the barrier request was
         * completed. */
-       switch (connection->write_ordering) {
+       switch (connection->resource->write_ordering) {
        case WO_none:
                if (rv == FE_RECYCLED)
                        return 0;
@@ -1498,7 +1558,8 @@ static int receive_Barrier(struct drbd_connection *connection, struct packet_inf
 
                return 0;
        default:
-               drbd_err(connection, "Strangeness in connection->write_ordering %d\n", connection->write_ordering);
+               drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
+                        connection->resource->write_ordering);
                return -EIO;
        }
 
@@ -1531,7 +1592,7 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
        struct drbd_peer_request *peer_req;
        struct page *page;
        int dgs, ds, err;
-       int data_size = pi->size;
+       unsigned int data_size = pi->size;
        void *dig_in = peer_device->connection->int_dig_in;
        void *dig_vv = peer_device->connection->int_dig_vv;
        unsigned long *data;
@@ -1578,6 +1639,7 @@ read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
        if (!peer_req)
                return NULL;
 
+       peer_req->flags |= EE_WRITE;
        if (trim)
                return peer_req;
 
@@ -1734,9 +1796,10 @@ static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t secto
         * respective _drbd_clear_done_ee */
 
        peer_req->w.cb = e_end_resync_block;
+       peer_req->submit_jif = jiffies;
 
        spin_lock_irq(&device->resource->req_lock);
-       list_add(&peer_req->w.list, &device->sync_ee);
+       list_add_tail(&peer_req->w.list, &device->sync_ee);
        spin_unlock_irq(&device->resource->req_lock);
 
        atomic_add(pi->size >> 9, &device->rs_sect_ev);
@@ -1889,6 +1952,7 @@ static int e_end_block(struct drbd_work *w, int cancel)
                }
                dec_unacked(device);
        }
+
        /* we delete from the conflict detection hash _after_ we sent out the
         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
        if (peer_req->flags & EE_IN_INTERVAL_TREE) {
@@ -2115,6 +2179,8 @@ static int handle_write_conflicts(struct drbd_device *device,
        drbd_for_each_overlap(i, &device->write_requests, sector, size) {
                if (i == &peer_req->i)
                        continue;
+               if (i->completed)
+                       continue;
 
                if (!i->local) {
                        /*
@@ -2147,7 +2213,6 @@ static int handle_write_conflicts(struct drbd_device *device,
                                          (unsigned long long)sector, size,
                                          superseded ? "local" : "remote");
 
-                       inc_unacked(device);
                        peer_req->w.cb = superseded ? e_send_superseded :
                                                   e_send_retry_write;
                        list_add_tail(&peer_req->w.list, &device->done_ee);
@@ -2206,6 +2271,7 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
 {
        struct drbd_peer_device *peer_device;
        struct drbd_device *device;
+       struct net_conf *nc;
        sector_t sector;
        struct drbd_peer_request *peer_req;
        struct p_data *p = pi->data;
@@ -2245,6 +2311,8 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
        }
 
        peer_req->w.cb = e_end_block;
+       peer_req->submit_jif = jiffies;
+       peer_req->flags |= EE_APPLICATION;
 
        dp_flags = be32_to_cpu(p->dp_flags);
        rw |= wire_flags_to_bio(dp_flags);
@@ -2271,9 +2339,36 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
        spin_unlock(&connection->epoch_lock);
 
        rcu_read_lock();
-       tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
+       nc = rcu_dereference(peer_device->connection->net_conf);
+       tp = nc->two_primaries;
+       if (peer_device->connection->agreed_pro_version < 100) {
+               switch (nc->wire_protocol) {
+               case DRBD_PROT_C:
+                       dp_flags |= DP_SEND_WRITE_ACK;
+                       break;
+               case DRBD_PROT_B:
+                       dp_flags |= DP_SEND_RECEIVE_ACK;
+                       break;
+               }
+       }
        rcu_read_unlock();
+
+       if (dp_flags & DP_SEND_WRITE_ACK) {
+               peer_req->flags |= EE_SEND_WRITE_ACK;
+               inc_unacked(device);
+               /* corresponding dec_unacked() in e_end_block()
+                * respective _drbd_clear_done_ee */
+       }
+
+       if (dp_flags & DP_SEND_RECEIVE_ACK) {
+               /* I really don't like it that the receiver thread
+                * sends on the msock, but anyways */
+               drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
+       }
+
        if (tp) {
+               /* two primaries implies protocol C */
+               D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
                peer_req->flags |= EE_IN_INTERVAL_TREE;
                err = wait_for_and_update_peer_seq(peer_device, peer_seq);
                if (err)
@@ -2297,44 +2392,18 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
         * active_ee to become empty in drbd_submit_peer_request();
         * better not add ourselves here. */
        if ((peer_req->flags & EE_IS_TRIM_USE_ZEROOUT) == 0)
-               list_add(&peer_req->w.list, &device->active_ee);
+               list_add_tail(&peer_req->w.list, &device->active_ee);
        spin_unlock_irq(&device->resource->req_lock);
 
        if (device->state.conn == C_SYNC_TARGET)
                wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
 
-       if (peer_device->connection->agreed_pro_version < 100) {
-               rcu_read_lock();
-               switch (rcu_dereference(peer_device->connection->net_conf)->wire_protocol) {
-               case DRBD_PROT_C:
-                       dp_flags |= DP_SEND_WRITE_ACK;
-                       break;
-               case DRBD_PROT_B:
-                       dp_flags |= DP_SEND_RECEIVE_ACK;
-                       break;
-               }
-               rcu_read_unlock();
-       }
-
-       if (dp_flags & DP_SEND_WRITE_ACK) {
-               peer_req->flags |= EE_SEND_WRITE_ACK;
-               inc_unacked(device);
-               /* corresponding dec_unacked() in e_end_block()
-                * respective _drbd_clear_done_ee */
-       }
-
-       if (dp_flags & DP_SEND_RECEIVE_ACK) {
-               /* I really don't like it that the receiver thread
-                * sends on the msock, but anyways */
-               drbd_send_ack(first_peer_device(device), P_RECV_ACK, peer_req);
-       }
-
        if (device->state.pdsk < D_INCONSISTENT) {
                /* In case we have the only disk of the cluster, */
                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
-               peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
                peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
-               drbd_al_begin_io(device, &peer_req->i, true);
+               drbd_al_begin_io(device, &peer_req->i);
+               peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
        }
 
        err = drbd_submit_peer_request(device, peer_req, rw, DRBD_FAULT_DT_WR);
@@ -2347,8 +2416,10 @@ static int receive_Data(struct drbd_connection *connection, struct packet_info *
        list_del(&peer_req->w.list);
        drbd_remove_epoch_entry_interval(device, peer_req);
        spin_unlock_irq(&device->resource->req_lock);
-       if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
+       if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
+               peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
                drbd_al_complete_io(device, &peer_req->i);
+       }
 
 out_interrupted:
        drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT + EV_CLEANUP);
@@ -2368,13 +2439,14 @@ out_interrupted:
  * The current sync rate used here uses only the most recent two step marks,
  * to have a short time average so we can react faster.
  */
-bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
+bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
+               bool throttle_if_app_is_waiting)
 {
        struct lc_element *tmp;
-       bool throttle = true;
+       bool throttle = drbd_rs_c_min_rate_throttle(device);
 
-       if (!drbd_rs_c_min_rate_throttle(device))
-               return false;
+       if (!throttle || throttle_if_app_is_waiting)
+               return throttle;
 
        spin_lock_irq(&device->al_lock);
        tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
@@ -2382,7 +2454,8 @@ bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector)
                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
                if (test_bit(BME_PRIORITY, &bm_ext->flags))
                        throttle = false;
-               /* Do not slow down if app IO is already waiting for this extent */
+               /* Do not slow down if app IO is already waiting for this extent,
+                * and our progress is necessary for application IO to complete. */
        }
        spin_unlock_irq(&device->al_lock);
 
@@ -2407,7 +2480,9 @@ bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
                      (int)part_stat_read(&disk->part0, sectors[1]) -
                        atomic_read(&device->rs_sect_ev);
-       if (!device->rs_last_events || curr_events - device->rs_last_events > 64) {
+
+       if (atomic_read(&device->ap_actlog_cnt)
+           || !device->rs_last_events || curr_events - device->rs_last_events > 64) {
                unsigned long rs_left;
                int i;
 
@@ -2508,6 +2583,7 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet
                peer_req->w.cb = w_e_end_data_req;
                fault_type = DRBD_FAULT_DT_RD;
                /* application IO, don't drbd_rs_begin_io */
+               peer_req->flags |= EE_APPLICATION;
                goto submit;
 
        case P_RS_DATA_REQUEST:
@@ -2538,6 +2614,8 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet
                        peer_req->w.cb = w_e_end_csum_rs_req;
                        /* used in the sector offset progress display */
                        device->bm_resync_fo = BM_SECT_TO_BIT(sector);
+                       /* remember to report stats in drbd_resync_finished */
+                       device->use_csums = true;
                } else if (pi->cmd == P_OV_REPLY) {
                        /* track progress, we may need to throttle */
                        atomic_add(size >> 9, &device->rs_sect_in);
@@ -2595,8 +2673,20 @@ static int receive_DataRequest(struct drbd_connection *connection, struct packet
         * we would also throttle its application reads.
         * In that case, throttling is done on the SyncTarget only.
         */
-       if (device->state.peer != R_PRIMARY && drbd_rs_should_slow_down(device, sector))
+
+       /* Even though this may be a resync request, we do add to "read_ee";
+        * "sync_ee" is only used for resync WRITEs.
+        * Add to list early, so debugfs can find this request
+        * even if we have to sleep below. */
+       spin_lock_irq(&device->resource->req_lock);
+       list_add_tail(&peer_req->w.list, &device->read_ee);
+       spin_unlock_irq(&device->resource->req_lock);
+
+       update_receiver_timing_details(connection, drbd_rs_should_slow_down);
+       if (device->state.peer != R_PRIMARY
+       && drbd_rs_should_slow_down(device, sector, false))
                schedule_timeout_uninterruptible(HZ/10);
+       update_receiver_timing_details(connection, drbd_rs_begin_io);
        if (drbd_rs_begin_io(device, sector))
                goto out_free_e;
 
@@ -2604,22 +2694,20 @@ submit_for_resync:
        atomic_add(size >> 9, &device->rs_sect_ev);
 
 submit:
+       update_receiver_timing_details(connection, drbd_submit_peer_request);
        inc_unacked(device);
-       spin_lock_irq(&device->resource->req_lock);
-       list_add_tail(&peer_req->w.list, &device->read_ee);
-       spin_unlock_irq(&device->resource->req_lock);
-
        if (drbd_submit_peer_request(device, peer_req, READ, fault_type) == 0)
                return 0;
 
        /* don't care for the reason here */
        drbd_err(device, "submit failed, triggering re-connect\n");
+
+out_free_e:
        spin_lock_irq(&device->resource->req_lock);
        list_del(&peer_req->w.list);
        spin_unlock_irq(&device->resource->req_lock);
        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
 
-out_free_e:
        put_ldev(device);
        drbd_free_peer_req(device, peer_req);
        return -EIO;
@@ -2842,8 +2930,10 @@ static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
 -1091   requires proto 91
 -1096   requires proto 96
  */
-static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_hold(local)
+static int drbd_uuid_compare(struct drbd_device *const device, int *rule_nr) __must_hold(local)
 {
+       struct drbd_peer_device *const peer_device = first_peer_device(device);
+       struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
        u64 self, peer;
        int i, j;
 
@@ -2869,7 +2959,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
 
                if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
 
-                       if (first_peer_device(device)->connection->agreed_pro_version < 91)
+                       if (connection->agreed_pro_version < 91)
                                return -1091;
 
                        if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
@@ -2892,7 +2982,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
 
                if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
 
-                       if (first_peer_device(device)->connection->agreed_pro_version < 91)
+                       if (connection->agreed_pro_version < 91)
                                return -1091;
 
                        if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
@@ -2925,7 +3015,7 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
                case 1: /*  self_pri && !peer_pri */ return 1;
                case 2: /* !self_pri &&  peer_pri */ return -1;
                case 3: /*  self_pri &&  peer_pri */
-                       dc = test_bit(RESOLVE_CONFLICTS, &first_peer_device(device)->connection->flags);
+                       dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
                        return dc ? -1 : 1;
                }
        }
@@ -2938,14 +3028,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
        *rule_nr = 51;
        peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
        if (self == peer) {
-               if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
+               if (connection->agreed_pro_version < 96 ?
                    (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
                    (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
                    peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
                        /* The last P_SYNC_UUID did not get though. Undo the last start of
                           resync as sync source modifications of the peer's UUIDs. */
 
-                       if (first_peer_device(device)->connection->agreed_pro_version < 91)
+                       if (connection->agreed_pro_version < 91)
                                return -1091;
 
                        device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
@@ -2975,14 +3065,14 @@ static int drbd_uuid_compare(struct drbd_device *device, int *rule_nr) __must_ho
        *rule_nr = 71;
        self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
        if (self == peer) {
-               if (first_peer_device(device)->connection->agreed_pro_version < 96 ?
+               if (connection->agreed_pro_version < 96 ?
                    (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
                    (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
                    self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
                        /* The last P_SYNC_UUID did not get though. Undo the last start of
                           resync as sync source modifications of our UUIDs. */
 
-                       if (first_peer_device(device)->connection->agreed_pro_version < 91)
+                       if (connection->agreed_pro_version < 91)
                                return -1091;
 
                        __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
@@ -3352,8 +3442,7 @@ disconnect:
  * return: NULL (alg name was "")
  *         ERR_PTR(error) if something goes wrong
  *         or the crypto hash ptr, if it worked out ok. */
-static
-struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
+static struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
                const char *alg, const char *name)
 {
        struct crypto_hash *tfm;
@@ -3639,7 +3728,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
        struct drbd_device *device;
        struct p_sizes *p = pi->data;
        enum determine_dev_size dd = DS_UNCHANGED;
-       sector_t p_size, p_usize, my_usize;
+       sector_t p_size, p_usize, p_csize, my_usize;
        int ldsc = 0; /* local disk size changed */
        enum dds_flags ddsf;
 
@@ -3650,6 +3739,7 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
 
        p_size = be64_to_cpu(p->d_size);
        p_usize = be64_to_cpu(p->u_size);
+       p_csize = be64_to_cpu(p->c_size);
 
        /* just store the peer's disk size for now.
         * we still need to figure out whether we accept that. */
@@ -3710,7 +3800,6 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
        }
 
        device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
-       drbd_reconsider_max_bio_size(device);
        /* Leave drbd_reconsider_max_bio_size() before drbd_determine_dev_size().
           In case we cleared the QUEUE_FLAG_DISCARD from our queue in
           drbd_reconsider_max_bio_size(), we can be sure that after
@@ -3718,14 +3807,28 @@ static int receive_sizes(struct drbd_connection *connection, struct packet_info
 
        ddsf = be16_to_cpu(p->dds_flags);
        if (get_ldev(device)) {
+               drbd_reconsider_max_bio_size(device, device->ldev);
                dd = drbd_determine_dev_size(device, ddsf, NULL);
                put_ldev(device);
                if (dd == DS_ERROR)
                        return -EIO;
                drbd_md_sync(device);
        } else {
-               /* I am diskless, need to accept the peer's size. */
-               drbd_set_my_capacity(device, p_size);
+               /*
+                * I am diskless, need to accept the peer's *current* size.
+                * I must NOT accept the peers backing disk size,
+                * it may have been larger than mine all along...
+                *
+                * At this point, the peer knows more about my disk, or at
+                * least about what we last agreed upon, than myself.
+                * So if his c_size is less than his d_size, the most likely
+                * reason is that *my* d_size was smaller last time we checked.
+                *
+                * However, if he sends a zero current size,
+                * take his (user-capped or) backing disk size anyways.
+                */
+               drbd_reconsider_max_bio_size(device, NULL);
+               drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
        }
 
        if (get_ldev(device)) {
@@ -4501,6 +4604,7 @@ static void drbdd(struct drbd_connection *connection)
                struct data_cmd *cmd;
 
                drbd_thread_current_set_cpu(&connection->receiver);
+               update_receiver_timing_details(connection, drbd_recv_header);
                if (drbd_recv_header(connection, &pi))
                        goto err_out;
 
@@ -4519,12 +4623,14 @@ static void drbdd(struct drbd_connection *connection)
                }
 
                if (shs) {
+                       update_receiver_timing_details(connection, drbd_recv_all_warn);
                        err = drbd_recv_all_warn(connection, pi.data, shs);
                        if (err)
                                goto err_out;
                        pi.size -= shs;
                }
 
+               update_receiver_timing_details(connection, cmd->fn);
                err = cmd->fn(connection, &pi);
                if (err) {
                        drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
index 09803d0d5207ce7fccffc5c4a3cb0229071566cd..c67717d572d16c89b1a4f74701b25c0a7ffeafbe 100644 (file)
@@ -52,7 +52,7 @@ static void _drbd_start_io_acct(struct drbd_device *device, struct drbd_request
 static void _drbd_end_io_acct(struct drbd_device *device, struct drbd_request *req)
 {
        int rw = bio_data_dir(req->master_bio);
-       unsigned long duration = jiffies - req->start_time;
+       unsigned long duration = jiffies - req->start_jif;
        int cpu;
        cpu = part_stat_lock();
        part_stat_add(cpu, &device->vdisk->part0, ticks[rw], duration);
@@ -66,7 +66,7 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device,
 {
        struct drbd_request *req;
 
-       req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
+       req = mempool_alloc(drbd_request_mempool, GFP_NOIO | __GFP_ZERO);
        if (!req)
                return NULL;
 
@@ -84,6 +84,8 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device,
 
        INIT_LIST_HEAD(&req->tl_requests);
        INIT_LIST_HEAD(&req->w.list);
+       INIT_LIST_HEAD(&req->req_pending_master_completion);
+       INIT_LIST_HEAD(&req->req_pending_local);
 
        /* one reference to be put by __drbd_make_request */
        atomic_set(&req->completion_ref, 1);
@@ -92,6 +94,19 @@ static struct drbd_request *drbd_req_new(struct drbd_device *device,
        return req;
 }
 
+static void drbd_remove_request_interval(struct rb_root *root,
+                                        struct drbd_request *req)
+{
+       struct drbd_device *device = req->device;
+       struct drbd_interval *i = &req->i;
+
+       drbd_remove_interval(root, i);
+
+       /* Wake up any processes waiting for this request to complete.  */
+       if (i->waiting)
+               wake_up(&device->misc_wait);
+}
+
 void drbd_req_destroy(struct kref *kref)
 {
        struct drbd_request *req = container_of(kref, struct drbd_request, kref);
@@ -107,14 +122,30 @@ void drbd_req_destroy(struct kref *kref)
                return;
        }
 
-       /* remove it from the transfer log.
-        * well, only if it had been there in the first
-        * place... if it had not (local only or conflicting
-        * and never sent), it should still be "empty" as
-        * initialized in drbd_req_new(), so we can list_del() it
-        * here unconditionally */
+       /* If called from mod_rq_state (expected normal case) or
+        * drbd_send_and_submit (the less likely normal path), this holds the
+        * req_lock, and req->tl_requests will typicaly be on ->transfer_log,
+        * though it may be still empty (never added to the transfer log).
+        *
+        * If called from do_retry(), we do NOT hold the req_lock, but we are
+        * still allowed to unconditionally list_del(&req->tl_requests),
+        * because it will be on a local on-stack list only. */
        list_del_init(&req->tl_requests);
 
+       /* finally remove the request from the conflict detection
+        * respective block_id verification interval tree. */
+       if (!drbd_interval_empty(&req->i)) {
+               struct rb_root *root;
+
+               if (s & RQ_WRITE)
+                       root = &device->write_requests;
+               else
+                       root = &device->read_requests;
+               drbd_remove_request_interval(root, req);
+       } else if (s & (RQ_NET_MASK & ~RQ_NET_DONE) && req->i.size != 0)
+               drbd_err(device, "drbd_req_destroy: Logic BUG: interval empty, but: rq_state=0x%x, sect=%llu, size=%u\n",
+                       s, (unsigned long long)req->i.sector, req->i.size);
+
        /* if it was a write, we may have to set the corresponding
         * bit(s) out-of-sync first. If it had a local part, we need to
         * release the reference to the activity log. */
@@ -188,19 +219,6 @@ void complete_master_bio(struct drbd_device *device,
 }
 
 
-static void drbd_remove_request_interval(struct rb_root *root,
-                                        struct drbd_request *req)
-{
-       struct drbd_device *device = req->device;
-       struct drbd_interval *i = &req->i;
-
-       drbd_remove_interval(root, i);
-
-       /* Wake up any processes waiting for this request to complete.  */
-       if (i->waiting)
-               wake_up(&device->misc_wait);
-}
-
 /* Helper for __req_mod().
  * Set m->bio to the master bio, if it is fit to be completed,
  * or leave it alone (it is initialized to NULL in __req_mod),
@@ -254,18 +272,6 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
        ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
        error = PTR_ERR(req->private_bio);
 
-       /* remove the request from the conflict detection
-        * respective block_id verification hash */
-       if (!drbd_interval_empty(&req->i)) {
-               struct rb_root *root;
-
-               if (rw == WRITE)
-                       root = &device->write_requests;
-               else
-                       root = &device->read_requests;
-               drbd_remove_request_interval(root, req);
-       }
-
        /* Before we can signal completion to the upper layers,
         * we may need to close the current transfer log epoch.
         * We are within the request lock, so we can simply compare
@@ -301,9 +307,24 @@ void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
                m->error = ok ? 0 : (error ?: -EIO);
                m->bio = req->master_bio;
                req->master_bio = NULL;
+               /* We leave it in the tree, to be able to verify later
+                * write-acks in protocol != C during resync.
+                * But we mark it as "complete", so it won't be counted as
+                * conflict in a multi-primary setup. */
+               req->i.completed = true;
        }
+
+       if (req->i.waiting)
+               wake_up(&device->misc_wait);
+
+       /* Either we are about to complete to upper layers,
+        * or we will restart this request.
+        * In either case, the request object will be destroyed soon,
+        * so better remove it from all lists. */
+       list_del_init(&req->req_pending_master_completion);
 }
 
+/* still holds resource->req_lock */
 static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
 {
        struct drbd_device *device = req->device;
@@ -324,12 +345,91 @@ static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_
        return 1;
 }
 
+static void set_if_null_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+       if (!connection)
+               return;
+       if (connection->req_next == NULL)
+               connection->req_next = req;
+}
+
+static void advance_conn_req_next(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+       if (!connection)
+               return;
+       if (connection->req_next != req)
+               return;
+       list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) {
+               const unsigned s = req->rq_state;
+               if (s & RQ_NET_QUEUED)
+                       break;
+       }
+       if (&req->tl_requests == &connection->transfer_log)
+               req = NULL;
+       connection->req_next = req;
+}
+
+static void set_if_null_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+       if (!connection)
+               return;
+       if (connection->req_ack_pending == NULL)
+               connection->req_ack_pending = req;
+}
+
+static void advance_conn_req_ack_pending(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+       if (!connection)
+               return;
+       if (connection->req_ack_pending != req)
+               return;
+       list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) {
+               const unsigned s = req->rq_state;
+               if ((s & RQ_NET_SENT) && (s & RQ_NET_PENDING))
+                       break;
+       }
+       if (&req->tl_requests == &connection->transfer_log)
+               req = NULL;
+       connection->req_ack_pending = req;
+}
+
+static void set_if_null_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+       if (!connection)
+               return;
+       if (connection->req_not_net_done == NULL)
+               connection->req_not_net_done = req;
+}
+
+static void advance_conn_req_not_net_done(struct drbd_peer_device *peer_device, struct drbd_request *req)
+{
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
+       if (!connection)
+               return;
+       if (connection->req_not_net_done != req)
+               return;
+       list_for_each_entry_continue(req, &connection->transfer_log, tl_requests) {
+               const unsigned s = req->rq_state;
+               if ((s & RQ_NET_SENT) && !(s & RQ_NET_DONE))
+                       break;
+       }
+       if (&req->tl_requests == &connection->transfer_log)
+               req = NULL;
+       connection->req_not_net_done = req;
+}
+
 /* I'd like this to be the only place that manipulates
  * req->completion_ref and req->kref. */
 static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
                int clear, int set)
 {
        struct drbd_device *device = req->device;
+       struct drbd_peer_device *peer_device = first_peer_device(device);
        unsigned s = req->rq_state;
        int c_put = 0;
        int k_put = 0;
@@ -356,14 +456,23 @@ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
                atomic_inc(&req->completion_ref);
        }
 
-       if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED))
+       if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED)) {
                atomic_inc(&req->completion_ref);
+               set_if_null_req_next(peer_device, req);
+       }
 
        if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
                kref_get(&req->kref); /* wait for the DONE */
 
-       if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT))
-               atomic_add(req->i.size >> 9, &device->ap_in_flight);
+       if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT)) {
+               /* potentially already completed in the asender thread */
+               if (!(s & RQ_NET_DONE)) {
+                       atomic_add(req->i.size >> 9, &device->ap_in_flight);
+                       set_if_null_req_not_net_done(peer_device, req);
+               }
+               if (s & RQ_NET_PENDING)
+                       set_if_null_req_ack_pending(peer_device, req);
+       }
 
        if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
                atomic_inc(&req->completion_ref);
@@ -386,20 +495,34 @@ static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
                        ++k_put;
                else
                        ++c_put;
+               list_del_init(&req->req_pending_local);
        }
 
        if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
                dec_ap_pending(device);
                ++c_put;
+               req->acked_jif = jiffies;
+               advance_conn_req_ack_pending(peer_device, req);
        }
 
-       if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED))
+       if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED)) {
                ++c_put;
+               advance_conn_req_next(peer_device, req);
+       }
 
-       if ((s & RQ_EXP_BARR_ACK) && !(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
-               if (req->rq_state & RQ_NET_SENT)
+       if (!(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
+               if (s & RQ_NET_SENT)
                        atomic_sub(req->i.size >> 9, &device->ap_in_flight);
-               ++k_put;
+               if (s & RQ_EXP_BARR_ACK)
+                       ++k_put;
+               req->net_done_jif = jiffies;
+
+               /* in ahead/behind mode, or just in case,
+                * before we finally destroy this request,
+                * the caching pointers must not reference it anymore */
+               advance_conn_req_next(peer_device, req);
+               advance_conn_req_ack_pending(peer_device, req);
+               advance_conn_req_not_net_done(peer_device, req);
        }
 
        /* potentially complete and destroy */
@@ -439,6 +562,19 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request
                        bdevname(device->ldev->backing_bdev, b));
 }
 
+/* Helper for HANDED_OVER_TO_NETWORK.
+ * Is this a protocol A write (neither WRITE_ACK nor RECEIVE_ACK expected)?
+ * Is it also still "PENDING"?
+ * --> If so, clear PENDING and set NET_OK below.
+ * If it is a protocol A write, but not RQ_PENDING anymore, neg-ack was faster
+ * (and we must not set RQ_NET_OK) */
+static inline bool is_pending_write_protocol_A(struct drbd_request *req)
+{
+       return (req->rq_state &
+                  (RQ_WRITE|RQ_NET_PENDING|RQ_EXP_WRITE_ACK|RQ_EXP_RECEIVE_ACK))
+               == (RQ_WRITE|RQ_NET_PENDING);
+}
+
 /* obviously this could be coded as many single functions
  * instead of one huge switch,
  * or by putting the code directly in the respective locations
@@ -454,7 +590,9 @@ static void drbd_report_io_error(struct drbd_device *device, struct drbd_request
 int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                struct bio_and_error *m)
 {
-       struct drbd_device *device = req->device;
+       struct drbd_device *const device = req->device;
+       struct drbd_peer_device *const peer_device = first_peer_device(device);
+       struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
        struct net_conf *nc;
        int p, rv = 0;
 
@@ -477,7 +615,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                 * and from w_read_retry_remote */
                D_ASSERT(device, !(req->rq_state & RQ_NET_MASK));
                rcu_read_lock();
-               nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+               nc = rcu_dereference(connection->net_conf);
                p = nc->wire_protocol;
                rcu_read_unlock();
                req->rq_state |=
@@ -549,7 +687,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                D_ASSERT(device, (req->rq_state & RQ_LOCAL_MASK) == 0);
                mod_rq_state(req, m, 0, RQ_NET_QUEUED);
                req->w.cb = w_send_read_req;
-               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+               drbd_queue_work(&connection->sender_work,
                                &req->w);
                break;
 
@@ -585,23 +723,23 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
                mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
                req->w.cb =  w_send_dblock;
-               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+               drbd_queue_work(&connection->sender_work,
                                &req->w);
 
                /* close the epoch, in case it outgrew the limit */
                rcu_read_lock();
-               nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
+               nc = rcu_dereference(connection->net_conf);
                p = nc->max_epoch_size;
                rcu_read_unlock();
-               if (first_peer_device(device)->connection->current_tle_writes >= p)
-                       start_new_tl_epoch(first_peer_device(device)->connection);
+               if (connection->current_tle_writes >= p)
+                       start_new_tl_epoch(connection);
 
                break;
 
        case QUEUE_FOR_SEND_OOS:
                mod_rq_state(req, m, 0, RQ_NET_QUEUED);
                req->w.cb =  w_send_out_of_sync;
-               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+               drbd_queue_work(&connection->sender_work,
                                &req->w);
                break;
 
@@ -615,18 +753,16 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 
        case HANDED_OVER_TO_NETWORK:
                /* assert something? */
-               if (bio_data_dir(req->master_bio) == WRITE &&
-                   !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
+               if (is_pending_write_protocol_A(req))
                        /* this is what is dangerous about protocol A:
                         * pretend it was successfully written on the peer. */
-                       if (req->rq_state & RQ_NET_PENDING)
-                               mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
-                       /* else: neg-ack was faster... */
-                       /* it is still not yet RQ_NET_DONE until the
-                        * corresponding epoch barrier got acked as well,
-                        * so we know what to dirty on connection loss */
-               }
-               mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
+                       mod_rq_state(req, m, RQ_NET_QUEUED|RQ_NET_PENDING,
+                                               RQ_NET_SENT|RQ_NET_OK);
+               else
+                       mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
+               /* It is still not yet RQ_NET_DONE until the
+                * corresponding epoch barrier got acked as well,
+                * so we know what to dirty on connection loss. */
                break;
 
        case OOS_HANDED_TO_NETWORK:
@@ -658,12 +794,13 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
        case WRITE_ACKED_BY_PEER_AND_SIS:
                req->rq_state |= RQ_NET_SIS;
        case WRITE_ACKED_BY_PEER:
-               D_ASSERT(device, req->rq_state & RQ_EXP_WRITE_ACK);
-               /* protocol C; successfully written on peer.
+               /* Normal operation protocol C: successfully written on peer.
+                * During resync, even in protocol != C,
+                * we requested an explicit write ack anyways.
+                * Which means we cannot even assert anything here.
                 * Nothing more to do here.
                 * We want to keep the tl in place for all protocols, to cater
                 * for volatile write-back caches on lower level devices. */
-
                goto ack_common;
        case RECV_ACKED_BY_PEER:
                D_ASSERT(device, req->rq_state & RQ_EXP_RECEIVE_ACK);
@@ -671,7 +808,6 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                 * see also notes above in HANDED_OVER_TO_NETWORK about
                 * protocol != C */
        ack_common:
-               D_ASSERT(device, req->rq_state & RQ_NET_PENDING);
                mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
                break;
 
@@ -714,7 +850,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 
                get_ldev(device); /* always succeeds in this call path */
                req->w.cb = w_restart_disk_io;
-               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+               drbd_queue_work(&connection->sender_work,
                                &req->w);
                break;
 
@@ -736,7 +872,8 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 
                        mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
                        if (req->w.cb) {
-                               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+                               /* w.cb expected to be w_send_dblock, or w_send_read_req */
+                               drbd_queue_work(&connection->sender_work,
                                                &req->w);
                                rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
                        } /* else: FIXME can this happen? */
@@ -769,7 +906,7 @@ int __req_mod(struct drbd_request *req, enum drbd_req_event what,
                break;
 
        case QUEUE_AS_DRBD_BARRIER:
-               start_new_tl_epoch(first_peer_device(device)->connection);
+               start_new_tl_epoch(connection);
                mod_rq_state(req, m, 0, RQ_NET_OK|RQ_NET_DONE);
                break;
        };
@@ -886,6 +1023,9 @@ static void maybe_pull_ahead(struct drbd_device *device)
            connection->agreed_pro_version < 96)
                return;
 
+       if (on_congestion == OC_PULL_AHEAD && device->state.conn == C_AHEAD)
+               return; /* nothing to do ... */
+
        /* If I don't even have good local storage, we can not reasonably try
         * to pull ahead of the peer. We also need the local reference to make
         * sure device->act_log is there.
@@ -1021,6 +1161,7 @@ drbd_submit_req_private_bio(struct drbd_request *req)
         * stable storage, and this is a WRITE, we may not even submit
         * this bio. */
        if (get_ldev(device)) {
+               req->pre_submit_jif = jiffies;
                if (drbd_insert_fault(device,
                                      rw == WRITE ? DRBD_FAULT_DT_WR
                                    : rw == READ  ? DRBD_FAULT_DT_RD
@@ -1035,10 +1176,14 @@ drbd_submit_req_private_bio(struct drbd_request *req)
 
 static void drbd_queue_write(struct drbd_device *device, struct drbd_request *req)
 {
-       spin_lock(&device->submit.lock);
+       spin_lock_irq(&device->resource->req_lock);
        list_add_tail(&req->tl_requests, &device->submit.writes);
-       spin_unlock(&device->submit.lock);
+       list_add_tail(&req->req_pending_master_completion,
+                       &device->pending_master_completion[1 /* WRITE */]);
+       spin_unlock_irq(&device->resource->req_lock);
        queue_work(device->submit.wq, &device->submit.worker);
+       /* do_submit() may sleep internally on al_wait, too */
+       wake_up(&device->al_wait);
 }
 
 /* returns the new drbd_request pointer, if the caller is expected to
@@ -1047,7 +1192,7 @@ static void drbd_queue_write(struct drbd_device *device, struct drbd_request *re
  * Returns ERR_PTR(-ENOMEM) if we cannot allocate a drbd_request.
  */
 static struct drbd_request *
-drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long start_time)
+drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long start_jif)
 {
        const int rw = bio_data_dir(bio);
        struct drbd_request *req;
@@ -1062,7 +1207,7 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long
                bio_endio(bio, -ENOMEM);
                return ERR_PTR(-ENOMEM);
        }
-       req->start_time = start_time;
+       req->start_jif = start_jif;
 
        if (!get_ldev(device)) {
                bio_put(req->private_bio);
@@ -1075,10 +1220,12 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long
        if (rw == WRITE && req->private_bio && req->i.size
        && !test_bit(AL_SUSPENDED, &device->flags)) {
                if (!drbd_al_begin_io_fastpath(device, &req->i)) {
+                       atomic_inc(&device->ap_actlog_cnt);
                        drbd_queue_write(device, req);
                        return NULL;
                }
                req->rq_state |= RQ_IN_ACT_LOG;
+               req->in_actlog_jif = jiffies;
        }
 
        return req;
@@ -1086,11 +1233,13 @@ drbd_request_prepare(struct drbd_device *device, struct bio *bio, unsigned long
 
 static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request *req)
 {
+       struct drbd_resource *resource = device->resource;
        const int rw = bio_rw(req->master_bio);
        struct bio_and_error m = { NULL, };
        bool no_remote = false;
+       bool submit_private_bio = false;
 
-       spin_lock_irq(&device->resource->req_lock);
+       spin_lock_irq(&resource->req_lock);
        if (rw == WRITE) {
                /* This may temporarily give up the req_lock,
                 * but will re-aquire it before it returns here.
@@ -1148,13 +1297,18 @@ static void drbd_send_and_submit(struct drbd_device *device, struct drbd_request
                        no_remote = true;
        }
 
+       /* If it took the fast path in drbd_request_prepare, add it here.
+        * The slow path has added it already. */
+       if (list_empty(&req->req_pending_master_completion))
+               list_add_tail(&req->req_pending_master_completion,
+                       &device->pending_master_completion[rw == WRITE]);
        if (req->private_bio) {
                /* needs to be marked within the same spinlock */
+               list_add_tail(&req->req_pending_local,
+                       &device->pending_completion[rw == WRITE]);
                _req_mod(req, TO_BE_SUBMITTED);
                /* but we need to give up the spinlock to submit */
-               spin_unlock_irq(&device->resource->req_lock);
-               drbd_submit_req_private_bio(req);
-               spin_lock_irq(&device->resource->req_lock);
+               submit_private_bio = true;
        } else if (no_remote) {
 nodata:
                if (__ratelimit(&drbd_ratelimit_state))
@@ -1167,15 +1321,23 @@ nodata:
 out:
        if (drbd_req_put_completion_ref(req, &m, 1))
                kref_put(&req->kref, drbd_req_destroy);
-       spin_unlock_irq(&device->resource->req_lock);
-
+       spin_unlock_irq(&resource->req_lock);
+
+       /* Even though above is a kref_put(), this is safe.
+        * As long as we still need to submit our private bio,
+        * we hold a completion ref, and the request cannot disappear.
+        * If however this request did not even have a private bio to submit
+        * (e.g. remote read), req may already be invalid now.
+        * That's why we cannot check on req->private_bio. */
+       if (submit_private_bio)
+               drbd_submit_req_private_bio(req);
        if (m.bio)
                complete_master_bio(device, &m);
 }
 
-void __drbd_make_request(struct drbd_device *device, struct bio *bio, unsigned long start_time)
+void __drbd_make_request(struct drbd_device *device, struct bio *bio, unsigned long start_jif)
 {
-       struct drbd_request *req = drbd_request_prepare(device, bio, start_time);
+       struct drbd_request *req = drbd_request_prepare(device, bio, start_jif);
        if (IS_ERR_OR_NULL(req))
                return;
        drbd_send_and_submit(device, req);
@@ -1194,6 +1356,8 @@ static void submit_fast_path(struct drbd_device *device, struct list_head *incom
                                continue;
 
                        req->rq_state |= RQ_IN_ACT_LOG;
+                       req->in_actlog_jif = jiffies;
+                       atomic_dec(&device->ap_actlog_cnt);
                }
 
                list_del_init(&req->tl_requests);
@@ -1203,7 +1367,8 @@ static void submit_fast_path(struct drbd_device *device, struct list_head *incom
 
 static bool prepare_al_transaction_nonblock(struct drbd_device *device,
                                            struct list_head *incoming,
-                                           struct list_head *pending)
+                                           struct list_head *pending,
+                                           struct list_head *later)
 {
        struct drbd_request *req, *tmp;
        int wake = 0;
@@ -1212,45 +1377,105 @@ static bool prepare_al_transaction_nonblock(struct drbd_device *device,
        spin_lock_irq(&device->al_lock);
        list_for_each_entry_safe(req, tmp, incoming, tl_requests) {
                err = drbd_al_begin_io_nonblock(device, &req->i);
+               if (err == -ENOBUFS)
+                       break;
                if (err == -EBUSY)
                        wake = 1;
                if (err)
-                       continue;
-               req->rq_state |= RQ_IN_ACT_LOG;
-               list_move_tail(&req->tl_requests, pending);
+                       list_move_tail(&req->tl_requests, later);
+               else
+                       list_move_tail(&req->tl_requests, pending);
        }
        spin_unlock_irq(&device->al_lock);
        if (wake)
                wake_up(&device->al_wait);
-
        return !list_empty(pending);
 }
 
+void send_and_submit_pending(struct drbd_device *device, struct list_head *pending)
+{
+       struct drbd_request *req, *tmp;
+
+       list_for_each_entry_safe(req, tmp, pending, tl_requests) {
+               req->rq_state |= RQ_IN_ACT_LOG;
+               req->in_actlog_jif = jiffies;
+               atomic_dec(&device->ap_actlog_cnt);
+               list_del_init(&req->tl_requests);
+               drbd_send_and_submit(device, req);
+       }
+}
+
 void do_submit(struct work_struct *ws)
 {
        struct drbd_device *device = container_of(ws, struct drbd_device, submit.worker);
-       LIST_HEAD(incoming);
-       LIST_HEAD(pending);
-       struct drbd_request *req, *tmp;
+       LIST_HEAD(incoming);    /* from drbd_make_request() */
+       LIST_HEAD(pending);     /* to be submitted after next AL-transaction commit */
+       LIST_HEAD(busy);        /* blocked by resync requests */
+
+       /* grab new incoming requests */
+       spin_lock_irq(&device->resource->req_lock);
+       list_splice_tail_init(&device->submit.writes, &incoming);
+       spin_unlock_irq(&device->resource->req_lock);
 
        for (;;) {
-               spin_lock(&device->submit.lock);
-               list_splice_tail_init(&device->submit.writes, &incoming);
-               spin_unlock(&device->submit.lock);
+               DEFINE_WAIT(wait);
 
+               /* move used-to-be-busy back to front of incoming */
+               list_splice_init(&busy, &incoming);
                submit_fast_path(device, &incoming);
                if (list_empty(&incoming))
                        break;
 
-skip_fast_path:
-               wait_event(device->al_wait, prepare_al_transaction_nonblock(device, &incoming, &pending));
-               /* Maybe more was queued, while we prepared the transaction?
-                * Try to stuff them into this transaction as well.
-                * Be strictly non-blocking here, no wait_event, we already
-                * have something to commit.
-                * Stop if we don't make any more progres.
-                */
                for (;;) {
+                       prepare_to_wait(&device->al_wait, &wait, TASK_UNINTERRUPTIBLE);
+
+                       list_splice_init(&busy, &incoming);
+                       prepare_al_transaction_nonblock(device, &incoming, &pending, &busy);
+                       if (!list_empty(&pending))
+                               break;
+
+                       schedule();
+
+                       /* If all currently "hot" activity log extents are kept busy by
+                        * incoming requests, we still must not totally starve new
+                        * requests to "cold" extents.
+                        * Something left on &incoming means there had not been
+                        * enough update slots available, and the activity log
+                        * has been marked as "starving".
+                        *
+                        * Try again now, without looking for new requests,
+                        * effectively blocking all new requests until we made
+                        * at least _some_ progress with what we currently have.
+                        */
+                       if (!list_empty(&incoming))
+                               continue;
+
+                       /* Nothing moved to pending, but nothing left
+                        * on incoming: all moved to busy!
+                        * Grab new and iterate. */
+                       spin_lock_irq(&device->resource->req_lock);
+                       list_splice_tail_init(&device->submit.writes, &incoming);
+                       spin_unlock_irq(&device->resource->req_lock);
+               }
+               finish_wait(&device->al_wait, &wait);
+
+               /* If the transaction was full, before all incoming requests
+                * had been processed, skip ahead to commit, and iterate
+                * without splicing in more incoming requests from upper layers.
+                *
+                * Else, if all incoming have been processed,
+                * they have become either "pending" (to be submitted after
+                * next transaction commit) or "busy" (blocked by resync).
+                *
+                * Maybe more was queued, while we prepared the transaction?
+                * Try to stuff those into this transaction as well.
+                * Be strictly non-blocking here,
+                * we already have something to commit.
+                *
+                * Commit if we don't make any more progres.
+                */
+
+               while (list_empty(&incoming)) {
                        LIST_HEAD(more_pending);
                        LIST_HEAD(more_incoming);
                        bool made_progress;
@@ -1260,55 +1485,32 @@ skip_fast_path:
                        if (list_empty(&device->submit.writes))
                                break;
 
-                       spin_lock(&device->submit.lock);
+                       spin_lock_irq(&device->resource->req_lock);
                        list_splice_tail_init(&device->submit.writes, &more_incoming);
-                       spin_unlock(&device->submit.lock);
+                       spin_unlock_irq(&device->resource->req_lock);
 
                        if (list_empty(&more_incoming))
                                break;
 
-                       made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending);
+                       made_progress = prepare_al_transaction_nonblock(device, &more_incoming, &more_pending, &busy);
 
                        list_splice_tail_init(&more_pending, &pending);
                        list_splice_tail_init(&more_incoming, &incoming);
-
                        if (!made_progress)
                                break;
                }
-               drbd_al_begin_io_commit(device, false);
-
-               list_for_each_entry_safe(req, tmp, &pending, tl_requests) {
-                       list_del_init(&req->tl_requests);
-                       drbd_send_and_submit(device, req);
-               }
 
-               /* If all currently hot activity log extents are kept busy by
-                * incoming requests, we still must not totally starve new
-                * requests to cold extents. In that case, prepare one request
-                * in blocking mode. */
-               list_for_each_entry_safe(req, tmp, &incoming, tl_requests) {
-                       list_del_init(&req->tl_requests);
-                       req->rq_state |= RQ_IN_ACT_LOG;
-                       if (!drbd_al_begin_io_prepare(device, &req->i)) {
-                               /* Corresponding extent was hot after all? */
-                               drbd_send_and_submit(device, req);
-                       } else {
-                               /* Found a request to a cold extent.
-                                * Put on "pending" list,
-                                * and try to cumulate with more. */
-                               list_add(&req->tl_requests, &pending);
-                               goto skip_fast_path;
-                       }
-               }
+               drbd_al_begin_io_commit(device);
+               send_and_submit_pending(device, &pending);
        }
 }
 
 void drbd_make_request(struct request_queue *q, struct bio *bio)
 {
        struct drbd_device *device = (struct drbd_device *) q->queuedata;
-       unsigned long start_time;
+       unsigned long start_jif;
 
-       start_time = jiffies;
+       start_jif = jiffies;
 
        /*
         * what we "blindly" assume:
@@ -1316,7 +1518,7 @@ void drbd_make_request(struct request_queue *q, struct bio *bio)
        D_ASSERT(device, IS_ALIGNED(bio->bi_iter.bi_size, 512));
 
        inc_ap_bio(device);
-       __drbd_make_request(device, bio, start_time);
+       __drbd_make_request(device, bio, start_jif);
 }
 
 /* This is called by bio_add_page().
@@ -1353,36 +1555,13 @@ int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct
        return limit;
 }
 
-static void find_oldest_requests(
-               struct drbd_connection *connection,
-               struct drbd_device *device,
-               struct drbd_request **oldest_req_waiting_for_peer,
-               struct drbd_request **oldest_req_waiting_for_disk)
-{
-       struct drbd_request *r;
-       *oldest_req_waiting_for_peer = NULL;
-       *oldest_req_waiting_for_disk = NULL;
-       list_for_each_entry(r, &connection->transfer_log, tl_requests) {
-               const unsigned s = r->rq_state;
-               if (!*oldest_req_waiting_for_peer
-               && ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE)))
-                       *oldest_req_waiting_for_peer = r;
-
-               if (!*oldest_req_waiting_for_disk
-               && (s & RQ_LOCAL_PENDING) && r->device == device)
-                       *oldest_req_waiting_for_disk = r;
-
-               if (*oldest_req_waiting_for_peer && *oldest_req_waiting_for_disk)
-                       break;
-       }
-}
-
 void request_timer_fn(unsigned long data)
 {
        struct drbd_device *device = (struct drbd_device *) data;
        struct drbd_connection *connection = first_peer_device(device)->connection;
-       struct drbd_request *req_disk, *req_peer; /* oldest request */
+       struct drbd_request *req_read, *req_write, *req_peer; /* oldest request */
        struct net_conf *nc;
+       unsigned long oldest_submit_jif;
        unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
        unsigned long now;
 
@@ -1403,14 +1582,31 @@ void request_timer_fn(unsigned long data)
                return; /* Recurring timer stopped */
 
        now = jiffies;
+       nt = now + et;
 
        spin_lock_irq(&device->resource->req_lock);
-       find_oldest_requests(connection, device, &req_peer, &req_disk);
-       if (req_peer == NULL && req_disk == NULL) {
-               spin_unlock_irq(&device->resource->req_lock);
-               mod_timer(&device->request_timer, now + et);
-               return;
-       }
+       req_read = list_first_entry_or_null(&device->pending_completion[0], struct drbd_request, req_pending_local);
+       req_write = list_first_entry_or_null(&device->pending_completion[1], struct drbd_request, req_pending_local);
+       req_peer = connection->req_not_net_done;
+       /* maybe the oldest request waiting for the peer is in fact still
+        * blocking in tcp sendmsg */
+       if (!req_peer && connection->req_next && connection->req_next->pre_send_jif)
+               req_peer = connection->req_next;
+
+       /* evaluate the oldest peer request only in one timer! */
+       if (req_peer && req_peer->device != device)
+               req_peer = NULL;
+
+       /* do we have something to evaluate? */
+       if (req_peer == NULL && req_write == NULL && req_read == NULL)
+               goto out;
+
+       oldest_submit_jif =
+               (req_write && req_read)
+               ? ( time_before(req_write->pre_submit_jif, req_read->pre_submit_jif)
+                 ? req_write->pre_submit_jif : req_read->pre_submit_jif )
+               : req_write ? req_write->pre_submit_jif
+               : req_read ? req_read->pre_submit_jif : now;
 
        /* The request is considered timed out, if
         * - we have some effective timeout from the configuration,
@@ -1429,13 +1625,13 @@ void request_timer_fn(unsigned long data)
         * to expire twice (worst case) to become effective. Good enough.
         */
        if (ent && req_peer &&
-                time_after(now, req_peer->start_time + ent) &&
+                time_after(now, req_peer->pre_send_jif + ent) &&
                !time_in_range(now, connection->last_reconnect_jif, connection->last_reconnect_jif + ent)) {
                drbd_warn(device, "Remote failed to finish a request within ko-count * timeout\n");
                _drbd_set_state(_NS(device, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
        }
-       if (dt && req_disk &&
-                time_after(now, req_disk->start_time + dt) &&
+       if (dt && oldest_submit_jif != now &&
+                time_after(now, oldest_submit_jif + dt) &&
                !time_in_range(now, device->last_reattach_jif, device->last_reattach_jif + dt)) {
                drbd_warn(device, "Local backing device failed to meet the disk-timeout\n");
                __drbd_chk_io_error(device, DRBD_FORCE_DETACH);
@@ -1443,11 +1639,12 @@ void request_timer_fn(unsigned long data)
 
        /* Reschedule timer for the nearest not already expired timeout.
         * Fallback to now + min(effective network timeout, disk timeout). */
-       ent = (ent && req_peer && time_before(now, req_peer->start_time + ent))
-               ? req_peer->start_time + ent : now + et;
-       dt = (dt && req_disk && time_before(now, req_disk->start_time + dt))
-               ? req_disk->start_time + dt : now + et;
+       ent = (ent && req_peer && time_before(now, req_peer->pre_send_jif + ent))
+               ? req_peer->pre_send_jif + ent : now + et;
+       dt = (dt && oldest_submit_jif != now && time_before(now, oldest_submit_jif + dt))
+               ? oldest_submit_jif + dt : now + et;
        nt = time_before(ent, dt) ? ent : dt;
+out:
        spin_unlock_irq(&connection->resource->req_lock);
        mod_timer(&device->request_timer, nt);
 }
index 8566cd5866b4e2388cdb441439f25eecf6071443..9f6a04080e9f76aadfdfedf8d0e1cb408dbcba2a 100644 (file)
@@ -288,6 +288,7 @@ extern void complete_master_bio(struct drbd_device *device,
 extern void request_timer_fn(unsigned long data);
 extern void tl_restart(struct drbd_connection *connection, enum drbd_req_event what);
 extern void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what);
+extern void tl_abort_disk_io(struct drbd_device *device);
 
 /* this is in drbd_main.c */
 extern void drbd_restart_request(struct drbd_request *req);
index a5d8aae00e04c9515d4a684caaeabb079ada6dc5..c35c0f001bb74333887d0e47c23a0598cd203bfe 100644 (file)
@@ -410,7 +410,7 @@ _drbd_request_state(struct drbd_device *device, union drbd_state mask,
        return rv;
 }
 
-static void print_st(struct drbd_device *device, char *name, union drbd_state ns)
+static void print_st(struct drbd_device *device, const char *name, union drbd_state ns)
 {
        drbd_err(device, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c%c%c }\n",
            name,
@@ -952,11 +952,12 @@ enum drbd_state_rv
 __drbd_set_state(struct drbd_device *device, union drbd_state ns,
                 enum chg_state_flags flags, struct completion *done)
 {
+       struct drbd_peer_device *peer_device = first_peer_device(device);
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
        union drbd_state os;
        enum drbd_state_rv rv = SS_SUCCESS;
        enum sanitize_state_warnings ssw;
        struct after_state_chg_work *ascw;
-       bool did_remote, should_do_remote;
 
        os = drbd_read_state(device);
 
@@ -978,9 +979,9 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
                           this happen...*/
 
                        if (is_valid_state(device, os) == rv)
-                               rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection);
+                               rv = is_valid_soft_transition(os, ns, connection);
                } else
-                       rv = is_valid_soft_transition(os, ns, first_peer_device(device)->connection);
+                       rv = is_valid_soft_transition(os, ns, connection);
        }
 
        if (rv < SS_SUCCESS) {
@@ -997,7 +998,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
           sanitize_state(). Only display it here if we where not called from
           _conn_request_state() */
        if (!(flags & CS_DC_SUSP))
-               conn_pr_state_change(first_peer_device(device)->connection, os, ns,
+               conn_pr_state_change(connection, os, ns,
                                     (flags & ~CS_DC_MASK) | CS_DC_SUSP);
 
        /* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
@@ -1008,28 +1009,35 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
            (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
                atomic_inc(&device->local_cnt);
 
-       did_remote = drbd_should_do_remote(device->state);
+       if (!is_sync_state(os.conn) && is_sync_state(ns.conn))
+               clear_bit(RS_DONE, &device->flags);
+
+       /* changes to local_cnt and device flags should be visible before
+        * changes to state, which again should be visible before anything else
+        * depending on that change happens. */
+       smp_wmb();
        device->state.i = ns.i;
-       should_do_remote = drbd_should_do_remote(device->state);
        device->resource->susp = ns.susp;
        device->resource->susp_nod = ns.susp_nod;
        device->resource->susp_fen = ns.susp_fen;
+       smp_wmb();
 
        /* put replicated vs not-replicated requests in seperate epochs */
-       if (did_remote != should_do_remote)
-               start_new_tl_epoch(first_peer_device(device)->connection);
+       if (drbd_should_do_remote((union drbd_dev_state)os.i) !=
+           drbd_should_do_remote((union drbd_dev_state)ns.i))
+               start_new_tl_epoch(connection);
 
        if (os.disk == D_ATTACHING && ns.disk >= D_NEGOTIATING)
                drbd_print_uuids(device, "attached to UUIDs");
 
        /* Wake up role changes, that were delayed because of connection establishing */
        if (os.conn == C_WF_REPORT_PARAMS && ns.conn != C_WF_REPORT_PARAMS &&
-           no_peer_wf_report_params(first_peer_device(device)->connection))
-               clear_bit(STATE_SENT, &first_peer_device(device)->connection->flags);
+           no_peer_wf_report_params(connection))
+               clear_bit(STATE_SENT, &connection->flags);
 
        wake_up(&device->misc_wait);
        wake_up(&device->state_wait);
-       wake_up(&first_peer_device(device)->connection->ping_wait);
+       wake_up(&connection->ping_wait);
 
        /* Aborted verify run, or we reached the stop sector.
         * Log the last position, unless end-of-device. */
@@ -1118,21 +1126,21 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
 
        /* Receiver should clean up itself */
        if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
-               drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver);
+               drbd_thread_stop_nowait(&connection->receiver);
 
        /* Now the receiver finished cleaning up itself, it should die */
        if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
-               drbd_thread_stop_nowait(&first_peer_device(device)->connection->receiver);
+               drbd_thread_stop_nowait(&connection->receiver);
 
        /* Upon network failure, we need to restart the receiver. */
        if (os.conn > C_WF_CONNECTION &&
            ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
-               drbd_thread_restart_nowait(&first_peer_device(device)->connection->receiver);
+               drbd_thread_restart_nowait(&connection->receiver);
 
        /* Resume AL writing if we get a connection */
        if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
                drbd_resume_al(device);
-               first_peer_device(device)->connection->connect_cnt++;
+               connection->connect_cnt++;
        }
 
        /* remember last attach time so request_timer_fn() won't
@@ -1150,7 +1158,7 @@ __drbd_set_state(struct drbd_device *device, union drbd_state ns,
                ascw->w.cb = w_after_state_ch;
                ascw->device = device;
                ascw->done = done;
-               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
+               drbd_queue_work(&connection->sender_work,
                                &ascw->w);
        } else {
                drbd_err(device, "Could not kmalloc an ascw\n");
@@ -1222,13 +1230,16 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
                           union drbd_state ns, enum chg_state_flags flags)
 {
        struct drbd_resource *resource = device->resource;
+       struct drbd_peer_device *peer_device = first_peer_device(device);
+       struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
        struct sib_info sib;
 
        sib.sib_reason = SIB_STATE_CHANGE;
        sib.os = os;
        sib.ns = ns;
 
-       if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
+       if ((os.disk != D_UP_TO_DATE || os.pdsk != D_UP_TO_DATE)
+       &&  (ns.disk == D_UP_TO_DATE && ns.pdsk == D_UP_TO_DATE)) {
                clear_bit(CRASHED_PRIMARY, &device->flags);
                if (device->p_uuid)
                        device->p_uuid[UI_FLAGS] &= ~((u64)2);
@@ -1245,7 +1256,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
           state change. This function might sleep */
 
        if (ns.susp_nod) {
-               struct drbd_connection *connection = first_peer_device(device)->connection;
                enum drbd_req_event what = NOTHING;
 
                spin_lock_irq(&device->resource->req_lock);
@@ -1267,8 +1277,6 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
        }
 
        if (ns.susp_fen) {
-               struct drbd_connection *connection = first_peer_device(device)->connection;
-
                spin_lock_irq(&device->resource->req_lock);
                if (resource->susp_fen && conn_lowest_conn(connection) >= C_CONNECTED) {
                        /* case2: The connection was established again: */
@@ -1294,8 +1302,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
         * which is unexpected. */
        if ((os.conn != C_SYNC_SOURCE && os.conn != C_PAUSED_SYNC_S) &&
            (ns.conn == C_SYNC_SOURCE || ns.conn == C_PAUSED_SYNC_S) &&
-           first_peer_device(device)->connection->agreed_pro_version >= 96 && get_ldev(device)) {
-               drbd_gen_and_send_sync_uuid(first_peer_device(device));
+           connection->agreed_pro_version >= 96 && get_ldev(device)) {
+               drbd_gen_and_send_sync_uuid(peer_device);
                put_ldev(device);
        }
 
@@ -1309,8 +1317,8 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
                atomic_set(&device->rs_pending_cnt, 0);
                drbd_rs_cancel_all(device);
 
-               drbd_send_uuids(first_peer_device(device));
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_uuids(peer_device);
+               drbd_send_state(peer_device, ns);
        }
        /* No point in queuing send_bitmap if we don't have a connection
         * anymore, so check also the _current_ state, not only the new state
@@ -1335,7 +1343,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
                                        set_bit(NEW_CUR_UUID, &device->flags);
                                } else {
                                        drbd_uuid_new_current(device);
-                                       drbd_send_uuids(first_peer_device(device));
+                                       drbd_send_uuids(peer_device);
                                }
                        }
                        put_ldev(device);
@@ -1346,7 +1354,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
                if (os.peer == R_SECONDARY && ns.peer == R_PRIMARY &&
                    device->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
                        drbd_uuid_new_current(device);
-                       drbd_send_uuids(first_peer_device(device));
+                       drbd_send_uuids(peer_device);
                }
                /* D_DISKLESS Peer becomes secondary */
                if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
@@ -1373,16 +1381,16 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
        /* Last part of the attaching process ... */
        if (ns.conn >= C_CONNECTED &&
            os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
-               drbd_send_sizes(first_peer_device(device), 0, 0);  /* to start sync... */
-               drbd_send_uuids(first_peer_device(device));
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_sizes(peer_device, 0, 0);  /* to start sync... */
+               drbd_send_uuids(peer_device);
+               drbd_send_state(peer_device, ns);
        }
 
        /* We want to pause/continue resync, tell peer. */
        if (ns.conn >= C_CONNECTED &&
             ((os.aftr_isp != ns.aftr_isp) ||
              (os.user_isp != ns.user_isp)))
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_state(peer_device, ns);
 
        /* In case one of the isp bits got set, suspend other devices. */
        if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
@@ -1392,10 +1400,10 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
        /* Make sure the peer gets informed about eventual state
           changes (ISP bits) while we were in WFReportParams. */
        if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_state(peer_device, ns);
 
        if (os.conn != C_AHEAD && ns.conn == C_AHEAD)
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_state(peer_device, ns);
 
        /* We are in the progress to start a full sync... */
        if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
@@ -1449,7 +1457,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
                                        drbd_disk_str(device->state.disk));
 
                        if (ns.conn >= C_CONNECTED)
-                               drbd_send_state(first_peer_device(device), ns);
+                               drbd_send_state(peer_device, ns);
 
                        drbd_rs_cancel_all(device);
 
@@ -1473,7 +1481,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
                                 drbd_disk_str(device->state.disk));
 
                if (ns.conn >= C_CONNECTED)
-                       drbd_send_state(first_peer_device(device), ns);
+                       drbd_send_state(peer_device, ns);
                /* corresponding get_ldev in __drbd_set_state
                 * this may finally trigger drbd_ldev_destroy. */
                put_ldev(device);
@@ -1481,7 +1489,7 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
 
        /* Notify peer that I had a local IO error, and did not detached.. */
        if (os.disk == D_UP_TO_DATE && ns.disk == D_INCONSISTENT && ns.conn >= C_CONNECTED)
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_state(peer_device, ns);
 
        /* Disks got bigger while they were detached */
        if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
@@ -1499,14 +1507,14 @@ static void after_state_ch(struct drbd_device *device, union drbd_state os,
        /* sync target done with resync.  Explicitly notify peer, even though
         * it should (at least for non-empty resyncs) already know itself. */
        if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_state(peer_device, ns);
 
        /* Verify finished, or reached stop sector.  Peer did not know about
         * the stop sector, and we may even have changed the stop sector during
         * verify to interrupt/stop early.  Send the new state. */
        if (os.conn == C_VERIFY_S && ns.conn == C_CONNECTED
        && verify_can_do_stop_sector(device))
-               drbd_send_state(first_peer_device(device), ns);
+               drbd_send_state(peer_device, ns);
 
        /* This triggers bitmap writeout of potentially still unwritten pages
         * if the resync finished cleanly, or aborted because of peer disk
@@ -1563,7 +1571,7 @@ static int w_after_conn_state_ch(struct drbd_work *w, int unused)
                old_conf = connection->net_conf;
                connection->my_addr_len = 0;
                connection->peer_addr_len = 0;
-               rcu_assign_pointer(connection->net_conf, NULL);
+               RCU_INIT_POINTER(connection->net_conf, NULL);
                conn_free_crypto(connection);
                mutex_unlock(&connection->resource->conf_update);
 
@@ -1599,7 +1607,7 @@ static int w_after_conn_state_ch(struct drbd_work *w, int unused)
        return 0;
 }
 
-void conn_old_common_state(struct drbd_connection *connection, union drbd_state *pcs, enum chg_state_flags *pf)
+static void conn_old_common_state(struct drbd_connection *connection, union drbd_state *pcs, enum chg_state_flags *pf)
 {
        enum chg_state_flags flags = ~0;
        struct drbd_peer_device *peer_device;
@@ -1688,7 +1696,7 @@ conn_is_valid_transition(struct drbd_connection *connection, union drbd_state ma
        return rv;
 }
 
-void
+static void
 conn_set_state(struct drbd_connection *connection, union drbd_state mask, union drbd_state val,
               union drbd_state *pns_min, union drbd_state *pns_max, enum chg_state_flags flags)
 {
index d8f57b6305cd6f84ec0be24309512fe5b25a8e92..50776b36282868415d7b48ebed66fbf1f5dec09f 100644 (file)
@@ -67,13 +67,10 @@ rwlock_t global_state_lock;
  */
 void drbd_md_io_complete(struct bio *bio, int error)
 {
-       struct drbd_md_io *md_io;
        struct drbd_device *device;
 
-       md_io = (struct drbd_md_io *)bio->bi_private;
-       device = container_of(md_io, struct drbd_device, md_io);
-
-       md_io->error = error;
+       device = bio->bi_private;
+       device->md_io.error = error;
 
        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
         * to timeout on the lower level device, and eventually detach from it.
@@ -87,7 +84,7 @@ void drbd_md_io_complete(struct bio *bio, int error)
         * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
         */
        drbd_md_put_buffer(device);
-       md_io->done = 1;
+       device->md_io.done = 1;
        wake_up(&device->misc_wait);
        bio_put(bio);
        if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
@@ -135,6 +132,7 @@ void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(l
        i = peer_req->i;
        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
        block_id = peer_req->block_id;
+       peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 
        spin_lock_irqsave(&device->resource->req_lock, flags);
        device->writ_cnt += peer_req->i.size >> 9;
@@ -398,9 +396,6 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector,
        if (!get_ldev(device))
                return -EIO;
 
-       if (drbd_rs_should_slow_down(device, sector))
-               goto defer;
-
        /* GFP_TRY, because if there is no memory available right now, this may
         * be rescheduled for later. It is "only" background resync, after all. */
        peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
@@ -410,7 +405,7 @@ static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector,
 
        peer_req->w.cb = w_e_send_csum;
        spin_lock_irq(&device->resource->req_lock);
-       list_add(&peer_req->w.list, &device->read_ee);
+       list_add_tail(&peer_req->w.list, &device->read_ee);
        spin_unlock_irq(&device->resource->req_lock);
 
        atomic_add(size >> 9, &device->rs_sect_ev);
@@ -452,9 +447,9 @@ void resync_timer_fn(unsigned long data)
 {
        struct drbd_device *device = (struct drbd_device *) data;
 
-       if (list_empty(&device->resync_work.list))
-               drbd_queue_work(&first_peer_device(device)->connection->sender_work,
-                               &device->resync_work);
+       drbd_queue_work_if_unqueued(
+               &first_peer_device(device)->connection->sender_work,
+               &device->resync_work);
 }
 
 static void fifo_set(struct fifo_buffer *fb, int value)
@@ -504,9 +499,9 @@ struct fifo_buffer *fifo_alloc(int fifo_size)
 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 {
        struct disk_conf *dc;
-       unsigned int want;     /* The number of sectors we want in the proxy */
+       unsigned int want;     /* The number of sectors we want in-flight */
        int req_sect; /* Number of sectors to request in this turn */
-       int correction; /* Number of sectors more we need in the proxy*/
+       int correction; /* Number of sectors more we need in-flight */
        int cps; /* correction per invocation of drbd_rs_controller() */
        int steps; /* Number of time steps to plan ahead */
        int curr_corr;
@@ -577,20 +572,27 @@ static int drbd_rs_number_requests(struct drbd_device *device)
         * potentially causing a distributed deadlock on congestion during
         * online-verify or (checksum-based) resync, if max-buffers,
         * socket buffer sizes and resync rate settings are mis-configured. */
-       if (mxb - device->rs_in_flight < number)
-               number = mxb - device->rs_in_flight;
+
+       /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
+        * mxb (as used here, and in drbd_alloc_pages on the peer) is
+        * "number of pages" (typically also 4k),
+        * but "rs_in_flight" is in "sectors" (512 Byte). */
+       if (mxb - device->rs_in_flight/8 < number)
+               number = mxb - device->rs_in_flight/8;
 
        return number;
 }
 
-static int make_resync_request(struct drbd_device *device, int cancel)
+static int make_resync_request(struct drbd_device *const device, int cancel)
 {
+       struct drbd_peer_device *const peer_device = first_peer_device(device);
+       struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
        unsigned long bit;
        sector_t sector;
        const sector_t capacity = drbd_get_capacity(device->this_bdev);
        int max_bio_size;
        int number, rollback_i, size;
-       int align, queued, sndbuf;
+       int align, requeue = 0;
        int i = 0;
 
        if (unlikely(cancel))
@@ -617,17 +619,22 @@ static int make_resync_request(struct drbd_device *device, int cancel)
                goto requeue;
 
        for (i = 0; i < number; i++) {
-               /* Stop generating RS requests, when half of the send buffer is filled */
-               mutex_lock(&first_peer_device(device)->connection->data.mutex);
-               if (first_peer_device(device)->connection->data.socket) {
-                       queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued;
-                       sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf;
-               } else {
-                       queued = 1;
-                       sndbuf = 0;
-               }
-               mutex_unlock(&first_peer_device(device)->connection->data.mutex);
-               if (queued > sndbuf / 2)
+               /* Stop generating RS requests when half of the send buffer is filled,
+                * but notify TCP that we'd like to have more space. */
+               mutex_lock(&connection->data.mutex);
+               if (connection->data.socket) {
+                       struct sock *sk = connection->data.socket->sk;
+                       int queued = sk->sk_wmem_queued;
+                       int sndbuf = sk->sk_sndbuf;
+                       if (queued > sndbuf / 2) {
+                               requeue = 1;
+                               if (sk->sk_socket)
+                                       set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
+                       }
+               } else
+                       requeue = 1;
+               mutex_unlock(&connection->data.mutex);
+               if (requeue)
                        goto requeue;
 
 next_sector:
@@ -642,8 +649,7 @@ next_sector:
 
                sector = BM_BIT_TO_SECT(bit);
 
-               if (drbd_rs_should_slow_down(device, sector) ||
-                   drbd_try_rs_begin_io(device, sector)) {
+               if (drbd_try_rs_begin_io(device, sector)) {
                        device->bm_resync_fo = bit;
                        goto requeue;
                }
@@ -696,9 +702,9 @@ next_sector:
                /* adjust very last sectors, in case we are oddly sized */
                if (sector + (size>>9) > capacity)
                        size = (capacity-sector)<<9;
-               if (first_peer_device(device)->connection->agreed_pro_version >= 89 &&
-                   first_peer_device(device)->connection->csums_tfm) {
-                       switch (read_for_csum(first_peer_device(device), sector, size)) {
+
+               if (device->use_csums) {
+                       switch (read_for_csum(peer_device, sector, size)) {
                        case -EIO: /* Disk failure */
                                put_ldev(device);
                                return -EIO;
@@ -717,7 +723,7 @@ next_sector:
                        int err;
 
                        inc_rs_pending(device);
-                       err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST,
+                       err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
                                                 sector, size, ID_SYNCER);
                        if (err) {
                                drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
@@ -774,8 +780,7 @@ static int make_ov_request(struct drbd_device *device, int cancel)
 
                size = BM_BLOCK_SIZE;
 
-               if (drbd_rs_should_slow_down(device, sector) ||
-                   drbd_try_rs_begin_io(device, sector)) {
+               if (drbd_try_rs_begin_io(device, sector)) {
                        device->ov_position = sector;
                        goto requeue;
                }
@@ -911,7 +916,7 @@ int drbd_resync_finished(struct drbd_device *device)
                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
                        khelper_cmd = "after-resync-target";
 
-               if (first_peer_device(device)->connection->csums_tfm && device->rs_total) {
+               if (device->use_csums && device->rs_total) {
                        const unsigned long s = device->rs_same_csum;
                        const unsigned long t = device->rs_total;
                        const int ratio =
@@ -1351,13 +1356,15 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
 {
        struct drbd_request *req = container_of(w, struct drbd_request, w);
        struct drbd_device *device = req->device;
-       struct drbd_connection *connection = first_peer_device(device)->connection;
+       struct drbd_peer_device *const peer_device = first_peer_device(device);
+       struct drbd_connection *const connection = peer_device->connection;
        int err;
 
        if (unlikely(cancel)) {
                req_mod(req, SEND_CANCELED);
                return 0;
        }
+       req->pre_send_jif = jiffies;
 
        /* this time, no connection->send.current_epoch_writes++;
         * If it was sent, it was the closing barrier for the last
@@ -1365,7 +1372,7 @@ int w_send_out_of_sync(struct drbd_work *w, int cancel)
         * No more barriers will be sent, until we leave AHEAD mode again. */
        maybe_send_barrier(connection, req->epoch);
 
-       err = drbd_send_out_of_sync(first_peer_device(device), req);
+       err = drbd_send_out_of_sync(peer_device, req);
        req_mod(req, OOS_HANDED_TO_NETWORK);
 
        return err;
@@ -1380,19 +1387,21 @@ int w_send_dblock(struct drbd_work *w, int cancel)
 {
        struct drbd_request *req = container_of(w, struct drbd_request, w);
        struct drbd_device *device = req->device;
-       struct drbd_connection *connection = first_peer_device(device)->connection;
+       struct drbd_peer_device *const peer_device = first_peer_device(device);
+       struct drbd_connection *connection = peer_device->connection;
        int err;
 
        if (unlikely(cancel)) {
                req_mod(req, SEND_CANCELED);
                return 0;
        }
+       req->pre_send_jif = jiffies;
 
        re_init_if_first_write(connection, req->epoch);
        maybe_send_barrier(connection, req->epoch);
        connection->send.current_epoch_writes++;
 
-       err = drbd_send_dblock(first_peer_device(device), req);
+       err = drbd_send_dblock(peer_device, req);
        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
 
        return err;
@@ -1407,19 +1416,21 @@ int w_send_read_req(struct drbd_work *w, int cancel)
 {
        struct drbd_request *req = container_of(w, struct drbd_request, w);
        struct drbd_device *device = req->device;
-       struct drbd_connection *connection = first_peer_device(device)->connection;
+       struct drbd_peer_device *const peer_device = first_peer_device(device);
+       struct drbd_connection *connection = peer_device->connection;
        int err;
 
        if (unlikely(cancel)) {
                req_mod(req, SEND_CANCELED);
                return 0;
        }
+       req->pre_send_jif = jiffies;
 
        /* Even read requests may close a write epoch,
         * if there was any yet. */
        maybe_send_barrier(connection, req->epoch);
 
-       err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size,
+       err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
                                 (unsigned long)req);
 
        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
@@ -1433,7 +1444,7 @@ int w_restart_disk_io(struct drbd_work *w, int cancel)
        struct drbd_device *device = req->device;
 
        if (bio_data_dir(req->master_bio) == WRITE && req-