Merge branch 'drm-core-next' of git://git.kernel.org/pub/scm/linux/kernel/git/airlied...
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 #define SLEEP_TIME (HZ/10)
43
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45
46
47
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_sec
51    drbd_endio_pri
52
53  * more endio handlers:
54    atodb_endio in drbd_actlog.c
55    drbd_bm_async_io_complete in drbd_bitmap.c
56
57  * For all these callbacks, note the following:
58  * The callbacks will be called in irq context by the IDE drivers,
59  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60  * Try to get the locking right :)
61  *
62  */
63
64
65 /* About the global_state_lock
66    Each state transition on an device holds a read lock. In case we have
67    to evaluate the sync after dependencies, we grab a write lock, because
68    we need stable states on all devices for that.  */
69 rwlock_t global_state_lock;
70
71 /* used for synchronous meta data and bitmap IO
72  * submitted by drbd_md_sync_page_io()
73  */
74 void drbd_md_io_complete(struct bio *bio, int error)
75 {
76         struct drbd_md_io *md_io;
77
78         md_io = (struct drbd_md_io *)bio->bi_private;
79         md_io->error = error;
80
81         complete(&md_io->event);
82 }
83
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
88 {
89         unsigned long flags = 0;
90         struct drbd_conf *mdev = e->mdev;
91
92         D_ASSERT(e->block_id != ID_VACANT);
93
94         spin_lock_irqsave(&mdev->req_lock, flags);
95         mdev->read_cnt += e->size >> 9;
96         list_del(&e->w.list);
97         if (list_empty(&mdev->read_ee))
98                 wake_up(&mdev->ee_wait);
99         if (test_bit(__EE_WAS_ERROR, &e->flags))
100                 __drbd_chk_io_error(mdev, FALSE);
101         spin_unlock_irqrestore(&mdev->req_lock, flags);
102
103         drbd_queue_work(&mdev->data.work, &e->w);
104         put_ldev(mdev);
105 }
106
107 static int is_failed_barrier(int ee_flags)
108 {
109         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110                         == (EE_IS_BARRIER|EE_WAS_ERROR);
111 }
112
113 /* writes on behalf of the partner, or resync writes,
114  * "submitted" by the receiver, final stage.  */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
116 {
117         unsigned long flags = 0;
118         struct drbd_conf *mdev = e->mdev;
119         sector_t e_sector;
120         int do_wake;
121         int is_syncer_req;
122         int do_al_complete_io;
123
124         /* if this is a failed barrier request, disable use of barriers,
125          * and schedule for resubmission */
126         if (is_failed_barrier(e->flags)) {
127                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
128                 spin_lock_irqsave(&mdev->req_lock, flags);
129                 list_del(&e->w.list);
130                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131                 e->w.cb = w_e_reissue;
132                 /* put_ldev actually happens below, once we come here again. */
133                 __release(local);
134                 spin_unlock_irqrestore(&mdev->req_lock, flags);
135                 drbd_queue_work(&mdev->data.work, &e->w);
136                 return;
137         }
138
139         D_ASSERT(e->block_id != ID_VACANT);
140
141         /* after we moved e to done_ee,
142          * we may no longer access it,
143          * it may be freed/reused already!
144          * (as soon as we release the req_lock) */
145         e_sector = e->sector;
146         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147         is_syncer_req = is_syncer_block_id(e->block_id);
148
149         spin_lock_irqsave(&mdev->req_lock, flags);
150         mdev->writ_cnt += e->size >> 9;
151         list_del(&e->w.list); /* has been on active_ee or sync_ee */
152         list_add_tail(&e->w.list, &mdev->done_ee);
153
154         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155          * neither did we wake possibly waiting conflicting requests.
156          * done from "drbd_process_done_ee" within the appropriate w.cb
157          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
158
159         do_wake = is_syncer_req
160                 ? list_empty(&mdev->sync_ee)
161                 : list_empty(&mdev->active_ee);
162
163         if (test_bit(__EE_WAS_ERROR, &e->flags))
164                 __drbd_chk_io_error(mdev, FALSE);
165         spin_unlock_irqrestore(&mdev->req_lock, flags);
166
167         if (is_syncer_req)
168                 drbd_rs_complete_io(mdev, e_sector);
169
170         if (do_wake)
171                 wake_up(&mdev->ee_wait);
172
173         if (do_al_complete_io)
174                 drbd_al_complete_io(mdev, e_sector);
175
176         wake_asender(mdev);
177         put_ldev(mdev);
178 }
179
180 /* writes on behalf of the partner, or resync writes,
181  * "submitted" by the receiver.
182  */
183 void drbd_endio_sec(struct bio *bio, int error)
184 {
185         struct drbd_epoch_entry *e = bio->bi_private;
186         struct drbd_conf *mdev = e->mdev;
187         int uptodate = bio_flagged(bio, BIO_UPTODATE);
188         int is_write = bio_data_dir(bio) == WRITE;
189
190         if (error)
191                 dev_warn(DEV, "%s: error=%d s=%llus\n",
192                                 is_write ? "write" : "read", error,
193                                 (unsigned long long)e->sector);
194         if (!error && !uptodate) {
195                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196                                 is_write ? "write" : "read",
197                                 (unsigned long long)e->sector);
198                 /* strange behavior of some lower level drivers...
199                  * fail the request by clearing the uptodate flag,
200                  * but do not return any error?! */
201                 error = -EIO;
202         }
203
204         if (error)
205                 set_bit(__EE_WAS_ERROR, &e->flags);
206
207         bio_put(bio); /* no need for the bio anymore */
208         if (atomic_dec_and_test(&e->pending_bios)) {
209                 if (is_write)
210                         drbd_endio_write_sec_final(e);
211                 else
212                         drbd_endio_read_sec_final(e);
213         }
214 }
215
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217  */
218 void drbd_endio_pri(struct bio *bio, int error)
219 {
220         unsigned long flags;
221         struct drbd_request *req = bio->bi_private;
222         struct drbd_conf *mdev = req->mdev;
223         struct bio_and_error m;
224         enum drbd_req_event what;
225         int uptodate = bio_flagged(bio, BIO_UPTODATE);
226
227         if (!error && !uptodate) {
228                 dev_warn(DEV, "p %s: setting error to -EIO\n",
229                          bio_data_dir(bio) == WRITE ? "write" : "read");
230                 /* strange behavior of some lower level drivers...
231                  * fail the request by clearing the uptodate flag,
232                  * but do not return any error?! */
233                 error = -EIO;
234         }
235
236         /* to avoid recursion in __req_mod */
237         if (unlikely(error)) {
238                 what = (bio_data_dir(bio) == WRITE)
239                         ? write_completed_with_error
240                         : (bio_rw(bio) == READ)
241                           ? read_completed_with_error
242                           : read_ahead_completed_with_error;
243         } else
244                 what = completed_ok;
245
246         bio_put(req->private_bio);
247         req->private_bio = ERR_PTR(error);
248
249         spin_lock_irqsave(&mdev->req_lock, flags);
250         __req_mod(req, what, &m);
251         spin_unlock_irqrestore(&mdev->req_lock, flags);
252
253         if (m.bio)
254                 complete_master_bio(mdev, &m);
255 }
256
257 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
258 {
259         struct drbd_request *req = container_of(w, struct drbd_request, w);
260
261         /* We should not detach for read io-error,
262          * but try to WRITE the P_DATA_REPLY to the failed location,
263          * to give the disk the chance to relocate that block */
264
265         spin_lock_irq(&mdev->req_lock);
266         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
267                 _req_mod(req, read_retry_remote_canceled);
268                 spin_unlock_irq(&mdev->req_lock);
269                 return 1;
270         }
271         spin_unlock_irq(&mdev->req_lock);
272
273         return w_send_read_req(mdev, w, 0);
274 }
275
276 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
277 {
278         ERR_IF(cancel) return 1;
279         dev_err(DEV, "resync inactive, but callback triggered??\n");
280         return 1; /* Simply ignore this! */
281 }
282
283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
284 {
285         struct hash_desc desc;
286         struct scatterlist sg;
287         struct page *page = e->pages;
288         struct page *tmp;
289         unsigned len;
290
291         desc.tfm = tfm;
292         desc.flags = 0;
293
294         sg_init_table(&sg, 1);
295         crypto_hash_init(&desc);
296
297         while ((tmp = page_chain_next(page))) {
298                 /* all but the last page will be fully used */
299                 sg_set_page(&sg, page, PAGE_SIZE, 0);
300                 crypto_hash_update(&desc, &sg, sg.length);
301                 page = tmp;
302         }
303         /* and now the last, possibly only partially used page */
304         len = e->size & (PAGE_SIZE - 1);
305         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
306         crypto_hash_update(&desc, &sg, sg.length);
307         crypto_hash_final(&desc, digest);
308 }
309
310 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
311 {
312         struct hash_desc desc;
313         struct scatterlist sg;
314         struct bio_vec *bvec;
315         int i;
316
317         desc.tfm = tfm;
318         desc.flags = 0;
319
320         sg_init_table(&sg, 1);
321         crypto_hash_init(&desc);
322
323         __bio_for_each_segment(bvec, bio, i, 0) {
324                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
325                 crypto_hash_update(&desc, &sg, sg.length);
326         }
327         crypto_hash_final(&desc, digest);
328 }
329
330 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
331 {
332         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
333         int digest_size;
334         void *digest;
335         int ok;
336
337         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
338
339         if (unlikely(cancel)) {
340                 drbd_free_ee(mdev, e);
341                 return 1;
342         }
343
344         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
345                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
346                 digest = kmalloc(digest_size, GFP_NOIO);
347                 if (digest) {
348                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
349
350                         inc_rs_pending(mdev);
351                         ok = drbd_send_drequest_csum(mdev,
352                                                      e->sector,
353                                                      e->size,
354                                                      digest,
355                                                      digest_size,
356                                                      P_CSUM_RS_REQUEST);
357                         kfree(digest);
358                 } else {
359                         dev_err(DEV, "kmalloc() of digest failed.\n");
360                         ok = 0;
361                 }
362         } else
363                 ok = 1;
364
365         drbd_free_ee(mdev, e);
366
367         if (unlikely(!ok))
368                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
369         return ok;
370 }
371
372 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
373
374 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
375 {
376         struct drbd_epoch_entry *e;
377
378         if (!get_ldev(mdev))
379                 return 0;
380
381         /* GFP_TRY, because if there is no memory available right now, this may
382          * be rescheduled for later. It is "only" background resync, after all. */
383         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
384         if (!e)
385                 goto fail;
386
387         spin_lock_irq(&mdev->req_lock);
388         list_add(&e->w.list, &mdev->read_ee);
389         spin_unlock_irq(&mdev->req_lock);
390
391         e->w.cb = w_e_send_csum;
392         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
393                 return 1;
394
395         drbd_free_ee(mdev, e);
396 fail:
397         put_ldev(mdev);
398         return 2;
399 }
400
401 void resync_timer_fn(unsigned long data)
402 {
403         unsigned long flags;
404         struct drbd_conf *mdev = (struct drbd_conf *) data;
405         int queue;
406
407         spin_lock_irqsave(&mdev->req_lock, flags);
408
409         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
410                 queue = 1;
411                 if (mdev->state.conn == C_VERIFY_S)
412                         mdev->resync_work.cb = w_make_ov_request;
413                 else
414                         mdev->resync_work.cb = w_make_resync_request;
415         } else {
416                 queue = 0;
417                 mdev->resync_work.cb = w_resync_inactive;
418         }
419
420         spin_unlock_irqrestore(&mdev->req_lock, flags);
421
422         /* harmless race: list_empty outside data.work.q_lock */
423         if (list_empty(&mdev->resync_work.list) && queue)
424                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
425 }
426
427 int w_make_resync_request(struct drbd_conf *mdev,
428                 struct drbd_work *w, int cancel)
429 {
430         unsigned long bit;
431         sector_t sector;
432         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
433         int max_segment_size;
434         int number, i, size, pe, mx;
435         int align, queued, sndbuf;
436
437         if (unlikely(cancel))
438                 return 1;
439
440         if (unlikely(mdev->state.conn < C_CONNECTED)) {
441                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
442                 return 0;
443         }
444
445         if (mdev->state.conn != C_SYNC_TARGET)
446                 dev_err(DEV, "%s in w_make_resync_request\n",
447                         drbd_conn_str(mdev->state.conn));
448
449         if (!get_ldev(mdev)) {
450                 /* Since we only need to access mdev->rsync a
451                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
452                    to continue resync with a broken disk makes no sense at
453                    all */
454                 dev_err(DEV, "Disk broke down during resync!\n");
455                 mdev->resync_work.cb = w_resync_inactive;
456                 return 1;
457         }
458
459         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
460          * if it should be necessary */
461         max_segment_size = mdev->agreed_pro_version < 94 ?
462                 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
463
464         number = SLEEP_TIME * mdev->sync_conf.rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
465         pe = atomic_read(&mdev->rs_pending_cnt);
466
467         mutex_lock(&mdev->data.mutex);
468         if (mdev->data.socket)
469                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
470         else
471                 mx = 1;
472         mutex_unlock(&mdev->data.mutex);
473
474         /* For resync rates >160MB/sec, allow more pending RS requests */
475         if (number > mx)
476                 mx = number;
477
478         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
479         if ((pe + number) > mx) {
480                 number = mx - pe;
481         }
482
483         for (i = 0; i < number; i++) {
484                 /* Stop generating RS requests, when half of the send buffer is filled */
485                 mutex_lock(&mdev->data.mutex);
486                 if (mdev->data.socket) {
487                         queued = mdev->data.socket->sk->sk_wmem_queued;
488                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
489                 } else {
490                         queued = 1;
491                         sndbuf = 0;
492                 }
493                 mutex_unlock(&mdev->data.mutex);
494                 if (queued > sndbuf / 2)
495                         goto requeue;
496
497 next_sector:
498                 size = BM_BLOCK_SIZE;
499                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
500
501                 if (bit == -1UL) {
502                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
503                         mdev->resync_work.cb = w_resync_inactive;
504                         put_ldev(mdev);
505                         return 1;
506                 }
507
508                 sector = BM_BIT_TO_SECT(bit);
509
510                 if (drbd_try_rs_begin_io(mdev, sector)) {
511                         mdev->bm_resync_fo = bit;
512                         goto requeue;
513                 }
514                 mdev->bm_resync_fo = bit + 1;
515
516                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
517                         drbd_rs_complete_io(mdev, sector);
518                         goto next_sector;
519                 }
520
521 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
522                 /* try to find some adjacent bits.
523                  * we stop if we have already the maximum req size.
524                  *
525                  * Additionally always align bigger requests, in order to
526                  * be prepared for all stripe sizes of software RAIDs.
527                  */
528                 align = 1;
529                 for (;;) {
530                         if (size + BM_BLOCK_SIZE > max_segment_size)
531                                 break;
532
533                         /* Be always aligned */
534                         if (sector & ((1<<(align+3))-1))
535                                 break;
536
537                         /* do not cross extent boundaries */
538                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
539                                 break;
540                         /* now, is it actually dirty, after all?
541                          * caution, drbd_bm_test_bit is tri-state for some
542                          * obscure reason; ( b == 0 ) would get the out-of-band
543                          * only accidentally right because of the "oddly sized"
544                          * adjustment below */
545                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
546                                 break;
547                         bit++;
548                         size += BM_BLOCK_SIZE;
549                         if ((BM_BLOCK_SIZE << align) <= size)
550                                 align++;
551                         i++;
552                 }
553                 /* if we merged some,
554                  * reset the offset to start the next drbd_bm_find_next from */
555                 if (size > BM_BLOCK_SIZE)
556                         mdev->bm_resync_fo = bit + 1;
557 #endif
558
559                 /* adjust very last sectors, in case we are oddly sized */
560                 if (sector + (size>>9) > capacity)
561                         size = (capacity-sector)<<9;
562                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
563                         switch (read_for_csum(mdev, sector, size)) {
564                         case 0: /* Disk failure*/
565                                 put_ldev(mdev);
566                                 return 0;
567                         case 2: /* Allocation failed */
568                                 drbd_rs_complete_io(mdev, sector);
569                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
570                                 goto requeue;
571                         /* case 1: everything ok */
572                         }
573                 } else {
574                         inc_rs_pending(mdev);
575                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
576                                                sector, size, ID_SYNCER)) {
577                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
578                                 dec_rs_pending(mdev);
579                                 put_ldev(mdev);
580                                 return 0;
581                         }
582                 }
583         }
584
585         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
586                 /* last syncer _request_ was sent,
587                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
588                  * next sync group will resume), as soon as we receive the last
589                  * resync data block, and the last bit is cleared.
590                  * until then resync "work" is "inactive" ...
591                  */
592                 mdev->resync_work.cb = w_resync_inactive;
593                 put_ldev(mdev);
594                 return 1;
595         }
596
597  requeue:
598         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
599         put_ldev(mdev);
600         return 1;
601 }
602
603 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
604 {
605         int number, i, size;
606         sector_t sector;
607         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
608
609         if (unlikely(cancel))
610                 return 1;
611
612         if (unlikely(mdev->state.conn < C_CONNECTED)) {
613                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
614                 return 0;
615         }
616
617         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
618         if (atomic_read(&mdev->rs_pending_cnt) > number)
619                 goto requeue;
620
621         number -= atomic_read(&mdev->rs_pending_cnt);
622
623         sector = mdev->ov_position;
624         for (i = 0; i < number; i++) {
625                 if (sector >= capacity) {
626                         mdev->resync_work.cb = w_resync_inactive;
627                         return 1;
628                 }
629
630                 size = BM_BLOCK_SIZE;
631
632                 if (drbd_try_rs_begin_io(mdev, sector)) {
633                         mdev->ov_position = sector;
634                         goto requeue;
635                 }
636
637                 if (sector + (size>>9) > capacity)
638                         size = (capacity-sector)<<9;
639
640                 inc_rs_pending(mdev);
641                 if (!drbd_send_ov_request(mdev, sector, size)) {
642                         dec_rs_pending(mdev);
643                         return 0;
644                 }
645                 sector += BM_SECT_PER_BIT;
646         }
647         mdev->ov_position = sector;
648
649  requeue:
650         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
651         return 1;
652 }
653
654
655 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
656 {
657         kfree(w);
658         ov_oos_print(mdev);
659         drbd_resync_finished(mdev);
660
661         return 1;
662 }
663
664 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
665 {
666         kfree(w);
667
668         drbd_resync_finished(mdev);
669
670         return 1;
671 }
672
673 int drbd_resync_finished(struct drbd_conf *mdev)
674 {
675         unsigned long db, dt, dbdt;
676         unsigned long n_oos;
677         union drbd_state os, ns;
678         struct drbd_work *w;
679         char *khelper_cmd = NULL;
680
681         /* Remove all elements from the resync LRU. Since future actions
682          * might set bits in the (main) bitmap, then the entries in the
683          * resync LRU would be wrong. */
684         if (drbd_rs_del_all(mdev)) {
685                 /* In case this is not possible now, most probably because
686                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
687                  * queue (or even the read operations for those packets
688                  * is not finished by now).   Retry in 100ms. */
689
690                 drbd_kick_lo(mdev);
691                 __set_current_state(TASK_INTERRUPTIBLE);
692                 schedule_timeout(HZ / 10);
693                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
694                 if (w) {
695                         w->cb = w_resync_finished;
696                         drbd_queue_work(&mdev->data.work, w);
697                         return 1;
698                 }
699                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
700         }
701
702         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
703         if (dt <= 0)
704                 dt = 1;
705         db = mdev->rs_total;
706         dbdt = Bit2KB(db/dt);
707         mdev->rs_paused /= HZ;
708
709         if (!get_ldev(mdev))
710                 goto out;
711
712         spin_lock_irq(&mdev->req_lock);
713         os = mdev->state;
714
715         /* This protects us against multiple calls (that can happen in the presence
716            of application IO), and against connectivity loss just before we arrive here. */
717         if (os.conn <= C_CONNECTED)
718                 goto out_unlock;
719
720         ns = os;
721         ns.conn = C_CONNECTED;
722
723         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
724              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
725              "Online verify " : "Resync",
726              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
727
728         n_oos = drbd_bm_total_weight(mdev);
729
730         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
731                 if (n_oos) {
732                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
733                               n_oos, Bit2KB(1));
734                         khelper_cmd = "out-of-sync";
735                 }
736         } else {
737                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
738
739                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
740                         khelper_cmd = "after-resync-target";
741
742                 if (mdev->csums_tfm && mdev->rs_total) {
743                         const unsigned long s = mdev->rs_same_csum;
744                         const unsigned long t = mdev->rs_total;
745                         const int ratio =
746                                 (t == 0)     ? 0 :
747                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
748                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
749                              "transferred %luK total %luK\n",
750                              ratio,
751                              Bit2KB(mdev->rs_same_csum),
752                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
753                              Bit2KB(mdev->rs_total));
754                 }
755         }
756
757         if (mdev->rs_failed) {
758                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
759
760                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
761                         ns.disk = D_INCONSISTENT;
762                         ns.pdsk = D_UP_TO_DATE;
763                 } else {
764                         ns.disk = D_UP_TO_DATE;
765                         ns.pdsk = D_INCONSISTENT;
766                 }
767         } else {
768                 ns.disk = D_UP_TO_DATE;
769                 ns.pdsk = D_UP_TO_DATE;
770
771                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
772                         if (mdev->p_uuid) {
773                                 int i;
774                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
775                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
776                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
777                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
778                         } else {
779                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
780                         }
781                 }
782
783                 drbd_uuid_set_bm(mdev, 0UL);
784
785                 if (mdev->p_uuid) {
786                         /* Now the two UUID sets are equal, update what we
787                          * know of the peer. */
788                         int i;
789                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
790                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
791                 }
792         }
793
794         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
795 out_unlock:
796         spin_unlock_irq(&mdev->req_lock);
797         put_ldev(mdev);
798 out:
799         mdev->rs_total  = 0;
800         mdev->rs_failed = 0;
801         mdev->rs_paused = 0;
802         mdev->ov_start_sector = 0;
803
804         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
805                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
806                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
807         }
808
809         if (khelper_cmd)
810                 drbd_khelper(mdev, khelper_cmd);
811
812         return 1;
813 }
814
815 /* helper */
816 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
817 {
818         if (drbd_ee_has_active_page(e)) {
819                 /* This might happen if sendpage() has not finished */
820                 spin_lock_irq(&mdev->req_lock);
821                 list_add_tail(&e->w.list, &mdev->net_ee);
822                 spin_unlock_irq(&mdev->req_lock);
823         } else
824                 drbd_free_ee(mdev, e);
825 }
826
827 /**
828  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
829  * @mdev:       DRBD device.
830  * @w:          work object.
831  * @cancel:     The connection will be closed anyways
832  */
833 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
834 {
835         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
836         int ok;
837
838         if (unlikely(cancel)) {
839                 drbd_free_ee(mdev, e);
840                 dec_unacked(mdev);
841                 return 1;
842         }
843
844         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
845                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
846         } else {
847                 if (__ratelimit(&drbd_ratelimit_state))
848                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
849                             (unsigned long long)e->sector);
850
851                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
852         }
853
854         dec_unacked(mdev);
855
856         move_to_net_ee_or_free(mdev, e);
857
858         if (unlikely(!ok))
859                 dev_err(DEV, "drbd_send_block() failed\n");
860         return ok;
861 }
862
863 /**
864  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
865  * @mdev:       DRBD device.
866  * @w:          work object.
867  * @cancel:     The connection will be closed anyways
868  */
869 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
870 {
871         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
872         int ok;
873
874         if (unlikely(cancel)) {
875                 drbd_free_ee(mdev, e);
876                 dec_unacked(mdev);
877                 return 1;
878         }
879
880         if (get_ldev_if_state(mdev, D_FAILED)) {
881                 drbd_rs_complete_io(mdev, e->sector);
882                 put_ldev(mdev);
883         }
884
885         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
886                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
887                         inc_rs_pending(mdev);
888                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
889                 } else {
890                         if (__ratelimit(&drbd_ratelimit_state))
891                                 dev_err(DEV, "Not sending RSDataReply, "
892                                     "partner DISKLESS!\n");
893                         ok = 1;
894                 }
895         } else {
896                 if (__ratelimit(&drbd_ratelimit_state))
897                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
898                             (unsigned long long)e->sector);
899
900                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
901
902                 /* update resync data with failure */
903                 drbd_rs_failed_io(mdev, e->sector, e->size);
904         }
905
906         dec_unacked(mdev);
907
908         move_to_net_ee_or_free(mdev, e);
909
910         if (unlikely(!ok))
911                 dev_err(DEV, "drbd_send_block() failed\n");
912         return ok;
913 }
914
915 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
916 {
917         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
918         struct digest_info *di;
919         int digest_size;
920         void *digest = NULL;
921         int ok, eq = 0;
922
923         if (unlikely(cancel)) {
924                 drbd_free_ee(mdev, e);
925                 dec_unacked(mdev);
926                 return 1;
927         }
928
929         drbd_rs_complete_io(mdev, e->sector);
930
931         di = (struct digest_info *)(unsigned long)e->block_id;
932
933         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
934                 /* quick hack to try to avoid a race against reconfiguration.
935                  * a real fix would be much more involved,
936                  * introducing more locking mechanisms */
937                 if (mdev->csums_tfm) {
938                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
939                         D_ASSERT(digest_size == di->digest_size);
940                         digest = kmalloc(digest_size, GFP_NOIO);
941                 }
942                 if (digest) {
943                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
944                         eq = !memcmp(digest, di->digest, digest_size);
945                         kfree(digest);
946                 }
947
948                 if (eq) {
949                         drbd_set_in_sync(mdev, e->sector, e->size);
950                         /* rs_same_csums unit is BM_BLOCK_SIZE */
951                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
952                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
953                 } else {
954                         inc_rs_pending(mdev);
955                         e->block_id = ID_SYNCER;
956                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
957                 }
958         } else {
959                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
960                 if (__ratelimit(&drbd_ratelimit_state))
961                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
962         }
963
964         dec_unacked(mdev);
965
966         kfree(di);
967
968         move_to_net_ee_or_free(mdev, e);
969
970         if (unlikely(!ok))
971                 dev_err(DEV, "drbd_send_block/ack() failed\n");
972         return ok;
973 }
974
975 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
976 {
977         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
978         int digest_size;
979         void *digest;
980         int ok = 1;
981
982         if (unlikely(cancel))
983                 goto out;
984
985         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
986                 goto out;
987
988         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
989         /* FIXME if this allocation fails, online verify will not terminate! */
990         digest = kmalloc(digest_size, GFP_NOIO);
991         if (digest) {
992                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
993                 inc_rs_pending(mdev);
994                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
995                                              digest, digest_size, P_OV_REPLY);
996                 if (!ok)
997                         dec_rs_pending(mdev);
998                 kfree(digest);
999         }
1000
1001 out:
1002         drbd_free_ee(mdev, e);
1003
1004         dec_unacked(mdev);
1005
1006         return ok;
1007 }
1008
1009 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1010 {
1011         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1012                 mdev->ov_last_oos_size += size>>9;
1013         } else {
1014                 mdev->ov_last_oos_start = sector;
1015                 mdev->ov_last_oos_size = size>>9;
1016         }
1017         drbd_set_out_of_sync(mdev, sector, size);
1018         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1019 }
1020
1021 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1022 {
1023         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1024         struct digest_info *di;
1025         int digest_size;
1026         void *digest;
1027         int ok, eq = 0;
1028
1029         if (unlikely(cancel)) {
1030                 drbd_free_ee(mdev, e);
1031                 dec_unacked(mdev);
1032                 return 1;
1033         }
1034
1035         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1036          * the resync lru has been cleaned up already */
1037         drbd_rs_complete_io(mdev, e->sector);
1038
1039         di = (struct digest_info *)(unsigned long)e->block_id;
1040
1041         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1042                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1043                 digest = kmalloc(digest_size, GFP_NOIO);
1044                 if (digest) {
1045                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1046
1047                         D_ASSERT(digest_size == di->digest_size);
1048                         eq = !memcmp(digest, di->digest, digest_size);
1049                         kfree(digest);
1050                 }
1051         } else {
1052                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1053                 if (__ratelimit(&drbd_ratelimit_state))
1054                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1055         }
1056
1057         dec_unacked(mdev);
1058
1059         kfree(di);
1060
1061         if (!eq)
1062                 drbd_ov_oos_found(mdev, e->sector, e->size);
1063         else
1064                 ov_oos_print(mdev);
1065
1066         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1067                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1068
1069         drbd_free_ee(mdev, e);
1070
1071         if (--mdev->ov_left == 0) {
1072                 ov_oos_print(mdev);
1073                 drbd_resync_finished(mdev);
1074         }
1075
1076         return ok;
1077 }
1078
1079 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1080 {
1081         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1082         complete(&b->done);
1083         return 1;
1084 }
1085
1086 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1087 {
1088         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1089         struct p_barrier *p = &mdev->data.sbuf.barrier;
1090         int ok = 1;
1091
1092         /* really avoid racing with tl_clear.  w.cb may have been referenced
1093          * just before it was reassigned and re-queued, so double check that.
1094          * actually, this race was harmless, since we only try to send the
1095          * barrier packet here, and otherwise do nothing with the object.
1096          * but compare with the head of w_clear_epoch */
1097         spin_lock_irq(&mdev->req_lock);
1098         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1099                 cancel = 1;
1100         spin_unlock_irq(&mdev->req_lock);
1101         if (cancel)
1102                 return 1;
1103
1104         if (!drbd_get_data_sock(mdev))
1105                 return 0;
1106         p->barrier = b->br_number;
1107         /* inc_ap_pending was done where this was queued.
1108          * dec_ap_pending will be done in got_BarrierAck
1109          * or (on connection loss) in w_clear_epoch.  */
1110         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1111                                 (struct p_header *)p, sizeof(*p), 0);
1112         drbd_put_data_sock(mdev);
1113
1114         return ok;
1115 }
1116
1117 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1118 {
1119         if (cancel)
1120                 return 1;
1121         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1122 }
1123
1124 /**
1125  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1126  * @mdev:       DRBD device.
1127  * @w:          work object.
1128  * @cancel:     The connection will be closed anyways
1129  */
1130 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1131 {
1132         struct drbd_request *req = container_of(w, struct drbd_request, w);
1133         int ok;
1134
1135         if (unlikely(cancel)) {
1136                 req_mod(req, send_canceled);
1137                 return 1;
1138         }
1139
1140         ok = drbd_send_dblock(mdev, req);
1141         req_mod(req, ok ? handed_over_to_network : send_failed);
1142
1143         return ok;
1144 }
1145
1146 /**
1147  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1148  * @mdev:       DRBD device.
1149  * @w:          work object.
1150  * @cancel:     The connection will be closed anyways
1151  */
1152 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1153 {
1154         struct drbd_request *req = container_of(w, struct drbd_request, w);
1155         int ok;
1156
1157         if (unlikely(cancel)) {
1158                 req_mod(req, send_canceled);
1159                 return 1;
1160         }
1161
1162         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1163                                 (unsigned long)req);
1164
1165         if (!ok) {
1166                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1167                  * so this is probably redundant */
1168                 if (mdev->state.conn >= C_CONNECTED)
1169                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1170         }
1171         req_mod(req, ok ? handed_over_to_network : send_failed);
1172
1173         return ok;
1174 }
1175
1176 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1177 {
1178         struct drbd_conf *odev = mdev;
1179
1180         while (1) {
1181                 if (odev->sync_conf.after == -1)
1182                         return 1;
1183                 odev = minor_to_mdev(odev->sync_conf.after);
1184                 ERR_IF(!odev) return 1;
1185                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1186                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1187                     odev->state.aftr_isp || odev->state.peer_isp ||
1188                     odev->state.user_isp)
1189                         return 0;
1190         }
1191 }
1192
1193 /**
1194  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1195  * @mdev:       DRBD device.
1196  *
1197  * Called from process context only (admin command and after_state_ch).
1198  */
1199 static int _drbd_pause_after(struct drbd_conf *mdev)
1200 {
1201         struct drbd_conf *odev;
1202         int i, rv = 0;
1203
1204         for (i = 0; i < minor_count; i++) {
1205                 odev = minor_to_mdev(i);
1206                 if (!odev)
1207                         continue;
1208                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1209                         continue;
1210                 if (!_drbd_may_sync_now(odev))
1211                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1212                                != SS_NOTHING_TO_DO);
1213         }
1214
1215         return rv;
1216 }
1217
1218 /**
1219  * _drbd_resume_next() - Resume resync on all devices that may resync now
1220  * @mdev:       DRBD device.
1221  *
1222  * Called from process context only (admin command and worker).
1223  */
1224 static int _drbd_resume_next(struct drbd_conf *mdev)
1225 {
1226         struct drbd_conf *odev;
1227         int i, rv = 0;
1228
1229         for (i = 0; i < minor_count; i++) {
1230                 odev = minor_to_mdev(i);
1231                 if (!odev)
1232                         continue;
1233                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1234                         continue;
1235                 if (odev->state.aftr_isp) {
1236                         if (_drbd_may_sync_now(odev))
1237                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1238                                                         CS_HARD, NULL)
1239                                        != SS_NOTHING_TO_DO) ;
1240                 }
1241         }
1242         return rv;
1243 }
1244
1245 void resume_next_sg(struct drbd_conf *mdev)
1246 {
1247         write_lock_irq(&global_state_lock);
1248         _drbd_resume_next(mdev);
1249         write_unlock_irq(&global_state_lock);
1250 }
1251
1252 void suspend_other_sg(struct drbd_conf *mdev)
1253 {
1254         write_lock_irq(&global_state_lock);
1255         _drbd_pause_after(mdev);
1256         write_unlock_irq(&global_state_lock);
1257 }
1258
1259 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1260 {
1261         struct drbd_conf *odev;
1262
1263         if (o_minor == -1)
1264                 return NO_ERROR;
1265         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1266                 return ERR_SYNC_AFTER;
1267
1268         /* check for loops */
1269         odev = minor_to_mdev(o_minor);
1270         while (1) {
1271                 if (odev == mdev)
1272                         return ERR_SYNC_AFTER_CYCLE;
1273
1274                 /* dependency chain ends here, no cycles. */
1275                 if (odev->sync_conf.after == -1)
1276                         return NO_ERROR;
1277
1278                 /* follow the dependency chain */
1279                 odev = minor_to_mdev(odev->sync_conf.after);
1280         }
1281 }
1282
1283 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1284 {
1285         int changes;
1286         int retcode;
1287
1288         write_lock_irq(&global_state_lock);
1289         retcode = sync_after_error(mdev, na);
1290         if (retcode == NO_ERROR) {
1291                 mdev->sync_conf.after = na;
1292                 do {
1293                         changes  = _drbd_pause_after(mdev);
1294                         changes |= _drbd_resume_next(mdev);
1295                 } while (changes);
1296         }
1297         write_unlock_irq(&global_state_lock);
1298         return retcode;
1299 }
1300
1301 static void ping_peer(struct drbd_conf *mdev)
1302 {
1303         clear_bit(GOT_PING_ACK, &mdev->flags);
1304         request_ping(mdev);
1305         wait_event(mdev->misc_wait,
1306                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1307 }
1308
1309 /**
1310  * drbd_start_resync() - Start the resync process
1311  * @mdev:       DRBD device.
1312  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1313  *
1314  * This function might bring you directly into one of the
1315  * C_PAUSED_SYNC_* states.
1316  */
1317 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1318 {
1319         union drbd_state ns;
1320         int r;
1321
1322         if (mdev->state.conn >= C_SYNC_SOURCE) {
1323                 dev_err(DEV, "Resync already running!\n");
1324                 return;
1325         }
1326
1327         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1328         drbd_rs_cancel_all(mdev);
1329
1330         if (side == C_SYNC_TARGET) {
1331                 /* Since application IO was locked out during C_WF_BITMAP_T and
1332                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1333                    we check that we might make the data inconsistent. */
1334                 r = drbd_khelper(mdev, "before-resync-target");
1335                 r = (r >> 8) & 0xff;
1336                 if (r > 0) {
1337                         dev_info(DEV, "before-resync-target handler returned %d, "
1338                              "dropping connection.\n", r);
1339                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1340                         return;
1341                 }
1342         }
1343
1344         drbd_state_lock(mdev);
1345
1346         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1347                 drbd_state_unlock(mdev);
1348                 return;
1349         }
1350
1351         if (side == C_SYNC_TARGET) {
1352                 mdev->bm_resync_fo = 0;
1353         } else /* side == C_SYNC_SOURCE */ {
1354                 u64 uuid;
1355
1356                 get_random_bytes(&uuid, sizeof(u64));
1357                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1358                 drbd_send_sync_uuid(mdev, uuid);
1359
1360                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1361         }
1362
1363         write_lock_irq(&global_state_lock);
1364         ns = mdev->state;
1365
1366         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1367
1368         ns.conn = side;
1369
1370         if (side == C_SYNC_TARGET)
1371                 ns.disk = D_INCONSISTENT;
1372         else /* side == C_SYNC_SOURCE */
1373                 ns.pdsk = D_INCONSISTENT;
1374
1375         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1376         ns = mdev->state;
1377
1378         if (ns.conn < C_CONNECTED)
1379                 r = SS_UNKNOWN_ERROR;
1380
1381         if (r == SS_SUCCESS) {
1382                 mdev->rs_total     =
1383                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1384                 mdev->rs_failed    = 0;
1385                 mdev->rs_paused    = 0;
1386                 mdev->rs_start     =
1387                 mdev->rs_mark_time = jiffies;
1388                 mdev->rs_same_csum = 0;
1389                 _drbd_pause_after(mdev);
1390         }
1391         write_unlock_irq(&global_state_lock);
1392         put_ldev(mdev);
1393
1394         if (r == SS_SUCCESS) {
1395                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1396                      drbd_conn_str(ns.conn),
1397                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1398                      (unsigned long) mdev->rs_total);
1399
1400                 if (mdev->rs_total == 0) {
1401                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1402                         ping_peer(mdev);
1403                         drbd_resync_finished(mdev);
1404                 }
1405
1406                 /* ns.conn may already be != mdev->state.conn,
1407                  * we may have been paused in between, or become paused until
1408                  * the timer triggers.
1409                  * No matter, that is handled in resync_timer_fn() */
1410                 if (ns.conn == C_SYNC_TARGET)
1411                         mod_timer(&mdev->resync_timer, jiffies);
1412
1413                 drbd_md_sync(mdev);
1414         }
1415         drbd_state_unlock(mdev);
1416 }
1417
1418 int drbd_worker(struct drbd_thread *thi)
1419 {
1420         struct drbd_conf *mdev = thi->mdev;
1421         struct drbd_work *w = NULL;
1422         LIST_HEAD(work_list);
1423         int intr = 0, i;
1424
1425         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1426
1427         while (get_t_state(thi) == Running) {
1428                 drbd_thread_current_set_cpu(mdev);
1429
1430                 if (down_trylock(&mdev->data.work.s)) {
1431                         mutex_lock(&mdev->data.mutex);
1432                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1433                                 drbd_tcp_uncork(mdev->data.socket);
1434                         mutex_unlock(&mdev->data.mutex);
1435
1436                         intr = down_interruptible(&mdev->data.work.s);
1437
1438                         mutex_lock(&mdev->data.mutex);
1439                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1440                                 drbd_tcp_cork(mdev->data.socket);
1441                         mutex_unlock(&mdev->data.mutex);
1442                 }
1443
1444                 if (intr) {
1445                         D_ASSERT(intr == -EINTR);
1446                         flush_signals(current);
1447                         ERR_IF (get_t_state(thi) == Running)
1448                                 continue;
1449                         break;
1450                 }
1451
1452                 if (get_t_state(thi) != Running)
1453                         break;
1454                 /* With this break, we have done a down() but not consumed
1455                    the entry from the list. The cleanup code takes care of
1456                    this...   */
1457
1458                 w = NULL;
1459                 spin_lock_irq(&mdev->data.work.q_lock);
1460                 ERR_IF(list_empty(&mdev->data.work.q)) {
1461                         /* something terribly wrong in our logic.
1462                          * we were able to down() the semaphore,
1463                          * but the list is empty... doh.
1464                          *
1465                          * what is the best thing to do now?
1466                          * try again from scratch, restarting the receiver,
1467                          * asender, whatnot? could break even more ugly,
1468                          * e.g. when we are primary, but no good local data.
1469                          *
1470                          * I'll try to get away just starting over this loop.
1471                          */
1472                         spin_unlock_irq(&mdev->data.work.q_lock);
1473                         continue;
1474                 }
1475                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1476                 list_del_init(&w->list);
1477                 spin_unlock_irq(&mdev->data.work.q_lock);
1478
1479                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1480                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1481                         if (mdev->state.conn >= C_CONNECTED)
1482                                 drbd_force_state(mdev,
1483                                                 NS(conn, C_NETWORK_FAILURE));
1484                 }
1485         }
1486         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1487         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1488
1489         spin_lock_irq(&mdev->data.work.q_lock);
1490         i = 0;
1491         while (!list_empty(&mdev->data.work.q)) {
1492                 list_splice_init(&mdev->data.work.q, &work_list);
1493                 spin_unlock_irq(&mdev->data.work.q_lock);
1494
1495                 while (!list_empty(&work_list)) {
1496                         w = list_entry(work_list.next, struct drbd_work, list);
1497                         list_del_init(&w->list);
1498                         w->cb(mdev, w, 1);
1499                         i++; /* dead debugging code */
1500                 }
1501
1502                 spin_lock_irq(&mdev->data.work.q_lock);
1503         }
1504         sema_init(&mdev->data.work.s, 0);
1505         /* DANGEROUS race: if someone did queue his work within the spinlock,
1506          * but up() ed outside the spinlock, we could get an up() on the
1507          * semaphore without corresponding list entry.
1508          * So don't do that.
1509          */
1510         spin_unlock_irq(&mdev->data.work.q_lock);
1511
1512         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1513         /* _drbd_set_state only uses stop_nowait.
1514          * wait here for the Exiting receiver. */
1515         drbd_thread_stop(&mdev->receiver);
1516         drbd_mdev_cleanup(mdev);
1517
1518         dev_info(DEV, "worker terminated\n");
1519
1520         clear_bit(DEVICE_DYING, &mdev->flags);
1521         clear_bit(CONFIG_PENDING, &mdev->flags);
1522         wake_up(&mdev->state_wait);
1523
1524         return 0;
1525 }