Staging: hv: fix smp problems in the hyperv core code
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <asm/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/smp_lock.h>
40 #include <linux/pkt_sched.h>
41 #define __KERNEL_SYSCALLS__
42 #include <linux/unistd.h>
43 #include <linux/vmalloc.h>
44 #include <linux/random.h>
45 #include <linux/mm.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_req.h"
50
51 #include "drbd_vli.h"
52
53 struct flush_work {
54         struct drbd_work w;
55         struct drbd_epoch *epoch;
56 };
57
58 enum finish_epoch {
59         FE_STILL_LIVE,
60         FE_DESTROYED,
61         FE_RECYCLED,
62 };
63
64 static int drbd_do_handshake(struct drbd_conf *mdev);
65 static int drbd_do_auth(struct drbd_conf *mdev);
66
67 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
68 static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
69
70 static struct drbd_epoch *previous_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
71 {
72         struct drbd_epoch *prev;
73         spin_lock(&mdev->epoch_lock);
74         prev = list_entry(epoch->list.prev, struct drbd_epoch, list);
75         if (prev == epoch || prev == mdev->current_epoch)
76                 prev = NULL;
77         spin_unlock(&mdev->epoch_lock);
78         return prev;
79 }
80
81 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
82
83 static struct page *drbd_pp_first_page_or_try_alloc(struct drbd_conf *mdev)
84 {
85         struct page *page = NULL;
86
87         /* Yes, testing drbd_pp_vacant outside the lock is racy.
88          * So what. It saves a spin_lock. */
89         if (drbd_pp_vacant > 0) {
90                 spin_lock(&drbd_pp_lock);
91                 page = drbd_pp_pool;
92                 if (page) {
93                         drbd_pp_pool = (struct page *)page_private(page);
94                         set_page_private(page, 0); /* just to be polite */
95                         drbd_pp_vacant--;
96                 }
97                 spin_unlock(&drbd_pp_lock);
98         }
99         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
100          * "criss-cross" setup, that might cause write-out on some other DRBD,
101          * which in turn might block on the other node at this very place.  */
102         if (!page)
103                 page = alloc_page(GFP_TRY);
104         if (page)
105                 atomic_inc(&mdev->pp_in_use);
106         return page;
107 }
108
109 /* kick lower level device, if we have more than (arbitrary number)
110  * reference counts on it, which typically are locally submitted io
111  * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
112 static void maybe_kick_lo(struct drbd_conf *mdev)
113 {
114         if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
115                 drbd_kick_lo(mdev);
116 }
117
118 static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
119 {
120         struct drbd_epoch_entry *e;
121         struct list_head *le, *tle;
122
123         /* The EEs are always appended to the end of the list. Since
124            they are sent in order over the wire, they have to finish
125            in order. As soon as we see the first not finished we can
126            stop to examine the list... */
127
128         list_for_each_safe(le, tle, &mdev->net_ee) {
129                 e = list_entry(le, struct drbd_epoch_entry, w.list);
130                 if (drbd_bio_has_active_page(e->private_bio))
131                         break;
132                 list_move(le, to_be_freed);
133         }
134 }
135
136 static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
137 {
138         LIST_HEAD(reclaimed);
139         struct drbd_epoch_entry *e, *t;
140
141         maybe_kick_lo(mdev);
142         spin_lock_irq(&mdev->req_lock);
143         reclaim_net_ee(mdev, &reclaimed);
144         spin_unlock_irq(&mdev->req_lock);
145
146         list_for_each_entry_safe(e, t, &reclaimed, w.list)
147                 drbd_free_ee(mdev, e);
148 }
149
150 /**
151  * drbd_pp_alloc() - Returns a page, fails only if a signal comes in
152  * @mdev:       DRBD device.
153  * @retry:      whether or not to retry allocation forever (or until signalled)
154  *
155  * Tries to allocate a page, first from our own page pool, then from the
156  * kernel, unless this allocation would exceed the max_buffers setting.
157  * If @retry is non-zero, retry until DRBD frees a page somewhere else.
158  */
159 static struct page *drbd_pp_alloc(struct drbd_conf *mdev, int retry)
160 {
161         struct page *page = NULL;
162         DEFINE_WAIT(wait);
163
164         if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
165                 page = drbd_pp_first_page_or_try_alloc(mdev);
166                 if (page)
167                         return page;
168         }
169
170         for (;;) {
171                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
172
173                 drbd_kick_lo_and_reclaim_net(mdev);
174
175                 if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
176                         page = drbd_pp_first_page_or_try_alloc(mdev);
177                         if (page)
178                                 break;
179                 }
180
181                 if (!retry)
182                         break;
183
184                 if (signal_pending(current)) {
185                         dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
186                         break;
187                 }
188
189                 schedule();
190         }
191         finish_wait(&drbd_pp_wait, &wait);
192
193         return page;
194 }
195
196 /* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
197  * Is also used from inside an other spin_lock_irq(&mdev->req_lock) */
198 static void drbd_pp_free(struct drbd_conf *mdev, struct page *page)
199 {
200         int free_it;
201
202         spin_lock(&drbd_pp_lock);
203         if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
204                 free_it = 1;
205         } else {
206                 set_page_private(page, (unsigned long)drbd_pp_pool);
207                 drbd_pp_pool = page;
208                 drbd_pp_vacant++;
209                 free_it = 0;
210         }
211         spin_unlock(&drbd_pp_lock);
212
213         atomic_dec(&mdev->pp_in_use);
214
215         if (free_it)
216                 __free_page(page);
217
218         wake_up(&drbd_pp_wait);
219 }
220
221 static void drbd_pp_free_bio_pages(struct drbd_conf *mdev, struct bio *bio)
222 {
223         struct page *p_to_be_freed = NULL;
224         struct page *page;
225         struct bio_vec *bvec;
226         int i;
227
228         spin_lock(&drbd_pp_lock);
229         __bio_for_each_segment(bvec, bio, i, 0) {
230                 if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count) {
231                         set_page_private(bvec->bv_page, (unsigned long)p_to_be_freed);
232                         p_to_be_freed = bvec->bv_page;
233                 } else {
234                         set_page_private(bvec->bv_page, (unsigned long)drbd_pp_pool);
235                         drbd_pp_pool = bvec->bv_page;
236                         drbd_pp_vacant++;
237                 }
238         }
239         spin_unlock(&drbd_pp_lock);
240         atomic_sub(bio->bi_vcnt, &mdev->pp_in_use);
241
242         while (p_to_be_freed) {
243                 page = p_to_be_freed;
244                 p_to_be_freed = (struct page *)page_private(page);
245                 set_page_private(page, 0); /* just to be polite */
246                 put_page(page);
247         }
248
249         wake_up(&drbd_pp_wait);
250 }
251
252 /*
253 You need to hold the req_lock:
254  _drbd_wait_ee_list_empty()
255
256 You must not have the req_lock:
257  drbd_free_ee()
258  drbd_alloc_ee()
259  drbd_init_ee()
260  drbd_release_ee()
261  drbd_ee_fix_bhs()
262  drbd_process_done_ee()
263  drbd_clear_done_ee()
264  drbd_wait_ee_list_empty()
265 */
266
267 struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
268                                      u64 id,
269                                      sector_t sector,
270                                      unsigned int data_size,
271                                      gfp_t gfp_mask) __must_hold(local)
272 {
273         struct request_queue *q;
274         struct drbd_epoch_entry *e;
275         struct page *page;
276         struct bio *bio;
277         unsigned int ds;
278
279         if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
280                 return NULL;
281
282         e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
283         if (!e) {
284                 if (!(gfp_mask & __GFP_NOWARN))
285                         dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
286                 return NULL;
287         }
288
289         bio = bio_alloc(gfp_mask & ~__GFP_HIGHMEM, div_ceil(data_size, PAGE_SIZE));
290         if (!bio) {
291                 if (!(gfp_mask & __GFP_NOWARN))
292                         dev_err(DEV, "alloc_ee: Allocation of a bio failed\n");
293                 goto fail1;
294         }
295
296         bio->bi_bdev = mdev->ldev->backing_bdev;
297         bio->bi_sector = sector;
298
299         ds = data_size;
300         while (ds) {
301                 page = drbd_pp_alloc(mdev, (gfp_mask & __GFP_WAIT));
302                 if (!page) {
303                         if (!(gfp_mask & __GFP_NOWARN))
304                                 dev_err(DEV, "alloc_ee: Allocation of a page failed\n");
305                         goto fail2;
306                 }
307                 if (!bio_add_page(bio, page, min_t(int, ds, PAGE_SIZE), 0)) {
308                         drbd_pp_free(mdev, page);
309                         dev_err(DEV, "alloc_ee: bio_add_page(s=%llu,"
310                             "data_size=%u,ds=%u) failed\n",
311                             (unsigned long long)sector, data_size, ds);
312
313                         q = bdev_get_queue(bio->bi_bdev);
314                         if (q->merge_bvec_fn) {
315                                 struct bvec_merge_data bvm = {
316                                         .bi_bdev = bio->bi_bdev,
317                                         .bi_sector = bio->bi_sector,
318                                         .bi_size = bio->bi_size,
319                                         .bi_rw = bio->bi_rw,
320                                 };
321                                 int l = q->merge_bvec_fn(q, &bvm,
322                                                 &bio->bi_io_vec[bio->bi_vcnt]);
323                                 dev_err(DEV, "merge_bvec_fn() = %d\n", l);
324                         }
325
326                         /* dump more of the bio. */
327                         dev_err(DEV, "bio->bi_max_vecs = %d\n", bio->bi_max_vecs);
328                         dev_err(DEV, "bio->bi_vcnt = %d\n", bio->bi_vcnt);
329                         dev_err(DEV, "bio->bi_size = %d\n", bio->bi_size);
330                         dev_err(DEV, "bio->bi_phys_segments = %d\n", bio->bi_phys_segments);
331
332                         goto fail2;
333                         break;
334                 }
335                 ds -= min_t(int, ds, PAGE_SIZE);
336         }
337
338         D_ASSERT(data_size == bio->bi_size);
339
340         bio->bi_private = e;
341         e->mdev = mdev;
342         e->sector = sector;
343         e->size = bio->bi_size;
344
345         e->private_bio = bio;
346         e->block_id = id;
347         INIT_HLIST_NODE(&e->colision);
348         e->epoch = NULL;
349         e->flags = 0;
350
351         return e;
352
353  fail2:
354         drbd_pp_free_bio_pages(mdev, bio);
355         bio_put(bio);
356  fail1:
357         mempool_free(e, drbd_ee_mempool);
358
359         return NULL;
360 }
361
362 void drbd_free_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
363 {
364         struct bio *bio = e->private_bio;
365         drbd_pp_free_bio_pages(mdev, bio);
366         bio_put(bio);
367         D_ASSERT(hlist_unhashed(&e->colision));
368         mempool_free(e, drbd_ee_mempool);
369 }
370
371 int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
372 {
373         LIST_HEAD(work_list);
374         struct drbd_epoch_entry *e, *t;
375         int count = 0;
376
377         spin_lock_irq(&mdev->req_lock);
378         list_splice_init(list, &work_list);
379         spin_unlock_irq(&mdev->req_lock);
380
381         list_for_each_entry_safe(e, t, &work_list, w.list) {
382                 drbd_free_ee(mdev, e);
383                 count++;
384         }
385         return count;
386 }
387
388
389 /*
390  * This function is called from _asender only_
391  * but see also comments in _req_mod(,barrier_acked)
392  * and receive_Barrier.
393  *
394  * Move entries from net_ee to done_ee, if ready.
395  * Grab done_ee, call all callbacks, free the entries.
396  * The callbacks typically send out ACKs.
397  */
398 static int drbd_process_done_ee(struct drbd_conf *mdev)
399 {
400         LIST_HEAD(work_list);
401         LIST_HEAD(reclaimed);
402         struct drbd_epoch_entry *e, *t;
403         int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
404
405         spin_lock_irq(&mdev->req_lock);
406         reclaim_net_ee(mdev, &reclaimed);
407         list_splice_init(&mdev->done_ee, &work_list);
408         spin_unlock_irq(&mdev->req_lock);
409
410         list_for_each_entry_safe(e, t, &reclaimed, w.list)
411                 drbd_free_ee(mdev, e);
412
413         /* possible callbacks here:
414          * e_end_block, and e_end_resync_block, e_send_discard_ack.
415          * all ignore the last argument.
416          */
417         list_for_each_entry_safe(e, t, &work_list, w.list) {
418                 /* list_del not necessary, next/prev members not touched */
419                 ok = e->w.cb(mdev, &e->w, !ok) && ok;
420                 drbd_free_ee(mdev, e);
421         }
422         wake_up(&mdev->ee_wait);
423
424         return ok;
425 }
426
427 void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
428 {
429         DEFINE_WAIT(wait);
430
431         /* avoids spin_lock/unlock
432          * and calling prepare_to_wait in the fast path */
433         while (!list_empty(head)) {
434                 prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
435                 spin_unlock_irq(&mdev->req_lock);
436                 drbd_kick_lo(mdev);
437                 schedule();
438                 finish_wait(&mdev->ee_wait, &wait);
439                 spin_lock_irq(&mdev->req_lock);
440         }
441 }
442
443 void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
444 {
445         spin_lock_irq(&mdev->req_lock);
446         _drbd_wait_ee_list_empty(mdev, head);
447         spin_unlock_irq(&mdev->req_lock);
448 }
449
450 /* see also kernel_accept; which is only present since 2.6.18.
451  * also we want to log which part of it failed, exactly */
452 static int drbd_accept(struct drbd_conf *mdev, const char **what,
453                 struct socket *sock, struct socket **newsock)
454 {
455         struct sock *sk = sock->sk;
456         int err = 0;
457
458         *what = "listen";
459         err = sock->ops->listen(sock, 5);
460         if (err < 0)
461                 goto out;
462
463         *what = "sock_create_lite";
464         err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
465                                newsock);
466         if (err < 0)
467                 goto out;
468
469         *what = "accept";
470         err = sock->ops->accept(sock, *newsock, 0);
471         if (err < 0) {
472                 sock_release(*newsock);
473                 *newsock = NULL;
474                 goto out;
475         }
476         (*newsock)->ops  = sock->ops;
477
478 out:
479         return err;
480 }
481
482 static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
483                     void *buf, size_t size, int flags)
484 {
485         mm_segment_t oldfs;
486         struct kvec iov = {
487                 .iov_base = buf,
488                 .iov_len = size,
489         };
490         struct msghdr msg = {
491                 .msg_iovlen = 1,
492                 .msg_iov = (struct iovec *)&iov,
493                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
494         };
495         int rv;
496
497         oldfs = get_fs();
498         set_fs(KERNEL_DS);
499         rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
500         set_fs(oldfs);
501
502         return rv;
503 }
504
505 static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
506 {
507         mm_segment_t oldfs;
508         struct kvec iov = {
509                 .iov_base = buf,
510                 .iov_len = size,
511         };
512         struct msghdr msg = {
513                 .msg_iovlen = 1,
514                 .msg_iov = (struct iovec *)&iov,
515                 .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
516         };
517         int rv;
518
519         oldfs = get_fs();
520         set_fs(KERNEL_DS);
521
522         for (;;) {
523                 rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
524                 if (rv == size)
525                         break;
526
527                 /* Note:
528                  * ECONNRESET   other side closed the connection
529                  * ERESTARTSYS  (on  sock) we got a signal
530                  */
531
532                 if (rv < 0) {
533                         if (rv == -ECONNRESET)
534                                 dev_info(DEV, "sock was reset by peer\n");
535                         else if (rv != -ERESTARTSYS)
536                                 dev_err(DEV, "sock_recvmsg returned %d\n", rv);
537                         break;
538                 } else if (rv == 0) {
539                         dev_info(DEV, "sock was shut down by peer\n");
540                         break;
541                 } else  {
542                         /* signal came in, or peer/link went down,
543                          * after we read a partial message
544                          */
545                         /* D_ASSERT(signal_pending(current)); */
546                         break;
547                 }
548         };
549
550         set_fs(oldfs);
551
552         if (rv != size)
553                 drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
554
555         return rv;
556 }
557
558 static struct socket *drbd_try_connect(struct drbd_conf *mdev)
559 {
560         const char *what;
561         struct socket *sock;
562         struct sockaddr_in6 src_in6;
563         int err;
564         int disconnect_on_error = 1;
565
566         if (!get_net_conf(mdev))
567                 return NULL;
568
569         what = "sock_create_kern";
570         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
571                 SOCK_STREAM, IPPROTO_TCP, &sock);
572         if (err < 0) {
573                 sock = NULL;
574                 goto out;
575         }
576
577         sock->sk->sk_rcvtimeo =
578         sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
579
580        /* explicitly bind to the configured IP as source IP
581         *  for the outgoing connections.
582         *  This is needed for multihomed hosts and to be
583         *  able to use lo: interfaces for drbd.
584         * Make sure to use 0 as port number, so linux selects
585         *  a free one dynamically.
586         */
587         memcpy(&src_in6, mdev->net_conf->my_addr,
588                min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
589         if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
590                 src_in6.sin6_port = 0;
591         else
592                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
593
594         what = "bind before connect";
595         err = sock->ops->bind(sock,
596                               (struct sockaddr *) &src_in6,
597                               mdev->net_conf->my_addr_len);
598         if (err < 0)
599                 goto out;
600
601         /* connect may fail, peer not yet available.
602          * stay C_WF_CONNECTION, don't go Disconnecting! */
603         disconnect_on_error = 0;
604         what = "connect";
605         err = sock->ops->connect(sock,
606                                  (struct sockaddr *)mdev->net_conf->peer_addr,
607                                  mdev->net_conf->peer_addr_len, 0);
608
609 out:
610         if (err < 0) {
611                 if (sock) {
612                         sock_release(sock);
613                         sock = NULL;
614                 }
615                 switch (-err) {
616                         /* timeout, busy, signal pending */
617                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
618                 case EINTR: case ERESTARTSYS:
619                         /* peer not (yet) available, network problem */
620                 case ECONNREFUSED: case ENETUNREACH:
621                 case EHOSTDOWN:    case EHOSTUNREACH:
622                         disconnect_on_error = 0;
623                         break;
624                 default:
625                         dev_err(DEV, "%s failed, err = %d\n", what, err);
626                 }
627                 if (disconnect_on_error)
628                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
629         }
630         put_net_conf(mdev);
631         return sock;
632 }
633
634 static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
635 {
636         int timeo, err;
637         struct socket *s_estab = NULL, *s_listen;
638         const char *what;
639
640         if (!get_net_conf(mdev))
641                 return NULL;
642
643         what = "sock_create_kern";
644         err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
645                 SOCK_STREAM, IPPROTO_TCP, &s_listen);
646         if (err) {
647                 s_listen = NULL;
648                 goto out;
649         }
650
651         timeo = mdev->net_conf->try_connect_int * HZ;
652         timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
653
654         s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
655         s_listen->sk->sk_rcvtimeo = timeo;
656         s_listen->sk->sk_sndtimeo = timeo;
657
658         what = "bind before listen";
659         err = s_listen->ops->bind(s_listen,
660                               (struct sockaddr *) mdev->net_conf->my_addr,
661                               mdev->net_conf->my_addr_len);
662         if (err < 0)
663                 goto out;
664
665         err = drbd_accept(mdev, &what, s_listen, &s_estab);
666
667 out:
668         if (s_listen)
669                 sock_release(s_listen);
670         if (err < 0) {
671                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
672                         dev_err(DEV, "%s failed, err = %d\n", what, err);
673                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
674                 }
675         }
676         put_net_conf(mdev);
677
678         return s_estab;
679 }
680
681 static int drbd_send_fp(struct drbd_conf *mdev,
682         struct socket *sock, enum drbd_packets cmd)
683 {
684         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
685
686         return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
687 }
688
689 static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
690 {
691         struct p_header *h = (struct p_header *) &mdev->data.sbuf.header;
692         int rr;
693
694         rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
695
696         if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
697                 return be16_to_cpu(h->command);
698
699         return 0xffff;
700 }
701
702 /**
703  * drbd_socket_okay() - Free the socket if its connection is not okay
704  * @mdev:       DRBD device.
705  * @sock:       pointer to the pointer to the socket.
706  */
707 static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
708 {
709         int rr;
710         char tb[4];
711
712         if (!*sock)
713                 return FALSE;
714
715         rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
716
717         if (rr > 0 || rr == -EAGAIN) {
718                 return TRUE;
719         } else {
720                 sock_release(*sock);
721                 *sock = NULL;
722                 return FALSE;
723         }
724 }
725
726 /*
727  * return values:
728  *   1 yes, we have a valid connection
729  *   0 oops, did not work out, please try again
730  *  -1 peer talks different language,
731  *     no point in trying again, please go standalone.
732  *  -2 We do not have a network config...
733  */
734 static int drbd_connect(struct drbd_conf *mdev)
735 {
736         struct socket *s, *sock, *msock;
737         int try, h, ok;
738
739         D_ASSERT(!mdev->data.socket);
740
741         if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags))
742                 dev_err(DEV, "CREATE_BARRIER flag was set in drbd_connect - now cleared!\n");
743
744         if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
745                 return -2;
746
747         clear_bit(DISCARD_CONCURRENT, &mdev->flags);
748
749         sock  = NULL;
750         msock = NULL;
751
752         do {
753                 for (try = 0;;) {
754                         /* 3 tries, this should take less than a second! */
755                         s = drbd_try_connect(mdev);
756                         if (s || ++try >= 3)
757                                 break;
758                         /* give the other side time to call bind() & listen() */
759                         __set_current_state(TASK_INTERRUPTIBLE);
760                         schedule_timeout(HZ / 10);
761                 }
762
763                 if (s) {
764                         if (!sock) {
765                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
766                                 sock = s;
767                                 s = NULL;
768                         } else if (!msock) {
769                                 drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
770                                 msock = s;
771                                 s = NULL;
772                         } else {
773                                 dev_err(DEV, "Logic error in drbd_connect()\n");
774                                 goto out_release_sockets;
775                         }
776                 }
777
778                 if (sock && msock) {
779                         __set_current_state(TASK_INTERRUPTIBLE);
780                         schedule_timeout(HZ / 10);
781                         ok = drbd_socket_okay(mdev, &sock);
782                         ok = drbd_socket_okay(mdev, &msock) && ok;
783                         if (ok)
784                                 break;
785                 }
786
787 retry:
788                 s = drbd_wait_for_connect(mdev);
789                 if (s) {
790                         try = drbd_recv_fp(mdev, s);
791                         drbd_socket_okay(mdev, &sock);
792                         drbd_socket_okay(mdev, &msock);
793                         switch (try) {
794                         case P_HAND_SHAKE_S:
795                                 if (sock) {
796                                         dev_warn(DEV, "initial packet S crossed\n");
797                                         sock_release(sock);
798                                 }
799                                 sock = s;
800                                 break;
801                         case P_HAND_SHAKE_M:
802                                 if (msock) {
803                                         dev_warn(DEV, "initial packet M crossed\n");
804                                         sock_release(msock);
805                                 }
806                                 msock = s;
807                                 set_bit(DISCARD_CONCURRENT, &mdev->flags);
808                                 break;
809                         default:
810                                 dev_warn(DEV, "Error receiving initial packet\n");
811                                 sock_release(s);
812                                 if (random32() & 1)
813                                         goto retry;
814                         }
815                 }
816
817                 if (mdev->state.conn <= C_DISCONNECTING)
818                         goto out_release_sockets;
819                 if (signal_pending(current)) {
820                         flush_signals(current);
821                         smp_rmb();
822                         if (get_t_state(&mdev->receiver) == Exiting)
823                                 goto out_release_sockets;
824                 }
825
826                 if (sock && msock) {
827                         ok = drbd_socket_okay(mdev, &sock);
828                         ok = drbd_socket_okay(mdev, &msock) && ok;
829                         if (ok)
830                                 break;
831                 }
832         } while (1);
833
834         msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
835         sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
836
837         sock->sk->sk_allocation = GFP_NOIO;
838         msock->sk->sk_allocation = GFP_NOIO;
839
840         sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
841         msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
842
843         if (mdev->net_conf->sndbuf_size) {
844                 sock->sk->sk_sndbuf = mdev->net_conf->sndbuf_size;
845                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
846         }
847
848         if (mdev->net_conf->rcvbuf_size) {
849                 sock->sk->sk_rcvbuf = mdev->net_conf->rcvbuf_size;
850                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
851         }
852
853         /* NOT YET ...
854          * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
855          * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
856          * first set it to the P_HAND_SHAKE timeout,
857          * which we set to 4x the configured ping_timeout. */
858         sock->sk->sk_sndtimeo =
859         sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
860
861         msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
862         msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
863
864         /* we don't want delays.
865          * we use TCP_CORK where apropriate, though */
866         drbd_tcp_nodelay(sock);
867         drbd_tcp_nodelay(msock);
868
869         mdev->data.socket = sock;
870         mdev->meta.socket = msock;
871         mdev->last_received = jiffies;
872
873         D_ASSERT(mdev->asender.task == NULL);
874
875         h = drbd_do_handshake(mdev);
876         if (h <= 0)
877                 return h;
878
879         if (mdev->cram_hmac_tfm) {
880                 /* drbd_request_state(mdev, NS(conn, WFAuth)); */
881                 if (!drbd_do_auth(mdev)) {
882                         dev_err(DEV, "Authentication of peer failed\n");
883                         return -1;
884                 }
885         }
886
887         if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
888                 return 0;
889
890         sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
891         sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
892
893         atomic_set(&mdev->packet_seq, 0);
894         mdev->peer_seq = 0;
895
896         drbd_thread_start(&mdev->asender);
897
898         drbd_send_protocol(mdev);
899         drbd_send_sync_param(mdev, &mdev->sync_conf);
900         drbd_send_sizes(mdev, 0);
901         drbd_send_uuids(mdev);
902         drbd_send_state(mdev);
903         clear_bit(USE_DEGR_WFC_T, &mdev->flags);
904         clear_bit(RESIZE_PENDING, &mdev->flags);
905
906         return 1;
907
908 out_release_sockets:
909         if (sock)
910                 sock_release(sock);
911         if (msock)
912                 sock_release(msock);
913         return -1;
914 }
915
916 static int drbd_recv_header(struct drbd_conf *mdev, struct p_header *h)
917 {
918         int r;
919
920         r = drbd_recv(mdev, h, sizeof(*h));
921
922         if (unlikely(r != sizeof(*h))) {
923                 dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
924                 return FALSE;
925         };
926         h->command = be16_to_cpu(h->command);
927         h->length  = be16_to_cpu(h->length);
928         if (unlikely(h->magic != BE_DRBD_MAGIC)) {
929                 dev_err(DEV, "magic?? on data m: 0x%lx c: %d l: %d\n",
930                     (long)be32_to_cpu(h->magic),
931                     h->command, h->length);
932                 return FALSE;
933         }
934         mdev->last_received = jiffies;
935
936         return TRUE;
937 }
938
939 static enum finish_epoch drbd_flush_after_epoch(struct drbd_conf *mdev, struct drbd_epoch *epoch)
940 {
941         int rv;
942
943         if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
944                 rv = blkdev_issue_flush(mdev->ldev->backing_bdev, NULL);
945                 if (rv) {
946                         dev_err(DEV, "local disk flush failed with status %d\n", rv);
947                         /* would rather check on EOPNOTSUPP, but that is not reliable.
948                          * don't try again for ANY return value != 0
949                          * if (rv == -EOPNOTSUPP) */
950                         drbd_bump_write_ordering(mdev, WO_drain_io);
951                 }
952                 put_ldev(mdev);
953         }
954
955         return drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
956 }
957
958 static int w_flush(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
959 {
960         struct flush_work *fw = (struct flush_work *)w;
961         struct drbd_epoch *epoch = fw->epoch;
962
963         kfree(w);
964
965         if (!test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags))
966                 drbd_flush_after_epoch(mdev, epoch);
967
968         drbd_may_finish_epoch(mdev, epoch, EV_PUT |
969                               (mdev->state.conn < C_CONNECTED ? EV_CLEANUP : 0));
970
971         return 1;
972 }
973
974 /**
975  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
976  * @mdev:       DRBD device.
977  * @epoch:      Epoch object.
978  * @ev:         Epoch event.
979  */
980 static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
981                                                struct drbd_epoch *epoch,
982                                                enum epoch_event ev)
983 {
984         int finish, epoch_size;
985         struct drbd_epoch *next_epoch;
986         int schedule_flush = 0;
987         enum finish_epoch rv = FE_STILL_LIVE;
988
989         spin_lock(&mdev->epoch_lock);
990         do {
991                 next_epoch = NULL;
992                 finish = 0;
993
994                 epoch_size = atomic_read(&epoch->epoch_size);
995
996                 switch (ev & ~EV_CLEANUP) {
997                 case EV_PUT:
998                         atomic_dec(&epoch->active);
999                         break;
1000                 case EV_GOT_BARRIER_NR:
1001                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1002
1003                         /* Special case: If we just switched from WO_bio_barrier to
1004                            WO_bdev_flush we should not finish the current epoch */
1005                         if (test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags) && epoch_size == 1 &&
1006                             mdev->write_ordering != WO_bio_barrier &&
1007                             epoch == mdev->current_epoch)
1008                                 clear_bit(DE_CONTAINS_A_BARRIER, &epoch->flags);
1009                         break;
1010                 case EV_BARRIER_DONE:
1011                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags);
1012                         break;
1013                 case EV_BECAME_LAST:
1014                         /* nothing to do*/
1015                         break;
1016                 }
1017
1018                 if (epoch_size != 0 &&
1019                     atomic_read(&epoch->active) == 0 &&
1020                     test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) &&
1021                     epoch->list.prev == &mdev->current_epoch->list &&
1022                     !test_bit(DE_IS_FINISHING, &epoch->flags)) {
1023                         /* Nearly all conditions are met to finish that epoch... */
1024                         if (test_bit(DE_BARRIER_IN_NEXT_EPOCH_DONE, &epoch->flags) ||
1025                             mdev->write_ordering == WO_none ||
1026                             (epoch_size == 1 && test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) ||
1027                             ev & EV_CLEANUP) {
1028                                 finish = 1;
1029                                 set_bit(DE_IS_FINISHING, &epoch->flags);
1030                         } else if (!test_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags) &&
1031                                  mdev->write_ordering == WO_bio_barrier) {
1032                                 atomic_inc(&epoch->active);
1033                                 schedule_flush = 1;
1034                         }
1035                 }
1036                 if (finish) {
1037                         if (!(ev & EV_CLEANUP)) {
1038                                 spin_unlock(&mdev->epoch_lock);
1039                                 drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1040                                 spin_lock(&mdev->epoch_lock);
1041                         }
1042                         dec_unacked(mdev);
1043
1044                         if (mdev->current_epoch != epoch) {
1045                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1046                                 list_del(&epoch->list);
1047                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1048                                 mdev->epochs--;
1049                                 kfree(epoch);
1050
1051                                 if (rv == FE_STILL_LIVE)
1052                                         rv = FE_DESTROYED;
1053                         } else {
1054                                 epoch->flags = 0;
1055                                 atomic_set(&epoch->epoch_size, 0);
1056                                 /* atomic_set(&epoch->active, 0); is alrady zero */
1057                                 if (rv == FE_STILL_LIVE)
1058                                         rv = FE_RECYCLED;
1059                         }
1060                 }
1061
1062                 if (!next_epoch)
1063                         break;
1064
1065                 epoch = next_epoch;
1066         } while (1);
1067
1068         spin_unlock(&mdev->epoch_lock);
1069
1070         if (schedule_flush) {
1071                 struct flush_work *fw;
1072                 fw = kmalloc(sizeof(*fw), GFP_ATOMIC);
1073                 if (fw) {
1074                         fw->w.cb = w_flush;
1075                         fw->epoch = epoch;
1076                         drbd_queue_work(&mdev->data.work, &fw->w);
1077                 } else {
1078                         dev_warn(DEV, "Could not kmalloc a flush_work obj\n");
1079                         set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1080                         /* That is not a recursion, only one level */
1081                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE);
1082                         drbd_may_finish_epoch(mdev, epoch, EV_PUT);
1083                 }
1084         }
1085
1086         return rv;
1087 }
1088
1089 /**
1090  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1091  * @mdev:       DRBD device.
1092  * @wo:         Write ordering method to try.
1093  */
1094 void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1095 {
1096         enum write_ordering_e pwo;
1097         static char *write_ordering_str[] = {
1098                 [WO_none] = "none",
1099                 [WO_drain_io] = "drain",
1100                 [WO_bdev_flush] = "flush",
1101                 [WO_bio_barrier] = "barrier",
1102         };
1103
1104         pwo = mdev->write_ordering;
1105         wo = min(pwo, wo);
1106         if (wo == WO_bio_barrier && mdev->ldev->dc.no_disk_barrier)
1107                 wo = WO_bdev_flush;
1108         if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1109                 wo = WO_drain_io;
1110         if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1111                 wo = WO_none;
1112         mdev->write_ordering = wo;
1113         if (pwo != mdev->write_ordering || wo == WO_bio_barrier)
1114                 dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1115 }
1116
1117 /**
1118  * w_e_reissue() - Worker callback; Resubmit a bio, without BIO_RW_BARRIER set
1119  * @mdev:       DRBD device.
1120  * @w:          work object.
1121  * @cancel:     The connection will be closed anyways (unused in this callback)
1122  */
1123 int w_e_reissue(struct drbd_conf *mdev, struct drbd_work *w, int cancel) __releases(local)
1124 {
1125         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1126         struct bio *bio = e->private_bio;
1127
1128         /* We leave DE_CONTAINS_A_BARRIER and EE_IS_BARRIER in place,
1129            (and DE_BARRIER_IN_NEXT_EPOCH_ISSUED in the previous Epoch)
1130            so that we can finish that epoch in drbd_may_finish_epoch().
1131            That is necessary if we already have a long chain of Epochs, before
1132            we realize that BIO_RW_BARRIER is actually not supported */
1133
1134         /* As long as the -ENOTSUPP on the barrier is reported immediately
1135            that will never trigger. If it is reported late, we will just
1136            print that warning and continue correctly for all future requests
1137            with WO_bdev_flush */
1138         if (previous_epoch(mdev, e->epoch))
1139                 dev_warn(DEV, "Write ordering was not enforced (one time event)\n");
1140
1141         /* prepare bio for re-submit,
1142          * re-init volatile members */
1143         /* we still have a local reference,
1144          * get_ldev was done in receive_Data. */
1145         bio->bi_bdev = mdev->ldev->backing_bdev;
1146         bio->bi_sector = e->sector;
1147         bio->bi_size = e->size;
1148         bio->bi_idx = 0;
1149
1150         bio->bi_flags &= ~(BIO_POOL_MASK - 1);
1151         bio->bi_flags |= 1 << BIO_UPTODATE;
1152
1153         /* don't know whether this is necessary: */
1154         bio->bi_phys_segments = 0;
1155         bio->bi_next = NULL;
1156
1157         /* these should be unchanged: */
1158         /* bio->bi_end_io = drbd_endio_write_sec; */
1159         /* bio->bi_vcnt = whatever; */
1160
1161         e->w.cb = e_end_block;
1162
1163         /* This is no longer a barrier request. */
1164         bio->bi_rw &= ~(1UL << BIO_RW_BARRIER);
1165
1166         drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, bio);
1167
1168         return 1;
1169 }
1170
1171 static int receive_Barrier(struct drbd_conf *mdev, struct p_header *h)
1172 {
1173         int rv, issue_flush;
1174         struct p_barrier *p = (struct p_barrier *)h;
1175         struct drbd_epoch *epoch;
1176
1177         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
1178
1179         rv = drbd_recv(mdev, h->payload, h->length);
1180         ERR_IF(rv != h->length) return FALSE;
1181
1182         inc_unacked(mdev);
1183
1184         if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1185                 drbd_kick_lo(mdev);
1186
1187         mdev->current_epoch->barrier_nr = p->barrier;
1188         rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1189
1190         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1191          * the activity log, which means it would not be resynced in case the
1192          * R_PRIMARY crashes now.
1193          * Therefore we must send the barrier_ack after the barrier request was
1194          * completed. */
1195         switch (mdev->write_ordering) {
1196         case WO_bio_barrier:
1197         case WO_none:
1198                 if (rv == FE_RECYCLED)
1199                         return TRUE;
1200                 break;
1201
1202         case WO_bdev_flush:
1203         case WO_drain_io:
1204                 D_ASSERT(rv == FE_STILL_LIVE);
1205                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &mdev->current_epoch->flags);
1206                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1207                 rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1208                 if (rv == FE_RECYCLED)
1209                         return TRUE;
1210
1211                 /* The asender will send all the ACKs and barrier ACKs out, since
1212                    all EEs moved from the active_ee to the done_ee. We need to
1213                    provide a new epoch object for the EEs that come in soon */
1214                 break;
1215         }
1216
1217         /* receiver context, in the writeout path of the other node.
1218          * avoid potential distributed deadlock */
1219         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1220         if (!epoch) {
1221                 dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1222                 issue_flush = !test_and_set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1223                 drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1224                 if (issue_flush) {
1225                         rv = drbd_flush_after_epoch(mdev, mdev->current_epoch);
1226                         if (rv == FE_RECYCLED)
1227                                 return TRUE;
1228                 }
1229
1230                 drbd_wait_ee_list_empty(mdev, &mdev->done_ee);
1231
1232                 return TRUE;
1233         }
1234
1235         epoch->flags = 0;
1236         atomic_set(&epoch->epoch_size, 0);
1237         atomic_set(&epoch->active, 0);
1238
1239         spin_lock(&mdev->epoch_lock);
1240         if (atomic_read(&mdev->current_epoch->epoch_size)) {
1241                 list_add(&epoch->list, &mdev->current_epoch->list);
1242                 mdev->current_epoch = epoch;
1243                 mdev->epochs++;
1244         } else {
1245                 /* The current_epoch got recycled while we allocated this one... */
1246                 kfree(epoch);
1247         }
1248         spin_unlock(&mdev->epoch_lock);
1249
1250         return TRUE;
1251 }
1252
1253 /* used from receive_RSDataReply (recv_resync_read)
1254  * and from receive_Data */
1255 static struct drbd_epoch_entry *
1256 read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1257 {
1258         struct drbd_epoch_entry *e;
1259         struct bio_vec *bvec;
1260         struct page *page;
1261         struct bio *bio;
1262         int dgs, ds, i, rr;
1263         void *dig_in = mdev->int_dig_in;
1264         void *dig_vv = mdev->int_dig_vv;
1265
1266         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1267                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1268
1269         if (dgs) {
1270                 rr = drbd_recv(mdev, dig_in, dgs);
1271                 if (rr != dgs) {
1272                         dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1273                              rr, dgs);
1274                         return NULL;
1275                 }
1276         }
1277
1278         data_size -= dgs;
1279
1280         ERR_IF(data_size &  0x1ff) return NULL;
1281         ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1282
1283         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1284          * "criss-cross" setup, that might cause write-out on some other DRBD,
1285          * which in turn might block on the other node at this very place.  */
1286         e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1287         if (!e)
1288                 return NULL;
1289         bio = e->private_bio;
1290         ds = data_size;
1291         bio_for_each_segment(bvec, bio, i) {
1292                 page = bvec->bv_page;
1293                 rr = drbd_recv(mdev, kmap(page), min_t(int, ds, PAGE_SIZE));
1294                 kunmap(page);
1295                 if (rr != min_t(int, ds, PAGE_SIZE)) {
1296                         drbd_free_ee(mdev, e);
1297                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1298                              rr, min_t(int, ds, PAGE_SIZE));
1299                         return NULL;
1300                 }
1301                 ds -= rr;
1302         }
1303
1304         if (dgs) {
1305                 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1306                 if (memcmp(dig_in, dig_vv, dgs)) {
1307                         dev_err(DEV, "Digest integrity check FAILED.\n");
1308                         drbd_bcast_ee(mdev, "digest failed",
1309                                         dgs, dig_in, dig_vv, e);
1310                         drbd_free_ee(mdev, e);
1311                         return NULL;
1312                 }
1313         }
1314         mdev->recv_cnt += data_size>>9;
1315         return e;
1316 }
1317
1318 /* drbd_drain_block() just takes a data block
1319  * out of the socket input buffer, and discards it.
1320  */
1321 static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1322 {
1323         struct page *page;
1324         int rr, rv = 1;
1325         void *data;
1326
1327         page = drbd_pp_alloc(mdev, 1);
1328
1329         data = kmap(page);
1330         while (data_size) {
1331                 rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1332                 if (rr != min_t(int, data_size, PAGE_SIZE)) {
1333                         rv = 0;
1334                         dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1335                              rr, min_t(int, data_size, PAGE_SIZE));
1336                         break;
1337                 }
1338                 data_size -= rr;
1339         }
1340         kunmap(page);
1341         drbd_pp_free(mdev, page);
1342         return rv;
1343 }
1344
1345 static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1346                            sector_t sector, int data_size)
1347 {
1348         struct bio_vec *bvec;
1349         struct bio *bio;
1350         int dgs, rr, i, expect;
1351         void *dig_in = mdev->int_dig_in;
1352         void *dig_vv = mdev->int_dig_vv;
1353
1354         dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1355                 crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1356
1357         if (dgs) {
1358                 rr = drbd_recv(mdev, dig_in, dgs);
1359                 if (rr != dgs) {
1360                         dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1361                              rr, dgs);
1362                         return 0;
1363                 }
1364         }
1365
1366         data_size -= dgs;
1367
1368         /* optimistically update recv_cnt.  if receiving fails below,
1369          * we disconnect anyways, and counters will be reset. */
1370         mdev->recv_cnt += data_size>>9;
1371
1372         bio = req->master_bio;
1373         D_ASSERT(sector == bio->bi_sector);
1374
1375         bio_for_each_segment(bvec, bio, i) {
1376                 expect = min_t(int, data_size, bvec->bv_len);
1377                 rr = drbd_recv(mdev,
1378                              kmap(bvec->bv_page)+bvec->bv_offset,
1379                              expect);
1380                 kunmap(bvec->bv_page);
1381                 if (rr != expect) {
1382                         dev_warn(DEV, "short read receiving data reply: "
1383                              "read %d expected %d\n",
1384                              rr, expect);
1385                         return 0;
1386                 }
1387                 data_size -= rr;
1388         }
1389
1390         if (dgs) {
1391                 drbd_csum(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1392                 if (memcmp(dig_in, dig_vv, dgs)) {
1393                         dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1394                         return 0;
1395                 }
1396         }
1397
1398         D_ASSERT(data_size == 0);
1399         return 1;
1400 }
1401
1402 /* e_end_resync_block() is called via
1403  * drbd_process_done_ee() by asender only */
1404 static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1405 {
1406         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1407         sector_t sector = e->sector;
1408         int ok;
1409
1410         D_ASSERT(hlist_unhashed(&e->colision));
1411
1412         if (likely(drbd_bio_uptodate(e->private_bio))) {
1413                 drbd_set_in_sync(mdev, sector, e->size);
1414                 ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1415         } else {
1416                 /* Record failure to sync */
1417                 drbd_rs_failed_io(mdev, sector, e->size);
1418
1419                 ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1420         }
1421         dec_unacked(mdev);
1422
1423         return ok;
1424 }
1425
1426 static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1427 {
1428         struct drbd_epoch_entry *e;
1429
1430         e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1431         if (!e) {
1432                 put_ldev(mdev);
1433                 return FALSE;
1434         }
1435
1436         dec_rs_pending(mdev);
1437
1438         e->private_bio->bi_end_io = drbd_endio_write_sec;
1439         e->private_bio->bi_rw = WRITE;
1440         e->w.cb = e_end_resync_block;
1441
1442         inc_unacked(mdev);
1443         /* corresponding dec_unacked() in e_end_resync_block()
1444          * respective _drbd_clear_done_ee */
1445
1446         spin_lock_irq(&mdev->req_lock);
1447         list_add(&e->w.list, &mdev->sync_ee);
1448         spin_unlock_irq(&mdev->req_lock);
1449
1450         drbd_generic_make_request(mdev, DRBD_FAULT_RS_WR, e->private_bio);
1451         /* accounting done in endio */
1452
1453         maybe_kick_lo(mdev);
1454         return TRUE;
1455 }
1456
1457 static int receive_DataReply(struct drbd_conf *mdev, struct p_header *h)
1458 {
1459         struct drbd_request *req;
1460         sector_t sector;
1461         unsigned int header_size, data_size;
1462         int ok;
1463         struct p_data *p = (struct p_data *)h;
1464
1465         header_size = sizeof(*p) - sizeof(*h);
1466         data_size   = h->length  - header_size;
1467
1468         ERR_IF(data_size == 0) return FALSE;
1469
1470         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1471                 return FALSE;
1472
1473         sector = be64_to_cpu(p->sector);
1474
1475         spin_lock_irq(&mdev->req_lock);
1476         req = _ar_id_to_req(mdev, p->block_id, sector);
1477         spin_unlock_irq(&mdev->req_lock);
1478         if (unlikely(!req)) {
1479                 dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1480                 return FALSE;
1481         }
1482
1483         /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1484          * special casing it there for the various failure cases.
1485          * still no race with drbd_fail_pending_reads */
1486         ok = recv_dless_read(mdev, req, sector, data_size);
1487
1488         if (ok)
1489                 req_mod(req, data_received);
1490         /* else: nothing. handled from drbd_disconnect...
1491          * I don't think we may complete this just yet
1492          * in case we are "on-disconnect: freeze" */
1493
1494         return ok;
1495 }
1496
1497 static int receive_RSDataReply(struct drbd_conf *mdev, struct p_header *h)
1498 {
1499         sector_t sector;
1500         unsigned int header_size, data_size;
1501         int ok;
1502         struct p_data *p = (struct p_data *)h;
1503
1504         header_size = sizeof(*p) - sizeof(*h);
1505         data_size   = h->length  - header_size;
1506
1507         ERR_IF(data_size == 0) return FALSE;
1508
1509         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1510                 return FALSE;
1511
1512         sector = be64_to_cpu(p->sector);
1513         D_ASSERT(p->block_id == ID_SYNCER);
1514
1515         if (get_ldev(mdev)) {
1516                 /* data is submitted to disk within recv_resync_read.
1517                  * corresponding put_ldev done below on error,
1518                  * or in drbd_endio_write_sec. */
1519                 ok = recv_resync_read(mdev, sector, data_size);
1520         } else {
1521                 if (__ratelimit(&drbd_ratelimit_state))
1522                         dev_err(DEV, "Can not write resync data to local disk.\n");
1523
1524                 ok = drbd_drain_block(mdev, data_size);
1525
1526                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1527         }
1528
1529         return ok;
1530 }
1531
1532 /* e_end_block() is called via drbd_process_done_ee().
1533  * this means this function only runs in the asender thread
1534  */
1535 static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1536 {
1537         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1538         sector_t sector = e->sector;
1539         struct drbd_epoch *epoch;
1540         int ok = 1, pcmd;
1541
1542         if (e->flags & EE_IS_BARRIER) {
1543                 epoch = previous_epoch(mdev, e->epoch);
1544                 if (epoch)
1545                         drbd_may_finish_epoch(mdev, epoch, EV_BARRIER_DONE + (cancel ? EV_CLEANUP : 0));
1546         }
1547
1548         if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1549                 if (likely(drbd_bio_uptodate(e->private_bio))) {
1550                         pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1551                                 mdev->state.conn <= C_PAUSED_SYNC_T &&
1552                                 e->flags & EE_MAY_SET_IN_SYNC) ?
1553                                 P_RS_WRITE_ACK : P_WRITE_ACK;
1554                         ok &= drbd_send_ack(mdev, pcmd, e);
1555                         if (pcmd == P_RS_WRITE_ACK)
1556                                 drbd_set_in_sync(mdev, sector, e->size);
1557                 } else {
1558                         ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1559                         /* we expect it to be marked out of sync anyways...
1560                          * maybe assert this?  */
1561                 }
1562                 dec_unacked(mdev);
1563         }
1564         /* we delete from the conflict detection hash _after_ we sent out the
1565          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1566         if (mdev->net_conf->two_primaries) {
1567                 spin_lock_irq(&mdev->req_lock);
1568                 D_ASSERT(!hlist_unhashed(&e->colision));
1569                 hlist_del_init(&e->colision);
1570                 spin_unlock_irq(&mdev->req_lock);
1571         } else {
1572                 D_ASSERT(hlist_unhashed(&e->colision));
1573         }
1574
1575         drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1576
1577         return ok;
1578 }
1579
1580 static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1581 {
1582         struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1583         int ok = 1;
1584
1585         D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1586         ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1587
1588         spin_lock_irq(&mdev->req_lock);
1589         D_ASSERT(!hlist_unhashed(&e->colision));
1590         hlist_del_init(&e->colision);
1591         spin_unlock_irq(&mdev->req_lock);
1592
1593         dec_unacked(mdev);
1594
1595         return ok;
1596 }
1597
1598 /* Called from receive_Data.
1599  * Synchronize packets on sock with packets on msock.
1600  *
1601  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1602  * packet traveling on msock, they are still processed in the order they have
1603  * been sent.
1604  *
1605  * Note: we don't care for Ack packets overtaking P_DATA packets.
1606  *
1607  * In case packet_seq is larger than mdev->peer_seq number, there are
1608  * outstanding packets on the msock. We wait for them to arrive.
1609  * In case we are the logically next packet, we update mdev->peer_seq
1610  * ourselves. Correctly handles 32bit wrap around.
1611  *
1612  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1613  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1614  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1615  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1616  *
1617  * returns 0 if we may process the packet,
1618  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1619 static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1620 {
1621         DEFINE_WAIT(wait);
1622         unsigned int p_seq;
1623         long timeout;
1624         int ret = 0;
1625         spin_lock(&mdev->peer_seq_lock);
1626         for (;;) {
1627                 prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1628                 if (seq_le(packet_seq, mdev->peer_seq+1))
1629                         break;
1630                 if (signal_pending(current)) {
1631                         ret = -ERESTARTSYS;
1632                         break;
1633                 }
1634                 p_seq = mdev->peer_seq;
1635                 spin_unlock(&mdev->peer_seq_lock);
1636                 timeout = schedule_timeout(30*HZ);
1637                 spin_lock(&mdev->peer_seq_lock);
1638                 if (timeout == 0 && p_seq == mdev->peer_seq) {
1639                         ret = -ETIMEDOUT;
1640                         dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1641                         break;
1642                 }
1643         }
1644         finish_wait(&mdev->seq_wait, &wait);
1645         if (mdev->peer_seq+1 == packet_seq)
1646                 mdev->peer_seq++;
1647         spin_unlock(&mdev->peer_seq_lock);
1648         return ret;
1649 }
1650
1651 /* mirrored write */
1652 static int receive_Data(struct drbd_conf *mdev, struct p_header *h)
1653 {
1654         sector_t sector;
1655         struct drbd_epoch_entry *e;
1656         struct p_data *p = (struct p_data *)h;
1657         int header_size, data_size;
1658         int rw = WRITE;
1659         u32 dp_flags;
1660
1661         header_size = sizeof(*p) - sizeof(*h);
1662         data_size   = h->length  - header_size;
1663
1664         ERR_IF(data_size == 0) return FALSE;
1665
1666         if (drbd_recv(mdev, h->payload, header_size) != header_size)
1667                 return FALSE;
1668
1669         if (!get_ldev(mdev)) {
1670                 if (__ratelimit(&drbd_ratelimit_state))
1671                         dev_err(DEV, "Can not write mirrored data block "
1672                             "to local disk.\n");
1673                 spin_lock(&mdev->peer_seq_lock);
1674                 if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1675                         mdev->peer_seq++;
1676                 spin_unlock(&mdev->peer_seq_lock);
1677
1678                 drbd_send_ack_dp(mdev, P_NEG_ACK, p);
1679                 atomic_inc(&mdev->current_epoch->epoch_size);
1680                 return drbd_drain_block(mdev, data_size);
1681         }
1682
1683         /* get_ldev(mdev) successful.
1684          * Corresponding put_ldev done either below (on various errors),
1685          * or in drbd_endio_write_sec, if we successfully submit the data at
1686          * the end of this function. */
1687
1688         sector = be64_to_cpu(p->sector);
1689         e = read_in_block(mdev, p->block_id, sector, data_size);
1690         if (!e) {
1691                 put_ldev(mdev);
1692                 return FALSE;
1693         }
1694
1695         e->private_bio->bi_end_io = drbd_endio_write_sec;
1696         e->w.cb = e_end_block;
1697
1698         spin_lock(&mdev->epoch_lock);
1699         e->epoch = mdev->current_epoch;
1700         atomic_inc(&e->epoch->epoch_size);
1701         atomic_inc(&e->epoch->active);
1702
1703         if (mdev->write_ordering == WO_bio_barrier && atomic_read(&e->epoch->epoch_size) == 1) {
1704                 struct drbd_epoch *epoch;
1705                 /* Issue a barrier if we start a new epoch, and the previous epoch
1706                    was not a epoch containing a single request which already was
1707                    a Barrier. */
1708                 epoch = list_entry(e->epoch->list.prev, struct drbd_epoch, list);
1709                 if (epoch == e->epoch) {
1710                         set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1711                         rw |= (1<<BIO_RW_BARRIER);
1712                         e->flags |= EE_IS_BARRIER;
1713                 } else {
1714                         if (atomic_read(&epoch->epoch_size) > 1 ||
1715                             !test_bit(DE_CONTAINS_A_BARRIER, &epoch->flags)) {
1716                                 set_bit(DE_BARRIER_IN_NEXT_EPOCH_ISSUED, &epoch->flags);
1717                                 set_bit(DE_CONTAINS_A_BARRIER, &e->epoch->flags);
1718                                 rw |= (1<<BIO_RW_BARRIER);
1719                                 e->flags |= EE_IS_BARRIER;
1720                         }
1721                 }
1722         }
1723         spin_unlock(&mdev->epoch_lock);
1724
1725         dp_flags = be32_to_cpu(p->dp_flags);
1726         if (dp_flags & DP_HARDBARRIER) {
1727                 dev_err(DEV, "ASSERT FAILED would have submitted barrier request\n");
1728                 /* rw |= (1<<BIO_RW_BARRIER); */
1729         }
1730         if (dp_flags & DP_RW_SYNC)
1731                 rw |= (1<<BIO_RW_SYNCIO) | (1<<BIO_RW_UNPLUG);
1732         if (dp_flags & DP_MAY_SET_IN_SYNC)
1733                 e->flags |= EE_MAY_SET_IN_SYNC;
1734
1735         /* I'm the receiver, I do hold a net_cnt reference. */
1736         if (!mdev->net_conf->two_primaries) {
1737                 spin_lock_irq(&mdev->req_lock);
1738         } else {
1739                 /* don't get the req_lock yet,
1740                  * we may sleep in drbd_wait_peer_seq */
1741                 const int size = e->size;
1742                 const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1743                 DEFINE_WAIT(wait);
1744                 struct drbd_request *i;
1745                 struct hlist_node *n;
1746                 struct hlist_head *slot;
1747                 int first;
1748
1749                 D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1750                 BUG_ON(mdev->ee_hash == NULL);
1751                 BUG_ON(mdev->tl_hash == NULL);
1752
1753                 /* conflict detection and handling:
1754                  * 1. wait on the sequence number,
1755                  *    in case this data packet overtook ACK packets.
1756                  * 2. check our hash tables for conflicting requests.
1757                  *    we only need to walk the tl_hash, since an ee can not
1758                  *    have a conflict with an other ee: on the submitting
1759                  *    node, the corresponding req had already been conflicting,
1760                  *    and a conflicting req is never sent.
1761                  *
1762                  * Note: for two_primaries, we are protocol C,
1763                  * so there cannot be any request that is DONE
1764                  * but still on the transfer log.
1765                  *
1766                  * unconditionally add to the ee_hash.
1767                  *
1768                  * if no conflicting request is found:
1769                  *    submit.
1770                  *
1771                  * if any conflicting request is found
1772                  * that has not yet been acked,
1773                  * AND I have the "discard concurrent writes" flag:
1774                  *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1775                  *
1776                  * if any conflicting request is found:
1777                  *       block the receiver, waiting on misc_wait
1778                  *       until no more conflicting requests are there,
1779                  *       or we get interrupted (disconnect).
1780                  *
1781                  *       we do not just write after local io completion of those
1782                  *       requests, but only after req is done completely, i.e.
1783                  *       we wait for the P_DISCARD_ACK to arrive!
1784                  *
1785                  *       then proceed normally, i.e. submit.
1786                  */
1787                 if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1788                         goto out_interrupted;
1789
1790                 spin_lock_irq(&mdev->req_lock);
1791
1792                 hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1793
1794 #define OVERLAPS overlaps(i->sector, i->size, sector, size)
1795                 slot = tl_hash_slot(mdev, sector);
1796                 first = 1;
1797                 for (;;) {
1798                         int have_unacked = 0;
1799                         int have_conflict = 0;
1800                         prepare_to_wait(&mdev->misc_wait, &wait,
1801                                 TASK_INTERRUPTIBLE);
1802                         hlist_for_each_entry(i, n, slot, colision) {
1803                                 if (OVERLAPS) {
1804                                         /* only ALERT on first iteration,
1805                                          * we may be woken up early... */
1806                                         if (first)
1807                                                 dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1808                                                       " new: %llus +%u; pending: %llus +%u\n",
1809                                                       current->comm, current->pid,
1810                                                       (unsigned long long)sector, size,
1811                                                       (unsigned long long)i->sector, i->size);
1812                                         if (i->rq_state & RQ_NET_PENDING)
1813                                                 ++have_unacked;
1814                                         ++have_conflict;
1815                                 }
1816                         }
1817 #undef OVERLAPS
1818                         if (!have_conflict)
1819                                 break;
1820
1821                         /* Discard Ack only for the _first_ iteration */
1822                         if (first && discard && have_unacked) {
1823                                 dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1824                                      (unsigned long long)sector);
1825                                 inc_unacked(mdev);
1826                                 e->w.cb = e_send_discard_ack;
1827                                 list_add_tail(&e->w.list, &mdev->done_ee);
1828
1829                                 spin_unlock_irq(&mdev->req_lock);
1830
1831                                 /* we could probably send that P_DISCARD_ACK ourselves,
1832                                  * but I don't like the receiver using the msock */
1833
1834                                 put_ldev(mdev);
1835                                 wake_asender(mdev);
1836                                 finish_wait(&mdev->misc_wait, &wait);
1837                                 return TRUE;
1838                         }
1839
1840                         if (signal_pending(current)) {
1841                                 hlist_del_init(&e->colision);
1842
1843                                 spin_unlock_irq(&mdev->req_lock);
1844
1845                                 finish_wait(&mdev->misc_wait, &wait);
1846                                 goto out_interrupted;
1847                         }
1848
1849                         spin_unlock_irq(&mdev->req_lock);
1850                         if (first) {
1851                                 first = 0;
1852                                 dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1853                                      "sec=%llus\n", (unsigned long long)sector);
1854                         } else if (discard) {
1855                                 /* we had none on the first iteration.
1856                                  * there must be none now. */
1857                                 D_ASSERT(have_unacked == 0);
1858                         }
1859                         schedule();
1860                         spin_lock_irq(&mdev->req_lock);
1861                 }
1862                 finish_wait(&mdev->misc_wait, &wait);
1863         }
1864
1865         list_add(&e->w.list, &mdev->active_ee);
1866         spin_unlock_irq(&mdev->req_lock);
1867
1868         switch (mdev->net_conf->wire_protocol) {
1869         case DRBD_PROT_C:
1870                 inc_unacked(mdev);
1871                 /* corresponding dec_unacked() in e_end_block()
1872                  * respective _drbd_clear_done_ee */
1873                 break;
1874         case DRBD_PROT_B:
1875                 /* I really don't like it that the receiver thread
1876                  * sends on the msock, but anyways */
1877                 drbd_send_ack(mdev, P_RECV_ACK, e);
1878                 break;
1879         case DRBD_PROT_A:
1880                 /* nothing to do */
1881                 break;
1882         }
1883
1884         if (mdev->state.pdsk == D_DISKLESS) {
1885                 /* In case we have the only disk of the cluster, */
1886                 drbd_set_out_of_sync(mdev, e->sector, e->size);
1887                 e->flags |= EE_CALL_AL_COMPLETE_IO;
1888                 drbd_al_begin_io(mdev, e->sector);
1889         }
1890
1891         e->private_bio->bi_rw = rw;
1892         drbd_generic_make_request(mdev, DRBD_FAULT_DT_WR, e->private_bio);
1893         /* accounting done in endio */
1894
1895         maybe_kick_lo(mdev);
1896         return TRUE;
1897
1898 out_interrupted:
1899         /* yes, the epoch_size now is imbalanced.
1900          * but we drop the connection anyways, so we don't have a chance to
1901          * receive a barrier... atomic_inc(&mdev->epoch_size); */
1902         put_ldev(mdev);
1903         drbd_free_ee(mdev, e);
1904         return FALSE;
1905 }
1906
1907 static int receive_DataRequest(struct drbd_conf *mdev, struct p_header *h)
1908 {
1909         sector_t sector;
1910         const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1911         struct drbd_epoch_entry *e;
1912         struct digest_info *di = NULL;
1913         int size, digest_size;
1914         unsigned int fault_type;
1915         struct p_block_req *p =
1916                 (struct p_block_req *)h;
1917         const int brps = sizeof(*p)-sizeof(*h);
1918
1919         if (drbd_recv(mdev, h->payload, brps) != brps)
1920                 return FALSE;
1921
1922         sector = be64_to_cpu(p->sector);
1923         size   = be32_to_cpu(p->blksize);
1924
1925         if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1926                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1927                                 (unsigned long long)sector, size);
1928                 return FALSE;
1929         }
1930         if (sector + (size>>9) > capacity) {
1931                 dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1932                                 (unsigned long long)sector, size);
1933                 return FALSE;
1934         }
1935
1936         if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1937                 if (__ratelimit(&drbd_ratelimit_state))
1938                         dev_err(DEV, "Can not satisfy peer's read request, "
1939                             "no local data.\n");
1940                 drbd_send_ack_rp(mdev, h->command == P_DATA_REQUEST ? P_NEG_DREPLY :
1941                                  P_NEG_RS_DREPLY , p);
1942                 return TRUE;
1943         }
1944
1945         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1946          * "criss-cross" setup, that might cause write-out on some other DRBD,
1947          * which in turn might block on the other node at this very place.  */
1948         e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1949         if (!e) {
1950                 put_ldev(mdev);
1951                 return FALSE;
1952         }
1953
1954         e->private_bio->bi_rw = READ;
1955         e->private_bio->bi_end_io = drbd_endio_read_sec;
1956
1957         switch (h->command) {
1958         case P_DATA_REQUEST:
1959                 e->w.cb = w_e_end_data_req;
1960                 fault_type = DRBD_FAULT_DT_RD;
1961                 break;
1962         case P_RS_DATA_REQUEST:
1963                 e->w.cb = w_e_end_rsdata_req;
1964                 fault_type = DRBD_FAULT_RS_RD;
1965                 /* Eventually this should become asynchronously. Currently it
1966                  * blocks the whole receiver just to delay the reading of a
1967                  * resync data block.
1968                  * the drbd_work_queue mechanism is made for this...
1969                  */
1970                 if (!drbd_rs_begin_io(mdev, sector)) {
1971                         /* we have been interrupted,
1972                          * probably connection lost! */
1973                         D_ASSERT(signal_pending(current));
1974                         goto out_free_e;
1975                 }
1976                 break;
1977
1978         case P_OV_REPLY:
1979         case P_CSUM_RS_REQUEST:
1980                 fault_type = DRBD_FAULT_RS_RD;
1981                 digest_size = h->length - brps ;
1982                 di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
1983                 if (!di)
1984                         goto out_free_e;
1985
1986                 di->digest_size = digest_size;
1987                 di->digest = (((char *)di)+sizeof(struct digest_info));
1988
1989                 if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
1990                         goto out_free_e;
1991
1992                 e->block_id = (u64)(unsigned long)di;
1993                 if (h->command == P_CSUM_RS_REQUEST) {
1994                         D_ASSERT(mdev->agreed_pro_version >= 89);
1995                         e->w.cb = w_e_end_csum_rs_req;
1996                 } else if (h->command == P_OV_REPLY) {
1997                         e->w.cb = w_e_end_ov_reply;
1998                         dec_rs_pending(mdev);
1999                         break;
2000                 }
2001
2002                 if (!drbd_rs_begin_io(mdev, sector)) {
2003                         /* we have been interrupted, probably connection lost! */
2004                         D_ASSERT(signal_pending(current));
2005                         goto out_free_e;
2006                 }
2007                 break;
2008
2009         case P_OV_REQUEST:
2010                 if (mdev->state.conn >= C_CONNECTED &&
2011                     mdev->state.conn != C_VERIFY_T)
2012                         dev_warn(DEV, "ASSERT FAILED: got P_OV_REQUEST while being %s\n",
2013                                 drbd_conn_str(mdev->state.conn));
2014                 if (mdev->ov_start_sector == ~(sector_t)0 &&
2015                     mdev->agreed_pro_version >= 90) {
2016                         mdev->ov_start_sector = sector;
2017                         mdev->ov_position = sector;
2018                         mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2019                         dev_info(DEV, "Online Verify start sector: %llu\n",
2020                                         (unsigned long long)sector);
2021                 }
2022                 e->w.cb = w_e_end_ov_req;
2023                 fault_type = DRBD_FAULT_RS_RD;
2024                 /* Eventually this should become asynchronous. Currently it
2025                  * blocks the whole receiver just to delay the reading of a
2026                  * resync data block.
2027                  * the drbd_work_queue mechanism is made for this...
2028                  */
2029                 if (!drbd_rs_begin_io(mdev, sector)) {
2030                         /* we have been interrupted,
2031                          * probably connection lost! */
2032                         D_ASSERT(signal_pending(current));
2033                         goto out_free_e;
2034                 }
2035                 break;
2036
2037
2038         default:
2039                 dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2040                     cmdname(h->command));
2041                 fault_type = DRBD_FAULT_MAX;
2042         }
2043
2044         spin_lock_irq(&mdev->req_lock);
2045         list_add(&e->w.list, &mdev->read_ee);
2046         spin_unlock_irq(&mdev->req_lock);
2047
2048         inc_unacked(mdev);
2049
2050         drbd_generic_make_request(mdev, fault_type, e->private_bio);
2051         maybe_kick_lo(mdev);
2052
2053         return TRUE;
2054
2055 out_free_e:
2056         kfree(di);
2057         put_ldev(mdev);
2058         drbd_free_ee(mdev, e);
2059         return FALSE;
2060 }
2061
2062 static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2063 {
2064         int self, peer, rv = -100;
2065         unsigned long ch_self, ch_peer;
2066
2067         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2068         peer = mdev->p_uuid[UI_BITMAP] & 1;
2069
2070         ch_peer = mdev->p_uuid[UI_SIZE];
2071         ch_self = mdev->comm_bm_set;
2072
2073         switch (mdev->net_conf->after_sb_0p) {
2074         case ASB_CONSENSUS:
2075         case ASB_DISCARD_SECONDARY:
2076         case ASB_CALL_HELPER:
2077                 dev_err(DEV, "Configuration error.\n");
2078                 break;
2079         case ASB_DISCONNECT:
2080                 break;
2081         case ASB_DISCARD_YOUNGER_PRI:
2082                 if (self == 0 && peer == 1) {
2083                         rv = -1;
2084                         break;
2085                 }
2086                 if (self == 1 && peer == 0) {
2087                         rv =  1;
2088                         break;
2089                 }
2090                 /* Else fall through to one of the other strategies... */
2091         case ASB_DISCARD_OLDER_PRI:
2092                 if (self == 0 && peer == 1) {
2093                         rv = 1;
2094                         break;
2095                 }
2096                 if (self == 1 && peer == 0) {
2097                         rv = -1;
2098                         break;
2099                 }
2100                 /* Else fall through to one of the other strategies... */
2101                 dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2102                      "Using discard-least-changes instead\n");
2103         case ASB_DISCARD_ZERO_CHG:
2104                 if (ch_peer == 0 && ch_self == 0) {
2105                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2106                                 ? -1 : 1;
2107                         break;
2108                 } else {
2109                         if (ch_peer == 0) { rv =  1; break; }
2110                         if (ch_self == 0) { rv = -1; break; }
2111                 }
2112                 if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2113                         break;
2114         case ASB_DISCARD_LEAST_CHG:
2115                 if      (ch_self < ch_peer)
2116                         rv = -1;
2117                 else if (ch_self > ch_peer)
2118                         rv =  1;
2119                 else /* ( ch_self == ch_peer ) */
2120                      /* Well, then use something else. */
2121                         rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2122                                 ? -1 : 1;
2123                 break;
2124         case ASB_DISCARD_LOCAL:
2125                 rv = -1;
2126                 break;
2127         case ASB_DISCARD_REMOTE:
2128                 rv =  1;
2129         }
2130
2131         return rv;
2132 }
2133
2134 static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2135 {
2136         int self, peer, hg, rv = -100;
2137
2138         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2139         peer = mdev->p_uuid[UI_BITMAP] & 1;
2140
2141         switch (mdev->net_conf->after_sb_1p) {
2142         case ASB_DISCARD_YOUNGER_PRI:
2143         case ASB_DISCARD_OLDER_PRI:
2144         case ASB_DISCARD_LEAST_CHG:
2145         case ASB_DISCARD_LOCAL:
2146         case ASB_DISCARD_REMOTE:
2147                 dev_err(DEV, "Configuration error.\n");
2148                 break;
2149         case ASB_DISCONNECT:
2150                 break;
2151         case ASB_CONSENSUS:
2152                 hg = drbd_asb_recover_0p(mdev);
2153                 if (hg == -1 && mdev->state.role == R_SECONDARY)
2154                         rv = hg;
2155                 if (hg == 1  && mdev->state.role == R_PRIMARY)
2156                         rv = hg;
2157                 break;
2158         case ASB_VIOLENTLY:
2159                 rv = drbd_asb_recover_0p(mdev);
2160                 break;
2161         case ASB_DISCARD_SECONDARY:
2162                 return mdev->state.role == R_PRIMARY ? 1 : -1;
2163         case ASB_CALL_HELPER:
2164                 hg = drbd_asb_recover_0p(mdev);
2165                 if (hg == -1 && mdev->state.role == R_PRIMARY) {
2166                         self = drbd_set_role(mdev, R_SECONDARY, 0);
2167                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2168                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2169                           * we do not need to wait for the after state change work either. */
2170                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2171                         if (self != SS_SUCCESS) {
2172                                 drbd_khelper(mdev, "pri-lost-after-sb");
2173                         } else {
2174                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2175                                 rv = hg;
2176                         }
2177                 } else
2178                         rv = hg;
2179         }
2180
2181         return rv;
2182 }
2183
2184 static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2185 {
2186         int self, peer, hg, rv = -100;
2187
2188         self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2189         peer = mdev->p_uuid[UI_BITMAP] & 1;
2190
2191         switch (mdev->net_conf->after_sb_2p) {
2192         case ASB_DISCARD_YOUNGER_PRI:
2193         case ASB_DISCARD_OLDER_PRI:
2194         case ASB_DISCARD_LEAST_CHG:
2195         case ASB_DISCARD_LOCAL:
2196         case ASB_DISCARD_REMOTE:
2197         case ASB_CONSENSUS:
2198         case ASB_DISCARD_SECONDARY:
2199                 dev_err(DEV, "Configuration error.\n");
2200                 break;
2201         case ASB_VIOLENTLY:
2202                 rv = drbd_asb_recover_0p(mdev);
2203                 break;
2204         case ASB_DISCONNECT:
2205                 break;
2206         case ASB_CALL_HELPER:
2207                 hg = drbd_asb_recover_0p(mdev);
2208                 if (hg == -1) {
2209                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2210                           * we might be here in C_WF_REPORT_PARAMS which is transient.
2211                           * we do not need to wait for the after state change work either. */
2212                         self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2213                         if (self != SS_SUCCESS) {
2214                                 drbd_khelper(mdev, "pri-lost-after-sb");
2215                         } else {
2216                                 dev_warn(DEV, "Successfully gave up primary role.\n");
2217                                 rv = hg;
2218                         }
2219                 } else
2220                         rv = hg;
2221         }
2222
2223         return rv;
2224 }
2225
2226 static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2227                            u64 bits, u64 flags)
2228 {
2229         if (!uuid) {
2230                 dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2231                 return;
2232         }
2233         dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2234              text,
2235              (unsigned long long)uuid[UI_CURRENT],
2236              (unsigned long long)uuid[UI_BITMAP],
2237              (unsigned long long)uuid[UI_HISTORY_START],
2238              (unsigned long long)uuid[UI_HISTORY_END],
2239              (unsigned long long)bits,
2240              (unsigned long long)flags);
2241 }
2242
2243 /*
2244   100   after split brain try auto recover
2245     2   C_SYNC_SOURCE set BitMap
2246     1   C_SYNC_SOURCE use BitMap
2247     0   no Sync
2248    -1   C_SYNC_TARGET use BitMap
2249    -2   C_SYNC_TARGET set BitMap
2250  -100   after split brain, disconnect
2251 -1000   unrelated data
2252  */
2253 static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2254 {
2255         u64 self, peer;
2256         int i, j;
2257
2258         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2259         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2260
2261         *rule_nr = 10;
2262         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2263                 return 0;
2264
2265         *rule_nr = 20;
2266         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2267              peer != UUID_JUST_CREATED)
2268                 return -2;
2269
2270         *rule_nr = 30;
2271         if (self != UUID_JUST_CREATED &&
2272             (peer == UUID_JUST_CREATED || peer == (u64)0))
2273                 return 2;
2274
2275         if (self == peer) {
2276                 int rct, dc; /* roles at crash time */
2277
2278                 if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2279
2280                         if (mdev->agreed_pro_version < 91)
2281                                 return -1001;
2282
2283                         if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2284                             (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2285                                 dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2286                                 drbd_uuid_set_bm(mdev, 0UL);
2287
2288                                 drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2289                                                mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2290                                 *rule_nr = 34;
2291                         } else {
2292                                 dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2293                                 *rule_nr = 36;
2294                         }
2295
2296                         return 1;
2297                 }
2298
2299                 if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2300
2301                         if (mdev->agreed_pro_version < 91)
2302                                 return -1001;
2303
2304                         if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2305                             (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2306                                 dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2307
2308                                 mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2309                                 mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2310                                 mdev->p_uuid[UI_BITMAP] = 0UL;
2311
2312                                 drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2313                                 *rule_nr = 35;
2314                         } else {
2315                                 dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2316                                 *rule_nr = 37;
2317                         }
2318
2319                         return -1;
2320                 }
2321
2322                 /* Common power [off|failure] */
2323                 rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2324                         (mdev->p_uuid[UI_FLAGS] & 2);
2325                 /* lowest bit is set when we were primary,
2326                  * next bit (weight 2) is set when peer was primary */
2327                 *rule_nr = 40;
2328
2329                 switch (rct) {
2330                 case 0: /* !self_pri && !peer_pri */ return 0;
2331                 case 1: /*  self_pri && !peer_pri */ return 1;
2332                 case 2: /* !self_pri &&  peer_pri */ return -1;
2333                 case 3: /*  self_pri &&  peer_pri */
2334                         dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2335                         return dc ? -1 : 1;
2336                 }
2337         }
2338
2339         *rule_nr = 50;
2340         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2341         if (self == peer)
2342                 return -1;
2343
2344         *rule_nr = 51;
2345         peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2346         if (self == peer) {
2347                 self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2348                 peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2349                 if (self == peer) {
2350                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2351                            resync as sync source modifications of the peer's UUIDs. */
2352
2353                         if (mdev->agreed_pro_version < 91)
2354                                 return -1001;
2355
2356                         mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2357                         mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2358                         return -1;
2359                 }
2360         }
2361
2362         *rule_nr = 60;
2363         self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2364         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2365                 peer = mdev->p_uuid[i] & ~((u64)1);
2366                 if (self == peer)
2367                         return -2;
2368         }
2369
2370         *rule_nr = 70;
2371         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2372         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2373         if (self == peer)
2374                 return 1;
2375
2376         *rule_nr = 71;
2377         self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2378         if (self == peer) {
2379                 self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2380                 peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2381                 if (self == peer) {
2382                         /* The last P_SYNC_UUID did not get though. Undo the last start of
2383                            resync as sync source modifications of our UUIDs. */
2384
2385                         if (mdev->agreed_pro_version < 91)
2386                                 return -1001;
2387
2388                         _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2389                         _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2390
2391                         dev_info(DEV, "Undid last start of resync:\n");
2392
2393                         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2394                                        mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2395
2396                         return 1;
2397                 }
2398         }
2399
2400
2401         *rule_nr = 80;
2402         peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2403         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2404                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2405                 if (self == peer)
2406                         return 2;
2407         }
2408
2409         *rule_nr = 90;
2410         self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2411         peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2412         if (self == peer && self != ((u64)0))
2413                 return 100;
2414
2415         *rule_nr = 100;
2416         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2417                 self = mdev->ldev->md.uuid[i] & ~((u64)1);
2418                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2419                         peer = mdev->p_uuid[j] & ~((u64)1);
2420                         if (self == peer)
2421                                 return -100;
2422                 }
2423         }
2424
2425         return -1000;
2426 }
2427
2428 /* drbd_sync_handshake() returns the new conn state on success, or
2429    CONN_MASK (-1) on failure.
2430  */
2431 static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2432                                            enum drbd_disk_state peer_disk) __must_hold(local)
2433 {
2434         int hg, rule_nr;
2435         enum drbd_conns rv = C_MASK;
2436         enum drbd_disk_state mydisk;
2437
2438         mydisk = mdev->state.disk;
2439         if (mydisk == D_NEGOTIATING)
2440                 mydisk = mdev->new_state_tmp.disk;
2441
2442         dev_info(DEV, "drbd_sync_handshake:\n");
2443         drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2444         drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2445                        mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2446
2447         hg = drbd_uuid_compare(mdev, &rule_nr);
2448
2449         dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2450
2451         if (hg == -1000) {
2452                 dev_alert(DEV, "Unrelated data, aborting!\n");
2453                 return C_MASK;
2454         }
2455         if (hg == -1001) {
2456                 dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2457                 return C_MASK;
2458         }
2459
2460         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2461             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2462                 int f = (hg == -100) || abs(hg) == 2;
2463                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
2464                 if (f)
2465                         hg = hg*2;
2466                 dev_info(DEV, "Becoming sync %s due to disk states.\n",
2467                      hg > 0 ? "source" : "target");
2468         }
2469
2470         if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2471                 int pcount = (mdev->state.role == R_PRIMARY)
2472                            + (peer_role == R_PRIMARY);
2473                 int forced = (hg == -100);
2474
2475                 switch (pcount) {
2476                 case 0:
2477                         hg = drbd_asb_recover_0p(mdev);
2478                         break;
2479                 case 1:
2480                         hg = drbd_asb_recover_1p(mdev);
2481                         break;
2482                 case 2:
2483                         hg = drbd_asb_recover_2p(mdev);
2484                         break;
2485                 }
2486                 if (abs(hg) < 100) {
2487                         dev_warn(DEV, "Split-Brain detected, %d primaries, "
2488                              "automatically solved. Sync from %s node\n",
2489                              pcount, (hg < 0) ? "peer" : "this");
2490                         if (forced) {
2491                                 dev_warn(DEV, "Doing a full sync, since"
2492                                      " UUIDs where ambiguous.\n");
2493                                 hg = hg*2;
2494                         }
2495                 }
2496         }
2497
2498         if (hg == -100) {
2499                 if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2500                         hg = -1;
2501                 if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2502                         hg = 1;
2503
2504                 if (abs(hg) < 100)
2505                         dev_warn(DEV, "Split-Brain detected, manually solved. "
2506                              "Sync from %s node\n",
2507                              (hg < 0) ? "peer" : "this");
2508         }
2509
2510         if (hg == -100) {
2511                 dev_alert(DEV, "Split-Brain detected, dropping connection!\n");
2512                 drbd_khelper(mdev, "split-brain");
2513                 return C_MASK;
2514         }
2515
2516         if (hg > 0 && mydisk <= D_INCONSISTENT) {
2517                 dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2518                 return C_MASK;
2519         }
2520
2521         if (hg < 0 && /* by intention we do not use mydisk here. */
2522             mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2523                 switch (mdev->net_conf->rr_conflict) {
2524                 case ASB_CALL_HELPER:
2525                         drbd_khelper(mdev, "pri-lost");
2526                         /* fall through */
2527                 case ASB_DISCONNECT:
2528                         dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2529                         return C_MASK;
2530                 case ASB_VIOLENTLY:
2531                         dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2532                              "assumption\n");
2533                 }
2534         }
2535
2536         if (abs(hg) >= 2) {
2537                 dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2538                 if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2539                         return C_MASK;
2540         }
2541
2542         if (hg > 0) { /* become sync source. */
2543                 rv = C_WF_BITMAP_S;
2544         } else if (hg < 0) { /* become sync target */
2545                 rv = C_WF_BITMAP_T;
2546         } else {
2547                 rv = C_CONNECTED;
2548                 if (drbd_bm_total_weight(mdev)) {
2549                         dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2550                              drbd_bm_total_weight(mdev));
2551                 }
2552         }
2553
2554         return rv;
2555 }
2556
2557 /* returns 1 if invalid */
2558 static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2559 {
2560         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2561         if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2562             (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2563                 return 0;
2564
2565         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2566         if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2567             self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2568                 return 1;
2569
2570         /* everything else is valid if they are equal on both sides. */
2571         if (peer == self)
2572                 return 0;
2573
2574         /* everything es is invalid. */
2575         return 1;
2576 }
2577
2578 static int receive_protocol(struct drbd_conf *mdev, struct p_header *h)
2579 {
2580         struct p_protocol *p = (struct p_protocol *)h;
2581         int header_size, data_size;
2582         int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2583         int p_want_lose, p_two_primaries;
2584         char p_integrity_alg[SHARED_SECRET_MAX] = "";
2585
2586         header_size = sizeof(*p) - sizeof(*h);
2587         data_size   = h->length  - header_size;
2588
2589         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2590                 return FALSE;
2591
2592         p_proto         = be32_to_cpu(p->protocol);
2593         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2594         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2595         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2596         p_want_lose     = be32_to_cpu(p->want_lose);
2597         p_two_primaries = be32_to_cpu(p->two_primaries);
2598
2599         if (p_proto != mdev->net_conf->wire_protocol) {
2600                 dev_err(DEV, "incompatible communication protocols\n");
2601                 goto disconnect;
2602         }
2603
2604         if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2605                 dev_err(DEV, "incompatible after-sb-0pri settings\n");
2606                 goto disconnect;
2607         }
2608
2609         if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2610                 dev_err(DEV, "incompatible after-sb-1pri settings\n");
2611                 goto disconnect;
2612         }
2613
2614         if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2615                 dev_err(DEV, "incompatible after-sb-2pri settings\n");
2616                 goto disconnect;
2617         }
2618
2619         if (p_want_lose && mdev->net_conf->want_lose) {
2620                 dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2621                 goto disconnect;
2622         }
2623
2624         if (p_two_primaries != mdev->net_conf->two_primaries) {
2625                 dev_err(DEV, "incompatible setting of the two-primaries options\n");
2626                 goto disconnect;
2627         }
2628
2629         if (mdev->agreed_pro_version >= 87) {
2630                 unsigned char *my_alg = mdev->net_conf->integrity_alg;
2631
2632                 if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2633                         return FALSE;
2634
2635                 p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2636                 if (strcmp(p_integrity_alg, my_alg)) {
2637                         dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2638                         goto disconnect;
2639                 }
2640                 dev_info(DEV, "data-integrity-alg: %s\n",
2641                      my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2642         }
2643
2644         return TRUE;
2645
2646 disconnect:
2647         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2648         return FALSE;
2649 }
2650
2651 /* helper function
2652  * input: alg name, feature name
2653  * return: NULL (alg name was "")
2654  *         ERR_PTR(error) if something goes wrong
2655  *         or the crypto hash ptr, if it worked out ok. */
2656 struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2657                 const char *alg, const char *name)
2658 {
2659         struct crypto_hash *tfm;
2660
2661         if (!alg[0])
2662                 return NULL;
2663
2664         tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2665         if (IS_ERR(tfm)) {
2666                 dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2667                         alg, name, PTR_ERR(tfm));
2668                 return tfm;
2669         }
2670         if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2671                 crypto_free_hash(tfm);
2672                 dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2673                 return ERR_PTR(-EINVAL);
2674         }
2675         return tfm;
2676 }
2677
2678 static int receive_SyncParam(struct drbd_conf *mdev, struct p_header *h)
2679 {
2680         int ok = TRUE;
2681         struct p_rs_param_89 *p = (struct p_rs_param_89 *)h;
2682         unsigned int header_size, data_size, exp_max_sz;
2683         struct crypto_hash *verify_tfm = NULL;
2684         struct crypto_hash *csums_tfm = NULL;
2685         const int apv = mdev->agreed_pro_version;
2686
2687         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2688                     : apv == 88 ? sizeof(struct p_rs_param)
2689                                         + SHARED_SECRET_MAX
2690                     : /* 89 */    sizeof(struct p_rs_param_89);
2691
2692         if (h->length > exp_max_sz) {
2693                 dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2694                     h->length, exp_max_sz);
2695                 return FALSE;
2696         }
2697
2698         if (apv <= 88) {
2699                 header_size = sizeof(struct p_rs_param) - sizeof(*h);
2700                 data_size   = h->length  - header_size;
2701         } else /* apv >= 89 */ {
2702                 header_size = sizeof(struct p_rs_param_89) - sizeof(*h);
2703                 data_size   = h->length  - header_size;
2704                 D_ASSERT(data_size == 0);
2705         }
2706
2707         /* initialize verify_alg and csums_alg */
2708         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2709
2710         if (drbd_recv(mdev, h->payload, header_size) != header_size)
2711                 return FALSE;
2712
2713         mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2714
2715         if (apv >= 88) {
2716                 if (apv == 88) {
2717                         if (data_size > SHARED_SECRET_MAX) {
2718                                 dev_err(DEV, "verify-alg too long, "
2719                                     "peer wants %u, accepting only %u byte\n",
2720                                                 data_size, SHARED_SECRET_MAX);
2721                                 return FALSE;
2722                         }
2723
2724                         if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2725                                 return FALSE;
2726
2727                         /* we expect NUL terminated string */
2728                         /* but just in case someone tries to be evil */
2729                         D_ASSERT(p->verify_alg[data_size-1] == 0);
2730                         p->verify_alg[data_size-1] = 0;
2731
2732                 } else /* apv >= 89 */ {
2733                         /* we still expect NUL terminated strings */
2734                         /* but just in case someone tries to be evil */
2735                         D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2736                         D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2737                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2738                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2739                 }
2740
2741                 if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2742                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2743                                 dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2744                                     mdev->sync_conf.verify_alg, p->verify_alg);
2745                                 goto disconnect;
2746                         }
2747                         verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2748                                         p->verify_alg, "verify-alg");
2749                         if (IS_ERR(verify_tfm)) {
2750                                 verify_tfm = NULL;
2751                                 goto disconnect;
2752                         }
2753                 }
2754
2755                 if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2756                         if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2757                                 dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2758                                     mdev->sync_conf.csums_alg, p->csums_alg);
2759                                 goto disconnect;
2760                         }
2761                         csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2762                                         p->csums_alg, "csums-alg");
2763                         if (IS_ERR(csums_tfm)) {
2764                                 csums_tfm = NULL;
2765                                 goto disconnect;
2766                         }
2767                 }
2768
2769
2770                 spin_lock(&mdev->peer_seq_lock);
2771                 /* lock against drbd_nl_syncer_conf() */
2772                 if (verify_tfm) {
2773                         strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2774                         mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2775                         crypto_free_hash(mdev->verify_tfm);
2776                         mdev->verify_tfm = verify_tfm;
2777                         dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2778                 }
2779                 if (csums_tfm) {
2780                         strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2781                         mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2782                         crypto_free_hash(mdev->csums_tfm);
2783                         mdev->csums_tfm = csums_tfm;
2784                         dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2785                 }
2786                 spin_unlock(&mdev->peer_seq_lock);
2787         }
2788
2789         return ok;
2790 disconnect:
2791         /* just for completeness: actually not needed,
2792          * as this is not reached if csums_tfm was ok. */
2793         crypto_free_hash(csums_tfm);
2794         /* but free the verify_tfm again, if csums_tfm did not work out */
2795         crypto_free_hash(verify_tfm);
2796         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2797         return FALSE;
2798 }
2799
2800 static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2801 {
2802         /* sorry, we currently have no working implementation
2803          * of distributed TCQ */
2804 }
2805
2806 /* warn if the arguments differ by more than 12.5% */
2807 static void warn_if_differ_considerably(struct drbd_conf *mdev,
2808         const char *s, sector_t a, sector_t b)
2809 {
2810         sector_t d;
2811         if (a == 0 || b == 0)
2812                 return;
2813         d = (a > b) ? (a - b) : (b - a);
2814         if (d > (a>>3) || d > (b>>3))
2815                 dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2816                      (unsigned long long)a, (unsigned long long)b);
2817 }
2818
2819 static int receive_sizes(struct drbd_conf *mdev, struct p_header *h)
2820 {
2821         struct p_sizes *p = (struct p_sizes *)h;
2822         enum determine_dev_size dd = unchanged;
2823         unsigned int max_seg_s;
2824         sector_t p_size, p_usize, my_usize;
2825         int ldsc = 0; /* local disk size changed */
2826         enum drbd_conns nconn;
2827
2828         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2829         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2830                 return FALSE;
2831
2832         p_size = be64_to_cpu(p->d_size);
2833         p_usize = be64_to_cpu(p->u_size);
2834
2835         if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2836                 dev_err(DEV, "some backing storage is needed\n");
2837                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2838                 return FALSE;
2839         }
2840
2841         /* just store the peer's disk size for now.
2842          * we still need to figure out whether we accept that. */
2843         mdev->p_size = p_size;
2844
2845 #define min_not_zero(l, r) (l == 0) ? r : ((r == 0) ? l : min(l, r))
2846         if (get_ldev(mdev)) {
2847                 warn_if_differ_considerably(mdev, "lower level device sizes",
2848                            p_size, drbd_get_max_capacity(mdev->ldev));
2849                 warn_if_differ_considerably(mdev, "user requested size",
2850                                             p_usize, mdev->ldev->dc.disk_size);
2851
2852                 /* if this is the first connect, or an otherwise expected
2853                  * param exchange, choose the minimum */
2854                 if (mdev->state.conn == C_WF_REPORT_PARAMS)
2855                         p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2856                                              p_usize);
2857
2858                 my_usize = mdev->ldev->dc.disk_size;
2859
2860                 if (mdev->ldev->dc.disk_size != p_usize) {
2861                         mdev->ldev->dc.disk_size = p_usize;
2862                         dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2863                              (unsigned long)mdev->ldev->dc.disk_size);
2864                 }
2865
2866                 /* Never shrink a device with usable data during connect.
2867                    But allow online shrinking if we are connected. */
2868                 if (drbd_new_dev_size(mdev, mdev->ldev) <
2869                    drbd_get_capacity(mdev->this_bdev) &&
2870                    mdev->state.disk >= D_OUTDATED &&
2871                    mdev->state.conn < C_CONNECTED) {
2872                         dev_err(DEV, "The peer's disk size is too small!\n");
2873                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2874                         mdev->ldev->dc.disk_size = my_usize;
2875                         put_ldev(mdev);
2876                         return FALSE;
2877                 }
2878                 put_ldev(mdev);
2879         }
2880 #undef min_not_zero
2881
2882         if (get_ldev(mdev)) {
2883                 dd = drbd_determin_dev_size(mdev);
2884                 put_ldev(mdev);
2885                 if (dd == dev_size_error)
2886                         return FALSE;
2887                 drbd_md_sync(mdev);
2888         } else {
2889                 /* I am diskless, need to accept the peer's size. */
2890                 drbd_set_my_capacity(mdev, p_size);
2891         }
2892
2893         if (mdev->p_uuid && mdev->state.conn <= C_CONNECTED && get_ldev(mdev)) {
2894                 nconn = drbd_sync_handshake(mdev,
2895                                 mdev->state.peer, mdev->state.pdsk);
2896                 put_ldev(mdev);
2897
2898                 if (nconn == C_MASK) {
2899                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2900                         return FALSE;
2901                 }
2902
2903                 if (drbd_request_state(mdev, NS(conn, nconn)) < SS_SUCCESS) {
2904                         drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2905                         return FALSE;
2906                 }
2907         }
2908
2909         if (get_ldev(mdev)) {
2910                 if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2911                         mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2912                         ldsc = 1;
2913                 }
2914
2915                 max_seg_s = be32_to_cpu(p->max_segment_size);
2916                 if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2917                         drbd_setup_queue_param(mdev, max_seg_s);
2918
2919                 drbd_setup_order_type(mdev, be32_to_cpu(p->queue_order_type));
2920                 put_ldev(mdev);
2921         }
2922
2923         if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2924                 if (be64_to_cpu(p->c_size) !=
2925                     drbd_get_capacity(mdev->this_bdev) || ldsc) {
2926                         /* we have different sizes, probably peer
2927                          * needs to know my new size... */
2928                         drbd_send_sizes(mdev, 0);
2929                 }
2930                 if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2931                     (dd == grew && mdev->state.conn == C_CONNECTED)) {
2932                         if (mdev->state.pdsk >= D_INCONSISTENT &&
2933                             mdev->state.disk >= D_INCONSISTENT)
2934                                 resync_after_online_grow(mdev);
2935                         else
2936                                 set_bit(RESYNC_AFTER_NEG, &mdev->flags);
2937                 }
2938         }
2939
2940         return TRUE;
2941 }
2942
2943 static int receive_uuids(struct drbd_conf *mdev, struct p_header *h)
2944 {
2945         struct p_uuids *p = (struct p_uuids *)h;
2946         u64 *p_uuid;
2947         int i;
2948
2949         ERR_IF(h->length != (sizeof(*p)-sizeof(*h))) return FALSE;
2950         if (drbd_recv(mdev, h->payload, h->length) != h->length)
2951                 return FALSE;
2952
2953         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
2954
2955         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
2956                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
2957
2958         kfree(mdev->p_uuid);
2959         mdev->p_uuid = p_uuid;
2960
2961         if (mdev->state.conn < C_CONNECTED &&
2962             mdev->state.disk < D_INCONSISTENT &&
2963             mdev->state.role == R_PRIMARY &&
2964             (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
2965                 dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
2966                     (unsigned long long)mdev->ed_uuid);
2967                 drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2968                 return FALSE;
2969         }
2970
2971         if (get_ldev(mdev)) {
2972                 int skip_initial_sync =
2973                         mdev->state.conn == C_CONNECTED &&
2974                         mdev->agreed_pro_version >= 90 &&
2975                         mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
2976                         (p_uuid[UI_FLAGS] & 8);
2977                 if (skip_initial_sync) {
2978                         dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
2979                         drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
2980                                         "clear_n_write from receive_uuids");
2981                         _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
2982                         _drbd_uuid_set(mdev, UI_BITMAP, 0);
2983                         _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
2984                                         CS_VERBOSE, NULL);