Merge tag 'locks-v4.19-1' of git://git.kernel.org/pub/scm/linux/kernel/git/jlayton...
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24 */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched/signal.h>
29 #include <linux/wait.h>
30 #include <linux/mm.h>
31 #include <linux/memcontrol.h>
32 #include <linux/mm_inline.h>
33 #include <linux/slab.h>
34 #include <linux/random.h>
35 #include <linux/string.h>
36 #include <linux/scatterlist.h>
37
38 #include "drbd_int.h"
39 #include "drbd_protocol.h"
40 #include "drbd_req.h"
41
42 static int make_ov_request(struct drbd_device *, int);
43 static int make_resync_request(struct drbd_device *, int);
44
45 /* endio handlers:
46  *   drbd_md_endio (defined here)
47  *   drbd_request_endio (defined here)
48  *   drbd_peer_request_endio (defined here)
49  *   drbd_bm_endio (defined in drbd_bitmap.c)
50  *
51  * For all these callbacks, note the following:
52  * The callbacks will be called in irq context by the IDE drivers,
53  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
54  * Try to get the locking right :)
55  *
56  */
57
58 /* used for synchronous meta data and bitmap IO
59  * submitted by drbd_md_sync_page_io()
60  */
61 void drbd_md_endio(struct bio *bio)
62 {
63         struct drbd_device *device;
64
65         device = bio->bi_private;
66         device->md_io.error = blk_status_to_errno(bio->bi_status);
67
68         /* special case: drbd_md_read() during drbd_adm_attach() */
69         if (device->ldev)
70                 put_ldev(device);
71         bio_put(bio);
72
73         /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
74          * to timeout on the lower level device, and eventually detach from it.
75          * If this io completion runs after that timeout expired, this
76          * drbd_md_put_buffer() may allow us to finally try and re-attach.
77          * During normal operation, this only puts that extra reference
78          * down to 1 again.
79          * Make sure we first drop the reference, and only then signal
80          * completion, or we may (in drbd_al_read_log()) cycle so fast into the
81          * next drbd_md_sync_page_io(), that we trigger the
82          * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
83          */
84         drbd_md_put_buffer(device);
85         device->md_io.done = 1;
86         wake_up(&device->misc_wait);
87 }
88
89 /* reads on behalf of the partner,
90  * "submitted" by the receiver
91  */
92 static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
93 {
94         unsigned long flags = 0;
95         struct drbd_peer_device *peer_device = peer_req->peer_device;
96         struct drbd_device *device = peer_device->device;
97
98         spin_lock_irqsave(&device->resource->req_lock, flags);
99         device->read_cnt += peer_req->i.size >> 9;
100         list_del(&peer_req->w.list);
101         if (list_empty(&device->read_ee))
102                 wake_up(&device->ee_wait);
103         if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
104                 __drbd_chk_io_error(device, DRBD_READ_ERROR);
105         spin_unlock_irqrestore(&device->resource->req_lock, flags);
106
107         drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
108         put_ldev(device);
109 }
110
111 /* writes on behalf of the partner, or resync writes,
112  * "submitted" by the receiver, final stage.  */
113 void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
114 {
115         unsigned long flags = 0;
116         struct drbd_peer_device *peer_device = peer_req->peer_device;
117         struct drbd_device *device = peer_device->device;
118         struct drbd_connection *connection = peer_device->connection;
119         struct drbd_interval i;
120         int do_wake;
121         u64 block_id;
122         int do_al_complete_io;
123
124         /* after we moved peer_req to done_ee,
125          * we may no longer access it,
126          * it may be freed/reused already!
127          * (as soon as we release the req_lock) */
128         i = peer_req->i;
129         do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
130         block_id = peer_req->block_id;
131         peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
132
133         if (peer_req->flags & EE_WAS_ERROR) {
134                 /* In protocol != C, we usually do not send write acks.
135                  * In case of a write error, send the neg ack anyways. */
136                 if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
137                         inc_unacked(device);
138                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
139         }
140
141         spin_lock_irqsave(&device->resource->req_lock, flags);
142         device->writ_cnt += peer_req->i.size >> 9;
143         list_move_tail(&peer_req->w.list, &device->done_ee);
144
145         /*
146          * Do not remove from the write_requests tree here: we did not send the
147          * Ack yet and did not wake possibly waiting conflicting requests.
148          * Removed from the tree from "drbd_process_done_ee" within the
149          * appropriate dw.cb (e_end_block/e_end_resync_block) or from
150          * _drbd_clear_done_ee.
151          */
152
153         do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
154
155         /* FIXME do we want to detach for failed REQ_DISCARD?
156          * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
157         if (peer_req->flags & EE_WAS_ERROR)
158                 __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
159
160         if (connection->cstate >= C_WF_REPORT_PARAMS) {
161                 kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
162                 if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
163                         kref_put(&device->kref, drbd_destroy_device);
164         }
165         spin_unlock_irqrestore(&device->resource->req_lock, flags);
166
167         if (block_id == ID_SYNCER)
168                 drbd_rs_complete_io(device, i.sector);
169
170         if (do_wake)
171                 wake_up(&device->ee_wait);
172
173         if (do_al_complete_io)
174                 drbd_al_complete_io(device, &i);
175
176         put_ldev(device);
177 }
178
179 /* writes on behalf of the partner, or resync writes,
180  * "submitted" by the receiver.
181  */
182 void drbd_peer_request_endio(struct bio *bio)
183 {
184         struct drbd_peer_request *peer_req = bio->bi_private;
185         struct drbd_device *device = peer_req->peer_device->device;
186         bool is_write = bio_data_dir(bio) == WRITE;
187         bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
188                           bio_op(bio) == REQ_OP_DISCARD;
189
190         if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
191                 drbd_warn(device, "%s: error=%d s=%llus\n",
192                                 is_write ? (is_discard ? "discard" : "write")
193                                         : "read", bio->bi_status,
194                                 (unsigned long long)peer_req->i.sector);
195
196         if (bio->bi_status)
197                 set_bit(__EE_WAS_ERROR, &peer_req->flags);
198
199         bio_put(bio); /* no need for the bio anymore */
200         if (atomic_dec_and_test(&peer_req->pending_bios)) {
201                 if (is_write)
202                         drbd_endio_write_sec_final(peer_req);
203                 else
204                         drbd_endio_read_sec_final(peer_req);
205         }
206 }
207
208 static void
209 drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
210 {
211         panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
212                 device->minor, device->resource->name, device->vnr);
213 }
214
215 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
216  */
217 void drbd_request_endio(struct bio *bio)
218 {
219         unsigned long flags;
220         struct drbd_request *req = bio->bi_private;
221         struct drbd_device *device = req->device;
222         struct bio_and_error m;
223         enum drbd_req_event what;
224
225         /* If this request was aborted locally before,
226          * but now was completed "successfully",
227          * chances are that this caused arbitrary data corruption.
228          *
229          * "aborting" requests, or force-detaching the disk, is intended for
230          * completely blocked/hung local backing devices which do no longer
231          * complete requests at all, not even do error completions.  In this
232          * situation, usually a hard-reset and failover is the only way out.
233          *
234          * By "aborting", basically faking a local error-completion,
235          * we allow for a more graceful swichover by cleanly migrating services.
236          * Still the affected node has to be rebooted "soon".
237          *
238          * By completing these requests, we allow the upper layers to re-use
239          * the associated data pages.
240          *
241          * If later the local backing device "recovers", and now DMAs some data
242          * from disk into the original request pages, in the best case it will
243          * just put random data into unused pages; but typically it will corrupt
244          * meanwhile completely unrelated data, causing all sorts of damage.
245          *
246          * Which means delayed successful completion,
247          * especially for READ requests,
248          * is a reason to panic().
249          *
250          * We assume that a delayed *error* completion is OK,
251          * though we still will complain noisily about it.
252          */
253         if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
254                 if (__ratelimit(&drbd_ratelimit_state))
255                         drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
256
257                 if (!bio->bi_status)
258                         drbd_panic_after_delayed_completion_of_aborted_request(device);
259         }
260
261         /* to avoid recursion in __req_mod */
262         if (unlikely(bio->bi_status)) {
263                 switch (bio_op(bio)) {
264                 case REQ_OP_WRITE_ZEROES:
265                 case REQ_OP_DISCARD:
266                         if (bio->bi_status == BLK_STS_NOTSUPP)
267                                 what = DISCARD_COMPLETED_NOTSUPP;
268                         else
269                                 what = DISCARD_COMPLETED_WITH_ERROR;
270                         break;
271                 case REQ_OP_READ:
272                         if (bio->bi_opf & REQ_RAHEAD)
273                                 what = READ_AHEAD_COMPLETED_WITH_ERROR;
274                         else
275                                 what = READ_COMPLETED_WITH_ERROR;
276                         break;
277                 default:
278                         what = WRITE_COMPLETED_WITH_ERROR;
279                         break;
280                 }
281         } else {
282                 what = COMPLETED_OK;
283         }
284
285         req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
286         bio_put(bio);
287
288         /* not req_mod(), we need irqsave here! */
289         spin_lock_irqsave(&device->resource->req_lock, flags);
290         __req_mod(req, what, &m);
291         spin_unlock_irqrestore(&device->resource->req_lock, flags);
292         put_ldev(device);
293
294         if (m.bio)
295                 complete_master_bio(device, &m);
296 }
297
298 void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
299 {
300         AHASH_REQUEST_ON_STACK(req, tfm);
301         struct scatterlist sg;
302         struct page *page = peer_req->pages;
303         struct page *tmp;
304         unsigned len;
305
306         ahash_request_set_tfm(req, tfm);
307         ahash_request_set_callback(req, 0, NULL, NULL);
308
309         sg_init_table(&sg, 1);
310         crypto_ahash_init(req);
311
312         while ((tmp = page_chain_next(page))) {
313                 /* all but the last page will be fully used */
314                 sg_set_page(&sg, page, PAGE_SIZE, 0);
315                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
316                 crypto_ahash_update(req);
317                 page = tmp;
318         }
319         /* and now the last, possibly only partially used page */
320         len = peer_req->i.size & (PAGE_SIZE - 1);
321         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
322         ahash_request_set_crypt(req, &sg, digest, sg.length);
323         crypto_ahash_finup(req);
324         ahash_request_zero(req);
325 }
326
327 void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
328 {
329         AHASH_REQUEST_ON_STACK(req, tfm);
330         struct scatterlist sg;
331         struct bio_vec bvec;
332         struct bvec_iter iter;
333
334         ahash_request_set_tfm(req, tfm);
335         ahash_request_set_callback(req, 0, NULL, NULL);
336
337         sg_init_table(&sg, 1);
338         crypto_ahash_init(req);
339
340         bio_for_each_segment(bvec, bio, iter) {
341                 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
342                 ahash_request_set_crypt(req, &sg, NULL, sg.length);
343                 crypto_ahash_update(req);
344                 /* REQ_OP_WRITE_SAME has only one segment,
345                  * checksum the payload only once. */
346                 if (bio_op(bio) == REQ_OP_WRITE_SAME)
347                         break;
348         }
349         ahash_request_set_crypt(req, NULL, digest, 0);
350         crypto_ahash_final(req);
351         ahash_request_zero(req);
352 }
353
354 /* MAYBE merge common code with w_e_end_ov_req */
355 static int w_e_send_csum(struct drbd_work *w, int cancel)
356 {
357         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
358         struct drbd_peer_device *peer_device = peer_req->peer_device;
359         struct drbd_device *device = peer_device->device;
360         int digest_size;
361         void *digest;
362         int err = 0;
363
364         if (unlikely(cancel))
365                 goto out;
366
367         if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
368                 goto out;
369
370         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
371         digest = kmalloc(digest_size, GFP_NOIO);
372         if (digest) {
373                 sector_t sector = peer_req->i.sector;
374                 unsigned int size = peer_req->i.size;
375                 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
376                 /* Free peer_req and pages before send.
377                  * In case we block on congestion, we could otherwise run into
378                  * some distributed deadlock, if the other side blocks on
379                  * congestion as well, because our receiver blocks in
380                  * drbd_alloc_pages due to pp_in_use > max_buffers. */
381                 drbd_free_peer_req(device, peer_req);
382                 peer_req = NULL;
383                 inc_rs_pending(device);
384                 err = drbd_send_drequest_csum(peer_device, sector, size,
385                                               digest, digest_size,
386                                               P_CSUM_RS_REQUEST);
387                 kfree(digest);
388         } else {
389                 drbd_err(device, "kmalloc() of digest failed.\n");
390                 err = -ENOMEM;
391         }
392
393 out:
394         if (peer_req)
395                 drbd_free_peer_req(device, peer_req);
396
397         if (unlikely(err))
398                 drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
399         return err;
400 }
401
402 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
403
404 static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
405 {
406         struct drbd_device *device = peer_device->device;
407         struct drbd_peer_request *peer_req;
408
409         if (!get_ldev(device))
410                 return -EIO;
411
412         /* GFP_TRY, because if there is no memory available right now, this may
413          * be rescheduled for later. It is "only" background resync, after all. */
414         peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
415                                        size, size, GFP_TRY);
416         if (!peer_req)
417                 goto defer;
418
419         peer_req->w.cb = w_e_send_csum;
420         spin_lock_irq(&device->resource->req_lock);
421         list_add_tail(&peer_req->w.list, &device->read_ee);
422         spin_unlock_irq(&device->resource->req_lock);
423
424         atomic_add(size >> 9, &device->rs_sect_ev);
425         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
426                                      DRBD_FAULT_RS_RD) == 0)
427                 return 0;
428
429         /* If it failed because of ENOMEM, retry should help.  If it failed
430          * because bio_add_page failed (probably broken lower level driver),
431          * retry may or may not help.
432          * If it does not, you may need to force disconnect. */
433         spin_lock_irq(&device->resource->req_lock);
434         list_del(&peer_req->w.list);
435         spin_unlock_irq(&device->resource->req_lock);
436
437         drbd_free_peer_req(device, peer_req);
438 defer:
439         put_ldev(device);
440         return -EAGAIN;
441 }
442
443 int w_resync_timer(struct drbd_work *w, int cancel)
444 {
445         struct drbd_device *device =
446                 container_of(w, struct drbd_device, resync_work);
447
448         switch (device->state.conn) {
449         case C_VERIFY_S:
450                 make_ov_request(device, cancel);
451                 break;
452         case C_SYNC_TARGET:
453                 make_resync_request(device, cancel);
454                 break;
455         }
456
457         return 0;
458 }
459
460 void resync_timer_fn(struct timer_list *t)
461 {
462         struct drbd_device *device = from_timer(device, t, resync_timer);
463
464         drbd_queue_work_if_unqueued(
465                 &first_peer_device(device)->connection->sender_work,
466                 &device->resync_work);
467 }
468
469 static void fifo_set(struct fifo_buffer *fb, int value)
470 {
471         int i;
472
473         for (i = 0; i < fb->size; i++)
474                 fb->values[i] = value;
475 }
476
477 static int fifo_push(struct fifo_buffer *fb, int value)
478 {
479         int ov;
480
481         ov = fb->values[fb->head_index];
482         fb->values[fb->head_index++] = value;
483
484         if (fb->head_index >= fb->size)
485                 fb->head_index = 0;
486
487         return ov;
488 }
489
490 static void fifo_add_val(struct fifo_buffer *fb, int value)
491 {
492         int i;
493
494         for (i = 0; i < fb->size; i++)
495                 fb->values[i] += value;
496 }
497
498 struct fifo_buffer *fifo_alloc(int fifo_size)
499 {
500         struct fifo_buffer *fb;
501
502         fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
503         if (!fb)
504                 return NULL;
505
506         fb->head_index = 0;
507         fb->size = fifo_size;
508         fb->total = 0;
509
510         return fb;
511 }
512
513 static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
514 {
515         struct disk_conf *dc;
516         unsigned int want;     /* The number of sectors we want in-flight */
517         int req_sect; /* Number of sectors to request in this turn */
518         int correction; /* Number of sectors more we need in-flight */
519         int cps; /* correction per invocation of drbd_rs_controller() */
520         int steps; /* Number of time steps to plan ahead */
521         int curr_corr;
522         int max_sect;
523         struct fifo_buffer *plan;
524
525         dc = rcu_dereference(device->ldev->disk_conf);
526         plan = rcu_dereference(device->rs_plan_s);
527
528         steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
529
530         if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
531                 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
532         } else { /* normal path */
533                 want = dc->c_fill_target ? dc->c_fill_target :
534                         sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
535         }
536
537         correction = want - device->rs_in_flight - plan->total;
538
539         /* Plan ahead */
540         cps = correction / steps;
541         fifo_add_val(plan, cps);
542         plan->total += cps * steps;
543
544         /* What we do in this step */
545         curr_corr = fifo_push(plan, 0);
546         plan->total -= curr_corr;
547
548         req_sect = sect_in + curr_corr;
549         if (req_sect < 0)
550                 req_sect = 0;
551
552         max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
553         if (req_sect > max_sect)
554                 req_sect = max_sect;
555
556         /*
557         drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
558                  sect_in, device->rs_in_flight, want, correction,
559                  steps, cps, device->rs_planed, curr_corr, req_sect);
560         */
561
562         return req_sect;
563 }
564
565 static int drbd_rs_number_requests(struct drbd_device *device)
566 {
567         unsigned int sect_in;  /* Number of sectors that came in since the last turn */
568         int number, mxb;
569
570         sect_in = atomic_xchg(&device->rs_sect_in, 0);
571         device->rs_in_flight -= sect_in;
572
573         rcu_read_lock();
574         mxb = drbd_get_max_buffers(device) / 2;
575         if (rcu_dereference(device->rs_plan_s)->size) {
576                 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
577                 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
578         } else {
579                 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
580                 number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
581         }
582         rcu_read_unlock();
583
584         /* Don't have more than "max-buffers"/2 in-flight.
585          * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
586          * potentially causing a distributed deadlock on congestion during
587          * online-verify or (checksum-based) resync, if max-buffers,
588          * socket buffer sizes and resync rate settings are mis-configured. */
589
590         /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
591          * mxb (as used here, and in drbd_alloc_pages on the peer) is
592          * "number of pages" (typically also 4k),
593          * but "rs_in_flight" is in "sectors" (512 Byte). */
594         if (mxb - device->rs_in_flight/8 < number)
595                 number = mxb - device->rs_in_flight/8;
596
597         return number;
598 }
599
600 static int make_resync_request(struct drbd_device *const device, int cancel)
601 {
602         struct drbd_peer_device *const peer_device = first_peer_device(device);
603         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
604         unsigned long bit;
605         sector_t sector;
606         const sector_t capacity = drbd_get_capacity(device->this_bdev);
607         int max_bio_size;
608         int number, rollback_i, size;
609         int align, requeue = 0;
610         int i = 0;
611         int discard_granularity = 0;
612
613         if (unlikely(cancel))
614                 return 0;
615
616         if (device->rs_total == 0) {
617                 /* empty resync? */
618                 drbd_resync_finished(device);
619                 return 0;
620         }
621
622         if (!get_ldev(device)) {
623                 /* Since we only need to access device->rsync a
624                    get_ldev_if_state(device,D_FAILED) would be sufficient, but
625                    to continue resync with a broken disk makes no sense at
626                    all */
627                 drbd_err(device, "Disk broke down during resync!\n");
628                 return 0;
629         }
630
631         if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
632                 rcu_read_lock();
633                 discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
634                 rcu_read_unlock();
635         }
636
637         max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
638         number = drbd_rs_number_requests(device);
639         if (number <= 0)
640                 goto requeue;
641
642         for (i = 0; i < number; i++) {
643                 /* Stop generating RS requests when half of the send buffer is filled,
644                  * but notify TCP that we'd like to have more space. */
645                 mutex_lock(&connection->data.mutex);
646                 if (connection->data.socket) {
647                         struct sock *sk = connection->data.socket->sk;
648                         int queued = sk->sk_wmem_queued;
649                         int sndbuf = sk->sk_sndbuf;
650                         if (queued > sndbuf / 2) {
651                                 requeue = 1;
652                                 if (sk->sk_socket)
653                                         set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
654                         }
655                 } else
656                         requeue = 1;
657                 mutex_unlock(&connection->data.mutex);
658                 if (requeue)
659                         goto requeue;
660
661 next_sector:
662                 size = BM_BLOCK_SIZE;
663                 bit  = drbd_bm_find_next(device, device->bm_resync_fo);
664
665                 if (bit == DRBD_END_OF_BITMAP) {
666                         device->bm_resync_fo = drbd_bm_bits(device);
667                         put_ldev(device);
668                         return 0;
669                 }
670
671                 sector = BM_BIT_TO_SECT(bit);
672
673                 if (drbd_try_rs_begin_io(device, sector)) {
674                         device->bm_resync_fo = bit;
675                         goto requeue;
676                 }
677                 device->bm_resync_fo = bit + 1;
678
679                 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
680                         drbd_rs_complete_io(device, sector);
681                         goto next_sector;
682                 }
683
684 #if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
685                 /* try to find some adjacent bits.
686                  * we stop if we have already the maximum req size.
687                  *
688                  * Additionally always align bigger requests, in order to
689                  * be prepared for all stripe sizes of software RAIDs.
690                  */
691                 align = 1;
692                 rollback_i = i;
693                 while (i < number) {
694                         if (size + BM_BLOCK_SIZE > max_bio_size)
695                                 break;
696
697                         /* Be always aligned */
698                         if (sector & ((1<<(align+3))-1))
699                                 break;
700
701                         if (discard_granularity && size == discard_granularity)
702                                 break;
703
704                         /* do not cross extent boundaries */
705                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
706                                 break;
707                         /* now, is it actually dirty, after all?
708                          * caution, drbd_bm_test_bit is tri-state for some
709                          * obscure reason; ( b == 0 ) would get the out-of-band
710                          * only accidentally right because of the "oddly sized"
711                          * adjustment below */
712                         if (drbd_bm_test_bit(device, bit+1) != 1)
713                                 break;
714                         bit++;
715                         size += BM_BLOCK_SIZE;
716                         if ((BM_BLOCK_SIZE << align) <= size)
717                                 align++;
718                         i++;
719                 }
720                 /* if we merged some,
721                  * reset the offset to start the next drbd_bm_find_next from */
722                 if (size > BM_BLOCK_SIZE)
723                         device->bm_resync_fo = bit + 1;
724 #endif
725
726                 /* adjust very last sectors, in case we are oddly sized */
727                 if (sector + (size>>9) > capacity)
728                         size = (capacity-sector)<<9;
729
730                 if (device->use_csums) {
731                         switch (read_for_csum(peer_device, sector, size)) {
732                         case -EIO: /* Disk failure */
733                                 put_ldev(device);
734                                 return -EIO;
735                         case -EAGAIN: /* allocation failed, or ldev busy */
736                                 drbd_rs_complete_io(device, sector);
737                                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
738                                 i = rollback_i;
739                                 goto requeue;
740                         case 0:
741                                 /* everything ok */
742                                 break;
743                         default:
744                                 BUG();
745                         }
746                 } else {
747                         int err;
748
749                         inc_rs_pending(device);
750                         err = drbd_send_drequest(peer_device,
751                                                  size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
752                                                  sector, size, ID_SYNCER);
753                         if (err) {
754                                 drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
755                                 dec_rs_pending(device);
756                                 put_ldev(device);
757                                 return err;
758                         }
759                 }
760         }
761
762         if (device->bm_resync_fo >= drbd_bm_bits(device)) {
763                 /* last syncer _request_ was sent,
764                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
765                  * next sync group will resume), as soon as we receive the last
766                  * resync data block, and the last bit is cleared.
767                  * until then resync "work" is "inactive" ...
768                  */
769                 put_ldev(device);
770                 return 0;
771         }
772
773  requeue:
774         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
775         mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
776         put_ldev(device);
777         return 0;
778 }
779
780 static int make_ov_request(struct drbd_device *device, int cancel)
781 {
782         int number, i, size;
783         sector_t sector;
784         const sector_t capacity = drbd_get_capacity(device->this_bdev);
785         bool stop_sector_reached = false;
786
787         if (unlikely(cancel))
788                 return 1;
789
790         number = drbd_rs_number_requests(device);
791
792         sector = device->ov_position;
793         for (i = 0; i < number; i++) {
794                 if (sector >= capacity)
795                         return 1;
796
797                 /* We check for "finished" only in the reply path:
798                  * w_e_end_ov_reply().
799                  * We need to send at least one request out. */
800                 stop_sector_reached = i > 0
801                         && verify_can_do_stop_sector(device)
802                         && sector >= device->ov_stop_sector;
803                 if (stop_sector_reached)
804                         break;
805
806                 size = BM_BLOCK_SIZE;
807
808                 if (drbd_try_rs_begin_io(device, sector)) {
809                         device->ov_position = sector;
810                         goto requeue;
811                 }
812
813                 if (sector + (size>>9) > capacity)
814                         size = (capacity-sector)<<9;
815
816                 inc_rs_pending(device);
817                 if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
818                         dec_rs_pending(device);
819                         return 0;
820                 }
821                 sector += BM_SECT_PER_BIT;
822         }
823         device->ov_position = sector;
824
825  requeue:
826         device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
827         if (i == 0 || !stop_sector_reached)
828                 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
829         return 1;
830 }
831
832 int w_ov_finished(struct drbd_work *w, int cancel)
833 {
834         struct drbd_device_work *dw =
835                 container_of(w, struct drbd_device_work, w);
836         struct drbd_device *device = dw->device;
837         kfree(dw);
838         ov_out_of_sync_print(device);
839         drbd_resync_finished(device);
840
841         return 0;
842 }
843
844 static int w_resync_finished(struct drbd_work *w, int cancel)
845 {
846         struct drbd_device_work *dw =
847                 container_of(w, struct drbd_device_work, w);
848         struct drbd_device *device = dw->device;
849         kfree(dw);
850
851         drbd_resync_finished(device);
852
853         return 0;
854 }
855
856 static void ping_peer(struct drbd_device *device)
857 {
858         struct drbd_connection *connection = first_peer_device(device)->connection;
859
860         clear_bit(GOT_PING_ACK, &connection->flags);
861         request_ping(connection);
862         wait_event(connection->ping_wait,
863                    test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
864 }
865
866 int drbd_resync_finished(struct drbd_device *device)
867 {
868         struct drbd_connection *connection = first_peer_device(device)->connection;
869         unsigned long db, dt, dbdt;
870         unsigned long n_oos;
871         union drbd_state os, ns;
872         struct drbd_device_work *dw;
873         char *khelper_cmd = NULL;
874         int verify_done = 0;
875
876         /* Remove all elements from the resync LRU. Since future actions
877          * might set bits in the (main) bitmap, then the entries in the
878          * resync LRU would be wrong. */
879         if (drbd_rs_del_all(device)) {
880                 /* In case this is not possible now, most probably because
881                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
882                  * queue (or even the read operations for those packets
883                  * is not finished by now).   Retry in 100ms. */
884
885                 schedule_timeout_interruptible(HZ / 10);
886                 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
887                 if (dw) {
888                         dw->w.cb = w_resync_finished;
889                         dw->device = device;
890                         drbd_queue_work(&connection->sender_work, &dw->w);
891                         return 1;
892                 }
893                 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
894         }
895
896         dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
897         if (dt <= 0)
898                 dt = 1;
899
900         db = device->rs_total;
901         /* adjust for verify start and stop sectors, respective reached position */
902         if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
903                 db -= device->ov_left;
904
905         dbdt = Bit2KB(db/dt);
906         device->rs_paused /= HZ;
907
908         if (!get_ldev(device))
909                 goto out;
910
911         ping_peer(device);
912
913         spin_lock_irq(&device->resource->req_lock);
914         os = drbd_read_state(device);
915
916         verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
917
918         /* This protects us against multiple calls (that can happen in the presence
919            of application IO), and against connectivity loss just before we arrive here. */
920         if (os.conn <= C_CONNECTED)
921                 goto out_unlock;
922
923         ns = os;
924         ns.conn = C_CONNECTED;
925
926         drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
927              verify_done ? "Online verify" : "Resync",
928              dt + device->rs_paused, device->rs_paused, dbdt);
929
930         n_oos = drbd_bm_total_weight(device);
931
932         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
933                 if (n_oos) {
934                         drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
935                               n_oos, Bit2KB(1));
936                         khelper_cmd = "out-of-sync";
937                 }
938         } else {
939                 D_ASSERT(device, (n_oos - device->rs_failed) == 0);
940
941                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
942                         khelper_cmd = "after-resync-target";
943
944                 if (device->use_csums && device->rs_total) {
945                         const unsigned long s = device->rs_same_csum;
946                         const unsigned long t = device->rs_total;
947                         const int ratio =
948                                 (t == 0)     ? 0 :
949                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
950                         drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
951                              "transferred %luK total %luK\n",
952                              ratio,
953                              Bit2KB(device->rs_same_csum),
954                              Bit2KB(device->rs_total - device->rs_same_csum),
955                              Bit2KB(device->rs_total));
956                 }
957         }
958
959         if (device->rs_failed) {
960                 drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
961
962                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
963                         ns.disk = D_INCONSISTENT;
964                         ns.pdsk = D_UP_TO_DATE;
965                 } else {
966                         ns.disk = D_UP_TO_DATE;
967                         ns.pdsk = D_INCONSISTENT;
968                 }
969         } else {
970                 ns.disk = D_UP_TO_DATE;
971                 ns.pdsk = D_UP_TO_DATE;
972
973                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
974                         if (device->p_uuid) {
975                                 int i;
976                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
977                                         _drbd_uuid_set(device, i, device->p_uuid[i]);
978                                 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
979                                 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
980                         } else {
981                                 drbd_err(device, "device->p_uuid is NULL! BUG\n");
982                         }
983                 }
984
985                 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
986                         /* for verify runs, we don't update uuids here,
987                          * so there would be nothing to report. */
988                         drbd_uuid_set_bm(device, 0UL);
989                         drbd_print_uuids(device, "updated UUIDs");
990                         if (device->p_uuid) {
991                                 /* Now the two UUID sets are equal, update what we
992                                  * know of the peer. */
993                                 int i;
994                                 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
995                                         device->p_uuid[i] = device->ldev->md.uuid[i];
996                         }
997                 }
998         }
999
1000         _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1001 out_unlock:
1002         spin_unlock_irq(&device->resource->req_lock);
1003
1004         /* If we have been sync source, and have an effective fencing-policy,
1005          * once *all* volumes are back in sync, call "unfence". */
1006         if (os.conn == C_SYNC_SOURCE) {
1007                 enum drbd_disk_state disk_state = D_MASK;
1008                 enum drbd_disk_state pdsk_state = D_MASK;
1009                 enum drbd_fencing_p fp = FP_DONT_CARE;
1010
1011                 rcu_read_lock();
1012                 fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1013                 if (fp != FP_DONT_CARE) {
1014                         struct drbd_peer_device *peer_device;
1015                         int vnr;
1016                         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1017                                 struct drbd_device *device = peer_device->device;
1018                                 disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1019                                 pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1020                         }
1021                 }
1022                 rcu_read_unlock();
1023                 if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1024                         conn_khelper(connection, "unfence-peer");
1025         }
1026
1027         put_ldev(device);
1028 out:
1029         device->rs_total  = 0;
1030         device->rs_failed = 0;
1031         device->rs_paused = 0;
1032
1033         /* reset start sector, if we reached end of device */
1034         if (verify_done && device->ov_left == 0)
1035                 device->ov_start_sector = 0;
1036
1037         drbd_md_sync(device);
1038
1039         if (khelper_cmd)
1040                 drbd_khelper(device, khelper_cmd);
1041
1042         return 1;
1043 }
1044
1045 /* helper */
1046 static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1047 {
1048         if (drbd_peer_req_has_active_page(peer_req)) {
1049                 /* This might happen if sendpage() has not finished */
1050                 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1051                 atomic_add(i, &device->pp_in_use_by_net);
1052                 atomic_sub(i, &device->pp_in_use);
1053                 spin_lock_irq(&device->resource->req_lock);
1054                 list_add_tail(&peer_req->w.list, &device->net_ee);
1055                 spin_unlock_irq(&device->resource->req_lock);
1056                 wake_up(&drbd_pp_wait);
1057         } else
1058                 drbd_free_peer_req(device, peer_req);
1059 }
1060
1061 /**
1062  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1063  * @w:          work object.
1064  * @cancel:     The connection will be closed anyways
1065  */
1066 int w_e_end_data_req(struct drbd_work *w, int cancel)
1067 {
1068         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1069         struct drbd_peer_device *peer_device = peer_req->peer_device;
1070         struct drbd_device *device = peer_device->device;
1071         int err;
1072
1073         if (unlikely(cancel)) {
1074                 drbd_free_peer_req(device, peer_req);
1075                 dec_unacked(device);
1076                 return 0;
1077         }
1078
1079         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080                 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1081         } else {
1082                 if (__ratelimit(&drbd_ratelimit_state))
1083                         drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1084                             (unsigned long long)peer_req->i.sector);
1085
1086                 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1087         }
1088
1089         dec_unacked(device);
1090
1091         move_to_net_ee_or_free(device, peer_req);
1092
1093         if (unlikely(err))
1094                 drbd_err(device, "drbd_send_block() failed\n");
1095         return err;
1096 }
1097
1098 static bool all_zero(struct drbd_peer_request *peer_req)
1099 {
1100         struct page *page = peer_req->pages;
1101         unsigned int len = peer_req->i.size;
1102
1103         page_chain_for_each(page) {
1104                 unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1105                 unsigned int i, words = l / sizeof(long);
1106                 unsigned long *d;
1107
1108                 d = kmap_atomic(page);
1109                 for (i = 0; i < words; i++) {
1110                         if (d[i]) {
1111                                 kunmap_atomic(d);
1112                                 return false;
1113                         }
1114                 }
1115                 kunmap_atomic(d);
1116                 len -= l;
1117         }
1118
1119         return true;
1120 }
1121
1122 /**
1123  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1124  * @w:          work object.
1125  * @cancel:     The connection will be closed anyways
1126  */
1127 int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1128 {
1129         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1130         struct drbd_peer_device *peer_device = peer_req->peer_device;
1131         struct drbd_device *device = peer_device->device;
1132         int err;
1133
1134         if (unlikely(cancel)) {
1135                 drbd_free_peer_req(device, peer_req);
1136                 dec_unacked(device);
1137                 return 0;
1138         }
1139
1140         if (get_ldev_if_state(device, D_FAILED)) {
1141                 drbd_rs_complete_io(device, peer_req->i.sector);
1142                 put_ldev(device);
1143         }
1144
1145         if (device->state.conn == C_AHEAD) {
1146                 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1147         } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1148                 if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1149                         inc_rs_pending(device);
1150                         if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1151                                 err = drbd_send_rs_deallocated(peer_device, peer_req);
1152                         else
1153                                 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154                 } else {
1155                         if (__ratelimit(&drbd_ratelimit_state))
1156                                 drbd_err(device, "Not sending RSDataReply, "
1157                                     "partner DISKLESS!\n");
1158                         err = 0;
1159                 }
1160         } else {
1161                 if (__ratelimit(&drbd_ratelimit_state))
1162                         drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1163                             (unsigned long long)peer_req->i.sector);
1164
1165                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1166
1167                 /* update resync data with failure */
1168                 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1169         }
1170
1171         dec_unacked(device);
1172
1173         move_to_net_ee_or_free(device, peer_req);
1174
1175         if (unlikely(err))
1176                 drbd_err(device, "drbd_send_block() failed\n");
1177         return err;
1178 }
1179
1180 int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1181 {
1182         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1183         struct drbd_peer_device *peer_device = peer_req->peer_device;
1184         struct drbd_device *device = peer_device->device;
1185         struct digest_info *di;
1186         int digest_size;
1187         void *digest = NULL;
1188         int err, eq = 0;
1189
1190         if (unlikely(cancel)) {
1191                 drbd_free_peer_req(device, peer_req);
1192                 dec_unacked(device);
1193                 return 0;
1194         }
1195
1196         if (get_ldev(device)) {
1197                 drbd_rs_complete_io(device, peer_req->i.sector);
1198                 put_ldev(device);
1199         }
1200
1201         di = peer_req->digest;
1202
1203         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1204                 /* quick hack to try to avoid a race against reconfiguration.
1205                  * a real fix would be much more involved,
1206                  * introducing more locking mechanisms */
1207                 if (peer_device->connection->csums_tfm) {
1208                         digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1209                         D_ASSERT(device, digest_size == di->digest_size);
1210                         digest = kmalloc(digest_size, GFP_NOIO);
1211                 }
1212                 if (digest) {
1213                         drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1214                         eq = !memcmp(digest, di->digest, digest_size);
1215                         kfree(digest);
1216                 }
1217
1218                 if (eq) {
1219                         drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1220                         /* rs_same_csums unit is BM_BLOCK_SIZE */
1221                         device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1222                         err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1223                 } else {
1224                         inc_rs_pending(device);
1225                         peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1226                         peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1227                         kfree(di);
1228                         err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1229                 }
1230         } else {
1231                 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1232                 if (__ratelimit(&drbd_ratelimit_state))
1233                         drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1234         }
1235
1236         dec_unacked(device);
1237         move_to_net_ee_or_free(device, peer_req);
1238
1239         if (unlikely(err))
1240                 drbd_err(device, "drbd_send_block/ack() failed\n");
1241         return err;
1242 }
1243
1244 int w_e_end_ov_req(struct drbd_work *w, int cancel)
1245 {
1246         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1247         struct drbd_peer_device *peer_device = peer_req->peer_device;
1248         struct drbd_device *device = peer_device->device;
1249         sector_t sector = peer_req->i.sector;
1250         unsigned int size = peer_req->i.size;
1251         int digest_size;
1252         void *digest;
1253         int err = 0;
1254
1255         if (unlikely(cancel))
1256                 goto out;
1257
1258         digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1259         digest = kmalloc(digest_size, GFP_NOIO);
1260         if (!digest) {
1261                 err = 1;        /* terminate the connection in case the allocation failed */
1262                 goto out;
1263         }
1264
1265         if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1266                 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1267         else
1268                 memset(digest, 0, digest_size);
1269
1270         /* Free e and pages before send.
1271          * In case we block on congestion, we could otherwise run into
1272          * some distributed deadlock, if the other side blocks on
1273          * congestion as well, because our receiver blocks in
1274          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275         drbd_free_peer_req(device, peer_req);
1276         peer_req = NULL;
1277         inc_rs_pending(device);
1278         err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1279         if (err)
1280                 dec_rs_pending(device);
1281         kfree(digest);
1282
1283 out:
1284         if (peer_req)
1285                 drbd_free_peer_req(device, peer_req);
1286         dec_unacked(device);
1287         return err;
1288 }
1289
1290 void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1291 {
1292         if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1293                 device->ov_last_oos_size += size>>9;
1294         } else {
1295                 device->ov_last_oos_start = sector;
1296                 device->ov_last_oos_size = size>>9;
1297         }
1298         drbd_set_out_of_sync(device, sector, size);
1299 }
1300
1301 int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1302 {
1303         struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1304         struct drbd_peer_device *peer_device = peer_req->peer_device;
1305         struct drbd_device *device = peer_device->device;
1306         struct digest_info *di;
1307         void *digest;
1308         sector_t sector = peer_req->i.sector;
1309         unsigned int size = peer_req->i.size;
1310         int digest_size;
1311         int err, eq = 0;
1312         bool stop_sector_reached = false;
1313
1314         if (unlikely(cancel)) {
1315                 drbd_free_peer_req(device, peer_req);
1316                 dec_unacked(device);
1317                 return 0;
1318         }
1319
1320         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1321          * the resync lru has been cleaned up already */
1322         if (get_ldev(device)) {
1323                 drbd_rs_complete_io(device, peer_req->i.sector);
1324                 put_ldev(device);
1325         }
1326
1327         di = peer_req->digest;
1328
1329         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1330                 digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1331                 digest = kmalloc(digest_size, GFP_NOIO);
1332                 if (digest) {
1333                         drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1334
1335                         D_ASSERT(device, digest_size == di->digest_size);
1336                         eq = !memcmp(digest, di->digest, digest_size);
1337                         kfree(digest);
1338                 }
1339         }
1340
1341         /* Free peer_req and pages before send.
1342          * In case we block on congestion, we could otherwise run into
1343          * some distributed deadlock, if the other side blocks on
1344          * congestion as well, because our receiver blocks in
1345          * drbd_alloc_pages due to pp_in_use > max_buffers. */
1346         drbd_free_peer_req(device, peer_req);
1347         if (!eq)
1348                 drbd_ov_out_of_sync_found(device, sector, size);
1349         else
1350                 ov_out_of_sync_print(device);
1351
1352         err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1353                                eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1354
1355         dec_unacked(device);
1356
1357         --device->ov_left;
1358
1359         /* let's advance progress step marks only for every other megabyte */
1360         if ((device->ov_left & 0x200) == 0x200)
1361                 drbd_advance_rs_marks(device, device->ov_left);
1362
1363         stop_sector_reached = verify_can_do_stop_sector(device) &&
1364                 (sector + (size>>9)) >= device->ov_stop_sector;
1365
1366         if (device->ov_left == 0 || stop_sector_reached) {
1367                 ov_out_of_sync_print(device);
1368                 drbd_resync_finished(device);
1369         }
1370
1371         return err;
1372 }
1373
1374 /* FIXME
1375  * We need to track the number of pending barrier acks,
1376  * and to be able to wait for them.
1377  * See also comment in drbd_adm_attach before drbd_suspend_io.
1378  */
1379 static int drbd_send_barrier(struct drbd_connection *connection)
1380 {
1381         struct p_barrier *p;
1382         struct drbd_socket *sock;
1383
1384         sock = &connection->data;
1385         p = conn_prepare_command(connection, sock);
1386         if (!p)
1387                 return -EIO;
1388         p->barrier = connection->send.current_epoch_nr;
1389         p->pad = 0;
1390         connection->send.current_epoch_writes = 0;
1391         connection->send.last_sent_barrier_jif = jiffies;
1392
1393         return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1394 }
1395
1396 static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1397 {
1398         struct drbd_socket *sock = &pd->connection->data;
1399         if (!drbd_prepare_command(pd, sock))
1400                 return -EIO;
1401         return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1402 }
1403
1404 int w_send_write_hint(struct drbd_work *w, int cancel)
1405 {
1406         struct drbd_device *device =
1407                 container_of(w, struct drbd_device, unplug_work);
1408
1409         if (cancel)
1410                 return 0;
1411         return pd_send_unplug_remote(first_peer_device(device));
1412 }
1413
1414 static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1415 {
1416         if (!connection->send.seen_any_write_yet) {
1417                 connection->send.seen_any_write_yet = true;
1418                 connection->send.current_epoch_nr = epoch;
1419                 connection->send.current_epoch_writes = 0;
1420                 connection->send.last_sent_barrier_jif = jiffies;
1421         }
1422 }
1423
1424 static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1425 {
1426         /* re-init if first write on this connection */
1427         if (!connection->send.seen_any_write_yet)
1428                 return;
1429         if (connection->send.current_epoch_nr != epoch) {
1430                 if (connection->send.current_epoch_writes)
1431                         drbd_send_barrier(connection);
1432                 connection->send.current_epoch_nr = epoch;
1433         }
1434 }
1435
1436 int w_send_out_of_sync(struct drbd_work *w, int cancel)
1437 {
1438         struct drbd_request *req = container_of(w, struct drbd_request, w);
1439         struct drbd_device *device = req->device;
1440         struct drbd_peer_device *const peer_device = first_peer_device(device);
1441         struct drbd_connection *const connection = peer_device->connection;
1442         int err;
1443
1444         if (unlikely(cancel)) {
1445                 req_mod(req, SEND_CANCELED);
1446                 return 0;
1447         }
1448         req->pre_send_jif = jiffies;
1449
1450         /* this time, no connection->send.current_epoch_writes++;
1451          * If it was sent, it was the closing barrier for the last
1452          * replicated epoch, before we went into AHEAD mode.
1453          * No more barriers will be sent, until we leave AHEAD mode again. */
1454         maybe_send_barrier(connection, req->epoch);
1455
1456         err = drbd_send_out_of_sync(peer_device, req);
1457         req_mod(req, OOS_HANDED_TO_NETWORK);
1458
1459         return err;
1460 }
1461
1462 /**
1463  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1464  * @w:          work object.
1465  * @cancel:     The connection will be closed anyways
1466  */
1467 int w_send_dblock(struct drbd_work *w, int cancel)
1468 {
1469         struct drbd_request *req = container_of(w, struct drbd_request, w);
1470         struct drbd_device *device = req->device;
1471         struct drbd_peer_device *const peer_device = first_peer_device(device);
1472         struct drbd_connection *connection = peer_device->connection;
1473         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1474         int err;
1475
1476         if (unlikely(cancel)) {
1477                 req_mod(req, SEND_CANCELED);
1478                 return 0;
1479         }
1480         req->pre_send_jif = jiffies;
1481
1482         re_init_if_first_write(connection, req->epoch);
1483         maybe_send_barrier(connection, req->epoch);
1484         connection->send.current_epoch_writes++;
1485
1486         err = drbd_send_dblock(peer_device, req);
1487         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1488
1489         if (do_send_unplug && !err)
1490                 pd_send_unplug_remote(peer_device);
1491
1492         return err;
1493 }
1494
1495 /**
1496  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1497  * @w:          work object.
1498  * @cancel:     The connection will be closed anyways
1499  */
1500 int w_send_read_req(struct drbd_work *w, int cancel)
1501 {
1502         struct drbd_request *req = container_of(w, struct drbd_request, w);
1503         struct drbd_device *device = req->device;
1504         struct drbd_peer_device *const peer_device = first_peer_device(device);
1505         struct drbd_connection *connection = peer_device->connection;
1506         bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1507         int err;
1508
1509         if (unlikely(cancel)) {
1510                 req_mod(req, SEND_CANCELED);
1511                 return 0;
1512         }
1513         req->pre_send_jif = jiffies;
1514
1515         /* Even read requests may close a write epoch,
1516          * if there was any yet. */
1517         maybe_send_barrier(connection, req->epoch);
1518
1519         err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1520                                  (unsigned long)req);
1521
1522         req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1523
1524         if (do_send_unplug && !err)
1525                 pd_send_unplug_remote(peer_device);
1526
1527         return err;
1528 }
1529
1530 int w_restart_disk_io(struct drbd_work *w, int cancel)
1531 {
1532         struct drbd_request *req = container_of(w, struct drbd_request, w);
1533         struct drbd_device *device = req->device;
1534
1535         if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1536                 drbd_al_begin_io(device, &req->i);
1537
1538         drbd_req_make_private_bio(req, req->master_bio);
1539         bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1540         generic_make_request(req->private_bio);
1541
1542         return 0;
1543 }
1544
1545 static int _drbd_may_sync_now(struct drbd_device *device)
1546 {
1547         struct drbd_device *odev = device;
1548         int resync_after;
1549
1550         while (1) {
1551                 if (!odev->ldev || odev->state.disk == D_DISKLESS)
1552                         return 1;
1553                 rcu_read_lock();
1554                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1555                 rcu_read_unlock();
1556                 if (resync_after == -1)
1557                         return 1;
1558                 odev = minor_to_device(resync_after);
1559                 if (!odev)
1560                         return 1;
1561                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1562                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1563                     odev->state.aftr_isp || odev->state.peer_isp ||
1564                     odev->state.user_isp)
1565                         return 0;
1566         }
1567 }
1568
1569 /**
1570  * drbd_pause_after() - Pause resync on all devices that may not resync now
1571  * @device:     DRBD device.
1572  *
1573  * Called from process context only (admin command and after_state_ch).
1574  */
1575 static bool drbd_pause_after(struct drbd_device *device)
1576 {
1577         bool changed = false;
1578         struct drbd_device *odev;
1579         int i;
1580
1581         rcu_read_lock();
1582         idr_for_each_entry(&drbd_devices, odev, i) {
1583                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1584                         continue;
1585                 if (!_drbd_may_sync_now(odev) &&
1586                     _drbd_set_state(_NS(odev, aftr_isp, 1),
1587                                     CS_HARD, NULL) != SS_NOTHING_TO_DO)
1588                         changed = true;
1589         }
1590         rcu_read_unlock();
1591
1592         return changed;
1593 }
1594
1595 /**
1596  * drbd_resume_next() - Resume resync on all devices that may resync now
1597  * @device:     DRBD device.
1598  *
1599  * Called from process context only (admin command and worker).
1600  */
1601 static bool drbd_resume_next(struct drbd_device *device)
1602 {
1603         bool changed = false;
1604         struct drbd_device *odev;
1605         int i;
1606
1607         rcu_read_lock();
1608         idr_for_each_entry(&drbd_devices, odev, i) {
1609                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1610                         continue;
1611                 if (odev->state.aftr_isp) {
1612                         if (_drbd_may_sync_now(odev) &&
1613                             _drbd_set_state(_NS(odev, aftr_isp, 0),
1614                                             CS_HARD, NULL) != SS_NOTHING_TO_DO)
1615                                 changed = true;
1616                 }
1617         }
1618         rcu_read_unlock();
1619         return changed;
1620 }
1621
1622 void resume_next_sg(struct drbd_device *device)
1623 {
1624         lock_all_resources();
1625         drbd_resume_next(device);
1626         unlock_all_resources();
1627 }
1628
1629 void suspend_other_sg(struct drbd_device *device)
1630 {
1631         lock_all_resources();
1632         drbd_pause_after(device);
1633         unlock_all_resources();
1634 }
1635
1636 /* caller must lock_all_resources() */
1637 enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1638 {
1639         struct drbd_device *odev;
1640         int resync_after;
1641
1642         if (o_minor == -1)
1643                 return NO_ERROR;
1644         if (o_minor < -1 || o_minor > MINORMASK)
1645                 return ERR_RESYNC_AFTER;
1646
1647         /* check for loops */
1648         odev = minor_to_device(o_minor);
1649         while (1) {
1650                 if (odev == device)
1651                         return ERR_RESYNC_AFTER_CYCLE;
1652
1653                 /* You are free to depend on diskless, non-existing,
1654                  * or not yet/no longer existing minors.
1655                  * We only reject dependency loops.
1656                  * We cannot follow the dependency chain beyond a detached or
1657                  * missing minor.
1658                  */
1659                 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1660                         return NO_ERROR;
1661
1662                 rcu_read_lock();
1663                 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1664                 rcu_read_unlock();
1665                 /* dependency chain ends here, no cycles. */
1666                 if (resync_after == -1)
1667                         return NO_ERROR;
1668
1669                 /* follow the dependency chain */
1670                 odev = minor_to_device(resync_after);
1671         }
1672 }
1673
1674 /* caller must lock_all_resources() */
1675 void drbd_resync_after_changed(struct drbd_device *device)
1676 {
1677         int changed;
1678
1679         do {
1680                 changed  = drbd_pause_after(device);
1681                 changed |= drbd_resume_next(device);
1682         } while (changed);
1683 }
1684
1685 void drbd_rs_controller_reset(struct drbd_device *device)
1686 {
1687         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1688         struct fifo_buffer *plan;
1689
1690         atomic_set(&device->rs_sect_in, 0);
1691         atomic_set(&device->rs_sect_ev, 0);
1692         device->rs_in_flight = 0;
1693         device->rs_last_events =
1694                 (int)part_stat_read(&disk->part0, sectors[0]) +
1695                 (int)part_stat_read(&disk->part0, sectors[1]);
1696
1697         /* Updating the RCU protected object in place is necessary since
1698            this function gets called from atomic context.
1699            It is valid since all other updates also lead to an completely
1700            empty fifo */
1701         rcu_read_lock();
1702         plan = rcu_dereference(device->rs_plan_s);
1703         plan->total = 0;
1704         fifo_set(plan, 0);
1705         rcu_read_unlock();
1706 }
1707
1708 void start_resync_timer_fn(struct timer_list *t)
1709 {
1710         struct drbd_device *device = from_timer(device, t, start_resync_timer);
1711         drbd_device_post_work(device, RS_START);
1712 }
1713
1714 static void do_start_resync(struct drbd_device *device)
1715 {
1716         if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1717                 drbd_warn(device, "postponing start_resync ...\n");
1718                 device->start_resync_timer.expires = jiffies + HZ/10;
1719                 add_timer(&device->start_resync_timer);
1720                 return;
1721         }
1722
1723         drbd_start_resync(device, C_SYNC_SOURCE);
1724         clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1725 }
1726
1727 static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1728 {
1729         bool csums_after_crash_only;
1730         rcu_read_lock();
1731         csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1732         rcu_read_unlock();
1733         return connection->agreed_pro_version >= 89 &&          /* supported? */
1734                 connection->csums_tfm &&                        /* configured? */
1735                 (csums_after_crash_only == false                /* use for each resync? */
1736                  || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1737 }
1738
1739 /**
1740  * drbd_start_resync() - Start the resync process
1741  * @device:     DRBD device.
1742  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1743  *
1744  * This function might bring you directly into one of the
1745  * C_PAUSED_SYNC_* states.
1746  */
1747 void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1748 {
1749         struct drbd_peer_device *peer_device = first_peer_device(device);
1750         struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1751         union drbd_state ns;
1752         int r;
1753
1754         if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1755                 drbd_err(device, "Resync already running!\n");
1756                 return;
1757         }
1758
1759         if (!connection) {
1760                 drbd_err(device, "No connection to peer, aborting!\n");
1761                 return;
1762         }
1763
1764         if (!test_bit(B_RS_H_DONE, &device->flags)) {
1765                 if (side == C_SYNC_TARGET) {
1766                         /* Since application IO was locked out during C_WF_BITMAP_T and
1767                            C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1768                            we check that we might make the data inconsistent. */
1769                         r = drbd_khelper(device, "before-resync-target");
1770                         r = (r >> 8) & 0xff;
1771                         if (r > 0) {
1772                                 drbd_info(device, "before-resync-target handler returned %d, "
1773                                          "dropping connection.\n", r);
1774                                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1775                                 return;
1776                         }
1777                 } else /* C_SYNC_SOURCE */ {
1778                         r = drbd_khelper(device, "before-resync-source");
1779                         r = (r >> 8) & 0xff;
1780                         if (r > 0) {
1781                                 if (r == 3) {
1782                                         drbd_info(device, "before-resync-source handler returned %d, "
1783                                                  "ignoring. Old userland tools?", r);
1784                                 } else {
1785                                         drbd_info(device, "before-resync-source handler returned %d, "
1786                                                  "dropping connection.\n", r);
1787                                         conn_request_state(connection,
1788                                                            NS(conn, C_DISCONNECTING), CS_HARD);
1789                                         return;
1790                                 }
1791                         }
1792                 }
1793         }
1794
1795         if (current == connection->worker.task) {
1796                 /* The worker should not sleep waiting for state_mutex,
1797                    that can take long */
1798                 if (!mutex_trylock(device->state_mutex)) {
1799                         set_bit(B_RS_H_DONE, &device->flags);
1800                         device->start_resync_timer.expires = jiffies + HZ/5;
1801                         add_timer(&device->start_resync_timer);
1802                         return;
1803                 }
1804         } else {
1805                 mutex_lock(device->state_mutex);
1806         }
1807
1808         lock_all_resources();
1809         clear_bit(B_RS_H_DONE, &device->flags);
1810         /* Did some connection breakage or IO error race with us? */
1811         if (device->state.conn < C_CONNECTED
1812         || !get_ldev_if_state(device, D_NEGOTIATING)) {
1813                 unlock_all_resources();
1814                 goto out;
1815         }
1816
1817         ns = drbd_read_state(device);
1818
1819         ns.aftr_isp = !_drbd_may_sync_now(device);
1820
1821         ns.conn = side;
1822
1823         if (side == C_SYNC_TARGET)
1824                 ns.disk = D_INCONSISTENT;
1825         else /* side == C_SYNC_SOURCE */
1826                 ns.pdsk = D_INCONSISTENT;
1827
1828         r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1829         ns = drbd_read_state(device);
1830
1831         if (ns.conn < C_CONNECTED)
1832                 r = SS_UNKNOWN_ERROR;
1833
1834         if (r == SS_SUCCESS) {
1835                 unsigned long tw = drbd_bm_total_weight(device);
1836                 unsigned long now = jiffies;
1837                 int i;
1838
1839                 device->rs_failed    = 0;
1840                 device->rs_paused    = 0;
1841                 device->rs_same_csum = 0;
1842                 device->rs_last_sect_ev = 0;
1843                 device->rs_total     = tw;
1844                 device->rs_start     = now;
1845                 for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1846                         device->rs_mark_left[i] = tw;
1847                         device->rs_mark_time[i] = now;
1848                 }
1849                 drbd_pause_after(device);
1850                 /* Forget potentially stale cached per resync extent bit-counts.
1851                  * Open coded drbd_rs_cancel_all(device), we already have IRQs
1852                  * disabled, and know the disk state is ok. */
1853                 spin_lock(&device->al_lock);
1854                 lc_reset(device->resync);
1855                 device->resync_locked = 0;
1856                 device->resync_wenr = LC_FREE;
1857                 spin_unlock(&device->al_lock);
1858         }
1859         unlock_all_resources();
1860
1861         if (r == SS_SUCCESS) {
1862                 wake_up(&device->al_wait); /* for lc_reset() above */
1863                 /* reset rs_last_bcast when a resync or verify is started,
1864                  * to deal with potential jiffies wrap. */
1865                 device->rs_last_bcast = jiffies - HZ;
1866
1867                 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1868                      drbd_conn_str(ns.conn),
1869                      (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1870                      (unsigned long) device->rs_total);
1871                 if (side == C_SYNC_TARGET) {
1872                         device->bm_resync_fo = 0;
1873                         device->use_csums = use_checksum_based_resync(connection, device);
1874                 } else {
1875                         device->use_csums = false;
1876                 }
1877
1878                 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1879                  * with w_send_oos, or the sync target will get confused as to
1880                  * how much bits to resync.  We cannot do that always, because for an
1881                  * empty resync and protocol < 95, we need to do it here, as we call
1882                  * drbd_resync_finished from here in that case.
1883                  * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1884                  * and from after_state_ch otherwise. */
1885                 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1886                         drbd_gen_and_send_sync_uuid(peer_device);
1887
1888                 if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1889                         /* This still has a race (about when exactly the peers
1890                          * detect connection loss) that can lead to a full sync
1891                          * on next handshake. In 8.3.9 we fixed this with explicit
1892                          * resync-finished notifications, but the fix
1893                          * introduces a protocol change.  Sleeping for some
1894                          * time longer than the ping interval + timeout on the
1895                          * SyncSource, to give the SyncTarget the chance to
1896                          * detect connection loss, then waiting for a ping
1897                          * response (implicit in drbd_resync_finished) reduces
1898                          * the race considerably, but does not solve it. */
1899                         if (side == C_SYNC_SOURCE) {
1900                                 struct net_conf *nc;
1901                                 int timeo;
1902
1903                                 rcu_read_lock();
1904                                 nc = rcu_dereference(connection->net_conf);
1905                                 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1906                                 rcu_read_unlock();
1907                                 schedule_timeout_interruptible(timeo);
1908                         }
1909                         drbd_resync_finished(device);
1910                 }
1911
1912                 drbd_rs_controller_reset(device);
1913                 /* ns.conn may already be != device->state.conn,
1914                  * we may have been paused in between, or become paused until
1915                  * the timer triggers.
1916                  * No matter, that is handled in resync_timer_fn() */
1917                 if (ns.conn == C_SYNC_TARGET)
1918                         mod_timer(&device->resync_timer, jiffies);
1919
1920                 drbd_md_sync(device);
1921         }
1922         put_ldev(device);
1923 out:
1924         mutex_unlock(device->state_mutex);
1925 }
1926
1927 static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1928 {
1929         struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1930         device->rs_last_bcast = jiffies;
1931
1932         if (!get_ldev(device))
1933                 return;
1934
1935         drbd_bm_write_lazy(device, 0);
1936         if (resync_done && is_sync_state(device->state.conn))
1937                 drbd_resync_finished(device);
1938
1939         drbd_bcast_event(device, &sib);
1940         /* update timestamp, in case it took a while to write out stuff */
1941         device->rs_last_bcast = jiffies;
1942         put_ldev(device);
1943 }
1944
1945 static void drbd_ldev_destroy(struct drbd_device *device)
1946 {
1947         lc_destroy(device->resync);
1948         device->resync = NULL;
1949         lc_destroy(device->act_log);
1950         device->act_log = NULL;
1951
1952         __acquire(local);
1953         drbd_backing_dev_free(device, device->ldev);
1954         device->ldev = NULL;
1955         __release(local);
1956
1957         clear_bit(GOING_DISKLESS, &device->flags);
1958         wake_up(&device->misc_wait);
1959 }
1960
1961 static void go_diskless(struct drbd_device *device)
1962 {
1963         D_ASSERT(device, device->state.disk == D_FAILED);
1964         /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1965          * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1966          * the protected members anymore, though, so once put_ldev reaches zero
1967          * again, it will be safe to free them. */
1968
1969         /* Try to write changed bitmap pages, read errors may have just
1970          * set some bits outside the area covered by the activity log.
1971          *
1972          * If we have an IO error during the bitmap writeout,
1973          * we will want a full sync next time, just in case.
1974          * (Do we want a specific meta data flag for this?)
1975          *
1976          * If that does not make it to stable storage either,
1977          * we cannot do anything about that anymore.
1978          *
1979          * We still need to check if both bitmap and ldev are present, we may
1980          * end up here after a failed attach, before ldev was even assigned.
1981          */
1982         if (device->bitmap && device->ldev) {
1983                 /* An interrupted resync or similar is allowed to recounts bits
1984                  * while we detach.
1985                  * Any modifications would not be expected anymore, though.
1986                  */
1987                 if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1988                                         "detach", BM_LOCKED_TEST_ALLOWED)) {
1989                         if (test_bit(WAS_READ_ERROR, &device->flags)) {
1990                                 drbd_md_set_flag(device, MDF_FULL_SYNC);
1991                                 drbd_md_sync(device);
1992                         }
1993                 }
1994         }
1995
1996         drbd_force_state(device, NS(disk, D_DISKLESS));
1997 }
1998
1999 static int do_md_sync(struct drbd_device *device)
2000 {
2001         drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2002         drbd_md_sync(device);
2003         return 0;
2004 }
2005
2006 /* only called from drbd_worker thread, no locking */
2007 void __update_timing_details(
2008                 struct drbd_thread_timing_details *tdp,
2009                 unsigned int *cb_nr,
2010                 void *cb,
2011                 const char *fn, const unsigned int line)
2012 {
2013         unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2014         struct drbd_thread_timing_details *td = tdp + i;
2015
2016         td->start_jif = jiffies;
2017         td->cb_addr = cb;
2018         td->caller_fn = fn;
2019         td->line = line;
2020         td->cb_nr = *cb_nr;
2021
2022         i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2023         td = tdp + i;
2024         memset(td, 0, sizeof(*td));
2025
2026         ++(*cb_nr);
2027 }
2028
2029 static void do_device_work(struct drbd_device *device, const unsigned long todo)
2030 {
2031         if (test_bit(MD_SYNC, &todo))
2032                 do_md_sync(device);
2033         if (test_bit(RS_DONE, &todo) ||
2034             test_bit(RS_PROGRESS, &todo))
2035                 update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2036         if (test_bit(GO_DISKLESS, &todo))
2037                 go_diskless(device);
2038         if (test_bit(DESTROY_DISK, &todo))
2039                 drbd_ldev_destroy(device);
2040         if (test_bit(RS_START, &todo))
2041                 do_start_resync(device);
2042 }
2043
2044 #define DRBD_DEVICE_WORK_MASK   \
2045         ((1UL << GO_DISKLESS)   \
2046         |(1UL << DESTROY_DISK)  \
2047         |(1UL << MD_SYNC)       \
2048         |(1UL << RS_START)      \
2049         |(1UL << RS_PROGRESS)   \
2050         |(1UL << RS_DONE)       \
2051         )
2052
2053 static unsigned long get_work_bits(unsigned long *flags)
2054 {
2055         unsigned long old, new;
2056         do {
2057                 old = *flags;
2058                 new = old & ~DRBD_DEVICE_WORK_MASK;
2059         } while (cmpxchg(flags, old, new) != old);
2060         return old & DRBD_DEVICE_WORK_MASK;
2061 }
2062
2063 static void do_unqueued_work(struct drbd_connection *connection)
2064 {
2065         struct drbd_peer_device *peer_device;
2066         int vnr;
2067
2068         rcu_read_lock();
2069         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2070                 struct drbd_device *device = peer_device->device;
2071                 unsigned long todo = get_work_bits(&device->flags);
2072                 if (!todo)
2073                         continue;
2074
2075                 kref_get(&device->kref);
2076                 rcu_read_unlock();
2077                 do_device_work(device, todo);
2078                 kref_put(&device->kref, drbd_destroy_device);
2079                 rcu_read_lock();
2080         }
2081         rcu_read_unlock();
2082 }
2083
2084 static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2085 {
2086         spin_lock_irq(&queue->q_lock);
2087         list_splice_tail_init(&queue->q, work_list);
2088         spin_unlock_irq(&queue->q_lock);
2089         return !list_empty(work_list);
2090 }
2091
2092 static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2093 {
2094         DEFINE_WAIT(wait);
2095         struct net_conf *nc;
2096         int uncork, cork;
2097
2098         dequeue_work_batch(&connection->sender_work, work_list);
2099         if (!list_empty(work_list))
2100                 return;
2101
2102         /* Still nothing to do?
2103          * Maybe we still need to close the current epoch,
2104          * even if no new requests are queued yet.
2105          *
2106          * Also, poke TCP, just in case.
2107          * Then wait for new work (or signal). */
2108         rcu_read_lock();
2109         nc = rcu_dereference(connection->net_conf);
2110         uncork = nc ? nc->tcp_cork : 0;
2111         rcu_read_unlock();
2112         if (uncork) {
2113                 mutex_lock(&connection->data.mutex);
2114                 if (connection->data.socket)
2115                         drbd_tcp_uncork(connection->data.socket);
2116                 mutex_unlock(&connection->data.mutex);
2117         }
2118
2119         for (;;) {
2120                 int send_barrier;
2121                 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2122                 spin_lock_irq(&connection->resource->req_lock);
2123                 spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2124                 if (!list_empty(&connection->sender_work.q))
2125                         list_splice_tail_init(&connection->sender_work.q, work_list);
2126                 spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2127                 if (!list_empty(work_list) || signal_pending(current)) {
2128                         spin_unlock_irq(&connection->resource->req_lock);
2129                         break;
2130                 }
2131
2132                 /* We found nothing new to do, no to-be-communicated request,
2133                  * no other work item.  We may still need to close the last
2134                  * epoch.  Next incoming request epoch will be connection ->
2135                  * current transfer log epoch number.  If that is different
2136                  * from the epoch of the last request we communicated, it is
2137                  * safe to send the epoch separating barrier now.
2138                  */
2139                 send_barrier =
2140                         atomic_read(&connection->current_tle_nr) !=
2141                         connection->send.current_epoch_nr;
2142                 spin_unlock_irq(&connection->resource->req_lock);
2143
2144                 if (send_barrier)
2145                         maybe_send_barrier(connection,
2146                                         connection->send.current_epoch_nr + 1);
2147
2148                 if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2149                         break;
2150
2151                 /* drbd_send() may have called flush_signals() */
2152                 if (get_t_state(&connection->worker) != RUNNING)
2153                         break;
2154
2155                 schedule();
2156                 /* may be woken up for other things but new work, too,
2157                  * e.g. if the current epoch got closed.
2158                  * In which case we send the barrier above. */
2159         }
2160         finish_wait(&connection->sender_work.q_wait, &wait);
2161
2162         /* someone may have changed the config while we have been waiting above. */
2163         rcu_read_lock();
2164         nc = rcu_dereference(connection->net_conf);
2165         cork = nc ? nc->tcp_cork : 0;
2166         rcu_read_unlock();
2167         mutex_lock(&connection->data.mutex);
2168         if (connection->data.socket) {
2169                 if (cork)
2170                         drbd_tcp_cork(connection->data.socket);
2171                 else if (!uncork)
2172                         drbd_tcp_uncork(connection->data.socket);
2173         }
2174         mutex_unlock(&connection->data.mutex);
2175 }
2176
2177 int drbd_worker(struct drbd_thread *thi)
2178 {
2179         struct drbd_connection *connection = thi->connection;
2180         struct drbd_work *w = NULL;
2181         struct drbd_peer_device *peer_device;
2182         LIST_HEAD(work_list);
2183         int vnr;
2184
2185         while (get_t_state(thi) == RUNNING) {
2186                 drbd_thread_current_set_cpu(thi);
2187
2188                 if (list_empty(&work_list)) {
2189                         update_worker_timing_details(connection, wait_for_work);
2190                         wait_for_work(connection, &work_list);
2191                 }
2192
2193                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2194                         update_worker_timing_details(connection, do_unqueued_work);
2195                         do_unqueued_work(connection);
2196                 }
2197
2198                 if (signal_pending(current)) {
2199                         flush_signals(current);
2200                         if (get_t_state(thi) == RUNNING) {
2201                                 drbd_warn(connection, "Worker got an unexpected signal\n");
2202                                 continue;
2203                         }
2204                         break;
2205                 }
2206
2207                 if (get_t_state(thi) != RUNNING)
2208                         break;
2209
2210                 if (!list_empty(&work_list)) {
2211                         w = list_first_entry(&work_list, struct drbd_work, list);
2212                         list_del_init(&w->list);
2213                         update_worker_timing_details(connection, w->cb);
2214                         if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2215                                 continue;
2216                         if (connection->cstate >= C_WF_REPORT_PARAMS)
2217                                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2218                 }
2219         }
2220
2221         do {
2222                 if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2223                         update_worker_timing_details(connection, do_unqueued_work);
2224                         do_unqueued_work(connection);
2225                 }
2226                 if (!list_empty(&work_list)) {
2227                         w = list_first_entry(&work_list, struct drbd_work, list);
2228                         list_del_init(&w->list);
2229                         update_worker_timing_details(connection, w->cb);
2230                         w->cb(w, 1);
2231                 } else
2232                         dequeue_work_batch(&connection->sender_work, &work_list);
2233         } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2234
2235         rcu_read_lock();
2236         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2237                 struct drbd_device *device = peer_device->device;
2238                 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2239                 kref_get(&device->kref);
2240                 rcu_read_unlock();
2241                 drbd_device_cleanup(device);
2242                 kref_put(&device->kref, drbd_destroy_device);
2243                 rcu_read_lock();
2244         }
2245         rcu_read_unlock();
2246
2247         return 0;
2248 }