Merge git://git.kernel.org/pub/scm/linux/kernel/git/lethal/sh-2.6
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_worker.c
1 /*
2    drbd_worker.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23
24  */
25
26 #include <linux/module.h>
27 #include <linux/drbd.h>
28 #include <linux/sched.h>
29 #include <linux/smp_lock.h>
30 #include <linux/wait.h>
31 #include <linux/mm.h>
32 #include <linux/memcontrol.h>
33 #include <linux/mm_inline.h>
34 #include <linux/slab.h>
35 #include <linux/random.h>
36 #include <linux/string.h>
37 #include <linux/scatterlist.h>
38
39 #include "drbd_int.h"
40 #include "drbd_req.h"
41
42 #define SLEEP_TIME (HZ/10)
43
44 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
45
46
47
48 /* defined here:
49    drbd_md_io_complete
50    drbd_endio_sec
51    drbd_endio_pri
52
53  * more endio handlers:
54    atodb_endio in drbd_actlog.c
55    drbd_bm_async_io_complete in drbd_bitmap.c
56
57  * For all these callbacks, note the following:
58  * The callbacks will be called in irq context by the IDE drivers,
59  * and in Softirqs/Tasklets/BH context by the SCSI drivers.
60  * Try to get the locking right :)
61  *
62  */
63
64
65 /* About the global_state_lock
66    Each state transition on an device holds a read lock. In case we have
67    to evaluate the sync after dependencies, we grab a write lock, because
68    we need stable states on all devices for that.  */
69 rwlock_t global_state_lock;
70
71 /* used for synchronous meta data and bitmap IO
72  * submitted by drbd_md_sync_page_io()
73  */
74 void drbd_md_io_complete(struct bio *bio, int error)
75 {
76         struct drbd_md_io *md_io;
77
78         md_io = (struct drbd_md_io *)bio->bi_private;
79         md_io->error = error;
80
81         complete(&md_io->event);
82 }
83
84 /* reads on behalf of the partner,
85  * "submitted" by the receiver
86  */
87 void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
88 {
89         unsigned long flags = 0;
90         struct drbd_conf *mdev = e->mdev;
91
92         D_ASSERT(e->block_id != ID_VACANT);
93
94         spin_lock_irqsave(&mdev->req_lock, flags);
95         mdev->read_cnt += e->size >> 9;
96         list_del(&e->w.list);
97         if (list_empty(&mdev->read_ee))
98                 wake_up(&mdev->ee_wait);
99         if (test_bit(__EE_WAS_ERROR, &e->flags))
100                 __drbd_chk_io_error(mdev, FALSE);
101         spin_unlock_irqrestore(&mdev->req_lock, flags);
102
103         drbd_queue_work(&mdev->data.work, &e->w);
104         put_ldev(mdev);
105 }
106
107 static int is_failed_barrier(int ee_flags)
108 {
109         return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED))
110                         == (EE_IS_BARRIER|EE_WAS_ERROR);
111 }
112
113 /* writes on behalf of the partner, or resync writes,
114  * "submitted" by the receiver, final stage.  */
115 static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
116 {
117         unsigned long flags = 0;
118         struct drbd_conf *mdev = e->mdev;
119         sector_t e_sector;
120         int do_wake;
121         int is_syncer_req;
122         int do_al_complete_io;
123
124         /* if this is a failed barrier request, disable use of barriers,
125          * and schedule for resubmission */
126         if (is_failed_barrier(e->flags)) {
127                 drbd_bump_write_ordering(mdev, WO_bdev_flush);
128                 spin_lock_irqsave(&mdev->req_lock, flags);
129                 list_del(&e->w.list);
130                 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED;
131                 e->w.cb = w_e_reissue;
132                 /* put_ldev actually happens below, once we come here again. */
133                 __release(local);
134                 spin_unlock_irqrestore(&mdev->req_lock, flags);
135                 drbd_queue_work(&mdev->data.work, &e->w);
136                 return;
137         }
138
139         D_ASSERT(e->block_id != ID_VACANT);
140
141         /* after we moved e to done_ee,
142          * we may no longer access it,
143          * it may be freed/reused already!
144          * (as soon as we release the req_lock) */
145         e_sector = e->sector;
146         do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
147         is_syncer_req = is_syncer_block_id(e->block_id);
148
149         spin_lock_irqsave(&mdev->req_lock, flags);
150         mdev->writ_cnt += e->size >> 9;
151         list_del(&e->w.list); /* has been on active_ee or sync_ee */
152         list_add_tail(&e->w.list, &mdev->done_ee);
153
154         /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
155          * neither did we wake possibly waiting conflicting requests.
156          * done from "drbd_process_done_ee" within the appropriate w.cb
157          * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
158
159         do_wake = is_syncer_req
160                 ? list_empty(&mdev->sync_ee)
161                 : list_empty(&mdev->active_ee);
162
163         if (test_bit(__EE_WAS_ERROR, &e->flags))
164                 __drbd_chk_io_error(mdev, FALSE);
165         spin_unlock_irqrestore(&mdev->req_lock, flags);
166
167         if (is_syncer_req)
168                 drbd_rs_complete_io(mdev, e_sector);
169
170         if (do_wake)
171                 wake_up(&mdev->ee_wait);
172
173         if (do_al_complete_io)
174                 drbd_al_complete_io(mdev, e_sector);
175
176         wake_asender(mdev);
177         put_ldev(mdev);
178 }
179
180 /* writes on behalf of the partner, or resync writes,
181  * "submitted" by the receiver.
182  */
183 void drbd_endio_sec(struct bio *bio, int error)
184 {
185         struct drbd_epoch_entry *e = bio->bi_private;
186         struct drbd_conf *mdev = e->mdev;
187         int uptodate = bio_flagged(bio, BIO_UPTODATE);
188         int is_write = bio_data_dir(bio) == WRITE;
189
190         if (error)
191                 dev_warn(DEV, "%s: error=%d s=%llus\n",
192                                 is_write ? "write" : "read", error,
193                                 (unsigned long long)e->sector);
194         if (!error && !uptodate) {
195                 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
196                                 is_write ? "write" : "read",
197                                 (unsigned long long)e->sector);
198                 /* strange behavior of some lower level drivers...
199                  * fail the request by clearing the uptodate flag,
200                  * but do not return any error?! */
201                 error = -EIO;
202         }
203
204         if (error)
205                 set_bit(__EE_WAS_ERROR, &e->flags);
206
207         bio_put(bio); /* no need for the bio anymore */
208         if (atomic_dec_and_test(&e->pending_bios)) {
209                 if (is_write)
210                         drbd_endio_write_sec_final(e);
211                 else
212                         drbd_endio_read_sec_final(e);
213         }
214 }
215
216 /* read, readA or write requests on R_PRIMARY coming from drbd_make_request
217  */
218 void drbd_endio_pri(struct bio *bio, int error)
219 {
220         unsigned long flags;
221         struct drbd_request *req = bio->bi_private;
222         struct drbd_conf *mdev = req->mdev;
223         struct bio_and_error m;
224         enum drbd_req_event what;
225         int uptodate = bio_flagged(bio, BIO_UPTODATE);
226
227         if (!error && !uptodate) {
228                 dev_warn(DEV, "p %s: setting error to -EIO\n",
229                          bio_data_dir(bio) == WRITE ? "write" : "read");
230                 /* strange behavior of some lower level drivers...
231                  * fail the request by clearing the uptodate flag,
232                  * but do not return any error?! */
233                 error = -EIO;
234         }
235
236         /* to avoid recursion in __req_mod */
237         if (unlikely(error)) {
238                 what = (bio_data_dir(bio) == WRITE)
239                         ? write_completed_with_error
240                         : (bio_rw(bio) == READ)
241                           ? read_completed_with_error
242                           : read_ahead_completed_with_error;
243         } else
244                 what = completed_ok;
245
246         bio_put(req->private_bio);
247         req->private_bio = ERR_PTR(error);
248
249         spin_lock_irqsave(&mdev->req_lock, flags);
250         __req_mod(req, what, &m);
251         spin_unlock_irqrestore(&mdev->req_lock, flags);
252
253         if (m.bio)
254                 complete_master_bio(mdev, &m);
255 }
256
257 int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
258 {
259         struct drbd_request *req = container_of(w, struct drbd_request, w);
260
261         /* We should not detach for read io-error,
262          * but try to WRITE the P_DATA_REPLY to the failed location,
263          * to give the disk the chance to relocate that block */
264
265         spin_lock_irq(&mdev->req_lock);
266         if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
267                 _req_mod(req, read_retry_remote_canceled);
268                 spin_unlock_irq(&mdev->req_lock);
269                 return 1;
270         }
271         spin_unlock_irq(&mdev->req_lock);
272
273         return w_send_read_req(mdev, w, 0);
274 }
275
276 int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
277 {
278         ERR_IF(cancel) return 1;
279         dev_err(DEV, "resync inactive, but callback triggered??\n");
280         return 1; /* Simply ignore this! */
281 }
282
283 void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
284 {
285         struct hash_desc desc;
286         struct scatterlist sg;
287         struct page *page = e->pages;
288         struct page *tmp;
289         unsigned len;
290
291         desc.tfm = tfm;
292         desc.flags = 0;
293
294         sg_init_table(&sg, 1);
295         crypto_hash_init(&desc);
296
297         while ((tmp = page_chain_next(page))) {
298                 /* all but the last page will be fully used */
299                 sg_set_page(&sg, page, PAGE_SIZE, 0);
300                 crypto_hash_update(&desc, &sg, sg.length);
301                 page = tmp;
302         }
303         /* and now the last, possibly only partially used page */
304         len = e->size & (PAGE_SIZE - 1);
305         sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
306         crypto_hash_update(&desc, &sg, sg.length);
307         crypto_hash_final(&desc, digest);
308 }
309
310 void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
311 {
312         struct hash_desc desc;
313         struct scatterlist sg;
314         struct bio_vec *bvec;
315         int i;
316
317         desc.tfm = tfm;
318         desc.flags = 0;
319
320         sg_init_table(&sg, 1);
321         crypto_hash_init(&desc);
322
323         __bio_for_each_segment(bvec, bio, i, 0) {
324                 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
325                 crypto_hash_update(&desc, &sg, sg.length);
326         }
327         crypto_hash_final(&desc, digest);
328 }
329
330 static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
331 {
332         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
333         int digest_size;
334         void *digest;
335         int ok;
336
337         D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
338
339         if (unlikely(cancel)) {
340                 drbd_free_ee(mdev, e);
341                 return 1;
342         }
343
344         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
345                 digest_size = crypto_hash_digestsize(mdev->csums_tfm);
346                 digest = kmalloc(digest_size, GFP_NOIO);
347                 if (digest) {
348                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
349
350                         inc_rs_pending(mdev);
351                         ok = drbd_send_drequest_csum(mdev,
352                                                      e->sector,
353                                                      e->size,
354                                                      digest,
355                                                      digest_size,
356                                                      P_CSUM_RS_REQUEST);
357                         kfree(digest);
358                 } else {
359                         dev_err(DEV, "kmalloc() of digest failed.\n");
360                         ok = 0;
361                 }
362         } else
363                 ok = 1;
364
365         drbd_free_ee(mdev, e);
366
367         if (unlikely(!ok))
368                 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
369         return ok;
370 }
371
372 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
373
374 static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
375 {
376         struct drbd_epoch_entry *e;
377
378         if (!get_ldev(mdev))
379                 return 0;
380
381         /* GFP_TRY, because if there is no memory available right now, this may
382          * be rescheduled for later. It is "only" background resync, after all. */
383         e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
384         if (!e)
385                 goto fail;
386
387         spin_lock_irq(&mdev->req_lock);
388         list_add(&e->w.list, &mdev->read_ee);
389         spin_unlock_irq(&mdev->req_lock);
390
391         e->w.cb = w_e_send_csum;
392         if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
393                 return 1;
394
395         drbd_free_ee(mdev, e);
396 fail:
397         put_ldev(mdev);
398         return 2;
399 }
400
401 void resync_timer_fn(unsigned long data)
402 {
403         unsigned long flags;
404         struct drbd_conf *mdev = (struct drbd_conf *) data;
405         int queue;
406
407         spin_lock_irqsave(&mdev->req_lock, flags);
408
409         if (likely(!test_and_clear_bit(STOP_SYNC_TIMER, &mdev->flags))) {
410                 queue = 1;
411                 if (mdev->state.conn == C_VERIFY_S)
412                         mdev->resync_work.cb = w_make_ov_request;
413                 else
414                         mdev->resync_work.cb = w_make_resync_request;
415         } else {
416                 queue = 0;
417                 mdev->resync_work.cb = w_resync_inactive;
418         }
419
420         spin_unlock_irqrestore(&mdev->req_lock, flags);
421
422         /* harmless race: list_empty outside data.work.q_lock */
423         if (list_empty(&mdev->resync_work.list) && queue)
424                 drbd_queue_work(&mdev->data.work, &mdev->resync_work);
425 }
426
427 static int calc_resync_rate(struct drbd_conf *mdev)
428 {
429         int d = mdev->data_delay / 1000; /* us -> ms */
430         int td = mdev->sync_conf.throttle_th * 100;  /* 0.1s -> ms */
431         int hd = mdev->sync_conf.hold_off_th * 100;  /* 0.1s -> ms */
432         int cr = mdev->sync_conf.rate;
433
434         return d <= td ? cr :
435                 d >= hd ? 0 :
436                 cr + (cr * (td - d) / (hd - td));
437 }
438
439 int w_make_resync_request(struct drbd_conf *mdev,
440                 struct drbd_work *w, int cancel)
441 {
442         unsigned long bit;
443         sector_t sector;
444         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
445         int max_segment_size;
446         int number, i, size, pe, mx;
447         int align, queued, sndbuf;
448
449         if (unlikely(cancel))
450                 return 1;
451
452         if (unlikely(mdev->state.conn < C_CONNECTED)) {
453                 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
454                 return 0;
455         }
456
457         if (mdev->state.conn != C_SYNC_TARGET)
458                 dev_err(DEV, "%s in w_make_resync_request\n",
459                         drbd_conn_str(mdev->state.conn));
460
461         if (!get_ldev(mdev)) {
462                 /* Since we only need to access mdev->rsync a
463                    get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
464                    to continue resync with a broken disk makes no sense at
465                    all */
466                 dev_err(DEV, "Disk broke down during resync!\n");
467                 mdev->resync_work.cb = w_resync_inactive;
468                 return 1;
469         }
470
471         /* starting with drbd 8.3.8, we can handle multi-bio EEs,
472          * if it should be necessary */
473         max_segment_size = mdev->agreed_pro_version < 94 ?
474                 queue_max_segment_size(mdev->rq_queue) : DRBD_MAX_SEGMENT_SIZE;
475
476         mdev->c_sync_rate = calc_resync_rate(mdev);
477         number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
478         pe = atomic_read(&mdev->rs_pending_cnt);
479
480         mutex_lock(&mdev->data.mutex);
481         if (mdev->data.socket)
482                 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
483         else
484                 mx = 1;
485         mutex_unlock(&mdev->data.mutex);
486
487         /* For resync rates >160MB/sec, allow more pending RS requests */
488         if (number > mx)
489                 mx = number;
490
491         /* Limit the number of pending RS requests to no more than the peer's receive buffer */
492         if ((pe + number) > mx) {
493                 number = mx - pe;
494         }
495
496         for (i = 0; i < number; i++) {
497                 /* Stop generating RS requests, when half of the send buffer is filled */
498                 mutex_lock(&mdev->data.mutex);
499                 if (mdev->data.socket) {
500                         queued = mdev->data.socket->sk->sk_wmem_queued;
501                         sndbuf = mdev->data.socket->sk->sk_sndbuf;
502                 } else {
503                         queued = 1;
504                         sndbuf = 0;
505                 }
506                 mutex_unlock(&mdev->data.mutex);
507                 if (queued > sndbuf / 2)
508                         goto requeue;
509
510 next_sector:
511                 size = BM_BLOCK_SIZE;
512                 bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
513
514                 if (bit == -1UL) {
515                         mdev->bm_resync_fo = drbd_bm_bits(mdev);
516                         mdev->resync_work.cb = w_resync_inactive;
517                         put_ldev(mdev);
518                         return 1;
519                 }
520
521                 sector = BM_BIT_TO_SECT(bit);
522
523                 if (drbd_try_rs_begin_io(mdev, sector)) {
524                         mdev->bm_resync_fo = bit;
525                         goto requeue;
526                 }
527                 mdev->bm_resync_fo = bit + 1;
528
529                 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
530                         drbd_rs_complete_io(mdev, sector);
531                         goto next_sector;
532                 }
533
534 #if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
535                 /* try to find some adjacent bits.
536                  * we stop if we have already the maximum req size.
537                  *
538                  * Additionally always align bigger requests, in order to
539                  * be prepared for all stripe sizes of software RAIDs.
540                  */
541                 align = 1;
542                 for (;;) {
543                         if (size + BM_BLOCK_SIZE > max_segment_size)
544                                 break;
545
546                         /* Be always aligned */
547                         if (sector & ((1<<(align+3))-1))
548                                 break;
549
550                         /* do not cross extent boundaries */
551                         if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
552                                 break;
553                         /* now, is it actually dirty, after all?
554                          * caution, drbd_bm_test_bit is tri-state for some
555                          * obscure reason; ( b == 0 ) would get the out-of-band
556                          * only accidentally right because of the "oddly sized"
557                          * adjustment below */
558                         if (drbd_bm_test_bit(mdev, bit+1) != 1)
559                                 break;
560                         bit++;
561                         size += BM_BLOCK_SIZE;
562                         if ((BM_BLOCK_SIZE << align) <= size)
563                                 align++;
564                         i++;
565                 }
566                 /* if we merged some,
567                  * reset the offset to start the next drbd_bm_find_next from */
568                 if (size > BM_BLOCK_SIZE)
569                         mdev->bm_resync_fo = bit + 1;
570 #endif
571
572                 /* adjust very last sectors, in case we are oddly sized */
573                 if (sector + (size>>9) > capacity)
574                         size = (capacity-sector)<<9;
575                 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
576                         switch (read_for_csum(mdev, sector, size)) {
577                         case 0: /* Disk failure*/
578                                 put_ldev(mdev);
579                                 return 0;
580                         case 2: /* Allocation failed */
581                                 drbd_rs_complete_io(mdev, sector);
582                                 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
583                                 goto requeue;
584                         /* case 1: everything ok */
585                         }
586                 } else {
587                         inc_rs_pending(mdev);
588                         if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
589                                                sector, size, ID_SYNCER)) {
590                                 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
591                                 dec_rs_pending(mdev);
592                                 put_ldev(mdev);
593                                 return 0;
594                         }
595                 }
596         }
597
598         if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
599                 /* last syncer _request_ was sent,
600                  * but the P_RS_DATA_REPLY not yet received.  sync will end (and
601                  * next sync group will resume), as soon as we receive the last
602                  * resync data block, and the last bit is cleared.
603                  * until then resync "work" is "inactive" ...
604                  */
605                 mdev->resync_work.cb = w_resync_inactive;
606                 put_ldev(mdev);
607                 return 1;
608         }
609
610  requeue:
611         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
612         put_ldev(mdev);
613         return 1;
614 }
615
616 static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
617 {
618         int number, i, size;
619         sector_t sector;
620         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
621
622         if (unlikely(cancel))
623                 return 1;
624
625         if (unlikely(mdev->state.conn < C_CONNECTED)) {
626                 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
627                 return 0;
628         }
629
630         number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
631         if (atomic_read(&mdev->rs_pending_cnt) > number)
632                 goto requeue;
633
634         number -= atomic_read(&mdev->rs_pending_cnt);
635
636         sector = mdev->ov_position;
637         for (i = 0; i < number; i++) {
638                 if (sector >= capacity) {
639                         mdev->resync_work.cb = w_resync_inactive;
640                         return 1;
641                 }
642
643                 size = BM_BLOCK_SIZE;
644
645                 if (drbd_try_rs_begin_io(mdev, sector)) {
646                         mdev->ov_position = sector;
647                         goto requeue;
648                 }
649
650                 if (sector + (size>>9) > capacity)
651                         size = (capacity-sector)<<9;
652
653                 inc_rs_pending(mdev);
654                 if (!drbd_send_ov_request(mdev, sector, size)) {
655                         dec_rs_pending(mdev);
656                         return 0;
657                 }
658                 sector += BM_SECT_PER_BIT;
659         }
660         mdev->ov_position = sector;
661
662  requeue:
663         mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
664         return 1;
665 }
666
667
668 int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
669 {
670         kfree(w);
671         ov_oos_print(mdev);
672         drbd_resync_finished(mdev);
673
674         return 1;
675 }
676
677 static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
678 {
679         kfree(w);
680
681         drbd_resync_finished(mdev);
682
683         return 1;
684 }
685
686 int drbd_resync_finished(struct drbd_conf *mdev)
687 {
688         unsigned long db, dt, dbdt;
689         unsigned long n_oos;
690         union drbd_state os, ns;
691         struct drbd_work *w;
692         char *khelper_cmd = NULL;
693
694         /* Remove all elements from the resync LRU. Since future actions
695          * might set bits in the (main) bitmap, then the entries in the
696          * resync LRU would be wrong. */
697         if (drbd_rs_del_all(mdev)) {
698                 /* In case this is not possible now, most probably because
699                  * there are P_RS_DATA_REPLY Packets lingering on the worker's
700                  * queue (or even the read operations for those packets
701                  * is not finished by now).   Retry in 100ms. */
702
703                 drbd_kick_lo(mdev);
704                 __set_current_state(TASK_INTERRUPTIBLE);
705                 schedule_timeout(HZ / 10);
706                 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
707                 if (w) {
708                         w->cb = w_resync_finished;
709                         drbd_queue_work(&mdev->data.work, w);
710                         return 1;
711                 }
712                 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
713         }
714
715         dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
716         if (dt <= 0)
717                 dt = 1;
718         db = mdev->rs_total;
719         dbdt = Bit2KB(db/dt);
720         mdev->rs_paused /= HZ;
721
722         if (!get_ldev(mdev))
723                 goto out;
724
725         spin_lock_irq(&mdev->req_lock);
726         os = mdev->state;
727
728         /* This protects us against multiple calls (that can happen in the presence
729            of application IO), and against connectivity loss just before we arrive here. */
730         if (os.conn <= C_CONNECTED)
731                 goto out_unlock;
732
733         ns = os;
734         ns.conn = C_CONNECTED;
735
736         dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
737              (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
738              "Online verify " : "Resync",
739              dt + mdev->rs_paused, mdev->rs_paused, dbdt);
740
741         n_oos = drbd_bm_total_weight(mdev);
742
743         if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
744                 if (n_oos) {
745                         dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
746                               n_oos, Bit2KB(1));
747                         khelper_cmd = "out-of-sync";
748                 }
749         } else {
750                 D_ASSERT((n_oos - mdev->rs_failed) == 0);
751
752                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
753                         khelper_cmd = "after-resync-target";
754
755                 if (mdev->csums_tfm && mdev->rs_total) {
756                         const unsigned long s = mdev->rs_same_csum;
757                         const unsigned long t = mdev->rs_total;
758                         const int ratio =
759                                 (t == 0)     ? 0 :
760                         (t < 100000) ? ((s*100)/t) : (s/(t/100));
761                         dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
762                              "transferred %luK total %luK\n",
763                              ratio,
764                              Bit2KB(mdev->rs_same_csum),
765                              Bit2KB(mdev->rs_total - mdev->rs_same_csum),
766                              Bit2KB(mdev->rs_total));
767                 }
768         }
769
770         if (mdev->rs_failed) {
771                 dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
772
773                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
774                         ns.disk = D_INCONSISTENT;
775                         ns.pdsk = D_UP_TO_DATE;
776                 } else {
777                         ns.disk = D_UP_TO_DATE;
778                         ns.pdsk = D_INCONSISTENT;
779                 }
780         } else {
781                 ns.disk = D_UP_TO_DATE;
782                 ns.pdsk = D_UP_TO_DATE;
783
784                 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
785                         if (mdev->p_uuid) {
786                                 int i;
787                                 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
788                                         _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
789                                 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
790                                 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
791                         } else {
792                                 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
793                         }
794                 }
795
796                 drbd_uuid_set_bm(mdev, 0UL);
797
798                 if (mdev->p_uuid) {
799                         /* Now the two UUID sets are equal, update what we
800                          * know of the peer. */
801                         int i;
802                         for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
803                                 mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
804                 }
805         }
806
807         _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
808 out_unlock:
809         spin_unlock_irq(&mdev->req_lock);
810         put_ldev(mdev);
811 out:
812         mdev->rs_total  = 0;
813         mdev->rs_failed = 0;
814         mdev->rs_paused = 0;
815         mdev->ov_start_sector = 0;
816
817         if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
818                 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n");
819                 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
820         }
821
822         if (khelper_cmd)
823                 drbd_khelper(mdev, khelper_cmd);
824
825         return 1;
826 }
827
828 /* helper */
829 static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
830 {
831         if (drbd_ee_has_active_page(e)) {
832                 /* This might happen if sendpage() has not finished */
833                 spin_lock_irq(&mdev->req_lock);
834                 list_add_tail(&e->w.list, &mdev->net_ee);
835                 spin_unlock_irq(&mdev->req_lock);
836         } else
837                 drbd_free_ee(mdev, e);
838 }
839
840 /**
841  * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
842  * @mdev:       DRBD device.
843  * @w:          work object.
844  * @cancel:     The connection will be closed anyways
845  */
846 int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
847 {
848         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
849         int ok;
850
851         if (unlikely(cancel)) {
852                 drbd_free_ee(mdev, e);
853                 dec_unacked(mdev);
854                 return 1;
855         }
856
857         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
858                 ok = drbd_send_block(mdev, P_DATA_REPLY, e);
859         } else {
860                 if (__ratelimit(&drbd_ratelimit_state))
861                         dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
862                             (unsigned long long)e->sector);
863
864                 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
865         }
866
867         dec_unacked(mdev);
868
869         move_to_net_ee_or_free(mdev, e);
870
871         if (unlikely(!ok))
872                 dev_err(DEV, "drbd_send_block() failed\n");
873         return ok;
874 }
875
876 /**
877  * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
878  * @mdev:       DRBD device.
879  * @w:          work object.
880  * @cancel:     The connection will be closed anyways
881  */
882 int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
883 {
884         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
885         int ok;
886
887         if (unlikely(cancel)) {
888                 drbd_free_ee(mdev, e);
889                 dec_unacked(mdev);
890                 return 1;
891         }
892
893         if (get_ldev_if_state(mdev, D_FAILED)) {
894                 drbd_rs_complete_io(mdev, e->sector);
895                 put_ldev(mdev);
896         }
897
898         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
899                 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
900                         inc_rs_pending(mdev);
901                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
902                 } else {
903                         if (__ratelimit(&drbd_ratelimit_state))
904                                 dev_err(DEV, "Not sending RSDataReply, "
905                                     "partner DISKLESS!\n");
906                         ok = 1;
907                 }
908         } else {
909                 if (__ratelimit(&drbd_ratelimit_state))
910                         dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
911                             (unsigned long long)e->sector);
912
913                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
914
915                 /* update resync data with failure */
916                 drbd_rs_failed_io(mdev, e->sector, e->size);
917         }
918
919         dec_unacked(mdev);
920
921         move_to_net_ee_or_free(mdev, e);
922
923         if (unlikely(!ok))
924                 dev_err(DEV, "drbd_send_block() failed\n");
925         return ok;
926 }
927
928 int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
929 {
930         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
931         struct digest_info *di;
932         int digest_size;
933         void *digest = NULL;
934         int ok, eq = 0;
935
936         if (unlikely(cancel)) {
937                 drbd_free_ee(mdev, e);
938                 dec_unacked(mdev);
939                 return 1;
940         }
941
942         drbd_rs_complete_io(mdev, e->sector);
943
944         di = (struct digest_info *)(unsigned long)e->block_id;
945
946         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
947                 /* quick hack to try to avoid a race against reconfiguration.
948                  * a real fix would be much more involved,
949                  * introducing more locking mechanisms */
950                 if (mdev->csums_tfm) {
951                         digest_size = crypto_hash_digestsize(mdev->csums_tfm);
952                         D_ASSERT(digest_size == di->digest_size);
953                         digest = kmalloc(digest_size, GFP_NOIO);
954                 }
955                 if (digest) {
956                         drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
957                         eq = !memcmp(digest, di->digest, digest_size);
958                         kfree(digest);
959                 }
960
961                 if (eq) {
962                         drbd_set_in_sync(mdev, e->sector, e->size);
963                         /* rs_same_csums unit is BM_BLOCK_SIZE */
964                         mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
965                         ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
966                 } else {
967                         inc_rs_pending(mdev);
968                         e->block_id = ID_SYNCER;
969                         ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
970                 }
971         } else {
972                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
973                 if (__ratelimit(&drbd_ratelimit_state))
974                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
975         }
976
977         dec_unacked(mdev);
978
979         kfree(di);
980
981         move_to_net_ee_or_free(mdev, e);
982
983         if (unlikely(!ok))
984                 dev_err(DEV, "drbd_send_block/ack() failed\n");
985         return ok;
986 }
987
988 int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
989 {
990         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
991         int digest_size;
992         void *digest;
993         int ok = 1;
994
995         if (unlikely(cancel))
996                 goto out;
997
998         if (unlikely((e->flags & EE_WAS_ERROR) != 0))
999                 goto out;
1000
1001         digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1002         /* FIXME if this allocation fails, online verify will not terminate! */
1003         digest = kmalloc(digest_size, GFP_NOIO);
1004         if (digest) {
1005                 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1006                 inc_rs_pending(mdev);
1007                 ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1008                                              digest, digest_size, P_OV_REPLY);
1009                 if (!ok)
1010                         dec_rs_pending(mdev);
1011                 kfree(digest);
1012         }
1013
1014 out:
1015         drbd_free_ee(mdev, e);
1016
1017         dec_unacked(mdev);
1018
1019         return ok;
1020 }
1021
1022 void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1023 {
1024         if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1025                 mdev->ov_last_oos_size += size>>9;
1026         } else {
1027                 mdev->ov_last_oos_start = sector;
1028                 mdev->ov_last_oos_size = size>>9;
1029         }
1030         drbd_set_out_of_sync(mdev, sector, size);
1031         set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1032 }
1033
1034 int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1035 {
1036         struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1037         struct digest_info *di;
1038         int digest_size;
1039         void *digest;
1040         int ok, eq = 0;
1041
1042         if (unlikely(cancel)) {
1043                 drbd_free_ee(mdev, e);
1044                 dec_unacked(mdev);
1045                 return 1;
1046         }
1047
1048         /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1049          * the resync lru has been cleaned up already */
1050         drbd_rs_complete_io(mdev, e->sector);
1051
1052         di = (struct digest_info *)(unsigned long)e->block_id;
1053
1054         if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1055                 digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1056                 digest = kmalloc(digest_size, GFP_NOIO);
1057                 if (digest) {
1058                         drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1059
1060                         D_ASSERT(digest_size == di->digest_size);
1061                         eq = !memcmp(digest, di->digest, digest_size);
1062                         kfree(digest);
1063                 }
1064         } else {
1065                 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1066                 if (__ratelimit(&drbd_ratelimit_state))
1067                         dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1068         }
1069
1070         dec_unacked(mdev);
1071
1072         kfree(di);
1073
1074         if (!eq)
1075                 drbd_ov_oos_found(mdev, e->sector, e->size);
1076         else
1077                 ov_oos_print(mdev);
1078
1079         ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1080                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1081
1082         drbd_free_ee(mdev, e);
1083
1084         if (--mdev->ov_left == 0) {
1085                 ov_oos_print(mdev);
1086                 drbd_resync_finished(mdev);
1087         }
1088
1089         return ok;
1090 }
1091
1092 int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1093 {
1094         struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1095         complete(&b->done);
1096         return 1;
1097 }
1098
1099 int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1100 {
1101         struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1102         struct p_barrier *p = &mdev->data.sbuf.barrier;
1103         int ok = 1;
1104
1105         /* really avoid racing with tl_clear.  w.cb may have been referenced
1106          * just before it was reassigned and re-queued, so double check that.
1107          * actually, this race was harmless, since we only try to send the
1108          * barrier packet here, and otherwise do nothing with the object.
1109          * but compare with the head of w_clear_epoch */
1110         spin_lock_irq(&mdev->req_lock);
1111         if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1112                 cancel = 1;
1113         spin_unlock_irq(&mdev->req_lock);
1114         if (cancel)
1115                 return 1;
1116
1117         if (!drbd_get_data_sock(mdev))
1118                 return 0;
1119         p->barrier = b->br_number;
1120         /* inc_ap_pending was done where this was queued.
1121          * dec_ap_pending will be done in got_BarrierAck
1122          * or (on connection loss) in w_clear_epoch.  */
1123         ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1124                                 (struct p_header *)p, sizeof(*p), 0);
1125         drbd_put_data_sock(mdev);
1126
1127         return ok;
1128 }
1129
1130 int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1131 {
1132         if (cancel)
1133                 return 1;
1134         return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1135 }
1136
1137 /**
1138  * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1139  * @mdev:       DRBD device.
1140  * @w:          work object.
1141  * @cancel:     The connection will be closed anyways
1142  */
1143 int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1144 {
1145         struct drbd_request *req = container_of(w, struct drbd_request, w);
1146         int ok;
1147
1148         if (unlikely(cancel)) {
1149                 req_mod(req, send_canceled);
1150                 return 1;
1151         }
1152
1153         ok = drbd_send_dblock(mdev, req);
1154         req_mod(req, ok ? handed_over_to_network : send_failed);
1155
1156         return ok;
1157 }
1158
1159 /**
1160  * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1161  * @mdev:       DRBD device.
1162  * @w:          work object.
1163  * @cancel:     The connection will be closed anyways
1164  */
1165 int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1166 {
1167         struct drbd_request *req = container_of(w, struct drbd_request, w);
1168         int ok;
1169
1170         if (unlikely(cancel)) {
1171                 req_mod(req, send_canceled);
1172                 return 1;
1173         }
1174
1175         ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1176                                 (unsigned long)req);
1177
1178         if (!ok) {
1179                 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1180                  * so this is probably redundant */
1181                 if (mdev->state.conn >= C_CONNECTED)
1182                         drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1183         }
1184         req_mod(req, ok ? handed_over_to_network : send_failed);
1185
1186         return ok;
1187 }
1188
1189 static int _drbd_may_sync_now(struct drbd_conf *mdev)
1190 {
1191         struct drbd_conf *odev = mdev;
1192
1193         while (1) {
1194                 if (odev->sync_conf.after == -1)
1195                         return 1;
1196                 odev = minor_to_mdev(odev->sync_conf.after);
1197                 ERR_IF(!odev) return 1;
1198                 if ((odev->state.conn >= C_SYNC_SOURCE &&
1199                      odev->state.conn <= C_PAUSED_SYNC_T) ||
1200                     odev->state.aftr_isp || odev->state.peer_isp ||
1201                     odev->state.user_isp)
1202                         return 0;
1203         }
1204 }
1205
1206 /**
1207  * _drbd_pause_after() - Pause resync on all devices that may not resync now
1208  * @mdev:       DRBD device.
1209  *
1210  * Called from process context only (admin command and after_state_ch).
1211  */
1212 static int _drbd_pause_after(struct drbd_conf *mdev)
1213 {
1214         struct drbd_conf *odev;
1215         int i, rv = 0;
1216
1217         for (i = 0; i < minor_count; i++) {
1218                 odev = minor_to_mdev(i);
1219                 if (!odev)
1220                         continue;
1221                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1222                         continue;
1223                 if (!_drbd_may_sync_now(odev))
1224                         rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1225                                != SS_NOTHING_TO_DO);
1226         }
1227
1228         return rv;
1229 }
1230
1231 /**
1232  * _drbd_resume_next() - Resume resync on all devices that may resync now
1233  * @mdev:       DRBD device.
1234  *
1235  * Called from process context only (admin command and worker).
1236  */
1237 static int _drbd_resume_next(struct drbd_conf *mdev)
1238 {
1239         struct drbd_conf *odev;
1240         int i, rv = 0;
1241
1242         for (i = 0; i < minor_count; i++) {
1243                 odev = minor_to_mdev(i);
1244                 if (!odev)
1245                         continue;
1246                 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1247                         continue;
1248                 if (odev->state.aftr_isp) {
1249                         if (_drbd_may_sync_now(odev))
1250                                 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1251                                                         CS_HARD, NULL)
1252                                        != SS_NOTHING_TO_DO) ;
1253                 }
1254         }
1255         return rv;
1256 }
1257
1258 void resume_next_sg(struct drbd_conf *mdev)
1259 {
1260         write_lock_irq(&global_state_lock);
1261         _drbd_resume_next(mdev);
1262         write_unlock_irq(&global_state_lock);
1263 }
1264
1265 void suspend_other_sg(struct drbd_conf *mdev)
1266 {
1267         write_lock_irq(&global_state_lock);
1268         _drbd_pause_after(mdev);
1269         write_unlock_irq(&global_state_lock);
1270 }
1271
1272 static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1273 {
1274         struct drbd_conf *odev;
1275
1276         if (o_minor == -1)
1277                 return NO_ERROR;
1278         if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1279                 return ERR_SYNC_AFTER;
1280
1281         /* check for loops */
1282         odev = minor_to_mdev(o_minor);
1283         while (1) {
1284                 if (odev == mdev)
1285                         return ERR_SYNC_AFTER_CYCLE;
1286
1287                 /* dependency chain ends here, no cycles. */
1288                 if (odev->sync_conf.after == -1)
1289                         return NO_ERROR;
1290
1291                 /* follow the dependency chain */
1292                 odev = minor_to_mdev(odev->sync_conf.after);
1293         }
1294 }
1295
1296 int drbd_alter_sa(struct drbd_conf *mdev, int na)
1297 {
1298         int changes;
1299         int retcode;
1300
1301         write_lock_irq(&global_state_lock);
1302         retcode = sync_after_error(mdev, na);
1303         if (retcode == NO_ERROR) {
1304                 mdev->sync_conf.after = na;
1305                 do {
1306                         changes  = _drbd_pause_after(mdev);
1307                         changes |= _drbd_resume_next(mdev);
1308                 } while (changes);
1309         }
1310         write_unlock_irq(&global_state_lock);
1311         return retcode;
1312 }
1313
1314 static void ping_peer(struct drbd_conf *mdev)
1315 {
1316         clear_bit(GOT_PING_ACK, &mdev->flags);
1317         request_ping(mdev);
1318         wait_event(mdev->misc_wait,
1319                    test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
1320 }
1321
1322 /**
1323  * drbd_start_resync() - Start the resync process
1324  * @mdev:       DRBD device.
1325  * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1326  *
1327  * This function might bring you directly into one of the
1328  * C_PAUSED_SYNC_* states.
1329  */
1330 void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1331 {
1332         union drbd_state ns;
1333         int r;
1334
1335         if (mdev->state.conn >= C_SYNC_SOURCE) {
1336                 dev_err(DEV, "Resync already running!\n");
1337                 return;
1338         }
1339
1340         /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1341         drbd_rs_cancel_all(mdev);
1342
1343         if (side == C_SYNC_TARGET) {
1344                 /* Since application IO was locked out during C_WF_BITMAP_T and
1345                    C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1346                    we check that we might make the data inconsistent. */
1347                 r = drbd_khelper(mdev, "before-resync-target");
1348                 r = (r >> 8) & 0xff;
1349                 if (r > 0) {
1350                         dev_info(DEV, "before-resync-target handler returned %d, "
1351                              "dropping connection.\n", r);
1352                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1353                         return;
1354                 }
1355         }
1356
1357         drbd_state_lock(mdev);
1358
1359         if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1360                 drbd_state_unlock(mdev);
1361                 return;
1362         }
1363
1364         if (side == C_SYNC_TARGET) {
1365                 mdev->bm_resync_fo = 0;
1366         } else /* side == C_SYNC_SOURCE */ {
1367                 u64 uuid;
1368
1369                 get_random_bytes(&uuid, sizeof(u64));
1370                 drbd_uuid_set(mdev, UI_BITMAP, uuid);
1371                 drbd_send_sync_uuid(mdev, uuid);
1372
1373                 D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1374         }
1375
1376         write_lock_irq(&global_state_lock);
1377         ns = mdev->state;
1378
1379         ns.aftr_isp = !_drbd_may_sync_now(mdev);
1380
1381         ns.conn = side;
1382
1383         if (side == C_SYNC_TARGET)
1384                 ns.disk = D_INCONSISTENT;
1385         else /* side == C_SYNC_SOURCE */
1386                 ns.pdsk = D_INCONSISTENT;
1387
1388         r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1389         ns = mdev->state;
1390
1391         if (ns.conn < C_CONNECTED)
1392                 r = SS_UNKNOWN_ERROR;
1393
1394         if (r == SS_SUCCESS) {
1395                 mdev->rs_total     =
1396                 mdev->rs_mark_left = drbd_bm_total_weight(mdev);
1397                 mdev->rs_failed    = 0;
1398                 mdev->rs_paused    = 0;
1399                 mdev->rs_start     =
1400                 mdev->rs_mark_time = jiffies;
1401                 mdev->rs_same_csum = 0;
1402                 _drbd_pause_after(mdev);
1403         }
1404         write_unlock_irq(&global_state_lock);
1405         put_ldev(mdev);
1406
1407         if (r == SS_SUCCESS) {
1408                 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1409                      drbd_conn_str(ns.conn),
1410                      (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1411                      (unsigned long) mdev->rs_total);
1412
1413                 if (mdev->rs_total == 0) {
1414                         /* Peer still reachable? Beware of failing before-resync-target handlers! */
1415                         ping_peer(mdev);
1416                         drbd_resync_finished(mdev);
1417                 }
1418
1419                 /* ns.conn may already be != mdev->state.conn,
1420                  * we may have been paused in between, or become paused until
1421                  * the timer triggers.
1422                  * No matter, that is handled in resync_timer_fn() */
1423                 if (ns.conn == C_SYNC_TARGET)
1424                         mod_timer(&mdev->resync_timer, jiffies);
1425
1426                 drbd_md_sync(mdev);
1427         }
1428         drbd_state_unlock(mdev);
1429 }
1430
1431 int drbd_worker(struct drbd_thread *thi)
1432 {
1433         struct drbd_conf *mdev = thi->mdev;
1434         struct drbd_work *w = NULL;
1435         LIST_HEAD(work_list);
1436         int intr = 0, i;
1437
1438         sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1439
1440         while (get_t_state(thi) == Running) {
1441                 drbd_thread_current_set_cpu(mdev);
1442
1443                 if (down_trylock(&mdev->data.work.s)) {
1444                         mutex_lock(&mdev->data.mutex);
1445                         if (mdev->data.socket && !mdev->net_conf->no_cork)
1446                                 drbd_tcp_uncork(mdev->data.socket);
1447                         mutex_unlock(&mdev->data.mutex);
1448
1449                         intr = down_interruptible(&mdev->data.work.s);
1450
1451                         mutex_lock(&mdev->data.mutex);
1452                         if (mdev->data.socket  && !mdev->net_conf->no_cork)
1453                                 drbd_tcp_cork(mdev->data.socket);
1454                         mutex_unlock(&mdev->data.mutex);
1455                 }
1456
1457                 if (intr) {
1458                         D_ASSERT(intr == -EINTR);
1459                         flush_signals(current);
1460                         ERR_IF (get_t_state(thi) == Running)
1461                                 continue;
1462                         break;
1463                 }
1464
1465                 if (get_t_state(thi) != Running)
1466                         break;
1467                 /* With this break, we have done a down() but not consumed
1468                    the entry from the list. The cleanup code takes care of
1469                    this...   */
1470
1471                 w = NULL;
1472                 spin_lock_irq(&mdev->data.work.q_lock);
1473                 ERR_IF(list_empty(&mdev->data.work.q)) {
1474                         /* something terribly wrong in our logic.
1475                          * we were able to down() the semaphore,
1476                          * but the list is empty... doh.
1477                          *
1478                          * what is the best thing to do now?
1479                          * try again from scratch, restarting the receiver,
1480                          * asender, whatnot? could break even more ugly,
1481                          * e.g. when we are primary, but no good local data.
1482                          *
1483                          * I'll try to get away just starting over this loop.
1484                          */
1485                         spin_unlock_irq(&mdev->data.work.q_lock);
1486                         continue;
1487                 }
1488                 w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1489                 list_del_init(&w->list);
1490                 spin_unlock_irq(&mdev->data.work.q_lock);
1491
1492                 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1493                         /* dev_warn(DEV, "worker: a callback failed! \n"); */
1494                         if (mdev->state.conn >= C_CONNECTED)
1495                                 drbd_force_state(mdev,
1496                                                 NS(conn, C_NETWORK_FAILURE));
1497                 }
1498         }
1499         D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1500         D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1501
1502         spin_lock_irq(&mdev->data.work.q_lock);
1503         i = 0;
1504         while (!list_empty(&mdev->data.work.q)) {
1505                 list_splice_init(&mdev->data.work.q, &work_list);
1506                 spin_unlock_irq(&mdev->data.work.q_lock);
1507
1508                 while (!list_empty(&work_list)) {
1509                         w = list_entry(work_list.next, struct drbd_work, list);
1510                         list_del_init(&w->list);
1511                         w->cb(mdev, w, 1);
1512                         i++; /* dead debugging code */
1513                 }
1514
1515                 spin_lock_irq(&mdev->data.work.q_lock);
1516         }
1517         sema_init(&mdev->data.work.s, 0);
1518         /* DANGEROUS race: if someone did queue his work within the spinlock,
1519          * but up() ed outside the spinlock, we could get an up() on the
1520          * semaphore without corresponding list entry.
1521          * So don't do that.
1522          */
1523         spin_unlock_irq(&mdev->data.work.q_lock);
1524
1525         D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1526         /* _drbd_set_state only uses stop_nowait.
1527          * wait here for the Exiting receiver. */
1528         drbd_thread_stop(&mdev->receiver);
1529         drbd_mdev_cleanup(mdev);
1530
1531         dev_info(DEV, "worker terminated\n");
1532
1533         clear_bit(DEVICE_DYING, &mdev->flags);
1534         clear_bit(CONFIG_PENDING, &mdev->flags);
1535         wake_up(&mdev->state_wait);
1536
1537         return 0;
1538 }