crypto: shash - remove shash_desc::flags
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
54
55 struct packet_info {
56         enum drbd_packet cmd;
57         unsigned int size;
58         unsigned int vnr;
59         void *data;
60 };
61
62 enum finish_epoch {
63         FE_STILL_LIVE,
64         FE_DESTROYED,
65         FE_RECYCLED,
66 };
67
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74
75
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89         struct page *page;
90         struct page *tmp;
91
92         BUG_ON(!n);
93         BUG_ON(!head);
94
95         page = *head;
96
97         if (!page)
98                 return NULL;
99
100         while (page) {
101                 tmp = page_chain_next(page);
102                 if (--n == 0)
103                         break; /* found sufficient pages */
104                 if (tmp == NULL)
105                         /* insufficient pages, don't use any of them. */
106                         return NULL;
107                 page = tmp;
108         }
109
110         /* add end of list marker for the returned list */
111         set_page_private(page, 0);
112         /* actual return value, and adjustment of head */
113         page = *head;
114         *head = tmp;
115         return page;
116 }
117
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123         struct page *tmp;
124         int i = 1;
125         while ((tmp = page_chain_next(page)))
126                 ++i, page = tmp;
127         if (len)
128                 *len = i;
129         return page;
130 }
131
132 static int page_chain_free(struct page *page)
133 {
134         struct page *tmp;
135         int i = 0;
136         page_chain_for_each_safe(page, tmp) {
137                 put_page(page);
138                 ++i;
139         }
140         return i;
141 }
142
143 static void page_chain_add(struct page **head,
144                 struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147         struct page *tmp;
148         tmp = page_chain_tail(chain_first, NULL);
149         BUG_ON(tmp != chain_last);
150 #endif
151
152         /* add chain to head */
153         set_page_private(chain_last, (unsigned long)*head);
154         *head = chain_first;
155 }
156
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158                                        unsigned int number)
159 {
160         struct page *page = NULL;
161         struct page *tmp = NULL;
162         unsigned int i = 0;
163
164         /* Yes, testing drbd_pp_vacant outside the lock is racy.
165          * So what. It saves a spin_lock. */
166         if (drbd_pp_vacant >= number) {
167                 spin_lock(&drbd_pp_lock);
168                 page = page_chain_del(&drbd_pp_pool, number);
169                 if (page)
170                         drbd_pp_vacant -= number;
171                 spin_unlock(&drbd_pp_lock);
172                 if (page)
173                         return page;
174         }
175
176         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177          * "criss-cross" setup, that might cause write-out on some other DRBD,
178          * which in turn might block on the other node at this very place.  */
179         for (i = 0; i < number; i++) {
180                 tmp = alloc_page(GFP_TRY);
181                 if (!tmp)
182                         break;
183                 set_page_private(tmp, (unsigned long)page);
184                 page = tmp;
185         }
186
187         if (i == number)
188                 return page;
189
190         /* Not enough pages immediately available this time.
191          * No need to jump around here, drbd_alloc_pages will retry this
192          * function "soon". */
193         if (page) {
194                 tmp = page_chain_tail(page, NULL);
195                 spin_lock(&drbd_pp_lock);
196                 page_chain_add(&drbd_pp_pool, page, tmp);
197                 drbd_pp_vacant += i;
198                 spin_unlock(&drbd_pp_lock);
199         }
200         return NULL;
201 }
202
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204                                            struct list_head *to_be_freed)
205 {
206         struct drbd_peer_request *peer_req, *tmp;
207
208         /* The EEs are always appended to the end of the list. Since
209            they are sent in order over the wire, they have to finish
210            in order. As soon as we see the first not finished we can
211            stop to examine the list... */
212
213         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214                 if (drbd_peer_req_has_active_page(peer_req))
215                         break;
216                 list_move(&peer_req->w.list, to_be_freed);
217         }
218 }
219
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222         LIST_HEAD(reclaimed);
223         struct drbd_peer_request *peer_req, *t;
224
225         spin_lock_irq(&device->resource->req_lock);
226         reclaim_finished_net_peer_reqs(device, &reclaimed);
227         spin_unlock_irq(&device->resource->req_lock);
228         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229                 drbd_free_net_peer_req(device, peer_req);
230 }
231
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234         struct drbd_peer_device *peer_device;
235         int vnr;
236
237         rcu_read_lock();
238         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239                 struct drbd_device *device = peer_device->device;
240                 if (!atomic_read(&device->pp_in_use_by_net))
241                         continue;
242
243                 kref_get(&device->kref);
244                 rcu_read_unlock();
245                 drbd_reclaim_net_peer_reqs(device);
246                 kref_put(&device->kref, drbd_destroy_device);
247                 rcu_read_lock();
248         }
249         rcu_read_unlock();
250 }
251
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:     DRBD device.
255  * @number:     number of pages requested
256  * @retry:      whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273                               bool retry)
274 {
275         struct drbd_device *device = peer_device->device;
276         struct page *page = NULL;
277         struct net_conf *nc;
278         DEFINE_WAIT(wait);
279         unsigned int mxb;
280
281         rcu_read_lock();
282         nc = rcu_dereference(peer_device->connection->net_conf);
283         mxb = nc ? nc->max_buffers : 1000000;
284         rcu_read_unlock();
285
286         if (atomic_read(&device->pp_in_use) < mxb)
287                 page = __drbd_alloc_pages(device, number);
288
289         /* Try to keep the fast path fast, but occasionally we need
290          * to reclaim the pages we lended to the network stack. */
291         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292                 drbd_reclaim_net_peer_reqs(device);
293
294         while (page == NULL) {
295                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296
297                 drbd_reclaim_net_peer_reqs(device);
298
299                 if (atomic_read(&device->pp_in_use) < mxb) {
300                         page = __drbd_alloc_pages(device, number);
301                         if (page)
302                                 break;
303                 }
304
305                 if (!retry)
306                         break;
307
308                 if (signal_pending(current)) {
309                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310                         break;
311                 }
312
313                 if (schedule_timeout(HZ/10) == 0)
314                         mxb = UINT_MAX;
315         }
316         finish_wait(&drbd_pp_wait, &wait);
317
318         if (page)
319                 atomic_add(number, &device->pp_in_use);
320         return page;
321 }
322
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330         int i;
331
332         if (page == NULL)
333                 return;
334
335         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336                 i = page_chain_free(page);
337         else {
338                 struct page *tmp;
339                 tmp = page_chain_tail(page, &i);
340                 spin_lock(&drbd_pp_lock);
341                 page_chain_add(&drbd_pp_pool, page, tmp);
342                 drbd_pp_vacant += i;
343                 spin_unlock(&drbd_pp_lock);
344         }
345         i = atomic_sub_return(i, a);
346         if (i < 0)
347                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349         wake_up(&drbd_pp_wait);
350 }
351
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373         struct drbd_device *device = peer_device->device;
374         struct drbd_peer_request *peer_req;
375         struct page *page = NULL;
376         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377
378         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379                 return NULL;
380
381         peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382         if (!peer_req) {
383                 if (!(gfp_mask & __GFP_NOWARN))
384                         drbd_err(device, "%s: allocation failed\n", __func__);
385                 return NULL;
386         }
387
388         if (nr_pages) {
389                 page = drbd_alloc_pages(peer_device, nr_pages,
390                                         gfpflags_allow_blocking(gfp_mask));
391                 if (!page)
392                         goto fail;
393         }
394
395         memset(peer_req, 0, sizeof(*peer_req));
396         INIT_LIST_HEAD(&peer_req->w.list);
397         drbd_clear_interval(&peer_req->i);
398         peer_req->i.size = request_size;
399         peer_req->i.sector = sector;
400         peer_req->submit_jif = jiffies;
401         peer_req->peer_device = peer_device;
402         peer_req->pages = page;
403         /*
404          * The block_id is opaque to the receiver.  It is not endianness
405          * converted, and sent back to the sender unchanged.
406          */
407         peer_req->block_id = id;
408
409         return peer_req;
410
411  fail:
412         mempool_free(peer_req, &drbd_ee_mempool);
413         return NULL;
414 }
415
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417                        int is_net)
418 {
419         might_sleep();
420         if (peer_req->flags & EE_HAS_DIGEST)
421                 kfree(peer_req->digest);
422         drbd_free_pages(device, peer_req->pages, is_net);
423         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427                 drbd_al_complete_io(device, &peer_req->i);
428         }
429         mempool_free(peer_req, &drbd_ee_mempool);
430 }
431
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434         LIST_HEAD(work_list);
435         struct drbd_peer_request *peer_req, *t;
436         int count = 0;
437         int is_net = list == &device->net_ee;
438
439         spin_lock_irq(&device->resource->req_lock);
440         list_splice_init(list, &work_list);
441         spin_unlock_irq(&device->resource->req_lock);
442
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 __drbd_free_peer_req(device, peer_req, is_net);
445                 count++;
446         }
447         return count;
448 }
449
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455         LIST_HEAD(work_list);
456         LIST_HEAD(reclaimed);
457         struct drbd_peer_request *peer_req, *t;
458         int err = 0;
459
460         spin_lock_irq(&device->resource->req_lock);
461         reclaim_finished_net_peer_reqs(device, &reclaimed);
462         list_splice_init(&device->done_ee, &work_list);
463         spin_unlock_irq(&device->resource->req_lock);
464
465         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466                 drbd_free_net_peer_req(device, peer_req);
467
468         /* possible callbacks here:
469          * e_end_block, and e_end_resync_block, e_send_superseded.
470          * all ignore the last argument.
471          */
472         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473                 int err2;
474
475                 /* list_del not necessary, next/prev members not touched */
476                 err2 = peer_req->w.cb(&peer_req->w, !!err);
477                 if (!err)
478                         err = err2;
479                 drbd_free_peer_req(device, peer_req);
480         }
481         wake_up(&device->ee_wait);
482
483         return err;
484 }
485
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487                                      struct list_head *head)
488 {
489         DEFINE_WAIT(wait);
490
491         /* avoids spin_lock/unlock
492          * and calling prepare_to_wait in the fast path */
493         while (!list_empty(head)) {
494                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495                 spin_unlock_irq(&device->resource->req_lock);
496                 io_schedule();
497                 finish_wait(&device->ee_wait, &wait);
498                 spin_lock_irq(&device->resource->req_lock);
499         }
500 }
501
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503                                     struct list_head *head)
504 {
505         spin_lock_irq(&device->resource->req_lock);
506         _drbd_wait_ee_list_empty(device, head);
507         spin_unlock_irq(&device->resource->req_lock);
508 }
509
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512         struct kvec iov = {
513                 .iov_base = buf,
514                 .iov_len = size,
515         };
516         struct msghdr msg = {
517                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518         };
519         iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
520         return sock_recvmsg(sock, &msg, msg.msg_flags);
521 }
522
523 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
524 {
525         int rv;
526
527         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
528
529         if (rv < 0) {
530                 if (rv == -ECONNRESET)
531                         drbd_info(connection, "sock was reset by peer\n");
532                 else if (rv != -ERESTARTSYS)
533                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
534         } else if (rv == 0) {
535                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
536                         long t;
537                         rcu_read_lock();
538                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
539                         rcu_read_unlock();
540
541                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
542
543                         if (t)
544                                 goto out;
545                 }
546                 drbd_info(connection, "sock was shut down by peer\n");
547         }
548
549         if (rv != size)
550                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
551
552 out:
553         return rv;
554 }
555
556 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
557 {
558         int err;
559
560         err = drbd_recv(connection, buf, size);
561         if (err != size) {
562                 if (err >= 0)
563                         err = -EIO;
564         } else
565                 err = 0;
566         return err;
567 }
568
569 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
570 {
571         int err;
572
573         err = drbd_recv_all(connection, buf, size);
574         if (err && !signal_pending(current))
575                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
576         return err;
577 }
578
579 /* quoting tcp(7):
580  *   On individual connections, the socket buffer size must be set prior to the
581  *   listen(2) or connect(2) calls in order to have it take effect.
582  * This is our wrapper to do so.
583  */
584 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
585                 unsigned int rcv)
586 {
587         /* open coded SO_SNDBUF, SO_RCVBUF */
588         if (snd) {
589                 sock->sk->sk_sndbuf = snd;
590                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
591         }
592         if (rcv) {
593                 sock->sk->sk_rcvbuf = rcv;
594                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
595         }
596 }
597
598 static struct socket *drbd_try_connect(struct drbd_connection *connection)
599 {
600         const char *what;
601         struct socket *sock;
602         struct sockaddr_in6 src_in6;
603         struct sockaddr_in6 peer_in6;
604         struct net_conf *nc;
605         int err, peer_addr_len, my_addr_len;
606         int sndbuf_size, rcvbuf_size, connect_int;
607         int disconnect_on_error = 1;
608
609         rcu_read_lock();
610         nc = rcu_dereference(connection->net_conf);
611         if (!nc) {
612                 rcu_read_unlock();
613                 return NULL;
614         }
615         sndbuf_size = nc->sndbuf_size;
616         rcvbuf_size = nc->rcvbuf_size;
617         connect_int = nc->connect_int;
618         rcu_read_unlock();
619
620         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
621         memcpy(&src_in6, &connection->my_addr, my_addr_len);
622
623         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
624                 src_in6.sin6_port = 0;
625         else
626                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
627
628         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
629         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
630
631         what = "sock_create_kern";
632         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
633                                SOCK_STREAM, IPPROTO_TCP, &sock);
634         if (err < 0) {
635                 sock = NULL;
636                 goto out;
637         }
638
639         sock->sk->sk_rcvtimeo =
640         sock->sk->sk_sndtimeo = connect_int * HZ;
641         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
642
643        /* explicitly bind to the configured IP as source IP
644         *  for the outgoing connections.
645         *  This is needed for multihomed hosts and to be
646         *  able to use lo: interfaces for drbd.
647         * Make sure to use 0 as port number, so linux selects
648         *  a free one dynamically.
649         */
650         what = "bind before connect";
651         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
652         if (err < 0)
653                 goto out;
654
655         /* connect may fail, peer not yet available.
656          * stay C_WF_CONNECTION, don't go Disconnecting! */
657         disconnect_on_error = 0;
658         what = "connect";
659         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
660
661 out:
662         if (err < 0) {
663                 if (sock) {
664                         sock_release(sock);
665                         sock = NULL;
666                 }
667                 switch (-err) {
668                         /* timeout, busy, signal pending */
669                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
670                 case EINTR: case ERESTARTSYS:
671                         /* peer not (yet) available, network problem */
672                 case ECONNREFUSED: case ENETUNREACH:
673                 case EHOSTDOWN:    case EHOSTUNREACH:
674                         disconnect_on_error = 0;
675                         break;
676                 default:
677                         drbd_err(connection, "%s failed, err = %d\n", what, err);
678                 }
679                 if (disconnect_on_error)
680                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
681         }
682
683         return sock;
684 }
685
686 struct accept_wait_data {
687         struct drbd_connection *connection;
688         struct socket *s_listen;
689         struct completion door_bell;
690         void (*original_sk_state_change)(struct sock *sk);
691
692 };
693
694 static void drbd_incoming_connection(struct sock *sk)
695 {
696         struct accept_wait_data *ad = sk->sk_user_data;
697         void (*state_change)(struct sock *sk);
698
699         state_change = ad->original_sk_state_change;
700         if (sk->sk_state == TCP_ESTABLISHED)
701                 complete(&ad->door_bell);
702         state_change(sk);
703 }
704
705 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
706 {
707         int err, sndbuf_size, rcvbuf_size, my_addr_len;
708         struct sockaddr_in6 my_addr;
709         struct socket *s_listen;
710         struct net_conf *nc;
711         const char *what;
712
713         rcu_read_lock();
714         nc = rcu_dereference(connection->net_conf);
715         if (!nc) {
716                 rcu_read_unlock();
717                 return -EIO;
718         }
719         sndbuf_size = nc->sndbuf_size;
720         rcvbuf_size = nc->rcvbuf_size;
721         rcu_read_unlock();
722
723         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
724         memcpy(&my_addr, &connection->my_addr, my_addr_len);
725
726         what = "sock_create_kern";
727         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
728                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
735         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
736
737         what = "bind before listen";
738         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
739         if (err < 0)
740                 goto out;
741
742         ad->s_listen = s_listen;
743         write_lock_bh(&s_listen->sk->sk_callback_lock);
744         ad->original_sk_state_change = s_listen->sk->sk_state_change;
745         s_listen->sk->sk_state_change = drbd_incoming_connection;
746         s_listen->sk->sk_user_data = ad;
747         write_unlock_bh(&s_listen->sk->sk_callback_lock);
748
749         what = "listen";
750         err = s_listen->ops->listen(s_listen, 5);
751         if (err < 0)
752                 goto out;
753
754         return 0;
755 out:
756         if (s_listen)
757                 sock_release(s_listen);
758         if (err < 0) {
759                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
760                         drbd_err(connection, "%s failed, err = %d\n", what, err);
761                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
762                 }
763         }
764
765         return -EIO;
766 }
767
768 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
769 {
770         write_lock_bh(&sk->sk_callback_lock);
771         sk->sk_state_change = ad->original_sk_state_change;
772         sk->sk_user_data = NULL;
773         write_unlock_bh(&sk->sk_callback_lock);
774 }
775
776 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
777 {
778         int timeo, connect_int, err = 0;
779         struct socket *s_estab = NULL;
780         struct net_conf *nc;
781
782         rcu_read_lock();
783         nc = rcu_dereference(connection->net_conf);
784         if (!nc) {
785                 rcu_read_unlock();
786                 return NULL;
787         }
788         connect_int = nc->connect_int;
789         rcu_read_unlock();
790
791         timeo = connect_int * HZ;
792         /* 28.5% random jitter */
793         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
794
795         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
796         if (err <= 0)
797                 return NULL;
798
799         err = kernel_accept(ad->s_listen, &s_estab, 0);
800         if (err < 0) {
801                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
802                         drbd_err(connection, "accept failed, err = %d\n", err);
803                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
804                 }
805         }
806
807         if (s_estab)
808                 unregister_state_change(s_estab->sk, ad);
809
810         return s_estab;
811 }
812
813 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
814
815 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
816                              enum drbd_packet cmd)
817 {
818         if (!conn_prepare_command(connection, sock))
819                 return -EIO;
820         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
821 }
822
823 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
824 {
825         unsigned int header_size = drbd_header_size(connection);
826         struct packet_info pi;
827         struct net_conf *nc;
828         int err;
829
830         rcu_read_lock();
831         nc = rcu_dereference(connection->net_conf);
832         if (!nc) {
833                 rcu_read_unlock();
834                 return -EIO;
835         }
836         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
837         rcu_read_unlock();
838
839         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
840         if (err != header_size) {
841                 if (err >= 0)
842                         err = -EIO;
843                 return err;
844         }
845         err = decode_header(connection, connection->data.rbuf, &pi);
846         if (err)
847                 return err;
848         return pi.cmd;
849 }
850
851 /**
852  * drbd_socket_okay() - Free the socket if its connection is not okay
853  * @sock:       pointer to the pointer to the socket.
854  */
855 static bool drbd_socket_okay(struct socket **sock)
856 {
857         int rr;
858         char tb[4];
859
860         if (!*sock)
861                 return false;
862
863         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
864
865         if (rr > 0 || rr == -EAGAIN) {
866                 return true;
867         } else {
868                 sock_release(*sock);
869                 *sock = NULL;
870                 return false;
871         }
872 }
873
874 static bool connection_established(struct drbd_connection *connection,
875                                    struct socket **sock1,
876                                    struct socket **sock2)
877 {
878         struct net_conf *nc;
879         int timeout;
880         bool ok;
881
882         if (!*sock1 || !*sock2)
883                 return false;
884
885         rcu_read_lock();
886         nc = rcu_dereference(connection->net_conf);
887         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
888         rcu_read_unlock();
889         schedule_timeout_interruptible(timeout);
890
891         ok = drbd_socket_okay(sock1);
892         ok = drbd_socket_okay(sock2) && ok;
893
894         return ok;
895 }
896
897 /* Gets called if a connection is established, or if a new minor gets created
898    in a connection */
899 int drbd_connected(struct drbd_peer_device *peer_device)
900 {
901         struct drbd_device *device = peer_device->device;
902         int err;
903
904         atomic_set(&device->packet_seq, 0);
905         device->peer_seq = 0;
906
907         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
908                 &peer_device->connection->cstate_mutex :
909                 &device->own_state_mutex;
910
911         err = drbd_send_sync_param(peer_device);
912         if (!err)
913                 err = drbd_send_sizes(peer_device, 0, 0);
914         if (!err)
915                 err = drbd_send_uuids(peer_device);
916         if (!err)
917                 err = drbd_send_current_state(peer_device);
918         clear_bit(USE_DEGR_WFC_T, &device->flags);
919         clear_bit(RESIZE_PENDING, &device->flags);
920         atomic_set(&device->ap_in_flight, 0);
921         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
922         return err;
923 }
924
925 /*
926  * return values:
927  *   1 yes, we have a valid connection
928  *   0 oops, did not work out, please try again
929  *  -1 peer talks different language,
930  *     no point in trying again, please go standalone.
931  *  -2 We do not have a network config...
932  */
933 static int conn_connect(struct drbd_connection *connection)
934 {
935         struct drbd_socket sock, msock;
936         struct drbd_peer_device *peer_device;
937         struct net_conf *nc;
938         int vnr, timeout, h;
939         bool discard_my_data, ok;
940         enum drbd_state_rv rv;
941         struct accept_wait_data ad = {
942                 .connection = connection,
943                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
944         };
945
946         clear_bit(DISCONNECT_SENT, &connection->flags);
947         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
948                 return -2;
949
950         mutex_init(&sock.mutex);
951         sock.sbuf = connection->data.sbuf;
952         sock.rbuf = connection->data.rbuf;
953         sock.socket = NULL;
954         mutex_init(&msock.mutex);
955         msock.sbuf = connection->meta.sbuf;
956         msock.rbuf = connection->meta.rbuf;
957         msock.socket = NULL;
958
959         /* Assume that the peer only understands protocol 80 until we know better.  */
960         connection->agreed_pro_version = 80;
961
962         if (prepare_listen_socket(connection, &ad))
963                 return 0;
964
965         do {
966                 struct socket *s;
967
968                 s = drbd_try_connect(connection);
969                 if (s) {
970                         if (!sock.socket) {
971                                 sock.socket = s;
972                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
973                         } else if (!msock.socket) {
974                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
975                                 msock.socket = s;
976                                 send_first_packet(connection, &msock, P_INITIAL_META);
977                         } else {
978                                 drbd_err(connection, "Logic error in conn_connect()\n");
979                                 goto out_release_sockets;
980                         }
981                 }
982
983                 if (connection_established(connection, &sock.socket, &msock.socket))
984                         break;
985
986 retry:
987                 s = drbd_wait_for_connect(connection, &ad);
988                 if (s) {
989                         int fp = receive_first_packet(connection, s);
990                         drbd_socket_okay(&sock.socket);
991                         drbd_socket_okay(&msock.socket);
992                         switch (fp) {
993                         case P_INITIAL_DATA:
994                                 if (sock.socket) {
995                                         drbd_warn(connection, "initial packet S crossed\n");
996                                         sock_release(sock.socket);
997                                         sock.socket = s;
998                                         goto randomize;
999                                 }
1000                                 sock.socket = s;
1001                                 break;
1002                         case P_INITIAL_META:
1003                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1004                                 if (msock.socket) {
1005                                         drbd_warn(connection, "initial packet M crossed\n");
1006                                         sock_release(msock.socket);
1007                                         msock.socket = s;
1008                                         goto randomize;
1009                                 }
1010                                 msock.socket = s;
1011                                 break;
1012                         default:
1013                                 drbd_warn(connection, "Error receiving initial packet\n");
1014                                 sock_release(s);
1015 randomize:
1016                                 if (prandom_u32() & 1)
1017                                         goto retry;
1018                         }
1019                 }
1020
1021                 if (connection->cstate <= C_DISCONNECTING)
1022                         goto out_release_sockets;
1023                 if (signal_pending(current)) {
1024                         flush_signals(current);
1025                         smp_rmb();
1026                         if (get_t_state(&connection->receiver) == EXITING)
1027                                 goto out_release_sockets;
1028                 }
1029
1030                 ok = connection_established(connection, &sock.socket, &msock.socket);
1031         } while (!ok);
1032
1033         if (ad.s_listen)
1034                 sock_release(ad.s_listen);
1035
1036         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1038
1039         sock.socket->sk->sk_allocation = GFP_NOIO;
1040         msock.socket->sk->sk_allocation = GFP_NOIO;
1041
1042         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1043         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1044
1045         /* NOT YET ...
1046          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1047          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1048          * first set it to the P_CONNECTION_FEATURES timeout,
1049          * which we set to 4x the configured ping_timeout. */
1050         rcu_read_lock();
1051         nc = rcu_dereference(connection->net_conf);
1052
1053         sock.socket->sk->sk_sndtimeo =
1054         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1055
1056         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1057         timeout = nc->timeout * HZ / 10;
1058         discard_my_data = nc->discard_my_data;
1059         rcu_read_unlock();
1060
1061         msock.socket->sk->sk_sndtimeo = timeout;
1062
1063         /* we don't want delays.
1064          * we use TCP_CORK where appropriate, though */
1065         drbd_tcp_nodelay(sock.socket);
1066         drbd_tcp_nodelay(msock.socket);
1067
1068         connection->data.socket = sock.socket;
1069         connection->meta.socket = msock.socket;
1070         connection->last_received = jiffies;
1071
1072         h = drbd_do_features(connection);
1073         if (h <= 0)
1074                 return h;
1075
1076         if (connection->cram_hmac_tfm) {
1077                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1078                 switch (drbd_do_auth(connection)) {
1079                 case -1:
1080                         drbd_err(connection, "Authentication of peer failed\n");
1081                         return -1;
1082                 case 0:
1083                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1084                         return 0;
1085                 }
1086         }
1087
1088         connection->data.socket->sk->sk_sndtimeo = timeout;
1089         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1090
1091         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1092                 return -1;
1093
1094         /* Prevent a race between resync-handshake and
1095          * being promoted to Primary.
1096          *
1097          * Grab and release the state mutex, so we know that any current
1098          * drbd_set_role() is finished, and any incoming drbd_set_role
1099          * will see the STATE_SENT flag, and wait for it to be cleared.
1100          */
1101         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1102                 mutex_lock(peer_device->device->state_mutex);
1103
1104         /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1105         spin_lock_irq(&connection->resource->req_lock);
1106         set_bit(STATE_SENT, &connection->flags);
1107         spin_unlock_irq(&connection->resource->req_lock);
1108
1109         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1110                 mutex_unlock(peer_device->device->state_mutex);
1111
1112         rcu_read_lock();
1113         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1114                 struct drbd_device *device = peer_device->device;
1115                 kref_get(&device->kref);
1116                 rcu_read_unlock();
1117
1118                 if (discard_my_data)
1119                         set_bit(DISCARD_MY_DATA, &device->flags);
1120                 else
1121                         clear_bit(DISCARD_MY_DATA, &device->flags);
1122
1123                 drbd_connected(peer_device);
1124                 kref_put(&device->kref, drbd_destroy_device);
1125                 rcu_read_lock();
1126         }
1127         rcu_read_unlock();
1128
1129         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1130         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1131                 clear_bit(STATE_SENT, &connection->flags);
1132                 return 0;
1133         }
1134
1135         drbd_thread_start(&connection->ack_receiver);
1136         /* opencoded create_singlethread_workqueue(),
1137          * to be able to use format string arguments */
1138         connection->ack_sender =
1139                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1140         if (!connection->ack_sender) {
1141                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1142                 return 0;
1143         }
1144
1145         mutex_lock(&connection->resource->conf_update);
1146         /* The discard_my_data flag is a single-shot modifier to the next
1147          * connection attempt, the handshake of which is now well underway.
1148          * No need for rcu style copying of the whole struct
1149          * just to clear a single value. */
1150         connection->net_conf->discard_my_data = 0;
1151         mutex_unlock(&connection->resource->conf_update);
1152
1153         return h;
1154
1155 out_release_sockets:
1156         if (ad.s_listen)
1157                 sock_release(ad.s_listen);
1158         if (sock.socket)
1159                 sock_release(sock.socket);
1160         if (msock.socket)
1161                 sock_release(msock.socket);
1162         return -1;
1163 }
1164
1165 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1166 {
1167         unsigned int header_size = drbd_header_size(connection);
1168
1169         if (header_size == sizeof(struct p_header100) &&
1170             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1171                 struct p_header100 *h = header;
1172                 if (h->pad != 0) {
1173                         drbd_err(connection, "Header padding is not zero\n");
1174                         return -EINVAL;
1175                 }
1176                 pi->vnr = be16_to_cpu(h->volume);
1177                 pi->cmd = be16_to_cpu(h->command);
1178                 pi->size = be32_to_cpu(h->length);
1179         } else if (header_size == sizeof(struct p_header95) &&
1180                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1181                 struct p_header95 *h = header;
1182                 pi->cmd = be16_to_cpu(h->command);
1183                 pi->size = be32_to_cpu(h->length);
1184                 pi->vnr = 0;
1185         } else if (header_size == sizeof(struct p_header80) &&
1186                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1187                 struct p_header80 *h = header;
1188                 pi->cmd = be16_to_cpu(h->command);
1189                 pi->size = be16_to_cpu(h->length);
1190                 pi->vnr = 0;
1191         } else {
1192                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1193                          be32_to_cpu(*(__be32 *)header),
1194                          connection->agreed_pro_version);
1195                 return -EINVAL;
1196         }
1197         pi->data = header + header_size;
1198         return 0;
1199 }
1200
1201 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1202 {
1203         if (current->plug == &connection->receiver_plug) {
1204                 blk_finish_plug(&connection->receiver_plug);
1205                 blk_start_plug(&connection->receiver_plug);
1206         } /* else: maybe just schedule() ?? */
1207 }
1208
1209 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1210 {
1211         void *buffer = connection->data.rbuf;
1212         int err;
1213
1214         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1215         if (err)
1216                 return err;
1217
1218         err = decode_header(connection, buffer, pi);
1219         connection->last_received = jiffies;
1220
1221         return err;
1222 }
1223
1224 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1225 {
1226         void *buffer = connection->data.rbuf;
1227         unsigned int size = drbd_header_size(connection);
1228         int err;
1229
1230         err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1231         if (err != size) {
1232                 /* If we have nothing in the receive buffer now, to reduce
1233                  * application latency, try to drain the backend queues as
1234                  * quickly as possible, and let remote TCP know what we have
1235                  * received so far. */
1236                 if (err == -EAGAIN) {
1237                         drbd_tcp_quickack(connection->data.socket);
1238                         drbd_unplug_all_devices(connection);
1239                 }
1240                 if (err > 0) {
1241                         buffer += err;
1242                         size -= err;
1243                 }
1244                 err = drbd_recv_all_warn(connection, buffer, size);
1245                 if (err)
1246                         return err;
1247         }
1248
1249         err = decode_header(connection, connection->data.rbuf, pi);
1250         connection->last_received = jiffies;
1251
1252         return err;
1253 }
1254 /* This is blkdev_issue_flush, but asynchronous.
1255  * We want to submit to all component volumes in parallel,
1256  * then wait for all completions.
1257  */
1258 struct issue_flush_context {
1259         atomic_t pending;
1260         int error;
1261         struct completion done;
1262 };
1263 struct one_flush_context {
1264         struct drbd_device *device;
1265         struct issue_flush_context *ctx;
1266 };
1267
1268 static void one_flush_endio(struct bio *bio)
1269 {
1270         struct one_flush_context *octx = bio->bi_private;
1271         struct drbd_device *device = octx->device;
1272         struct issue_flush_context *ctx = octx->ctx;
1273
1274         if (bio->bi_status) {
1275                 ctx->error = blk_status_to_errno(bio->bi_status);
1276                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1277         }
1278         kfree(octx);
1279         bio_put(bio);
1280
1281         clear_bit(FLUSH_PENDING, &device->flags);
1282         put_ldev(device);
1283         kref_put(&device->kref, drbd_destroy_device);
1284
1285         if (atomic_dec_and_test(&ctx->pending))
1286                 complete(&ctx->done);
1287 }
1288
1289 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1290 {
1291         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1292         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1293         if (!bio || !octx) {
1294                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1295                 /* FIXME: what else can I do now?  disconnecting or detaching
1296                  * really does not help to improve the state of the world, either.
1297                  */
1298                 kfree(octx);
1299                 if (bio)
1300                         bio_put(bio);
1301
1302                 ctx->error = -ENOMEM;
1303                 put_ldev(device);
1304                 kref_put(&device->kref, drbd_destroy_device);
1305                 return;
1306         }
1307
1308         octx->device = device;
1309         octx->ctx = ctx;
1310         bio_set_dev(bio, device->ldev->backing_bdev);
1311         bio->bi_private = octx;
1312         bio->bi_end_io = one_flush_endio;
1313         bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1314
1315         device->flush_jif = jiffies;
1316         set_bit(FLUSH_PENDING, &device->flags);
1317         atomic_inc(&ctx->pending);
1318         submit_bio(bio);
1319 }
1320
1321 static void drbd_flush(struct drbd_connection *connection)
1322 {
1323         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1324                 struct drbd_peer_device *peer_device;
1325                 struct issue_flush_context ctx;
1326                 int vnr;
1327
1328                 atomic_set(&ctx.pending, 1);
1329                 ctx.error = 0;
1330                 init_completion(&ctx.done);
1331
1332                 rcu_read_lock();
1333                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1334                         struct drbd_device *device = peer_device->device;
1335
1336                         if (!get_ldev(device))
1337                                 continue;
1338                         kref_get(&device->kref);
1339                         rcu_read_unlock();
1340
1341                         submit_one_flush(device, &ctx);
1342
1343                         rcu_read_lock();
1344                 }
1345                 rcu_read_unlock();
1346
1347                 /* Do we want to add a timeout,
1348                  * if disk-timeout is set? */
1349                 if (!atomic_dec_and_test(&ctx.pending))
1350                         wait_for_completion(&ctx.done);
1351
1352                 if (ctx.error) {
1353                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1354                          * don't try again for ANY return value != 0
1355                          * if (rv == -EOPNOTSUPP) */
1356                         /* Any error is already reported by bio_endio callback. */
1357                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1358                 }
1359         }
1360 }
1361
1362 /**
1363  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1364  * @device:     DRBD device.
1365  * @epoch:      Epoch object.
1366  * @ev:         Epoch event.
1367  */
1368 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1369                                                struct drbd_epoch *epoch,
1370                                                enum epoch_event ev)
1371 {
1372         int epoch_size;
1373         struct drbd_epoch *next_epoch;
1374         enum finish_epoch rv = FE_STILL_LIVE;
1375
1376         spin_lock(&connection->epoch_lock);
1377         do {
1378                 next_epoch = NULL;
1379
1380                 epoch_size = atomic_read(&epoch->epoch_size);
1381
1382                 switch (ev & ~EV_CLEANUP) {
1383                 case EV_PUT:
1384                         atomic_dec(&epoch->active);
1385                         break;
1386                 case EV_GOT_BARRIER_NR:
1387                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1388                         break;
1389                 case EV_BECAME_LAST:
1390                         /* nothing to do*/
1391                         break;
1392                 }
1393
1394                 if (epoch_size != 0 &&
1395                     atomic_read(&epoch->active) == 0 &&
1396                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1397                         if (!(ev & EV_CLEANUP)) {
1398                                 spin_unlock(&connection->epoch_lock);
1399                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1400                                 spin_lock(&connection->epoch_lock);
1401                         }
1402 #if 0
1403                         /* FIXME: dec unacked on connection, once we have
1404                          * something to count pending connection packets in. */
1405                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1406                                 dec_unacked(epoch->connection);
1407 #endif
1408
1409                         if (connection->current_epoch != epoch) {
1410                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1411                                 list_del(&epoch->list);
1412                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1413                                 connection->epochs--;
1414                                 kfree(epoch);
1415
1416                                 if (rv == FE_STILL_LIVE)
1417                                         rv = FE_DESTROYED;
1418                         } else {
1419                                 epoch->flags = 0;
1420                                 atomic_set(&epoch->epoch_size, 0);
1421                                 /* atomic_set(&epoch->active, 0); is already zero */
1422                                 if (rv == FE_STILL_LIVE)
1423                                         rv = FE_RECYCLED;
1424                         }
1425                 }
1426
1427                 if (!next_epoch)
1428                         break;
1429
1430                 epoch = next_epoch;
1431         } while (1);
1432
1433         spin_unlock(&connection->epoch_lock);
1434
1435         return rv;
1436 }
1437
1438 static enum write_ordering_e
1439 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1440 {
1441         struct disk_conf *dc;
1442
1443         dc = rcu_dereference(bdev->disk_conf);
1444
1445         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1446                 wo = WO_DRAIN_IO;
1447         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1448                 wo = WO_NONE;
1449
1450         return wo;
1451 }
1452
1453 /**
1454  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1455  * @connection: DRBD connection.
1456  * @wo:         Write ordering method to try.
1457  */
1458 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1459                               enum write_ordering_e wo)
1460 {
1461         struct drbd_device *device;
1462         enum write_ordering_e pwo;
1463         int vnr;
1464         static char *write_ordering_str[] = {
1465                 [WO_NONE] = "none",
1466                 [WO_DRAIN_IO] = "drain",
1467                 [WO_BDEV_FLUSH] = "flush",
1468         };
1469
1470         pwo = resource->write_ordering;
1471         if (wo != WO_BDEV_FLUSH)
1472                 wo = min(pwo, wo);
1473         rcu_read_lock();
1474         idr_for_each_entry(&resource->devices, device, vnr) {
1475                 if (get_ldev(device)) {
1476                         wo = max_allowed_wo(device->ldev, wo);
1477                         if (device->ldev == bdev)
1478                                 bdev = NULL;
1479                         put_ldev(device);
1480                 }
1481         }
1482
1483         if (bdev)
1484                 wo = max_allowed_wo(bdev, wo);
1485
1486         rcu_read_unlock();
1487
1488         resource->write_ordering = wo;
1489         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1490                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1491 }
1492
1493 /*
1494  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1495  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1496  * will directly go to fallback mode, submitting normal writes, and
1497  * never even try to UNMAP.
1498  *
1499  * And dm-thin does not do this (yet), mostly because in general it has
1500  * to assume that "skip_block_zeroing" is set.  See also:
1501  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1502  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1503  *
1504  * We *may* ignore the discard-zeroes-data setting, if so configured.
1505  *
1506  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1507  * may ignore partial unaligned discards.
1508  *
1509  * LVM/DM thin as of at least
1510  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1511  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1512  *   Driver version:  4.29.0
1513  * still behaves this way.
1514  *
1515  * For unaligned (wrt. alignment and granularity) or too small discards,
1516  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1517  * but discard all the aligned full chunks.
1518  *
1519  * At least for LVM/DM thin, with skip_block_zeroing=false,
1520  * the result is effectively "discard_zeroes_data=1".
1521  */
1522 /* flags: EE_TRIM|EE_ZEROOUT */
1523 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1524 {
1525         struct block_device *bdev = device->ldev->backing_bdev;
1526         struct request_queue *q = bdev_get_queue(bdev);
1527         sector_t tmp, nr;
1528         unsigned int max_discard_sectors, granularity;
1529         int alignment;
1530         int err = 0;
1531
1532         if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1533                 goto zero_out;
1534
1535         /* Zero-sector (unknown) and one-sector granularities are the same.  */
1536         granularity = max(q->limits.discard_granularity >> 9, 1U);
1537         alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1538
1539         max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1540         max_discard_sectors -= max_discard_sectors % granularity;
1541         if (unlikely(!max_discard_sectors))
1542                 goto zero_out;
1543
1544         if (nr_sectors < granularity)
1545                 goto zero_out;
1546
1547         tmp = start;
1548         if (sector_div(tmp, granularity) != alignment) {
1549                 if (nr_sectors < 2*granularity)
1550                         goto zero_out;
1551                 /* start + gran - (start + gran - align) % gran */
1552                 tmp = start + granularity - alignment;
1553                 tmp = start + granularity - sector_div(tmp, granularity);
1554
1555                 nr = tmp - start;
1556                 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1557                  * layers are below us, some may have smaller granularity */
1558                 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1559                 nr_sectors -= nr;
1560                 start = tmp;
1561         }
1562         while (nr_sectors >= max_discard_sectors) {
1563                 err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1564                 nr_sectors -= max_discard_sectors;
1565                 start += max_discard_sectors;
1566         }
1567         if (nr_sectors) {
1568                 /* max_discard_sectors is unsigned int (and a multiple of
1569                  * granularity, we made sure of that above already);
1570                  * nr is < max_discard_sectors;
1571                  * I don't need sector_div here, even though nr is sector_t */
1572                 nr = nr_sectors;
1573                 nr -= (unsigned int)nr % granularity;
1574                 if (nr) {
1575                         err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1576                         nr_sectors -= nr;
1577                         start += nr;
1578                 }
1579         }
1580  zero_out:
1581         if (nr_sectors) {
1582                 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1583                                 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1584         }
1585         return err != 0;
1586 }
1587
1588 static bool can_do_reliable_discards(struct drbd_device *device)
1589 {
1590         struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1591         struct disk_conf *dc;
1592         bool can_do;
1593
1594         if (!blk_queue_discard(q))
1595                 return false;
1596
1597         rcu_read_lock();
1598         dc = rcu_dereference(device->ldev->disk_conf);
1599         can_do = dc->discard_zeroes_if_aligned;
1600         rcu_read_unlock();
1601         return can_do;
1602 }
1603
1604 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1605 {
1606         /* If the backend cannot discard, or does not guarantee
1607          * read-back zeroes in discarded ranges, we fall back to
1608          * zero-out.  Unless configuration specifically requested
1609          * otherwise. */
1610         if (!can_do_reliable_discards(device))
1611                 peer_req->flags |= EE_ZEROOUT;
1612
1613         if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1614             peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1615                 peer_req->flags |= EE_WAS_ERROR;
1616         drbd_endio_write_sec_final(peer_req);
1617 }
1618
1619 static void drbd_issue_peer_wsame(struct drbd_device *device,
1620                                   struct drbd_peer_request *peer_req)
1621 {
1622         struct block_device *bdev = device->ldev->backing_bdev;
1623         sector_t s = peer_req->i.sector;
1624         sector_t nr = peer_req->i.size >> 9;
1625         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1626                 peer_req->flags |= EE_WAS_ERROR;
1627         drbd_endio_write_sec_final(peer_req);
1628 }
1629
1630
1631 /**
1632  * drbd_submit_peer_request()
1633  * @device:     DRBD device.
1634  * @peer_req:   peer request
1635  * @rw:         flag field, see bio->bi_opf
1636  *
1637  * May spread the pages to multiple bios,
1638  * depending on bio_add_page restrictions.
1639  *
1640  * Returns 0 if all bios have been submitted,
1641  * -ENOMEM if we could not allocate enough bios,
1642  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1643  *  single page to an empty bio (which should never happen and likely indicates
1644  *  that the lower level IO stack is in some way broken). This has been observed
1645  *  on certain Xen deployments.
1646  */
1647 /* TODO allocate from our own bio_set. */
1648 int drbd_submit_peer_request(struct drbd_device *device,
1649                              struct drbd_peer_request *peer_req,
1650                              const unsigned op, const unsigned op_flags,
1651                              const int fault_type)
1652 {
1653         struct bio *bios = NULL;
1654         struct bio *bio;
1655         struct page *page = peer_req->pages;
1656         sector_t sector = peer_req->i.sector;
1657         unsigned data_size = peer_req->i.size;
1658         unsigned n_bios = 0;
1659         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1660         int err = -ENOMEM;
1661
1662         /* TRIM/DISCARD: for now, always use the helper function
1663          * blkdev_issue_zeroout(..., discard=true).
1664          * It's synchronous, but it does the right thing wrt. bio splitting.
1665          * Correctness first, performance later.  Next step is to code an
1666          * asynchronous variant of the same.
1667          */
1668         if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1669                 /* wait for all pending IO completions, before we start
1670                  * zeroing things out. */
1671                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1672                 /* add it to the active list now,
1673                  * so we can find it to present it in debugfs */
1674                 peer_req->submit_jif = jiffies;
1675                 peer_req->flags |= EE_SUBMITTED;
1676
1677                 /* If this was a resync request from receive_rs_deallocated(),
1678                  * it is already on the sync_ee list */
1679                 if (list_empty(&peer_req->w.list)) {
1680                         spin_lock_irq(&device->resource->req_lock);
1681                         list_add_tail(&peer_req->w.list, &device->active_ee);
1682                         spin_unlock_irq(&device->resource->req_lock);
1683                 }
1684
1685                 if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1686                         drbd_issue_peer_discard_or_zero_out(device, peer_req);
1687                 else /* EE_WRITE_SAME */
1688                         drbd_issue_peer_wsame(device, peer_req);
1689                 return 0;
1690         }
1691
1692         /* In most cases, we will only need one bio.  But in case the lower
1693          * level restrictions happen to be different at this offset on this
1694          * side than those of the sending peer, we may need to submit the
1695          * request in more than one bio.
1696          *
1697          * Plain bio_alloc is good enough here, this is no DRBD internally
1698          * generated bio, but a bio allocated on behalf of the peer.
1699          */
1700 next_bio:
1701         bio = bio_alloc(GFP_NOIO, nr_pages);
1702         if (!bio) {
1703                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1704                 goto fail;
1705         }
1706         /* > peer_req->i.sector, unless this is the first bio */
1707         bio->bi_iter.bi_sector = sector;
1708         bio_set_dev(bio, device->ldev->backing_bdev);
1709         bio_set_op_attrs(bio, op, op_flags);
1710         bio->bi_private = peer_req;
1711         bio->bi_end_io = drbd_peer_request_endio;
1712
1713         bio->bi_next = bios;
1714         bios = bio;
1715         ++n_bios;
1716
1717         page_chain_for_each(page) {
1718                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1719                 if (!bio_add_page(bio, page, len, 0))
1720                         goto next_bio;
1721                 data_size -= len;
1722                 sector += len >> 9;
1723                 --nr_pages;
1724         }
1725         D_ASSERT(device, data_size == 0);
1726         D_ASSERT(device, page == NULL);
1727
1728         atomic_set(&peer_req->pending_bios, n_bios);
1729         /* for debugfs: update timestamp, mark as submitted */
1730         peer_req->submit_jif = jiffies;
1731         peer_req->flags |= EE_SUBMITTED;
1732         do {
1733                 bio = bios;
1734                 bios = bios->bi_next;
1735                 bio->bi_next = NULL;
1736
1737                 drbd_generic_make_request(device, fault_type, bio);
1738         } while (bios);
1739         return 0;
1740
1741 fail:
1742         while (bios) {
1743                 bio = bios;
1744                 bios = bios->bi_next;
1745                 bio_put(bio);
1746         }
1747         return err;
1748 }
1749
1750 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1751                                              struct drbd_peer_request *peer_req)
1752 {
1753         struct drbd_interval *i = &peer_req->i;
1754
1755         drbd_remove_interval(&device->write_requests, i);
1756         drbd_clear_interval(i);
1757
1758         /* Wake up any processes waiting for this peer request to complete.  */
1759         if (i->waiting)
1760                 wake_up(&device->misc_wait);
1761 }
1762
1763 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1764 {
1765         struct drbd_peer_device *peer_device;
1766         int vnr;
1767
1768         rcu_read_lock();
1769         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1770                 struct drbd_device *device = peer_device->device;
1771
1772                 kref_get(&device->kref);
1773                 rcu_read_unlock();
1774                 drbd_wait_ee_list_empty(device, &device->active_ee);
1775                 kref_put(&device->kref, drbd_destroy_device);
1776                 rcu_read_lock();
1777         }
1778         rcu_read_unlock();
1779 }
1780
1781 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1782 {
1783         int rv;
1784         struct p_barrier *p = pi->data;
1785         struct drbd_epoch *epoch;
1786
1787         /* FIXME these are unacked on connection,
1788          * not a specific (peer)device.
1789          */
1790         connection->current_epoch->barrier_nr = p->barrier;
1791         connection->current_epoch->connection = connection;
1792         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1793
1794         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1795          * the activity log, which means it would not be resynced in case the
1796          * R_PRIMARY crashes now.
1797          * Therefore we must send the barrier_ack after the barrier request was
1798          * completed. */
1799         switch (connection->resource->write_ordering) {
1800         case WO_NONE:
1801                 if (rv == FE_RECYCLED)
1802                         return 0;
1803
1804                 /* receiver context, in the writeout path of the other node.
1805                  * avoid potential distributed deadlock */
1806                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1807                 if (epoch)
1808                         break;
1809                 else
1810                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1811                         /* Fall through */
1812
1813         case WO_BDEV_FLUSH:
1814         case WO_DRAIN_IO:
1815                 conn_wait_active_ee_empty(connection);
1816                 drbd_flush(connection);
1817
1818                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1819                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1820                         if (epoch)
1821                                 break;
1822                 }
1823
1824                 return 0;
1825         default:
1826                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1827                          connection->resource->write_ordering);
1828                 return -EIO;
1829         }
1830
1831         epoch->flags = 0;
1832         atomic_set(&epoch->epoch_size, 0);
1833         atomic_set(&epoch->active, 0);
1834
1835         spin_lock(&connection->epoch_lock);
1836         if (atomic_read(&connection->current_epoch->epoch_size)) {
1837                 list_add(&epoch->list, &connection->current_epoch->list);
1838                 connection->current_epoch = epoch;
1839                 connection->epochs++;
1840         } else {
1841                 /* The current_epoch got recycled while we allocated this one... */
1842                 kfree(epoch);
1843         }
1844         spin_unlock(&connection->epoch_lock);
1845
1846         return 0;
1847 }
1848
1849 /* quick wrapper in case payload size != request_size (write same) */
1850 static void drbd_csum_ee_size(struct crypto_shash *h,
1851                               struct drbd_peer_request *r, void *d,
1852                               unsigned int payload_size)
1853 {
1854         unsigned int tmp = r->i.size;
1855         r->i.size = payload_size;
1856         drbd_csum_ee(h, r, d);
1857         r->i.size = tmp;
1858 }
1859
1860 /* used from receive_RSDataReply (recv_resync_read)
1861  * and from receive_Data.
1862  * data_size: actual payload ("data in")
1863  *      for normal writes that is bi_size.
1864  *      for discards, that is zero.
1865  *      for write same, it is logical_block_size.
1866  * both trim and write same have the bi_size ("data len to be affected")
1867  * as extra argument in the packet header.
1868  */
1869 static struct drbd_peer_request *
1870 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1871               struct packet_info *pi) __must_hold(local)
1872 {
1873         struct drbd_device *device = peer_device->device;
1874         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1875         struct drbd_peer_request *peer_req;
1876         struct page *page;
1877         int digest_size, err;
1878         unsigned int data_size = pi->size, ds;
1879         void *dig_in = peer_device->connection->int_dig_in;
1880         void *dig_vv = peer_device->connection->int_dig_vv;
1881         unsigned long *data;
1882         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1883         struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1884         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1885
1886         digest_size = 0;
1887         if (!trim && peer_device->connection->peer_integrity_tfm) {
1888                 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1889                 /*
1890                  * FIXME: Receive the incoming digest into the receive buffer
1891                  *        here, together with its struct p_data?
1892                  */
1893                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1894                 if (err)
1895                         return NULL;
1896                 data_size -= digest_size;
1897         }
1898
1899         /* assume request_size == data_size, but special case trim and wsame. */
1900         ds = data_size;
1901         if (trim) {
1902                 if (!expect(data_size == 0))
1903                         return NULL;
1904                 ds = be32_to_cpu(trim->size);
1905         } else if (zeroes) {
1906                 if (!expect(data_size == 0))
1907                         return NULL;
1908                 ds = be32_to_cpu(zeroes->size);
1909         } else if (wsame) {
1910                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1911                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1912                                 data_size, queue_logical_block_size(device->rq_queue));
1913                         return NULL;
1914                 }
1915                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1916                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1917                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1918                         return NULL;
1919                 }
1920                 ds = be32_to_cpu(wsame->size);
1921         }
1922
1923         if (!expect(IS_ALIGNED(ds, 512)))
1924                 return NULL;
1925         if (trim || wsame || zeroes) {
1926                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1927                         return NULL;
1928         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1929                 return NULL;
1930
1931         /* even though we trust out peer,
1932          * we sometimes have to double check. */
1933         if (sector + (ds>>9) > capacity) {
1934                 drbd_err(device, "request from peer beyond end of local disk: "
1935                         "capacity: %llus < sector: %llus + size: %u\n",
1936                         (unsigned long long)capacity,
1937                         (unsigned long long)sector, ds);
1938                 return NULL;
1939         }
1940
1941         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1942          * "criss-cross" setup, that might cause write-out on some other DRBD,
1943          * which in turn might block on the other node at this very place.  */
1944         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1945         if (!peer_req)
1946                 return NULL;
1947
1948         peer_req->flags |= EE_WRITE;
1949         if (trim) {
1950                 peer_req->flags |= EE_TRIM;
1951                 return peer_req;
1952         }
1953         if (zeroes) {
1954                 peer_req->flags |= EE_ZEROOUT;
1955                 return peer_req;
1956         }
1957         if (wsame)
1958                 peer_req->flags |= EE_WRITE_SAME;
1959
1960         /* receive payload size bytes into page chain */
1961         ds = data_size;
1962         page = peer_req->pages;
1963         page_chain_for_each(page) {
1964                 unsigned len = min_t(int, ds, PAGE_SIZE);
1965                 data = kmap(page);
1966                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1967                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1968                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1969                         data[0] = data[0] ^ (unsigned long)-1;
1970                 }
1971                 kunmap(page);
1972                 if (err) {
1973                         drbd_free_peer_req(device, peer_req);
1974                         return NULL;
1975                 }
1976                 ds -= len;
1977         }
1978
1979         if (digest_size) {
1980                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1981                 if (memcmp(dig_in, dig_vv, digest_size)) {
1982                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1983                                 (unsigned long long)sector, data_size);
1984                         drbd_free_peer_req(device, peer_req);
1985                         return NULL;
1986                 }
1987         }
1988         device->recv_cnt += data_size >> 9;
1989         return peer_req;
1990 }
1991
1992 /* drbd_drain_block() just takes a data block
1993  * out of the socket input buffer, and discards it.
1994  */
1995 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1996 {
1997         struct page *page;
1998         int err = 0;
1999         void *data;
2000
2001         if (!data_size)
2002                 return 0;
2003
2004         page = drbd_alloc_pages(peer_device, 1, 1);
2005
2006         data = kmap(page);
2007         while (data_size) {
2008                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
2009
2010                 err = drbd_recv_all_warn(peer_device->connection, data, len);
2011                 if (err)
2012                         break;
2013                 data_size -= len;
2014         }
2015         kunmap(page);
2016         drbd_free_pages(peer_device->device, page, 0);
2017         return err;
2018 }
2019
2020 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2021                            sector_t sector, int data_size)
2022 {
2023         struct bio_vec bvec;
2024         struct bvec_iter iter;
2025         struct bio *bio;
2026         int digest_size, err, expect;
2027         void *dig_in = peer_device->connection->int_dig_in;
2028         void *dig_vv = peer_device->connection->int_dig_vv;
2029
2030         digest_size = 0;
2031         if (peer_device->connection->peer_integrity_tfm) {
2032                 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2033                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2034                 if (err)
2035                         return err;
2036                 data_size -= digest_size;
2037         }
2038
2039         /* optimistically update recv_cnt.  if receiving fails below,
2040          * we disconnect anyways, and counters will be reset. */
2041         peer_device->device->recv_cnt += data_size>>9;
2042
2043         bio = req->master_bio;
2044         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2045
2046         bio_for_each_segment(bvec, bio, iter) {
2047                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2048                 expect = min_t(int, data_size, bvec.bv_len);
2049                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2050                 kunmap(bvec.bv_page);
2051                 if (err)
2052                         return err;
2053                 data_size -= expect;
2054         }
2055
2056         if (digest_size) {
2057                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2058                 if (memcmp(dig_in, dig_vv, digest_size)) {
2059                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2060                         return -EINVAL;
2061                 }
2062         }
2063
2064         D_ASSERT(peer_device->device, data_size == 0);
2065         return 0;
2066 }
2067
2068 /*
2069  * e_end_resync_block() is called in ack_sender context via
2070  * drbd_finish_peer_reqs().
2071  */
2072 static int e_end_resync_block(struct drbd_work *w, int unused)
2073 {
2074         struct drbd_peer_request *peer_req =
2075                 container_of(w, struct drbd_peer_request, w);
2076         struct drbd_peer_device *peer_device = peer_req->peer_device;
2077         struct drbd_device *device = peer_device->device;
2078         sector_t sector = peer_req->i.sector;
2079         int err;
2080
2081         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2082
2083         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2084                 drbd_set_in_sync(device, sector, peer_req->i.size);
2085                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2086         } else {
2087                 /* Record failure to sync */
2088                 drbd_rs_failed_io(device, sector, peer_req->i.size);
2089
2090                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2091         }
2092         dec_unacked(device);
2093
2094         return err;
2095 }
2096
2097 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2098                             struct packet_info *pi) __releases(local)
2099 {
2100         struct drbd_device *device = peer_device->device;
2101         struct drbd_peer_request *peer_req;
2102
2103         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2104         if (!peer_req)
2105                 goto fail;
2106
2107         dec_rs_pending(device);
2108
2109         inc_unacked(device);
2110         /* corresponding dec_unacked() in e_end_resync_block()
2111          * respective _drbd_clear_done_ee */
2112
2113         peer_req->w.cb = e_end_resync_block;
2114         peer_req->submit_jif = jiffies;
2115
2116         spin_lock_irq(&device->resource->req_lock);
2117         list_add_tail(&peer_req->w.list, &device->sync_ee);
2118         spin_unlock_irq(&device->resource->req_lock);
2119
2120         atomic_add(pi->size >> 9, &device->rs_sect_ev);
2121         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2122                                      DRBD_FAULT_RS_WR) == 0)
2123                 return 0;
2124
2125         /* don't care for the reason here */
2126         drbd_err(device, "submit failed, triggering re-connect\n");
2127         spin_lock_irq(&device->resource->req_lock);
2128         list_del(&peer_req->w.list);
2129         spin_unlock_irq(&device->resource->req_lock);
2130
2131         drbd_free_peer_req(device, peer_req);
2132 fail:
2133         put_ldev(device);
2134         return -EIO;
2135 }
2136
2137 static struct drbd_request *
2138 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2139              sector_t sector, bool missing_ok, const char *func)
2140 {
2141         struct drbd_request *req;
2142
2143         /* Request object according to our peer */
2144         req = (struct drbd_request *)(unsigned long)id;
2145         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2146                 return req;
2147         if (!missing_ok) {
2148                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2149                         (unsigned long)id, (unsigned long long)sector);
2150         }
2151         return NULL;
2152 }
2153
2154 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2155 {
2156         struct drbd_peer_device *peer_device;
2157         struct drbd_device *device;
2158         struct drbd_request *req;
2159         sector_t sector;
2160         int err;
2161         struct p_data *p = pi->data;
2162
2163         peer_device = conn_peer_device(connection, pi->vnr);
2164         if (!peer_device)
2165                 return -EIO;
2166         device = peer_device->device;
2167
2168         sector = be64_to_cpu(p->sector);
2169
2170         spin_lock_irq(&device->resource->req_lock);
2171         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2172         spin_unlock_irq(&device->resource->req_lock);
2173         if (unlikely(!req))
2174                 return -EIO;
2175
2176         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2177          * special casing it there for the various failure cases.
2178          * still no race with drbd_fail_pending_reads */
2179         err = recv_dless_read(peer_device, req, sector, pi->size);
2180         if (!err)
2181                 req_mod(req, DATA_RECEIVED);
2182         /* else: nothing. handled from drbd_disconnect...
2183          * I don't think we may complete this just yet
2184          * in case we are "on-disconnect: freeze" */
2185
2186         return err;
2187 }
2188
2189 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2190 {
2191         struct drbd_peer_device *peer_device;
2192         struct drbd_device *device;
2193         sector_t sector;
2194         int err;
2195         struct p_data *p = pi->data;
2196
2197         peer_device = conn_peer_device(connection, pi->vnr);
2198         if (!peer_device)
2199                 return -EIO;
2200         device = peer_device->device;
2201
2202         sector = be64_to_cpu(p->sector);
2203         D_ASSERT(device, p->block_id == ID_SYNCER);
2204
2205         if (get_ldev(device)) {
2206                 /* data is submitted to disk within recv_resync_read.
2207                  * corresponding put_ldev done below on error,
2208                  * or in drbd_peer_request_endio. */
2209                 err = recv_resync_read(peer_device, sector, pi);
2210         } else {
2211                 if (__ratelimit(&drbd_ratelimit_state))
2212                         drbd_err(device, "Can not write resync data to local disk.\n");
2213
2214                 err = drbd_drain_block(peer_device, pi->size);
2215
2216                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2217         }
2218
2219         atomic_add(pi->size >> 9, &device->rs_sect_in);
2220
2221         return err;
2222 }
2223
2224 static void restart_conflicting_writes(struct drbd_device *device,
2225                                        sector_t sector, int size)
2226 {
2227         struct drbd_interval *i;
2228         struct drbd_request *req;
2229
2230         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2231                 if (!i->local)
2232                         continue;
2233                 req = container_of(i, struct drbd_request, i);
2234                 if (req->rq_state & RQ_LOCAL_PENDING ||
2235                     !(req->rq_state & RQ_POSTPONED))
2236                         continue;
2237                 /* as it is RQ_POSTPONED, this will cause it to
2238                  * be queued on the retry workqueue. */
2239                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2240         }
2241 }
2242
2243 /*
2244  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2245  */
2246 static int e_end_block(struct drbd_work *w, int cancel)
2247 {
2248         struct drbd_peer_request *peer_req =
2249                 container_of(w, struct drbd_peer_request, w);
2250         struct drbd_peer_device *peer_device = peer_req->peer_device;
2251         struct drbd_device *device = peer_device->device;
2252         sector_t sector = peer_req->i.sector;
2253         int err = 0, pcmd;
2254
2255         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2256                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2257                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2258                                 device->state.conn <= C_PAUSED_SYNC_T &&
2259                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2260                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2261                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2262                         if (pcmd == P_RS_WRITE_ACK)
2263                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2264                 } else {
2265                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2266                         /* we expect it to be marked out of sync anyways...
2267                          * maybe assert this?  */
2268                 }
2269                 dec_unacked(device);
2270         }
2271
2272         /* we delete from the conflict detection hash _after_ we sent out the
2273          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2274         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2275                 spin_lock_irq(&device->resource->req_lock);
2276                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2277                 drbd_remove_epoch_entry_interval(device, peer_req);
2278                 if (peer_req->flags & EE_RESTART_REQUESTS)
2279                         restart_conflicting_writes(device, sector, peer_req->i.size);
2280                 spin_unlock_irq(&device->resource->req_lock);
2281         } else
2282                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2283
2284         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2285
2286         return err;
2287 }
2288
2289 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2290 {
2291         struct drbd_peer_request *peer_req =
2292                 container_of(w, struct drbd_peer_request, w);
2293         struct drbd_peer_device *peer_device = peer_req->peer_device;
2294         int err;
2295
2296         err = drbd_send_ack(peer_device, ack, peer_req);
2297         dec_unacked(peer_device->device);
2298
2299         return err;
2300 }
2301
2302 static int e_send_superseded(struct drbd_work *w, int unused)
2303 {
2304         return e_send_ack(w, P_SUPERSEDED);
2305 }
2306
2307 static int e_send_retry_write(struct drbd_work *w, int unused)
2308 {
2309         struct drbd_peer_request *peer_req =
2310                 container_of(w, struct drbd_peer_request, w);
2311         struct drbd_connection *connection = peer_req->peer_device->connection;
2312
2313         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2314                              P_RETRY_WRITE : P_SUPERSEDED);
2315 }
2316
2317 static bool seq_greater(u32 a, u32 b)
2318 {
2319         /*
2320          * We assume 32-bit wrap-around here.
2321          * For 24-bit wrap-around, we would have to shift:
2322          *  a <<= 8; b <<= 8;
2323          */
2324         return (s32)a - (s32)b > 0;
2325 }
2326
2327 static u32 seq_max(u32 a, u32 b)
2328 {
2329         return seq_greater(a, b) ? a : b;
2330 }
2331
2332 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2333 {
2334         struct drbd_device *device = peer_device->device;
2335         unsigned int newest_peer_seq;
2336
2337         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2338                 spin_lock(&device->peer_seq_lock);
2339                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2340                 device->peer_seq = newest_peer_seq;
2341                 spin_unlock(&device->peer_seq_lock);
2342                 /* wake up only if we actually changed device->peer_seq */
2343                 if (peer_seq == newest_peer_seq)
2344                         wake_up(&device->seq_wait);
2345         }
2346 }
2347
2348 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2349 {
2350         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2351 }
2352
2353 /* maybe change sync_ee into interval trees as well? */
2354 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2355 {
2356         struct drbd_peer_request *rs_req;
2357         bool rv = false;
2358
2359         spin_lock_irq(&device->resource->req_lock);
2360         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2361                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2362                              rs_req->i.sector, rs_req->i.size)) {
2363                         rv = true;
2364                         break;
2365                 }
2366         }
2367         spin_unlock_irq(&device->resource->req_lock);
2368
2369         return rv;
2370 }
2371
2372 /* Called from receive_Data.
2373  * Synchronize packets on sock with packets on msock.
2374  *
2375  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2376  * packet traveling on msock, they are still processed in the order they have
2377  * been sent.
2378  *
2379  * Note: we don't care for Ack packets overtaking P_DATA packets.
2380  *
2381  * In case packet_seq is larger than device->peer_seq number, there are
2382  * outstanding packets on the msock. We wait for them to arrive.
2383  * In case we are the logically next packet, we update device->peer_seq
2384  * ourselves. Correctly handles 32bit wrap around.
2385  *
2386  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2387  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2388  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2389  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2390  *
2391  * returns 0 if we may process the packet,
2392  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2393 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2394 {
2395         struct drbd_device *device = peer_device->device;
2396         DEFINE_WAIT(wait);
2397         long timeout;
2398         int ret = 0, tp;
2399
2400         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2401                 return 0;
2402
2403         spin_lock(&device->peer_seq_lock);
2404         for (;;) {
2405                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2406                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2407                         break;
2408                 }
2409
2410                 if (signal_pending(current)) {
2411                         ret = -ERESTARTSYS;
2412                         break;
2413                 }
2414
2415                 rcu_read_lock();
2416                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2417                 rcu_read_unlock();
2418
2419                 if (!tp)
2420                         break;
2421
2422                 /* Only need to wait if two_primaries is enabled */
2423                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2424                 spin_unlock(&device->peer_seq_lock);
2425                 rcu_read_lock();
2426                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2427                 rcu_read_unlock();
2428                 timeout = schedule_timeout(timeout);
2429                 spin_lock(&device->peer_seq_lock);
2430                 if (!timeout) {
2431                         ret = -ETIMEDOUT;
2432                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2433                         break;
2434                 }
2435         }
2436         spin_unlock(&device->peer_seq_lock);
2437         finish_wait(&device->seq_wait, &wait);
2438         return ret;
2439 }
2440
2441 /* see also bio_flags_to_wire()
2442  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2443  * flags and back. We may replicate to other kernel versions. */
2444 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2445 {
2446         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2447                 (dpf & DP_FUA ? REQ_FUA : 0) |
2448                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2449 }
2450
2451 static unsigned long wire_flags_to_bio_op(u32 dpf)
2452 {
2453         if (dpf & DP_ZEROES)
2454                 return REQ_OP_WRITE_ZEROES;
2455         if (dpf & DP_DISCARD)
2456                 return REQ_OP_DISCARD;
2457         if (dpf & DP_WSAME)
2458                 return REQ_OP_WRITE_SAME;
2459         else
2460                 return REQ_OP_WRITE;
2461 }
2462
2463 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2464                                     unsigned int size)
2465 {
2466         struct drbd_interval *i;
2467
2468     repeat:
2469         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2470                 struct drbd_request *req;
2471                 struct bio_and_error m;
2472
2473                 if (!i->local)
2474                         continue;
2475                 req = container_of(i, struct drbd_request, i);
2476                 if (!(req->rq_state & RQ_POSTPONED))
2477                         continue;
2478                 req->rq_state &= ~RQ_POSTPONED;
2479                 __req_mod(req, NEG_ACKED, &m);
2480                 spin_unlock_irq(&device->resource->req_lock);
2481                 if (m.bio)
2482                         complete_master_bio(device, &m);
2483                 spin_lock_irq(&device->resource->req_lock);
2484                 goto repeat;
2485         }
2486 }
2487
2488 static int handle_write_conflicts(struct drbd_device *device,
2489                                   struct drbd_peer_request *peer_req)
2490 {
2491         struct drbd_connection *connection = peer_req->peer_device->connection;
2492         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2493         sector_t sector = peer_req->i.sector;
2494         const unsigned int size = peer_req->i.size;
2495         struct drbd_interval *i;
2496         bool equal;
2497         int err;
2498
2499         /*
2500          * Inserting the peer request into the write_requests tree will prevent
2501          * new conflicting local requests from being added.
2502          */
2503         drbd_insert_interval(&device->write_requests, &peer_req->i);
2504
2505     repeat:
2506         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2507                 if (i == &peer_req->i)
2508                         continue;
2509                 if (i->completed)
2510                         continue;
2511
2512                 if (!i->local) {
2513                         /*
2514                          * Our peer has sent a conflicting remote request; this
2515                          * should not happen in a two-node setup.  Wait for the
2516                          * earlier peer request to complete.
2517                          */
2518                         err = drbd_wait_misc(device, i);
2519                         if (err)
2520                                 goto out;
2521                         goto repeat;
2522                 }
2523
2524                 equal = i->sector == sector && i->size == size;
2525                 if (resolve_conflicts) {
2526                         /*
2527                          * If the peer request is fully contained within the
2528                          * overlapping request, it can be considered overwritten
2529                          * and thus superseded; otherwise, it will be retried
2530                          * once all overlapping requests have completed.
2531                          */
2532                         bool superseded = i->sector <= sector && i->sector +
2533                                        (i->size >> 9) >= sector + (size >> 9);
2534
2535                         if (!equal)
2536                                 drbd_alert(device, "Concurrent writes detected: "
2537                                                "local=%llus +%u, remote=%llus +%u, "
2538                                                "assuming %s came first\n",
2539                                           (unsigned long long)i->sector, i->size,
2540                                           (unsigned long long)sector, size,
2541                                           superseded ? "local" : "remote");
2542
2543                         peer_req->w.cb = superseded ? e_send_superseded :
2544                                                    e_send_retry_write;
2545                         list_add_tail(&peer_req->w.list, &device->done_ee);
2546                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2547
2548                         err = -ENOENT;
2549                         goto out;
2550                 } else {
2551                         struct drbd_request *req =
2552                                 container_of(i, struct drbd_request, i);
2553
2554                         if (!equal)
2555                                 drbd_alert(device, "Concurrent writes detected: "
2556                                                "local=%llus +%u, remote=%llus +%u\n",
2557                                           (unsigned long long)i->sector, i->size,
2558                                           (unsigned long long)sector, size);
2559
2560                         if (req->rq_state & RQ_LOCAL_PENDING ||
2561                             !(req->rq_state & RQ_POSTPONED)) {
2562                                 /*
2563                                  * Wait for the node with the discard flag to
2564                                  * decide if this request has been superseded
2565                                  * or needs to be retried.
2566                                  * Requests that have been superseded will
2567                                  * disappear from the write_requests tree.
2568                                  *
2569                                  * In addition, wait for the conflicting
2570                                  * request to finish locally before submitting
2571                                  * the conflicting peer request.
2572                                  */
2573                                 err = drbd_wait_misc(device, &req->i);
2574                                 if (err) {
2575                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2576                                         fail_postponed_requests(device, sector, size);
2577                                         goto out;
2578                                 }
2579                                 goto repeat;
2580                         }
2581                         /*
2582                          * Remember to restart the conflicting requests after
2583                          * the new peer request has completed.
2584                          */
2585                         peer_req->flags |= EE_RESTART_REQUESTS;
2586                 }
2587         }
2588         err = 0;
2589
2590     out:
2591         if (err)
2592                 drbd_remove_epoch_entry_interval(device, peer_req);
2593         return err;
2594 }
2595
2596 /* mirrored write */
2597 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2598 {
2599         struct drbd_peer_device *peer_device;
2600         struct drbd_device *device;
2601         struct net_conf *nc;
2602         sector_t sector;
2603         struct drbd_peer_request *peer_req;
2604         struct p_data *p = pi->data;
2605         u32 peer_seq = be32_to_cpu(p->seq_num);
2606         int op, op_flags;
2607         u32 dp_flags;
2608         int err, tp;
2609
2610         peer_device = conn_peer_device(connection, pi->vnr);
2611         if (!peer_device)
2612                 return -EIO;
2613         device = peer_device->device;
2614
2615         if (!get_ldev(device)) {
2616                 int err2;
2617
2618                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2619                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2620                 atomic_inc(&connection->current_epoch->epoch_size);
2621                 err2 = drbd_drain_block(peer_device, pi->size);
2622                 if (!err)
2623                         err = err2;
2624                 return err;
2625         }
2626
2627         /*
2628          * Corresponding put_ldev done either below (on various errors), or in
2629          * drbd_peer_request_endio, if we successfully submit the data at the
2630          * end of this function.
2631          */
2632
2633         sector = be64_to_cpu(p->sector);
2634         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2635         if (!peer_req) {
2636                 put_ldev(device);
2637                 return -EIO;
2638         }
2639
2640         peer_req->w.cb = e_end_block;
2641         peer_req->submit_jif = jiffies;
2642         peer_req->flags |= EE_APPLICATION;
2643
2644         dp_flags = be32_to_cpu(p->dp_flags);
2645         op = wire_flags_to_bio_op(dp_flags);
2646         op_flags = wire_flags_to_bio_flags(dp_flags);
2647         if (pi->cmd == P_TRIM) {
2648                 D_ASSERT(peer_device, peer_req->i.size > 0);
2649                 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2650                 D_ASSERT(peer_device, peer_req->pages == NULL);
2651                 /* need to play safe: an older DRBD sender
2652                  * may mean zero-out while sending P_TRIM. */
2653                 if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2654                         peer_req->flags |= EE_ZEROOUT;
2655         } else if (pi->cmd == P_ZEROES) {
2656                 D_ASSERT(peer_device, peer_req->i.size > 0);
2657                 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2658                 D_ASSERT(peer_device, peer_req->pages == NULL);
2659                 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2660                 if (dp_flags & DP_DISCARD)
2661                         peer_req->flags |= EE_TRIM;
2662         } else if (peer_req->pages == NULL) {
2663                 D_ASSERT(device, peer_req->i.size == 0);
2664                 D_ASSERT(device, dp_flags & DP_FLUSH);
2665         }
2666
2667         if (dp_flags & DP_MAY_SET_IN_SYNC)
2668                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2669
2670         spin_lock(&connection->epoch_lock);
2671         peer_req->epoch = connection->current_epoch;
2672         atomic_inc(&peer_req->epoch->epoch_size);
2673         atomic_inc(&peer_req->epoch->active);
2674         spin_unlock(&connection->epoch_lock);
2675
2676         rcu_read_lock();
2677         nc = rcu_dereference(peer_device->connection->net_conf);
2678         tp = nc->two_primaries;
2679         if (peer_device->connection->agreed_pro_version < 100) {
2680                 switch (nc->wire_protocol) {
2681                 case DRBD_PROT_C:
2682                         dp_flags |= DP_SEND_WRITE_ACK;
2683                         break;
2684                 case DRBD_PROT_B:
2685                         dp_flags |= DP_SEND_RECEIVE_ACK;
2686                         break;
2687                 }
2688         }
2689         rcu_read_unlock();
2690
2691         if (dp_flags & DP_SEND_WRITE_ACK) {
2692                 peer_req->flags |= EE_SEND_WRITE_ACK;
2693                 inc_unacked(device);
2694                 /* corresponding dec_unacked() in e_end_block()
2695                  * respective _drbd_clear_done_ee */
2696         }
2697
2698         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2699                 /* I really don't like it that the receiver thread
2700                  * sends on the msock, but anyways */
2701                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2702         }
2703
2704         if (tp) {
2705                 /* two primaries implies protocol C */
2706                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2707                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2708                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2709                 if (err)
2710                         goto out_interrupted;
2711                 spin_lock_irq(&device->resource->req_lock);
2712                 err = handle_write_conflicts(device, peer_req);
2713                 if (err) {
2714                         spin_unlock_irq(&device->resource->req_lock);
2715                         if (err == -ENOENT) {
2716                                 put_ldev(device);
2717                                 return 0;
2718                         }
2719                         goto out_interrupted;
2720                 }
2721         } else {
2722                 update_peer_seq(peer_device, peer_seq);
2723                 spin_lock_irq(&device->resource->req_lock);
2724         }
2725         /* TRIM and WRITE_SAME are processed synchronously,
2726          * we wait for all pending requests, respectively wait for
2727          * active_ee to become empty in drbd_submit_peer_request();
2728          * better not add ourselves here. */
2729         if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2730                 list_add_tail(&peer_req->w.list, &device->active_ee);
2731         spin_unlock_irq(&device->resource->req_lock);
2732
2733         if (device->state.conn == C_SYNC_TARGET)
2734                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2735
2736         if (device->state.pdsk < D_INCONSISTENT) {
2737                 /* In case we have the only disk of the cluster, */
2738                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2739                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2740                 drbd_al_begin_io(device, &peer_req->i);
2741                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2742         }
2743
2744         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2745                                        DRBD_FAULT_DT_WR);
2746         if (!err)
2747                 return 0;
2748
2749         /* don't care for the reason here */
2750         drbd_err(device, "submit failed, triggering re-connect\n");
2751         spin_lock_irq(&device->resource->req_lock);
2752         list_del(&peer_req->w.list);
2753         drbd_remove_epoch_entry_interval(device, peer_req);
2754         spin_unlock_irq(&device->resource->req_lock);
2755         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2756                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2757                 drbd_al_complete_io(device, &peer_req->i);
2758         }
2759
2760 out_interrupted:
2761         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2762         put_ldev(device);
2763         drbd_free_peer_req(device, peer_req);
2764         return err;
2765 }
2766
2767 /* We may throttle resync, if the lower device seems to be busy,
2768  * and current sync rate is above c_min_rate.
2769  *
2770  * To decide whether or not the lower device is busy, we use a scheme similar
2771  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2772  * (more than 64 sectors) of activity we cannot account for with our own resync
2773  * activity, it obviously is "busy".
2774  *
2775  * The current sync rate used here uses only the most recent two step marks,
2776  * to have a short time average so we can react faster.
2777  */
2778 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2779                 bool throttle_if_app_is_waiting)
2780 {
2781         struct lc_element *tmp;
2782         bool throttle = drbd_rs_c_min_rate_throttle(device);
2783
2784         if (!throttle || throttle_if_app_is_waiting)
2785                 return throttle;
2786
2787         spin_lock_irq(&device->al_lock);
2788         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2789         if (tmp) {
2790                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2791                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2792                         throttle = false;
2793                 /* Do not slow down if app IO is already waiting for this extent,
2794                  * and our progress is necessary for application IO to complete. */
2795         }
2796         spin_unlock_irq(&device->al_lock);
2797
2798         return throttle;
2799 }
2800
2801 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2802 {
2803         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2804         unsigned long db, dt, dbdt;
2805         unsigned int c_min_rate;
2806         int curr_events;
2807
2808         rcu_read_lock();
2809         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2810         rcu_read_unlock();
2811
2812         /* feature disabled? */
2813         if (c_min_rate == 0)
2814                 return false;
2815
2816         curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2817                         atomic_read(&device->rs_sect_ev);
2818
2819         if (atomic_read(&device->ap_actlog_cnt)
2820             || curr_events - device->rs_last_events > 64) {
2821                 unsigned long rs_left;
2822                 int i;
2823
2824                 device->rs_last_events = curr_events;
2825
2826                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2827                  * approx. */
2828                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2829
2830                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2831                         rs_left = device->ov_left;
2832                 else
2833                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2834
2835                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2836                 if (!dt)
2837                         dt++;
2838                 db = device->rs_mark_left[i] - rs_left;
2839                 dbdt = Bit2KB(db/dt);
2840
2841                 if (dbdt > c_min_rate)
2842                         return true;
2843         }
2844         return false;
2845 }
2846
2847 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2848 {
2849         struct drbd_peer_device *peer_device;
2850         struct drbd_device *device;
2851         sector_t sector;
2852         sector_t capacity;
2853         struct drbd_peer_request *peer_req;
2854         struct digest_info *di = NULL;
2855         int size, verb;
2856         unsigned int fault_type;
2857         struct p_block_req *p = pi->data;
2858
2859         peer_device = conn_peer_device(connection, pi->vnr);
2860         if (!peer_device)
2861                 return -EIO;
2862         device = peer_device->device;
2863         capacity = drbd_get_capacity(device->this_bdev);
2864
2865         sector = be64_to_cpu(p->sector);
2866         size   = be32_to_cpu(p->blksize);
2867
2868         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2869                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2870                                 (unsigned long long)sector, size);
2871                 return -EINVAL;
2872         }
2873         if (sector + (size>>9) > capacity) {
2874                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2875                                 (unsigned long long)sector, size);
2876                 return -EINVAL;
2877         }
2878
2879         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2880                 verb = 1;
2881                 switch (pi->cmd) {
2882                 case P_DATA_REQUEST:
2883                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2884                         break;
2885                 case P_RS_THIN_REQ:
2886                 case P_RS_DATA_REQUEST:
2887                 case P_CSUM_RS_REQUEST:
2888                 case P_OV_REQUEST:
2889                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2890                         break;
2891                 case P_OV_REPLY:
2892                         verb = 0;
2893                         dec_rs_pending(device);
2894                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2895                         break;
2896                 default:
2897                         BUG();
2898                 }
2899                 if (verb && __ratelimit(&drbd_ratelimit_state))
2900                         drbd_err(device, "Can not satisfy peer's read request, "
2901                             "no local data.\n");
2902
2903                 /* drain possibly payload */
2904                 return drbd_drain_block(peer_device, pi->size);
2905         }
2906
2907         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2908          * "criss-cross" setup, that might cause write-out on some other DRBD,
2909          * which in turn might block on the other node at this very place.  */
2910         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2911                         size, GFP_NOIO);
2912         if (!peer_req) {
2913                 put_ldev(device);
2914                 return -ENOMEM;
2915         }
2916
2917         switch (pi->cmd) {
2918         case P_DATA_REQUEST:
2919                 peer_req->w.cb = w_e_end_data_req;
2920                 fault_type = DRBD_FAULT_DT_RD;
2921                 /* application IO, don't drbd_rs_begin_io */
2922                 peer_req->flags |= EE_APPLICATION;
2923                 goto submit;
2924
2925         case P_RS_THIN_REQ:
2926                 /* If at some point in the future we have a smart way to
2927                    find out if this data block is completely deallocated,
2928                    then we would do something smarter here than reading
2929                    the block... */
2930                 peer_req->flags |= EE_RS_THIN_REQ;
2931                 /* fall through */
2932         case P_RS_DATA_REQUEST:
2933                 peer_req->w.cb = w_e_end_rsdata_req;
2934                 fault_type = DRBD_FAULT_RS_RD;
2935                 /* used in the sector offset progress display */
2936                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2937                 break;
2938
2939         case P_OV_REPLY:
2940         case P_CSUM_RS_REQUEST:
2941                 fault_type = DRBD_FAULT_RS_RD;
2942                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2943                 if (!di)
2944                         goto out_free_e;
2945
2946                 di->digest_size = pi->size;
2947                 di->digest = (((char *)di)+sizeof(struct digest_info));
2948
2949                 peer_req->digest = di;
2950                 peer_req->flags |= EE_HAS_DIGEST;
2951
2952                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2953                         goto out_free_e;
2954
2955                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2956                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2957                         peer_req->w.cb = w_e_end_csum_rs_req;
2958                         /* used in the sector offset progress display */
2959                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2960                         /* remember to report stats in drbd_resync_finished */
2961                         device->use_csums = true;
2962                 } else if (pi->cmd == P_OV_REPLY) {
2963                         /* track progress, we may need to throttle */
2964                         atomic_add(size >> 9, &device->rs_sect_in);
2965                         peer_req->w.cb = w_e_end_ov_reply;
2966                         dec_rs_pending(device);
2967                         /* drbd_rs_begin_io done when we sent this request,
2968                          * but accounting still needs to be done. */
2969                         goto submit_for_resync;
2970                 }
2971                 break;
2972
2973         case P_OV_REQUEST:
2974                 if (device->ov_start_sector == ~(sector_t)0 &&
2975                     peer_device->connection->agreed_pro_version >= 90) {
2976                         unsigned long now = jiffies;
2977                         int i;
2978                         device->ov_start_sector = sector;
2979                         device->ov_position = sector;
2980                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2981                         device->rs_total = device->ov_left;
2982                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2983                                 device->rs_mark_left[i] = device->ov_left;
2984                                 device->rs_mark_time[i] = now;
2985                         }
2986                         drbd_info(device, "Online Verify start sector: %llu\n",
2987                                         (unsigned long long)sector);
2988                 }
2989                 peer_req->w.cb = w_e_end_ov_req;
2990                 fault_type = DRBD_FAULT_RS_RD;
2991                 break;
2992
2993         default:
2994                 BUG();
2995         }
2996
2997         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2998          * wrt the receiver, but it is not as straightforward as it may seem.
2999          * Various places in the resync start and stop logic assume resync
3000          * requests are processed in order, requeuing this on the worker thread
3001          * introduces a bunch of new code for synchronization between threads.
3002          *
3003          * Unlimited throttling before drbd_rs_begin_io may stall the resync
3004          * "forever", throttling after drbd_rs_begin_io will lock that extent
3005          * for application writes for the same time.  For now, just throttle
3006          * here, where the rest of the code expects the receiver to sleep for
3007          * a while, anyways.
3008          */
3009
3010         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
3011          * this defers syncer requests for some time, before letting at least
3012          * on request through.  The resync controller on the receiving side
3013          * will adapt to the incoming rate accordingly.
3014          *
3015          * We cannot throttle here if remote is Primary/SyncTarget:
3016          * we would also throttle its application reads.
3017          * In that case, throttling is done on the SyncTarget only.
3018          */
3019
3020         /* Even though this may be a resync request, we do add to "read_ee";
3021          * "sync_ee" is only used for resync WRITEs.
3022          * Add to list early, so debugfs can find this request
3023          * even if we have to sleep below. */
3024         spin_lock_irq(&device->resource->req_lock);
3025         list_add_tail(&peer_req->w.list, &device->read_ee);
3026         spin_unlock_irq(&device->resource->req_lock);
3027
3028         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3029         if (device->state.peer != R_PRIMARY
3030         && drbd_rs_should_slow_down(device, sector, false))
3031                 schedule_timeout_uninterruptible(HZ/10);
3032         update_receiver_timing_details(connection, drbd_rs_begin_io);
3033         if (drbd_rs_begin_io(device, sector))
3034                 goto out_free_e;
3035
3036 submit_for_resync:
3037         atomic_add(size >> 9, &device->rs_sect_ev);
3038
3039 submit:
3040         update_receiver_timing_details(connection, drbd_submit_peer_request);
3041         inc_unacked(device);
3042         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3043                                      fault_type) == 0)
3044                 return 0;
3045
3046         /* don't care for the reason here */
3047         drbd_err(device, "submit failed, triggering re-connect\n");
3048
3049 out_free_e:
3050         spin_lock_irq(&device->resource->req_lock);
3051         list_del(&peer_req->w.list);
3052         spin_unlock_irq(&device->resource->req_lock);
3053         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
3054
3055         put_ldev(device);
3056         drbd_free_peer_req(device, peer_req);
3057         return -EIO;
3058 }
3059
3060 /**
3061  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3062  */
3063 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3064 {
3065         struct drbd_device *device = peer_device->device;
3066         int self, peer, rv = -100;
3067         unsigned long ch_self, ch_peer;
3068         enum drbd_after_sb_p after_sb_0p;
3069
3070         self = device->ldev->md.uuid[UI_BITMAP] & 1;
3071         peer = device->p_uuid[UI_BITMAP] & 1;
3072
3073         ch_peer = device->p_uuid[UI_SIZE];
3074         ch_self = device->comm_bm_set;
3075
3076         rcu_read_lock();
3077         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3078         rcu_read_unlock();
3079         switch (after_sb_0p) {
3080         case ASB_CONSENSUS:
3081         case ASB_DISCARD_SECONDARY:
3082         case ASB_CALL_HELPER:
3083         case ASB_VIOLENTLY:
3084                 drbd_err(device, "Configuration error.\n");
3085                 break;
3086         case ASB_DISCONNECT:
3087                 break;
3088         case ASB_DISCARD_YOUNGER_PRI:
3089                 if (self == 0 && peer == 1) {
3090                         rv = -1;
3091                         break;
3092                 }
3093                 if (self == 1 && peer == 0) {
3094                         rv =  1;
3095                         break;
3096                 }
3097                 /* Else fall through to one of the other strategies... */
3098         case ASB_DISCARD_OLDER_PRI:
3099                 if (self == 0 && peer == 1) {
3100                         rv = 1;
3101                         break;
3102                 }
3103                 if (self == 1 && peer == 0) {
3104                         rv = -1;
3105                         break;
3106                 }
3107                 /* Else fall through to one of the other strategies... */
3108                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3109                      "Using discard-least-changes instead\n");
3110                 /* fall through */
3111         case ASB_DISCARD_ZERO_CHG:
3112                 if (ch_peer == 0 && ch_self == 0) {
3113                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3114                                 ? -1 : 1;
3115                         break;
3116                 } else {
3117                         if (ch_peer == 0) { rv =  1; break; }
3118                         if (ch_self == 0) { rv = -1; break; }
3119                 }
3120                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3121                         break;
3122                 /* else: fall through */
3123         case ASB_DISCARD_LEAST_CHG:
3124                 if      (ch_self < ch_peer)
3125                         rv = -1;
3126                 else if (ch_self > ch_peer)
3127                         rv =  1;
3128                 else /* ( ch_self == ch_peer ) */
3129                      /* Well, then use something else. */
3130                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3131                                 ? -1 : 1;
3132                 break;
3133         case ASB_DISCARD_LOCAL:
3134                 rv = -1;
3135                 break;
3136         case ASB_DISCARD_REMOTE:
3137                 rv =  1;
3138         }
3139
3140         return rv;
3141 }
3142
3143 /**
3144  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3145  */
3146 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3147 {
3148         struct drbd_device *device = peer_device->device;
3149         int hg, rv = -100;
3150         enum drbd_after_sb_p after_sb_1p;
3151
3152         rcu_read_lock();
3153         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3154         rcu_read_unlock();
3155         switch (after_sb_1p) {
3156         case ASB_DISCARD_YOUNGER_PRI:
3157         case ASB_DISCARD_OLDER_PRI:
3158         case ASB_DISCARD_LEAST_CHG:
3159         case ASB_DISCARD_LOCAL:
3160         case ASB_DISCARD_REMOTE:
3161         case ASB_DISCARD_ZERO_CHG:
3162                 drbd_err(device, "Configuration error.\n");
3163                 break;
3164         case ASB_DISCONNECT:
3165                 break;
3166         case ASB_CONSENSUS:
3167                 hg = drbd_asb_recover_0p(peer_device);
3168                 if (hg == -1 && device->state.role == R_SECONDARY)
3169                         rv = hg;
3170                 if (hg == 1  && device->state.role == R_PRIMARY)
3171                         rv = hg;
3172                 break;
3173         case ASB_VIOLENTLY:
3174                 rv = drbd_asb_recover_0p(peer_device);
3175                 break;
3176         case ASB_DISCARD_SECONDARY:
3177                 return device->state.role == R_PRIMARY ? 1 : -1;
3178         case ASB_CALL_HELPER:
3179                 hg = drbd_asb_recover_0p(peer_device);
3180                 if (hg == -1 && device->state.role == R_PRIMARY) {
3181                         enum drbd_state_rv rv2;
3182
3183                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3184                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3185                           * we do not need to wait for the after state change work either. */
3186                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3187                         if (rv2 != SS_SUCCESS) {
3188                                 drbd_khelper(device, "pri-lost-after-sb");
3189                         } else {
3190                                 drbd_warn(device, "Successfully gave up primary role.\n");
3191                                 rv = hg;
3192                         }
3193                 } else
3194                         rv = hg;
3195         }
3196
3197         return rv;
3198 }
3199
3200 /**
3201  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3202  */
3203 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3204 {
3205         struct drbd_device *device = peer_device->device;
3206         int hg, rv = -100;
3207         enum drbd_after_sb_p after_sb_2p;
3208
3209         rcu_read_lock();
3210         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3211         rcu_read_unlock();
3212         switch (after_sb_2p) {
3213         case ASB_DISCARD_YOUNGER_PRI:
3214         case ASB_DISCARD_OLDER_PRI:
3215         case ASB_DISCARD_LEAST_CHG:
3216         case ASB_DISCARD_LOCAL:
3217         case ASB_DISCARD_REMOTE:
3218         case ASB_CONSENSUS:
3219         case ASB_DISCARD_SECONDARY:
3220         case ASB_DISCARD_ZERO_CHG:
3221                 drbd_err(device, "Configuration error.\n");
3222                 break;
3223         case ASB_VIOLENTLY:
3224                 rv = drbd_asb_recover_0p(peer_device);
3225                 break;
3226         case ASB_DISCONNECT:
3227                 break;
3228         case ASB_CALL_HELPER:
3229                 hg = drbd_asb_recover_0p(peer_device);
3230                 if (hg == -1) {
3231                         enum drbd_state_rv rv2;
3232
3233                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3234                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3235                           * we do not need to wait for the after state change work either. */
3236                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3237                         if (rv2 != SS_SUCCESS) {
3238                                 drbd_khelper(device, "pri-lost-after-sb");
3239                         } else {
3240                                 drbd_warn(device, "Successfully gave up primary role.\n");
3241                                 rv = hg;
3242                         }
3243                 } else
3244                         rv = hg;
3245         }
3246
3247         return rv;
3248 }
3249
3250 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3251                            u64 bits, u64 flags)
3252 {
3253         if (!uuid) {
3254                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3255                 return;
3256         }
3257         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3258              text,
3259              (unsigned long long)uuid[UI_CURRENT],
3260              (unsigned long long)uuid[UI_BITMAP],
3261              (unsigned long long)uuid[UI_HISTORY_START],
3262              (unsigned long long)uuid[UI_HISTORY_END],
3263              (unsigned long long)bits,
3264              (unsigned long long)flags);
3265 }
3266
3267 /*
3268   100   after split brain try auto recover
3269     2   C_SYNC_SOURCE set BitMap
3270     1   C_SYNC_SOURCE use BitMap
3271     0   no Sync
3272    -1   C_SYNC_TARGET use BitMap
3273    -2   C_SYNC_TARGET set BitMap
3274  -100   after split brain, disconnect
3275 -1000   unrelated data
3276 -1091   requires proto 91
3277 -1096   requires proto 96
3278  */
3279
3280 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3281 {
3282         struct drbd_peer_device *const peer_device = first_peer_device(device);
3283         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3284         u64 self, peer;
3285         int i, j;
3286
3287         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3288         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3289
3290         *rule_nr = 10;
3291         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3292                 return 0;
3293
3294         *rule_nr = 20;
3295         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3296              peer != UUID_JUST_CREATED)
3297                 return -2;
3298
3299         *rule_nr = 30;
3300         if (self != UUID_JUST_CREATED &&
3301             (peer == UUID_JUST_CREATED || peer == (u64)0))
3302                 return 2;
3303
3304         if (self == peer) {
3305                 int rct, dc; /* roles at crash time */
3306
3307                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3308
3309                         if (connection->agreed_pro_version < 91)
3310                                 return -1091;
3311
3312                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3313                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3314                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3315                                 drbd_uuid_move_history(device);
3316                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3317                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3318
3319                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3320                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3321                                 *rule_nr = 34;
3322                         } else {
3323                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3324                                 *rule_nr = 36;
3325                         }
3326
3327                         return 1;
3328                 }
3329
3330                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3331
3332                         if (connection->agreed_pro_version < 91)
3333                                 return -1091;
3334
3335                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3336                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3337                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3338
3339                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3340                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3341                                 device->p_uuid[UI_BITMAP] = 0UL;
3342
3343                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3344                                 *rule_nr = 35;
3345                         } else {
3346                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3347                                 *rule_nr = 37;
3348                         }
3349
3350                         return -1;
3351                 }
3352
3353                 /* Common power [off|failure] */
3354                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3355                         (device->p_uuid[UI_FLAGS] & 2);
3356                 /* lowest bit is set when we were primary,
3357                  * next bit (weight 2) is set when peer was primary */
3358                 *rule_nr = 40;
3359
3360                 /* Neither has the "crashed primary" flag set,
3361                  * only a replication link hickup. */
3362                 if (rct == 0)
3363                         return 0;
3364
3365                 /* Current UUID equal and no bitmap uuid; does not necessarily
3366                  * mean this was a "simultaneous hard crash", maybe IO was
3367                  * frozen, so no UUID-bump happened.
3368                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3369                  * for "new-enough" peer DRBD version. */
3370                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3371                         *rule_nr = 41;
3372                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3373                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3374                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3375                         }
3376                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3377                                 /* At least one has the "crashed primary" bit set,
3378                                  * both are primary now, but neither has rotated its UUIDs?
3379                                  * "Can not happen." */
3380                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3381                                 return -100;
3382                         }
3383                         if (device->state.role == R_PRIMARY)
3384                                 return 1;
3385                         return -1;
3386                 }
3387
3388                 /* Both are secondary.
3389                  * Really looks like recovery from simultaneous hard crash.
3390                  * Check which had been primary before, and arbitrate. */
3391                 switch (rct) {
3392                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3393                 case 1: /*  self_pri && !peer_pri */ return 1;
3394                 case 2: /* !self_pri &&  peer_pri */ return -1;
3395                 case 3: /*  self_pri &&  peer_pri */
3396                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3397                         return dc ? -1 : 1;
3398                 }
3399         }
3400
3401         *rule_nr = 50;
3402         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3403         if (self == peer)
3404                 return -1;
3405
3406         *rule_nr = 51;
3407         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3408         if (self == peer) {
3409                 if (connection->agreed_pro_version < 96 ?
3410                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3411                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3412                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3413                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3414                            resync as sync source modifications of the peer's UUIDs. */
3415
3416                         if (connection->agreed_pro_version < 91)
3417                                 return -1091;
3418
3419                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3420                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3421
3422                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3423                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3424
3425                         return -1;
3426                 }
3427         }
3428
3429         *rule_nr = 60;
3430         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3431         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3432                 peer = device->p_uuid[i] & ~((u64)1);
3433                 if (self == peer)
3434                         return -2;
3435         }
3436
3437         *rule_nr = 70;
3438         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3439         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3440         if (self == peer)
3441                 return 1;
3442
3443         *rule_nr = 71;
3444         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3445         if (self == peer) {
3446                 if (connection->agreed_pro_version < 96 ?
3447                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3448                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3449                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3450                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3451                            resync as sync source modifications of our UUIDs. */
3452
3453                         if (connection->agreed_pro_version < 91)
3454                                 return -1091;
3455
3456                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3457                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3458
3459                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3460                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3461                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3462
3463                         return 1;
3464                 }
3465         }
3466
3467
3468         *rule_nr = 80;
3469         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3470         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3471                 self = device->ldev->md.uuid[i] & ~((u64)1);
3472                 if (self == peer)
3473                         return 2;
3474         }
3475
3476         *rule_nr = 90;
3477         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3478         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3479         if (self == peer && self != ((u64)0))
3480                 return 100;
3481
3482         *rule_nr = 100;
3483         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3484                 self = device->ldev->md.uuid[i] & ~((u64)1);
3485                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3486                         peer = device->p_uuid[j] & ~((u64)1);
3487                         if (self == peer)
3488                                 return -100;
3489                 }
3490         }
3491
3492         return -1000;
3493 }
3494
3495 /* drbd_sync_handshake() returns the new conn state on success, or
3496    CONN_MASK (-1) on failure.
3497  */
3498 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3499                                            enum drbd_role peer_role,
3500                                            enum drbd_disk_state peer_disk) __must_hold(local)
3501 {
3502         struct drbd_device *device = peer_device->device;
3503         enum drbd_conns rv = C_MASK;
3504         enum drbd_disk_state mydisk;
3505         struct net_conf *nc;
3506         int hg, rule_nr, rr_conflict, tentative, always_asbp;
3507
3508         mydisk = device->state.disk;
3509         if (mydisk == D_NEGOTIATING)
3510                 mydisk = device->new_state_tmp.disk;
3511
3512         drbd_info(device, "drbd_sync_handshake:\n");
3513
3514         spin_lock_irq(&device->ldev->md.uuid_lock);
3515         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3516         drbd_uuid_dump(device, "peer", device->p_uuid,
3517                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3518
3519         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3520         spin_unlock_irq(&device->ldev->md.uuid_lock);
3521
3522         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3523
3524         if (hg == -1000) {
3525                 drbd_alert(device, "Unrelated data, aborting!\n");
3526                 return C_MASK;
3527         }
3528         if (hg < -0x10000) {
3529                 int proto, fflags;
3530                 hg = -hg;
3531                 proto = hg & 0xff;
3532                 fflags = (hg >> 8) & 0xff;
3533                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3534                                         proto, fflags);
3535                 return C_MASK;
3536         }
3537         if (hg < -1000) {
3538                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3539                 return C_MASK;
3540         }
3541
3542         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3543             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3544                 int f = (hg == -100) || abs(hg) == 2;
3545                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3546                 if (f)
3547                         hg = hg*2;
3548                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3549                      hg > 0 ? "source" : "target");
3550         }
3551
3552         if (abs(hg) == 100)
3553                 drbd_khelper(device, "initial-split-brain");
3554
3555         rcu_read_lock();
3556         nc = rcu_dereference(peer_device->connection->net_conf);
3557         always_asbp = nc->always_asbp;
3558         rr_conflict = nc->rr_conflict;
3559         tentative = nc->tentative;
3560         rcu_read_unlock();
3561
3562         if (hg == 100 || (hg == -100 && always_asbp)) {
3563                 int pcount = (device->state.role == R_PRIMARY)
3564                            + (peer_role == R_PRIMARY);
3565                 int forced = (hg == -100);
3566
3567                 switch (pcount) {
3568                 case 0:
3569                         hg = drbd_asb_recover_0p(peer_device);
3570                         break;
3571                 case 1:
3572                         hg = drbd_asb_recover_1p(peer_device);
3573                         break;
3574                 case 2:
3575                         hg = drbd_asb_recover_2p(peer_device);
3576                         break;
3577                 }
3578                 if (abs(hg) < 100) {
3579                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3580                              "automatically solved. Sync from %s node\n",
3581                              pcount, (hg < 0) ? "peer" : "this");
3582                         if (forced) {
3583                                 drbd_warn(device, "Doing a full sync, since"
3584                                      " UUIDs where ambiguous.\n");
3585                                 hg = hg*2;
3586                         }
3587                 }
3588         }
3589
3590         if (hg == -100) {
3591                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3592                         hg = -1;
3593                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3594                         hg = 1;
3595
3596                 if (abs(hg) < 100)
3597                         drbd_warn(device, "Split-Brain detected, manually solved. "
3598                              "Sync from %s node\n",
3599                              (hg < 0) ? "peer" : "this");
3600         }
3601
3602         if (hg == -100) {
3603                 /* FIXME this log message is not correct if we end up here
3604                  * after an attempted attach on a diskless node.
3605                  * We just refuse to attach -- well, we drop the "connection"
3606                  * to that disk, in a way... */
3607                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3608                 drbd_khelper(device, "split-brain");
3609                 return C_MASK;
3610         }
3611
3612         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3613                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3614                 return C_MASK;
3615         }
3616
3617         if (hg < 0 && /* by intention we do not use mydisk here. */
3618             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3619                 switch (rr_conflict) {
3620                 case ASB_CALL_HELPER:
3621                         drbd_khelper(device, "pri-lost");
3622                         /* fall through */
3623                 case ASB_DISCONNECT:
3624                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3625                         return C_MASK;
3626                 case ASB_VIOLENTLY:
3627                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3628                              "assumption\n");
3629                 }
3630         }
3631
3632         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3633                 if (hg == 0)
3634                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3635                 else
3636                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3637                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3638                                  abs(hg) >= 2 ? "full" : "bit-map based");
3639                 return C_MASK;
3640         }
3641
3642         if (abs(hg) >= 2) {
3643                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3644                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3645                                         BM_LOCKED_SET_ALLOWED))
3646                         return C_MASK;
3647         }
3648
3649         if (hg > 0) { /* become sync source. */
3650                 rv = C_WF_BITMAP_S;
3651         } else if (hg < 0) { /* become sync target */
3652                 rv = C_WF_BITMAP_T;
3653         } else {
3654                 rv = C_CONNECTED;
3655                 if (drbd_bm_total_weight(device)) {
3656                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3657                              drbd_bm_total_weight(device));
3658                 }
3659         }
3660
3661         return rv;
3662 }
3663
3664 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3665 {
3666         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3667         if (peer == ASB_DISCARD_REMOTE)
3668                 return ASB_DISCARD_LOCAL;
3669
3670         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3671         if (peer == ASB_DISCARD_LOCAL)
3672                 return ASB_DISCARD_REMOTE;
3673
3674         /* everything else is valid if they are equal on both sides. */
3675         return peer;
3676 }
3677
3678 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3679 {
3680         struct p_protocol *p = pi->data;
3681         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3682         int p_proto, p_discard_my_data, p_two_primaries, cf;
3683         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3684         char integrity_alg[SHARED_SECRET_MAX] = "";
3685         struct crypto_shash *peer_integrity_tfm = NULL;
3686         void *int_dig_in = NULL, *int_dig_vv = NULL;
3687
3688         p_proto         = be32_to_cpu(p->protocol);
3689         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3690         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3691         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3692         p_two_primaries = be32_to_cpu(p->two_primaries);
3693         cf              = be32_to_cpu(p->conn_flags);
3694         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3695
3696         if (connection->agreed_pro_version >= 87) {
3697                 int err;
3698
3699                 if (pi->size > sizeof(integrity_alg))
3700                         return -EIO;
3701                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3702                 if (err)
3703                         return err;
3704                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3705         }
3706
3707         if (pi->cmd != P_PROTOCOL_UPDATE) {
3708                 clear_bit(CONN_DRY_RUN, &connection->flags);
3709
3710                 if (cf & CF_DRY_RUN)
3711                         set_bit(CONN_DRY_RUN, &connection->flags);
3712
3713                 rcu_read_lock();
3714                 nc = rcu_dereference(connection->net_conf);
3715
3716                 if (p_proto != nc->wire_protocol) {
3717                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3718                         goto disconnect_rcu_unlock;
3719                 }
3720
3721                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3722                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3723                         goto disconnect_rcu_unlock;
3724                 }
3725
3726                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3727                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3728                         goto disconnect_rcu_unlock;
3729                 }
3730
3731                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3732                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3733                         goto disconnect_rcu_unlock;
3734                 }
3735
3736                 if (p_discard_my_data && nc->discard_my_data) {
3737                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3738                         goto disconnect_rcu_unlock;
3739                 }
3740
3741                 if (p_two_primaries != nc->two_primaries) {
3742                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3743                         goto disconnect_rcu_unlock;
3744                 }
3745
3746                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3747                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3748                         goto disconnect_rcu_unlock;
3749                 }
3750
3751                 rcu_read_unlock();
3752         }
3753
3754         if (integrity_alg[0]) {
3755                 int hash_size;
3756
3757                 /*
3758                  * We can only change the peer data integrity algorithm
3759                  * here.  Changing our own data integrity algorithm
3760                  * requires that we send a P_PROTOCOL_UPDATE packet at
3761                  * the same time; otherwise, the peer has no way to
3762                  * tell between which packets the algorithm should
3763                  * change.
3764                  */
3765
3766                 peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3767                 if (IS_ERR(peer_integrity_tfm)) {
3768                         peer_integrity_tfm = NULL;
3769                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3770                                  integrity_alg);
3771                         goto disconnect;
3772                 }
3773
3774                 hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3775                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3776                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3777                 if (!(int_dig_in && int_dig_vv)) {
3778                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3779                         goto disconnect;
3780                 }
3781         }
3782
3783         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3784         if (!new_net_conf) {
3785                 drbd_err(connection, "Allocation of new net_conf failed\n");
3786                 goto disconnect;
3787         }
3788
3789         mutex_lock(&connection->data.mutex);
3790         mutex_lock(&connection->resource->conf_update);
3791         old_net_conf = connection->net_conf;
3792         *new_net_conf = *old_net_conf;
3793
3794         new_net_conf->wire_protocol = p_proto;
3795         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3796         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3797         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3798         new_net_conf->two_primaries = p_two_primaries;
3799
3800         rcu_assign_pointer(connection->net_conf, new_net_conf);
3801         mutex_unlock(&connection->resource->conf_update);
3802         mutex_unlock(&connection->data.mutex);
3803
3804         crypto_free_shash(connection->peer_integrity_tfm);
3805         kfree(connection->int_dig_in);
3806         kfree(connection->int_dig_vv);
3807         connection->peer_integrity_tfm = peer_integrity_tfm;
3808         connection->int_dig_in = int_dig_in;
3809         connection->int_dig_vv = int_dig_vv;
3810
3811         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3812                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3813                           integrity_alg[0] ? integrity_alg : "(none)");
3814
3815         synchronize_rcu();
3816         kfree(old_net_conf);
3817         return 0;
3818
3819 disconnect_rcu_unlock:
3820         rcu_read_unlock();
3821 disconnect:
3822         crypto_free_shash(peer_integrity_tfm);
3823         kfree(int_dig_in);
3824         kfree(int_dig_vv);
3825         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3826         return -EIO;
3827 }
3828
3829 /* helper function
3830  * input: alg name, feature name
3831  * return: NULL (alg name was "")
3832  *         ERR_PTR(error) if something goes wrong
3833  *         or the crypto hash ptr, if it worked out ok. */
3834 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3835                 const struct drbd_device *device,
3836                 const char *alg, const char *name)
3837 {
3838         struct crypto_shash *tfm;
3839
3840         if (!alg[0])
3841                 return NULL;
3842
3843         tfm = crypto_alloc_shash(alg, 0, 0);
3844         if (IS_ERR(tfm)) {
3845                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3846                         alg, name, PTR_ERR(tfm));
3847                 return tfm;
3848         }
3849         return tfm;
3850 }
3851
3852 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3853 {
3854         void *buffer = connection->data.rbuf;
3855         int size = pi->size;
3856
3857         while (size) {
3858                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3859                 s = drbd_recv(connection, buffer, s);
3860                 if (s <= 0) {
3861                         if (s < 0)
3862                                 return s;
3863                         break;
3864                 }
3865                 size -= s;
3866         }
3867         if (size)
3868                 return -EIO;
3869         return 0;
3870 }
3871
3872 /*
3873  * config_unknown_volume  -  device configuration command for unknown volume
3874  *
3875  * When a device is added to an existing connection, the node on which the
3876  * device is added first will send configuration commands to its peer but the
3877  * peer will not know about the device yet.  It will warn and ignore these
3878  * commands.  Once the device is added on the second node, the second node will
3879  * send the same device configuration commands, but in the other direction.
3880  *
3881  * (We can also end up here if drbd is misconfigured.)
3882  */
3883 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3884 {
3885         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3886                   cmdname(pi->cmd), pi->vnr);
3887         return ignore_remaining_packet(connection, pi);
3888 }
3889
3890 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3891 {
3892         struct drbd_peer_device *peer_device;
3893         struct drbd_device *device;
3894         struct p_rs_param_95 *p;
3895         unsigned int header_size, data_size, exp_max_sz;
3896         struct crypto_shash *verify_tfm = NULL;
3897         struct crypto_shash *csums_tfm = NULL;
3898         struct net_conf *old_net_conf, *new_net_conf = NULL;
3899         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3900         const int apv = connection->agreed_pro_version;
3901         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3902         int fifo_size = 0;
3903         int err;
3904
3905         peer_device = conn_peer_device(connection, pi->vnr);
3906         if (!peer_device)
3907                 return config_unknown_volume(connection, pi);
3908         device = peer_device->device;
3909
3910         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3911                     : apv == 88 ? sizeof(struct p_rs_param)
3912                                         + SHARED_SECRET_MAX
3913                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3914                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3915
3916         if (pi->size > exp_max_sz) {
3917                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3918                     pi->size, exp_max_sz);
3919                 return -EIO;
3920         }
3921
3922         if (apv <= 88) {
3923                 header_size = sizeof(struct p_rs_param);
3924                 data_size = pi->size - header_size;
3925         } else if (apv <= 94) {
3926                 header_size = sizeof(struct p_rs_param_89);
3927                 data_size = pi->size - header_size;
3928                 D_ASSERT(device, data_size == 0);
3929         } else {
3930                 header_size = sizeof(struct p_rs_param_95);
3931                 data_size = pi->size - header_size;
3932                 D_ASSERT(device, data_size == 0);
3933         }
3934
3935         /* initialize verify_alg and csums_alg */
3936         p = pi->data;
3937         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3938
3939         err = drbd_recv_all(peer_device->connection, p, header_size);
3940         if (err)
3941                 return err;
3942
3943         mutex_lock(&connection->resource->conf_update);
3944         old_net_conf = peer_device->connection->net_conf;
3945         if (get_ldev(device)) {
3946                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3947                 if (!new_disk_conf) {
3948                         put_ldev(device);
3949                         mutex_unlock(&connection->resource->conf_update);
3950                         drbd_err(device, "Allocation of new disk_conf failed\n");
3951                         return -ENOMEM;
3952                 }
3953
3954                 old_disk_conf = device->ldev->disk_conf;
3955                 *new_disk_conf = *old_disk_conf;
3956
3957                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3958         }
3959
3960         if (apv >= 88) {
3961                 if (apv == 88) {
3962                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3963                                 drbd_err(device, "verify-alg of wrong size, "
3964                                         "peer wants %u, accepting only up to %u byte\n",
3965                                         data_size, SHARED_SECRET_MAX);
3966                                 err = -EIO;
3967                                 goto reconnect;
3968                         }
3969
3970                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3971                         if (err)
3972                                 goto reconnect;
3973                         /* we expect NUL terminated string */
3974                         /* but just in case someone tries to be evil */
3975                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3976                         p->verify_alg[data_size-1] = 0;
3977
3978                 } else /* apv >= 89 */ {
3979                         /* we still expect NUL terminated strings */
3980                         /* but just in case someone tries to be evil */
3981                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3982                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3983                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3984                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3985                 }
3986
3987                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3988                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3989                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3990                                     old_net_conf->verify_alg, p->verify_alg);
3991                                 goto disconnect;
3992                         }
3993                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3994                                         p->verify_alg, "verify-alg");
3995                         if (IS_ERR(verify_tfm)) {
3996                                 verify_tfm = NULL;
3997                                 goto disconnect;
3998                         }
3999                 }
4000
4001                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
4002                         if (device->state.conn == C_WF_REPORT_PARAMS) {
4003                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
4004                                     old_net_conf->csums_alg, p->csums_alg);
4005                                 goto disconnect;
4006                         }
4007                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
4008                                         p->csums_alg, "csums-alg");
4009                         if (IS_ERR(csums_tfm)) {
4010                                 csums_tfm = NULL;
4011                                 goto disconnect;
4012                         }
4013                 }
4014
4015                 if (apv > 94 && new_disk_conf) {
4016                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4017                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4018                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4019                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4020
4021                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4022                         if (fifo_size != device->rs_plan_s->size) {
4023                                 new_plan = fifo_alloc(fifo_size);
4024                                 if (!new_plan) {
4025                                         drbd_err(device, "kmalloc of fifo_buffer failed");
4026                                         put_ldev(device);
4027                                         goto disconnect;
4028                                 }
4029                         }
4030                 }
4031
4032                 if (verify_tfm || csums_tfm) {
4033                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4034                         if (!new_net_conf) {
4035                                 drbd_err(device, "Allocation of new net_conf failed\n");
4036                                 goto disconnect;
4037                         }
4038
4039                         *new_net_conf = *old_net_conf;
4040
4041                         if (verify_tfm) {
4042                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
4043                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4044                                 crypto_free_shash(peer_device->connection->verify_tfm);
4045                                 peer_device->connection->verify_tfm = verify_tfm;
4046                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4047                         }
4048                         if (csums_tfm) {
4049                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
4050                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4051                                 crypto_free_shash(peer_device->connection->csums_tfm);
4052                                 peer_device->connection->csums_tfm = csums_tfm;
4053                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4054                         }
4055                         rcu_assign_pointer(connection->net_conf, new_net_conf);
4056                 }
4057         }
4058
4059         if (new_disk_conf) {
4060                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4061                 put_ldev(device);
4062         }
4063
4064         if (new_plan) {
4065                 old_plan = device->rs_plan_s;
4066                 rcu_assign_pointer(device->rs_plan_s, new_plan);
4067         }
4068
4069         mutex_unlock(&connection->resource->conf_update);
4070         synchronize_rcu();
4071         if (new_net_conf)
4072                 kfree(old_net_conf);
4073         kfree(old_disk_conf);
4074         kfree(old_plan);
4075
4076         return 0;
4077
4078 reconnect:
4079         if (new_disk_conf) {
4080                 put_ldev(device);
4081                 kfree(new_disk_conf);
4082         }
4083         mutex_unlock(&connection->resource->conf_update);
4084         return -EIO;
4085
4086 disconnect:
4087         kfree(new_plan);
4088         if (new_disk_conf) {
4089                 put_ldev(device);
4090                 kfree(new_disk_conf);
4091         }
4092         mutex_unlock(&connection->resource->conf_update);
4093         /* just for completeness: actually not needed,
4094          * as this is not reached if csums_tfm was ok. */
4095         crypto_free_shash(csums_tfm);
4096         /* but free the verify_tfm again, if csums_tfm did not work out */
4097         crypto_free_shash(verify_tfm);
4098         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4099         return -EIO;
4100 }
4101
4102 /* warn if the arguments differ by more than 12.5% */
4103 static void warn_if_differ_considerably(struct drbd_device *device,
4104         const char *s, sector_t a, sector_t b)
4105 {
4106         sector_t d;
4107         if (a == 0 || b == 0)
4108                 return;
4109         d = (a > b) ? (a - b) : (b - a);
4110         if (d > (a>>3) || d > (b>>3))
4111                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4112                      (unsigned long long)a, (unsigned long long)b);
4113 }
4114
4115 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4116 {
4117         struct drbd_peer_device *peer_device;
4118         struct drbd_device *device;
4119         struct p_sizes *p = pi->data;
4120         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4121         enum determine_dev_size dd = DS_UNCHANGED;
4122         sector_t p_size, p_usize, p_csize, my_usize;
4123         sector_t new_size, cur_size;
4124         int ldsc = 0; /* local disk size changed */
4125         enum dds_flags ddsf;
4126
4127         peer_device = conn_peer_device(connection, pi->vnr);
4128         if (!peer_device)
4129                 return config_unknown_volume(connection, pi);
4130         device = peer_device->device;
4131         cur_size = drbd_get_capacity(device->this_bdev);
4132
4133         p_size = be64_to_cpu(p->d_size);
4134         p_usize = be64_to_cpu(p->u_size);
4135         p_csize = be64_to_cpu(p->c_size);
4136
4137         /* just store the peer's disk size for now.
4138          * we still need to figure out whether we accept that. */
4139         device->p_size = p_size;
4140
4141         if (get_ldev(device)) {
4142                 rcu_read_lock();
4143                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4144                 rcu_read_unlock();
4145
4146                 warn_if_differ_considerably(device, "lower level device sizes",
4147                            p_size, drbd_get_max_capacity(device->ldev));
4148                 warn_if_differ_considerably(device, "user requested size",
4149                                             p_usize, my_usize);
4150
4151                 /* if this is the first connect, or an otherwise expected
4152                  * param exchange, choose the minimum */
4153                 if (device->state.conn == C_WF_REPORT_PARAMS)
4154                         p_usize = min_not_zero(my_usize, p_usize);
4155
4156                 /* Never shrink a device with usable data during connect,
4157                  * or "attach" on the peer.
4158                  * But allow online shrinking if we are connected. */
4159                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4160                 if (new_size < cur_size &&
4161                     device->state.disk >= D_OUTDATED &&
4162                     (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4163                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4164                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4165                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4166                         put_ldev(device);
4167                         return -EIO;
4168                 }
4169
4170                 if (my_usize != p_usize) {
4171                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4172
4173                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4174                         if (!new_disk_conf) {
4175                                 drbd_err(device, "Allocation of new disk_conf failed\n");
4176                                 put_ldev(device);
4177                                 return -ENOMEM;
4178                         }
4179
4180                         mutex_lock(&connection->resource->conf_update);
4181                         old_disk_conf = device->ldev->disk_conf;
4182                         *new_disk_conf = *old_disk_conf;
4183                         new_disk_conf->disk_size = p_usize;
4184
4185                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4186                         mutex_unlock(&connection->resource->conf_update);
4187                         synchronize_rcu();
4188                         kfree(old_disk_conf);
4189
4190                         drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4191                                  (unsigned long)p_usize, (unsigned long)my_usize);
4192                 }
4193
4194                 put_ldev(device);
4195         }
4196
4197         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4198         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4199            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4200            drbd_reconsider_queue_parameters(), we can be sure that after
4201            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4202
4203         ddsf = be16_to_cpu(p->dds_flags);
4204         if (get_ldev(device)) {
4205                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4206                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4207                 put_ldev(device);
4208                 if (dd == DS_ERROR)
4209                         return -EIO;
4210                 drbd_md_sync(device);
4211         } else {
4212                 /*
4213                  * I am diskless, need to accept the peer's *current* size.
4214                  * I must NOT accept the peers backing disk size,
4215                  * it may have been larger than mine all along...
4216                  *
4217                  * At this point, the peer knows more about my disk, or at
4218                  * least about what we last agreed upon, than myself.
4219                  * So if his c_size is less than his d_size, the most likely
4220                  * reason is that *my* d_size was smaller last time we checked.
4221                  *
4222                  * However, if he sends a zero current size,
4223                  * take his (user-capped or) backing disk size anyways.
4224                  *
4225                  * Unless of course he does not have a disk himself.
4226                  * In which case we ignore this completely.
4227                  */
4228                 sector_t new_size = p_csize ?: p_usize ?: p_size;
4229                 drbd_reconsider_queue_parameters(device, NULL, o);
4230                 if (new_size == 0) {
4231                         /* Ignore, peer does not know nothing. */
4232                 } else if (new_size == cur_size) {
4233                         /* nothing to do */
4234                 } else if (cur_size != 0 && p_size == 0) {
4235                         drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4236                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4237                 } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4238                         drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4239                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4240                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4241                         return -EIO;
4242                 } else {
4243                         /* I believe the peer, if
4244                          *  - I don't have a current size myself
4245                          *  - we agree on the size anyways
4246                          *  - I do have a current size, am Secondary,
4247                          *    and he has the only disk
4248                          *  - I do have a current size, am Primary,
4249                          *    and he has the only disk,
4250                          *    which is larger than my current size
4251                          */
4252                         drbd_set_my_capacity(device, new_size);
4253                 }
4254         }
4255
4256         if (get_ldev(device)) {
4257                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4258                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4259                         ldsc = 1;
4260                 }
4261
4262                 put_ldev(device);
4263         }
4264
4265         if (device->state.conn > C_WF_REPORT_PARAMS) {
4266                 if (be64_to_cpu(p->c_size) !=
4267                     drbd_get_capacity(device->this_bdev) || ldsc) {
4268                         /* we have different sizes, probably peer
4269                          * needs to know my new size... */
4270                         drbd_send_sizes(peer_device, 0, ddsf);
4271                 }
4272                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4273                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4274                         if (device->state.pdsk >= D_INCONSISTENT &&
4275                             device->state.disk >= D_INCONSISTENT) {
4276                                 if (ddsf & DDSF_NO_RESYNC)
4277                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4278                                 else
4279                                         resync_after_online_grow(device);
4280                         } else
4281                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4282                 }
4283         }
4284
4285         return 0;
4286 }
4287
4288 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4289 {
4290         struct drbd_peer_device *peer_device;
4291         struct drbd_device *device;
4292         struct p_uuids *p = pi->data;
4293         u64 *p_uuid;
4294         int i, updated_uuids = 0;
4295
4296         peer_device = conn_peer_device(connection, pi->vnr);
4297         if (!peer_device)
4298                 return config_unknown_volume(connection, pi);
4299         device = peer_device->device;
4300
4301         p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4302         if (!p_uuid) {
4303                 drbd_err(device, "kmalloc of p_uuid failed\n");
4304                 return false;
4305         }
4306
4307         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4308                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4309
4310         kfree(device->p_uuid);
4311         device->p_uuid = p_uuid;
4312
4313         if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4314             device->state.disk < D_INCONSISTENT &&
4315             device->state.role == R_PRIMARY &&
4316             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4317                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4318                     (unsigned long long)device->ed_uuid);
4319                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4320                 return -EIO;
4321         }
4322
4323         if (get_ldev(device)) {
4324                 int skip_initial_sync =
4325                         device->state.conn == C_CONNECTED &&
4326                         peer_device->connection->agreed_pro_version >= 90 &&
4327                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4328                         (p_uuid[UI_FLAGS] & 8);
4329                 if (skip_initial_sync) {
4330                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4331                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4332                                         "clear_n_write from receive_uuids",
4333                                         BM_LOCKED_TEST_ALLOWED);
4334                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4335                         _drbd_uuid_set(device, UI_BITMAP, 0);
4336                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4337                                         CS_VERBOSE, NULL);
4338                         drbd_md_sync(device);
4339                         updated_uuids = 1;
4340                 }
4341                 put_ldev(device);
4342         } else if (device->state.disk < D_INCONSISTENT &&
4343                    device->state.role == R_PRIMARY) {
4344                 /* I am a diskless primary, the peer just created a new current UUID
4345                    for me. */
4346                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4347         }
4348
4349         /* Before we test for the disk state, we should wait until an eventually
4350            ongoing cluster wide state change is finished. That is important if
4351            we are primary and are detaching from our disk. We need to see the
4352            new disk state... */
4353         mutex_lock(device->state_mutex);
4354         mutex_unlock(device->state_mutex);
4355         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4356                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4357
4358         if (updated_uuids)
4359                 drbd_print_uuids(device, "receiver updated UUIDs to");
4360
4361         return 0;
4362 }
4363
4364 /**
4365  * convert_state() - Converts the peer's view of the cluster state to our point of view
4366  * @ps:         The state as seen by the peer.
4367  */
4368 static union drbd_state convert_state(union drbd_state ps)
4369 {
4370         union drbd_state ms;
4371
4372         static enum drbd_conns c_tab[] = {
4373                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4374                 [C_CONNECTED] = C_CONNECTED,
4375
4376                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4377                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4378                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4379                 [C_VERIFY_S]       = C_VERIFY_T,
4380                 [C_MASK]   = C_MASK,
4381         };
4382
4383         ms.i = ps.i;
4384
4385         ms.conn = c_tab[ps.conn];
4386         ms.peer = ps.role;
4387         ms.role = ps.peer;
4388         ms.pdsk = ps.disk;
4389         ms.disk = ps.pdsk;
4390         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4391
4392         return ms;
4393 }
4394
4395 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4396 {
4397         struct drbd_peer_device *peer_device;
4398         struct drbd_device *device;
4399         struct p_req_state *p = pi->data;
4400         union drbd_state mask, val;
4401         enum drbd_state_rv rv;
4402
4403         peer_device = conn_peer_device(connection, pi->vnr);
4404         if (!peer_device)
4405                 return -EIO;
4406         device = peer_device->device;
4407
4408         mask.i = be32_to_cpu(p->mask);
4409         val.i = be32_to_cpu(p->val);
4410
4411         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4412             mutex_is_locked(device->state_mutex)) {
4413                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4414                 return 0;
4415         }
4416
4417         mask = convert_state(mask);
4418         val = convert_state(val);
4419
4420         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4421         drbd_send_sr_reply(peer_device, rv);
4422
4423         drbd_md_sync(device);
4424
4425         return 0;
4426 }
4427
4428 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4429 {
4430         struct p_req_state *p = pi->data;
4431         union drbd_state mask, val;
4432         enum drbd_state_rv rv;
4433
4434         mask.i = be32_to_cpu(p->mask);
4435         val.i = be32_to_cpu(p->val);
4436
4437         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4438             mutex_is_locked(&connection->cstate_mutex)) {
4439                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4440                 return 0;
4441         }
4442
4443         mask = convert_state(mask);
4444         val = convert_state(val);
4445
4446         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4447         conn_send_sr_reply(connection, rv);
4448
4449         return 0;
4450 }
4451
4452 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4453 {
4454         struct drbd_peer_device *peer_device;
4455         struct drbd_device *device;
4456         struct p_state *p = pi->data;
4457         union drbd_state os, ns, peer_state;
4458         enum drbd_disk_state real_peer_disk;
4459         enum chg_state_flags cs_flags;
4460         int rv;
4461
4462         peer_device = conn_peer_device(connection, pi->vnr);
4463         if (!peer_device)
4464                 return config_unknown_volume(connection, pi);
4465         device = peer_device->device;
4466
4467         peer_state.i = be32_to_cpu(p->state);
4468
4469         real_peer_disk = peer_state.disk;
4470         if (peer_state.disk == D_NEGOTIATING) {
4471                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4472                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4473         }
4474
4475         spin_lock_irq(&device->resource->req_lock);
4476  retry:
4477         os = ns = drbd_read_state(device);
4478         spin_unlock_irq(&device->resource->req_lock);
4479
4480         /* If some other part of the code (ack_receiver thread, timeout)
4481          * already decided to close the connection again,
4482          * we must not "re-establish" it here. */
4483         if (os.conn <= C_TEAR_DOWN)
4484                 return -ECONNRESET;
4485
4486         /* If this is the "end of sync" confirmation, usually the peer disk
4487          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4488          * set) resync started in PausedSyncT, or if the timing of pause-/
4489          * unpause-sync events has been "just right", the peer disk may
4490          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4491          */
4492         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4493             real_peer_disk == D_UP_TO_DATE &&
4494             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4495                 /* If we are (becoming) SyncSource, but peer is still in sync
4496                  * preparation, ignore its uptodate-ness to avoid flapping, it
4497                  * will change to inconsistent once the peer reaches active
4498                  * syncing states.
4499                  * It may have changed syncer-paused flags, however, so we
4500                  * cannot ignore this completely. */
4501                 if (peer_state.conn > C_CONNECTED &&
4502                     peer_state.conn < C_SYNC_SOURCE)
4503                         real_peer_disk = D_INCONSISTENT;
4504
4505                 /* if peer_state changes to connected at the same time,
4506                  * it explicitly notifies us that it finished resync.
4507                  * Maybe we should finish it up, too? */
4508                 else if (os.conn >= C_SYNC_SOURCE &&
4509                          peer_state.conn == C_CONNECTED) {
4510                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4511                                 drbd_resync_finished(device);
4512                         return 0;
4513                 }
4514         }
4515
4516         /* explicit verify finished notification, stop sector reached. */
4517         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4518             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4519                 ov_out_of_sync_print(device);
4520                 drbd_resync_finished(device);
4521                 return 0;
4522         }
4523
4524         /* peer says his disk is inconsistent, while we think it is uptodate,
4525          * and this happens while the peer still thinks we have a sync going on,
4526          * but we think we are already done with the sync.
4527          * We ignore this to avoid flapping pdsk.
4528          * This should not happen, if the peer is a recent version of drbd. */
4529         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4530             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4531                 real_peer_disk = D_UP_TO_DATE;
4532
4533         if (ns.conn == C_WF_REPORT_PARAMS)
4534                 ns.conn = C_CONNECTED;
4535
4536         if (peer_state.conn == C_AHEAD)
4537                 ns.conn = C_BEHIND;
4538
4539         /* TODO:
4540          * if (primary and diskless and peer uuid != effective uuid)
4541          *     abort attach on peer;
4542          *
4543          * If this node does not have good data, was already connected, but
4544          * the peer did a late attach only now, trying to "negotiate" with me,
4545          * AND I am currently Primary, possibly frozen, with some specific
4546          * "effective" uuid, this should never be reached, really, because
4547          * we first send the uuids, then the current state.
4548          *
4549          * In this scenario, we already dropped the connection hard
4550          * when we received the unsuitable uuids (receive_uuids().
4551          *
4552          * Should we want to change this, that is: not drop the connection in
4553          * receive_uuids() already, then we would need to add a branch here
4554          * that aborts the attach of "unsuitable uuids" on the peer in case
4555          * this node is currently Diskless Primary.
4556          */
4557
4558         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4559             get_ldev_if_state(device, D_NEGOTIATING)) {
4560                 int cr; /* consider resync */
4561
4562                 /* if we established a new connection */
4563                 cr  = (os.conn < C_CONNECTED);
4564                 /* if we had an established connection
4565                  * and one of the nodes newly attaches a disk */
4566                 cr |= (os.conn == C_CONNECTED &&
4567                        (peer_state.disk == D_NEGOTIATING ||
4568                         os.disk == D_NEGOTIATING));
4569                 /* if we have both been inconsistent, and the peer has been
4570                  * forced to be UpToDate with --force */
4571                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4572                 /* if we had been plain connected, and the admin requested to
4573                  * start a sync by "invalidate" or "invalidate-remote" */
4574                 cr |= (os.conn == C_CONNECTED &&
4575                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4576                                  peer_state.conn <= C_WF_BITMAP_T));
4577
4578                 if (cr)
4579                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4580
4581                 put_ldev(device);
4582                 if (ns.conn == C_MASK) {
4583                         ns.conn = C_CONNECTED;
4584                         if (device->state.disk == D_NEGOTIATING) {
4585                                 drbd_force_state(device, NS(disk, D_FAILED));
4586                         } else if (peer_state.disk == D_NEGOTIATING) {
4587                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4588                                 peer_state.disk = D_DISKLESS;
4589                                 real_peer_disk = D_DISKLESS;
4590                         } else {
4591                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4592                                         return -EIO;
4593                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4594                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4595                                 return -EIO;
4596                         }
4597                 }
4598         }
4599
4600         spin_lock_irq(&device->resource->req_lock);
4601         if (os.i != drbd_read_state(device).i)
4602                 goto retry;
4603         clear_bit(CONSIDER_RESYNC, &device->flags);
4604         ns.peer = peer_state.role;
4605         ns.pdsk = real_peer_disk;
4606         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4607         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4608                 ns.disk = device->new_state_tmp.disk;
4609         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4610         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4611             test_bit(NEW_CUR_UUID, &device->flags)) {
4612                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4613                    for temporal network outages! */
4614                 spin_unlock_irq(&device->resource->req_lock);
4615                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4616                 tl_clear(peer_device->connection);
4617                 drbd_uuid_new_current(device);
4618                 clear_bit(NEW_CUR_UUID, &device->flags);
4619                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4620                 return -EIO;
4621         }
4622         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4623         ns = drbd_read_state(device);
4624         spin_unlock_irq(&device->resource->req_lock);
4625
4626         if (rv < SS_SUCCESS) {
4627                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4628                 return -EIO;
4629         }
4630
4631         if (os.conn > C_WF_REPORT_PARAMS) {
4632                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4633                     peer_state.disk != D_NEGOTIATING ) {
4634                         /* we want resync, peer has not yet decided to sync... */
4635                         /* Nowadays only used when forcing a node into primary role and
4636                            setting its disk to UpToDate with that */
4637                         drbd_send_uuids(peer_device);
4638                         drbd_send_current_state(peer_device);
4639                 }
4640         }
4641
4642         clear_bit(DISCARD_MY_DATA, &device->flags);
4643
4644         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4645
4646         return 0;
4647 }
4648
4649 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4650 {
4651         struct drbd_peer_device *peer_device;
4652         struct drbd_device *device;
4653         struct p_rs_uuid *p = pi->data;
4654
4655         peer_device = conn_peer_device(connection, pi->vnr);
4656         if (!peer_device)
4657                 return -EIO;
4658         device = peer_device->device;
4659
4660         wait_event(device->misc_wait,
4661                    device->state.conn == C_WF_SYNC_UUID ||
4662                    device->state.conn == C_BEHIND ||
4663                    device->state.conn < C_CONNECTED ||
4664                    device->state.disk < D_NEGOTIATING);
4665
4666         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4667
4668         /* Here the _drbd_uuid_ functions are right, current should
4669            _not_ be rotated into the history */
4670         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4671                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4672                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4673
4674                 drbd_print_uuids(device, "updated sync uuid");
4675                 drbd_start_resync(device, C_SYNC_TARGET);
4676
4677                 put_ldev(device);
4678         } else
4679                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4680
4681         return 0;
4682 }
4683
4684 /**
4685  * receive_bitmap_plain
4686  *
4687  * Return 0 when done, 1 when another iteration is needed, and a negative error
4688  * code upon failure.
4689  */
4690 static int
4691 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4692                      unsigned long *p, struct bm_xfer_ctx *c)
4693 {
4694         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4695                                  drbd_header_size(peer_device->connection);
4696         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4697                                        c->bm_words - c->word_offset);
4698         unsigned int want = num_words * sizeof(*p);
4699         int err;
4700
4701         if (want != size) {
4702                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4703                 return -EIO;
4704         }
4705         if (want == 0)
4706                 return 0;
4707         err = drbd_recv_all(peer_device->connection, p, want);
4708         if (err)
4709                 return err;
4710
4711         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4712
4713         c->word_offset += num_words;
4714         c->bit_offset = c->word_offset * BITS_PER_LONG;
4715         if (c->bit_offset > c->bm_bits)
4716                 c->bit_offset = c->bm_bits;
4717
4718         return 1;
4719 }
4720
4721 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4722 {
4723         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4724 }
4725
4726 static int dcbp_get_start(struct p_compressed_bm *p)
4727 {
4728         return (p->encoding & 0x80) != 0;
4729 }
4730
4731 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4732 {
4733         return (p->encoding >> 4) & 0x7;
4734 }
4735
4736 /**
4737  * recv_bm_rle_bits
4738  *
4739  * Return 0 when done, 1 when another iteration is needed, and a negative error
4740  * code upon failure.
4741  */
4742 static int
4743 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4744                 struct p_compressed_bm *p,
4745                  struct bm_xfer_ctx *c,
4746                  unsigned int len)
4747 {
4748         struct bitstream bs;
4749         u64 look_ahead;
4750         u64 rl;
4751         u64 tmp;
4752         unsigned long s = c->bit_offset;
4753         unsigned long e;
4754         int toggle = dcbp_get_start(p);
4755         int have;
4756         int bits;
4757
4758         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4759
4760         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4761         if (bits < 0)
4762                 return -EIO;
4763
4764         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4765                 bits = vli_decode_bits(&rl, look_ahead);
4766                 if (bits <= 0)
4767                         return -EIO;
4768
4769                 if (toggle) {
4770                         e = s + rl -1;
4771                         if (e >= c->bm_bits) {
4772                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4773                                 return -EIO;
4774                         }
4775                         _drbd_bm_set_bits(peer_device->device, s, e);
4776                 }
4777
4778                 if (have < bits) {
4779                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4780                                 have, bits, look_ahead,
4781                                 (unsigned int)(bs.cur.b - p->code),
4782                                 (unsigned int)bs.buf_len);
4783                         return -EIO;
4784                 }
4785                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4786                 if (likely(bits < 64))
4787                         look_ahead >>= bits;
4788                 else
4789                         look_ahead = 0;
4790                 have -= bits;
4791
4792                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4793                 if (bits < 0)
4794                         return -EIO;
4795                 look_ahead |= tmp << have;
4796                 have += bits;
4797         }
4798
4799         c->bit_offset = s;
4800         bm_xfer_ctx_bit_to_word_offset(c);
4801
4802         return (s != c->bm_bits);
4803 }
4804
4805 /**
4806  * decode_bitmap_c
4807  *
4808  * Return 0 when done, 1 when another iteration is needed, and a negative error
4809  * code upon failure.
4810  */
4811 static int
4812 decode_bitmap_c(struct drbd_peer_device *peer_device,
4813                 struct p_compressed_bm *p,
4814                 struct bm_xfer_ctx *c,
4815                 unsigned int len)
4816 {
4817         if (dcbp_get_code(p) == RLE_VLI_Bits)
4818                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4819
4820         /* other variants had been implemented for evaluation,
4821          * but have been dropped as this one turned out to be "best"
4822          * during all our tests. */
4823
4824         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4825         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4826         return -EIO;
4827 }
4828
4829 void INFO_bm_xfer_stats(struct drbd_device *device,
4830                 const char *direction, struct bm_xfer_ctx *c)
4831 {
4832         /* what would it take to transfer it "plaintext" */
4833         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4834         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4835         unsigned int plain =
4836                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4837                 c->bm_words * sizeof(unsigned long);
4838         unsigned int total = c->bytes[0] + c->bytes[1];
4839         unsigned int r;
4840
4841         /* total can not be zero. but just in case: */
4842         if (total == 0)
4843                 return;
4844
4845         /* don't report if not compressed */
4846         if (total >= plain)
4847                 return;
4848
4849         /* total < plain. check for overflow, still */
4850         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4851                                     : (1000 * total / plain);
4852
4853         if (r > 1000)
4854                 r = 1000;
4855
4856         r = 1000 - r;
4857         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4858              "total %u; compression: %u.%u%%\n",
4859                         direction,
4860                         c->bytes[1], c->packets[1],
4861                         c->bytes[0], c->packets[0],
4862                         total, r/10, r % 10);
4863 }
4864
4865 /* Since we are processing the bitfield from lower addresses to higher,
4866    it does not matter if the process it in 32 bit chunks or 64 bit
4867    chunks as long as it is little endian. (Understand it as byte stream,
4868    beginning with the lowest byte...) If we would use big endian
4869    we would need to process it from the highest address to the lowest,
4870    in order to be agnostic to the 32 vs 64 bits issue.
4871
4872    returns 0 on failure, 1 if we successfully received it. */
4873 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4874 {
4875         struct drbd_peer_device *peer_device;
4876         struct drbd_device *device;
4877         struct bm_xfer_ctx c;
4878         int err;
4879
4880         peer_device = conn_peer_device(connection, pi->vnr);
4881         if (!peer_device)
4882                 return -EIO;
4883         device = peer_device->device;
4884
4885         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4886         /* you are supposed to send additional out-of-sync information
4887          * if you actually set bits during this phase */
4888
4889         c = (struct bm_xfer_ctx) {
4890                 .bm_bits = drbd_bm_bits(device),
4891                 .bm_words = drbd_bm_words(device),
4892         };
4893
4894         for(;;) {
4895                 if (pi->cmd == P_BITMAP)
4896                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4897                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4898                         /* MAYBE: sanity check that we speak proto >= 90,
4899                          * and the feature is enabled! */
4900                         struct p_compressed_bm *p = pi->data;
4901
4902                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4903                                 drbd_err(device, "ReportCBitmap packet too large\n");
4904                                 err = -EIO;
4905                                 goto out;
4906                         }
4907                         if (pi->size <= sizeof(*p)) {
4908                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4909                                 err = -EIO;
4910                                 goto out;
4911                         }
4912                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4913                         if (err)
4914                                goto out;
4915                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4916                 } else {
4917                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4918                         err = -EIO;
4919                         goto out;
4920                 }
4921
4922                 c.packets[pi->cmd == P_BITMAP]++;
4923                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4924
4925                 if (err <= 0) {
4926                         if (err < 0)
4927                                 goto out;
4928                         break;
4929                 }
4930                 err = drbd_recv_header(peer_device->connection, pi);
4931                 if (err)
4932                         goto out;
4933         }
4934
4935         INFO_bm_xfer_stats(device, "receive", &c);
4936
4937         if (device->state.conn == C_WF_BITMAP_T) {
4938                 enum drbd_state_rv rv;
4939
4940                 err = drbd_send_bitmap(device);
4941                 if (err)
4942                         goto out;
4943                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4944                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4945                 D_ASSERT(device, rv == SS_SUCCESS);
4946         } else if (device->state.conn != C_WF_BITMAP_S) {
4947                 /* admin may have requested C_DISCONNECTING,
4948                  * other threads may have noticed network errors */
4949                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4950                     drbd_conn_str(device->state.conn));
4951         }
4952         err = 0;
4953
4954  out:
4955         drbd_bm_unlock(device);
4956         if (!err && device->state.conn == C_WF_BITMAP_S)
4957                 drbd_start_resync(device, C_SYNC_SOURCE);
4958         return err;
4959 }
4960
4961 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4962 {
4963         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4964                  pi->cmd, pi->size);
4965
4966         return ignore_remaining_packet(connection, pi);
4967 }
4968
4969 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4970 {
4971         /* Make sure we've acked all the TCP data associated
4972          * with the data requests being unplugged */
4973         drbd_tcp_quickack(connection->data.socket);
4974
4975         return 0;
4976 }
4977
4978 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4979 {
4980         struct drbd_peer_device *peer_device;
4981         struct drbd_device *device;
4982         struct p_block_desc *p = pi->data;
4983
4984         peer_device = conn_peer_device(connection, pi->vnr);
4985         if (!peer_device)
4986                 return -EIO;
4987         device = peer_device->device;
4988
4989         switch (device->state.conn) {
4990         case C_WF_SYNC_UUID:
4991         case C_WF_BITMAP_T:
4992         case C_BEHIND:
4993                         break;
4994         default:
4995                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4996                                 drbd_conn_str(device->state.conn));
4997         }
4998
4999         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
5000
5001         return 0;
5002 }
5003
5004 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
5005 {
5006         struct drbd_peer_device *peer_device;
5007         struct p_block_desc *p = pi->data;
5008         struct drbd_device *device;
5009         sector_t sector;
5010         int size, err = 0;
5011
5012         peer_device = conn_peer_device(connection, pi->vnr);
5013         if (!peer_device)
5014                 return -EIO;
5015         device = peer_device->device;
5016
5017         sector = be64_to_cpu(p->sector);
5018         size = be32_to_cpu(p->blksize);
5019
5020         dec_rs_pending(device);
5021
5022         if (get_ldev(device)) {
5023                 struct drbd_peer_request *peer_req;
5024                 const int op = REQ_OP_WRITE_ZEROES;
5025
5026                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5027                                                size, 0, GFP_NOIO);
5028                 if (!peer_req) {
5029                         put_ldev(device);
5030                         return -ENOMEM;
5031                 }
5032
5033                 peer_req->w.cb = e_end_resync_block;
5034                 peer_req->submit_jif = jiffies;
5035                 peer_req->flags |= EE_TRIM;
5036
5037                 spin_lock_irq(&device->resource->req_lock);
5038                 list_add_tail(&peer_req->w.list, &device->sync_ee);
5039                 spin_unlock_irq(&device->resource->req_lock);
5040
5041                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
5042                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5043
5044                 if (err) {
5045                         spin_lock_irq(&device->resource->req_lock);
5046                         list_del(&peer_req->w.list);
5047                         spin_unlock_irq(&device->resource->req_lock);
5048
5049                         drbd_free_peer_req(device, peer_req);
5050                         put_ldev(device);
5051                         err = 0;
5052                         goto fail;
5053                 }
5054
5055                 inc_unacked(device);
5056
5057                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5058                    as well as drbd_rs_complete_io() */
5059         } else {
5060         fail:
5061                 drbd_rs_complete_io(device, sector);
5062                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5063         }
5064
5065         atomic_add(size >> 9, &device->rs_sect_in);
5066
5067         return err;
5068 }
5069
5070 struct data_cmd {
5071         int expect_payload;
5072         unsigned int pkt_size;
5073         int (*fn)(struct drbd_connection *, struct packet_info *);
5074 };
5075
5076 static struct data_cmd drbd_cmd_handler[] = {
5077         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
5078         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
5079         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5080         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5081         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
5082         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5083         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5084         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5085         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5086         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
5087         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5088         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5089         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
5090         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
5091         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
5092         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5093         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5094         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5095         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5096         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5097         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5098         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5099         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5100         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5101         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5102         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
5103         [P_ZEROES]          = { 0, sizeof(struct p_trim), receive_Data },
5104         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5105         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
5106 };
5107
5108 static void drbdd(struct drbd_connection *connection)
5109 {
5110         struct packet_info pi;
5111         size_t shs; /* sub header size */
5112         int err;
5113
5114         while (get_t_state(&connection->receiver) == RUNNING) {
5115                 struct data_cmd const *cmd;
5116
5117                 drbd_thread_current_set_cpu(&connection->receiver);
5118                 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5119                 if (drbd_recv_header_maybe_unplug(connection, &pi))
5120                         goto err_out;
5121
5122                 cmd = &drbd_cmd_handler[pi.cmd];
5123                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5124                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5125                                  cmdname(pi.cmd), pi.cmd);
5126                         goto err_out;
5127                 }
5128
5129                 shs = cmd->pkt_size;
5130                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5131                         shs += sizeof(struct o_qlim);
5132                 if (pi.size > shs && !cmd->expect_payload) {
5133                         drbd_err(connection, "No payload expected %s l:%d\n",
5134                                  cmdname(pi.cmd), pi.size);
5135                         goto err_out;
5136                 }
5137                 if (pi.size < shs) {
5138                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5139                                  cmdname(pi.cmd), (int)shs, pi.size);
5140                         goto err_out;
5141                 }
5142
5143                 if (shs) {
5144                         update_receiver_timing_details(connection, drbd_recv_all_warn);
5145                         err = drbd_recv_all_warn(connection, pi.data, shs);
5146                         if (err)
5147                                 goto err_out;
5148                         pi.size -= shs;
5149                 }
5150
5151                 update_receiver_timing_details(connection, cmd->fn);
5152                 err = cmd->fn(connection, &pi);
5153                 if (err) {
5154                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5155                                  cmdname(pi.cmd), err, pi.size);
5156                         goto err_out;
5157                 }
5158         }
5159         return;
5160
5161     err_out:
5162         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5163 }
5164
5165 static void conn_disconnect(struct drbd_connection *connection)
5166 {
5167         struct drbd_peer_device *peer_device;
5168         enum drbd_conns oc;
5169         int vnr;
5170
5171         if (connection->cstate == C_STANDALONE)
5172                 return;
5173
5174         /* We are about to start the cleanup after connection loss.
5175          * Make sure drbd_make_request knows about that.
5176          * Usually we should be in some network failure state already,
5177          * but just in case we are not, we fix it up here.
5178          */
5179         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5180
5181         /* ack_receiver does not clean up anything. it must not interfere, either */
5182         drbd_thread_stop(&connection->ack_receiver);
5183         if (connection->ack_sender) {
5184                 destroy_workqueue(connection->ack_sender);
5185                 connection->ack_sender = NULL;
5186         }
5187         drbd_free_sock(connection);
5188
5189         rcu_read_lock();
5190         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5191                 struct drbd_device *device = peer_device->device;
5192                 kref_get(&device->kref);
5193                 rcu_read_unlock();
5194                 drbd_disconnected(peer_device);
5195                 kref_put(&device->kref, drbd_destroy_device);
5196                 rcu_read_lock();
5197         }
5198         rcu_read_unlock();
5199
5200         if (!list_empty(&connection->current_epoch->list))
5201                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5202         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5203         atomic_set(&connection->current_epoch->epoch_size, 0);
5204         connection->send.seen_any_write_yet = false;
5205
5206         drbd_info(connection, "Connection closed\n");
5207
5208         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5209                 conn_try_outdate_peer_async(connection);
5210
5211         spin_lock_irq(&connection->resource->req_lock);
5212         oc = connection->cstate;
5213         if (oc >= C_UNCONNECTED)
5214                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5215
5216         spin_unlock_irq(&connection->resource->req_lock);
5217
5218         if (oc == C_DISCONNECTING)
5219                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5220 }
5221
5222 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5223 {
5224         struct drbd_device *device = peer_device->device;
5225         unsigned int i;
5226
5227         /* wait for current activity to cease. */
5228         spin_lock_irq(&device->resource->req_lock);
5229         _drbd_wait_ee_list_empty(device, &device->active_ee);
5230         _drbd_wait_ee_list_empty(device, &device->sync_ee);
5231         _drbd_wait_ee_list_empty(device, &device->read_ee);
5232         spin_unlock_irq(&device->resource->req_lock);
5233
5234         /* We do not have data structures that would allow us to
5235          * get the rs_pending_cnt down to 0 again.
5236          *  * On C_SYNC_TARGET we do not have any data structures describing
5237          *    the pending RSDataRequest's we have sent.
5238          *  * On C_SYNC_SOURCE there is no data structure that tracks
5239          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5240          *  And no, it is not the sum of the reference counts in the
5241          *  resync_LRU. The resync_LRU tracks the whole operation including
5242          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5243          *  on the fly. */
5244         drbd_rs_cancel_all(device);
5245         device->rs_total = 0;
5246         device->rs_failed = 0;
5247         atomic_set(&device->rs_pending_cnt, 0);
5248         wake_up(&device->misc_wait);
5249
5250         del_timer_sync(&device->resync_timer);
5251         resync_timer_fn(&device->resync_timer);
5252
5253         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5254          * w_make_resync_request etc. which may still be on the worker queue
5255          * to be "canceled" */
5256         drbd_flush_workqueue(&peer_device->connection->sender_work);
5257
5258         drbd_finish_peer_reqs(device);
5259
5260         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5261            might have issued a work again. The one before drbd_finish_peer_reqs() is
5262            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5263         drbd_flush_workqueue(&peer_device->connection->sender_work);
5264
5265         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5266          * again via drbd_try_clear_on_disk_bm(). */
5267         drbd_rs_cancel_all(device);
5268
5269         kfree(device->p_uuid);
5270         device->p_uuid = NULL;
5271
5272         if (!drbd_suspended(device))
5273                 tl_clear(peer_device->connection);
5274
5275         drbd_md_sync(device);
5276
5277         if (get_ldev(device)) {
5278                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5279                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5280                 put_ldev(device);
5281         }
5282
5283         /* tcp_close and release of sendpage pages can be deferred.  I don't
5284          * want to use SO_LINGER, because apparently it can be deferred for
5285          * more than 20 seconds (longest time I checked).
5286          *
5287          * Actually we don't care for exactly when the network stack does its
5288          * put_page(), but release our reference on these pages right here.
5289          */
5290         i = drbd_free_peer_reqs(device, &device->net_ee);
5291         if (i)
5292                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5293         i = atomic_read(&device->pp_in_use_by_net);
5294         if (i)
5295                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5296         i = atomic_read(&device->pp_in_use);
5297         if (i)
5298                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5299
5300         D_ASSERT(device, list_empty(&device->read_ee));
5301         D_ASSERT(device, list_empty(&device->active_ee));
5302         D_ASSERT(device, list_empty(&device->sync_ee));
5303         D_ASSERT(device, list_empty(&device->done_ee));
5304
5305         return 0;
5306 }
5307
5308 /*
5309  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5310  * we can agree on is stored in agreed_pro_version.
5311  *
5312  * feature flags and the reserved array should be enough room for future
5313  * enhancements of the handshake protocol, and possible plugins...
5314  *
5315  * for now, they are expected to be zero, but ignored.
5316  */
5317 static int drbd_send_features(struct drbd_connection *connection)
5318 {
5319         struct drbd_socket *sock;
5320         struct p_connection_features *p;
5321
5322         sock = &connection->data;
5323         p = conn_prepare_command(connection, sock);
5324         if (!p)
5325                 return -EIO;
5326         memset(p, 0, sizeof(*p));
5327         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5328         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5329         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5330         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5331 }
5332
5333 /*
5334  * return values:
5335  *   1 yes, we have a valid connection
5336  *   0 oops, did not work out, please try again
5337  *  -1 peer talks different language,
5338  *     no point in trying again, please go standalone.
5339  */
5340 static int drbd_do_features(struct drbd_connection *connection)
5341 {
5342         /* ASSERT current == connection->receiver ... */
5343         struct p_connection_features *p;
5344         const int expect = sizeof(struct p_connection_features);
5345         struct packet_info pi;
5346         int err;
5347
5348         err = drbd_send_features(connection);
5349         if (err)
5350                 return 0;
5351
5352         err = drbd_recv_header(connection, &pi);
5353         if (err)
5354                 return 0;
5355
5356         if (pi.cmd != P_CONNECTION_FEATURES) {
5357                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5358                          cmdname(pi.cmd), pi.cmd);
5359                 return -1;
5360         }
5361
5362         if (pi.size != expect) {
5363                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5364                      expect, pi.size);
5365                 return -1;
5366         }
5367
5368         p = pi.data;
5369         err = drbd_recv_all_warn(connection, p, expect);
5370         if (err)
5371                 return 0;
5372
5373         p->protocol_min = be32_to_cpu(p->protocol_min);
5374         p->protocol_max = be32_to_cpu(p->protocol_max);
5375         if (p->protocol_max == 0)
5376                 p->protocol_max = p->protocol_min;
5377
5378         if (PRO_VERSION_MAX < p->protocol_min ||
5379             PRO_VERSION_MIN > p->protocol_max)
5380                 goto incompat;
5381
5382         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5383         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5384
5385         drbd_info(connection, "Handshake successful: "
5386              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5387
5388         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5389                   connection->agreed_features,
5390                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5391                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5392                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5393                   connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5394                   connection->agreed_features ? "" : " none");
5395
5396         return 1;
5397
5398  incompat:
5399         drbd_err(connection, "incompatible DRBD dialects: "
5400             "I support %d-%d, peer supports %d-%d\n",
5401             PRO_VERSION_MIN, PRO_VERSION_MAX,
5402             p->protocol_min, p->protocol_max);
5403         return -1;
5404 }
5405
5406 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5407 static int drbd_do_auth(struct drbd_connection *connection)
5408 {
5409         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5410         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5411         return -1;
5412 }
5413 #else
5414 #define CHALLENGE_LEN 64
5415
5416 /* Return value:
5417         1 - auth succeeded,
5418         0 - failed, try again (network error),
5419         -1 - auth failed, don't try again.
5420 */
5421
5422 static int drbd_do_auth(struct drbd_connection *connection)
5423 {
5424         struct drbd_socket *sock;
5425         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5426         char *response = NULL;
5427         char *right_response = NULL;
5428         char *peers_ch = NULL;
5429         unsigned int key_len;
5430         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5431         unsigned int resp_size;
5432         SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5433         struct packet_info pi;
5434         struct net_conf *nc;
5435         int err, rv;
5436
5437         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5438
5439         rcu_read_lock();
5440         nc = rcu_dereference(connection->net_conf);
5441         key_len = strlen(nc->shared_secret);
5442         memcpy(secret, nc->shared_secret, key_len);
5443         rcu_read_unlock();
5444
5445         desc->tfm = connection->cram_hmac_tfm;
5446
5447         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5448         if (rv) {
5449                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5450                 rv = -1;
5451                 goto fail;
5452         }
5453
5454         get_random_bytes(my_challenge, CHALLENGE_LEN);
5455
5456         sock = &connection->data;
5457         if (!conn_prepare_command(connection, sock)) {
5458                 rv = 0;
5459                 goto fail;
5460         }
5461         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5462                                 my_challenge, CHALLENGE_LEN);
5463         if (!rv)
5464                 goto fail;
5465
5466         err = drbd_recv_header(connection, &pi);
5467         if (err) {
5468                 rv = 0;
5469                 goto fail;
5470         }
5471
5472         if (pi.cmd != P_AUTH_CHALLENGE) {
5473                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5474                          cmdname(pi.cmd), pi.cmd);
5475                 rv = -1;
5476                 goto fail;
5477         }
5478
5479         if (pi.size > CHALLENGE_LEN * 2) {
5480                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5481                 rv = -1;
5482                 goto fail;
5483         }
5484
5485         if (pi.size < CHALLENGE_LEN) {
5486                 drbd_err(connection, "AuthChallenge payload too small.\n");
5487                 rv = -1;
5488                 goto fail;
5489         }
5490
5491         peers_ch = kmalloc(pi.size, GFP_NOIO);
5492         if (peers_ch == NULL) {
5493                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5494                 rv = -1;
5495                 goto fail;
5496         }
5497
5498         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5499         if (err) {
5500                 rv = 0;
5501                 goto fail;
5502         }
5503
5504         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5505                 drbd_err(connection, "Peer presented the same challenge!\n");
5506                 rv = -1;
5507                 goto fail;
5508         }
5509
5510         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5511         response = kmalloc(resp_size, GFP_NOIO);
5512         if (response == NULL) {
5513                 drbd_err(connection, "kmalloc of response failed\n");
5514                 rv = -1;
5515                 goto fail;
5516         }
5517
5518         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5519         if (rv) {
5520                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5521                 rv = -1;
5522                 goto fail;
5523         }
5524
5525         if (!conn_prepare_command(connection, sock)) {
5526                 rv = 0;
5527                 goto fail;
5528         }
5529         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5530                                 response, resp_size);
5531         if (!rv)
5532                 goto fail;
5533
5534         err = drbd_recv_header(connection, &pi);
5535         if (err) {
5536                 rv = 0;
5537                 goto fail;
5538         }
5539
5540         if (pi.cmd != P_AUTH_RESPONSE) {
5541                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5542                          cmdname(pi.cmd), pi.cmd);
5543                 rv = 0;
5544                 goto fail;
5545         }
5546
5547         if (pi.size != resp_size) {
5548                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5549                 rv = 0;
5550                 goto fail;
5551         }
5552
5553         err = drbd_recv_all_warn(connection, response , resp_size);
5554         if (err) {
5555                 rv = 0;
5556                 goto fail;
5557         }
5558
5559         right_response = kmalloc(resp_size, GFP_NOIO);
5560         if (right_response == NULL) {
5561                 drbd_err(connection, "kmalloc of right_response failed\n");
5562                 rv = -1;
5563                 goto fail;
5564         }
5565
5566         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5567                                  right_response);
5568         if (rv) {
5569                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5570                 rv = -1;
5571                 goto fail;
5572         }
5573
5574         rv = !memcmp(response, right_response, resp_size);
5575
5576         if (rv)
5577                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5578                      resp_size);
5579         else
5580                 rv = -1;
5581
5582  fail:
5583         kfree(peers_ch);
5584         kfree(response);
5585         kfree(right_response);
5586         shash_desc_zero(desc);
5587
5588         return rv;
5589 }
5590 #endif
5591
5592 int drbd_receiver(struct drbd_thread *thi)
5593 {
5594         struct drbd_connection *connection = thi->connection;
5595         int h;
5596
5597         drbd_info(connection, "receiver (re)started\n");
5598
5599         do {
5600                 h = conn_connect(connection);
5601                 if (h == 0) {
5602                         conn_disconnect(connection);
5603                         schedule_timeout_interruptible(HZ);
5604                 }
5605                 if (h == -1) {
5606                         drbd_warn(connection, "Discarding network configuration.\n");
5607                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5608                 }
5609         } while (h == 0);
5610
5611         if (h > 0) {
5612                 blk_start_plug(&connection->receiver_plug);
5613                 drbdd(connection);
5614                 blk_finish_plug(&connection->receiver_plug);
5615         }
5616
5617         conn_disconnect(connection);
5618
5619         drbd_info(connection, "receiver terminated\n");
5620         return 0;
5621 }
5622
5623 /* ********* acknowledge sender ******** */
5624
5625 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5626 {
5627         struct p_req_state_reply *p = pi->data;
5628         int retcode = be32_to_cpu(p->retcode);
5629
5630         if (retcode >= SS_SUCCESS) {
5631                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5632         } else {
5633                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5634                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5635                          drbd_set_st_err_str(retcode), retcode);
5636         }
5637         wake_up(&connection->ping_wait);
5638
5639         return 0;
5640 }
5641
5642 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5643 {
5644         struct drbd_peer_device *peer_device;
5645         struct drbd_device *device;
5646         struct p_req_state_reply *p = pi->data;
5647         int retcode = be32_to_cpu(p->retcode);
5648
5649         peer_device = conn_peer_device(connection, pi->vnr);
5650         if (!peer_device)
5651                 return -EIO;
5652         device = peer_device->device;
5653
5654         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5655                 D_ASSERT(device, connection->agreed_pro_version < 100);
5656                 return got_conn_RqSReply(connection, pi);
5657         }
5658
5659         if (retcode >= SS_SUCCESS) {
5660                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5661         } else {
5662                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5663                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5664                         drbd_set_st_err_str(retcode), retcode);
5665         }
5666         wake_up(&device->state_wait);
5667
5668         return 0;
5669 }
5670
5671 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5672 {
5673         return drbd_send_ping_ack(connection);
5674
5675 }
5676
5677 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5678 {
5679         /* restore idle timeout */
5680         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5681         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5682                 wake_up(&connection->ping_wait);
5683
5684         return 0;
5685 }
5686
5687 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5688 {
5689         struct drbd_peer_device *peer_device;
5690         struct drbd_device *device;
5691         struct p_block_ack *p = pi->data;
5692         sector_t sector = be64_to_cpu(p->sector);
5693         int blksize = be32_to_cpu(p->blksize);
5694
5695         peer_device = conn_peer_device(connection, pi->vnr);
5696         if (!peer_device)
5697                 return -EIO;
5698         device = peer_device->device;
5699
5700         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5701
5702         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5703
5704         if (get_ldev(device)) {
5705                 drbd_rs_complete_io(device, sector);
5706                 drbd_set_in_sync(device, sector, blksize);
5707                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5708                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5709                 put_ldev(device);
5710         }
5711         dec_rs_pending(device);
5712         atomic_add(blksize >> 9, &device->rs_sect_in);
5713
5714         return 0;
5715 }
5716
5717 static int
5718 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5719                               struct rb_root *root, const char *func,
5720                               enum drbd_req_event what, bool missing_ok)
5721 {
5722         struct drbd_request *req;
5723         struct bio_and_error m;
5724
5725         spin_lock_irq(&device->resource->req_lock);
5726         req = find_request(device, root, id, sector, missing_ok, func);
5727         if (unlikely(!req)) {
5728                 spin_unlock_irq(&device->resource->req_lock);
5729                 return -EIO;
5730         }
5731         __req_mod(req, what, &m);
5732         spin_unlock_irq(&device->resource->req_lock);
5733
5734         if (m.bio)
5735                 complete_master_bio(device, &m);
5736         return 0;
5737 }
5738
5739 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5740 {
5741         struct drbd_peer_device *peer_device;
5742         struct drbd_device *device;
5743         struct p_block_ack *p = pi->data;
5744         sector_t sector = be64_to_cpu(p->sector);
5745         int blksize = be32_to_cpu(p->blksize);
5746         enum drbd_req_event what;
5747
5748         peer_device = conn_peer_device(connection, pi->vnr);
5749         if (!peer_device)
5750                 return -EIO;
5751         device = peer_device->device;
5752
5753         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5754
5755         if (p->block_id == ID_SYNCER) {
5756                 drbd_set_in_sync(device, sector, blksize);
5757                 dec_rs_pending(device);
5758                 return 0;
5759         }
5760         switch (pi->cmd) {
5761         case P_RS_WRITE_ACK:
5762                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5763                 break;
5764         case P_WRITE_ACK:
5765                 what = WRITE_ACKED_BY_PEER;
5766                 break;
5767         case P_RECV_ACK:
5768                 what = RECV_ACKED_BY_PEER;
5769                 break;
5770         case P_SUPERSEDED:
5771                 what = CONFLICT_RESOLVED;
5772                 break;
5773         case P_RETRY_WRITE:
5774                 what = POSTPONE_WRITE;
5775                 break;
5776         default:
5777                 BUG();
5778         }
5779
5780         return validate_req_change_req_state(device, p->block_id, sector,
5781                                              &device->write_requests, __func__,
5782                                              what, false);
5783 }
5784
5785 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5786 {
5787         struct drbd_peer_device *peer_device;
5788         struct drbd_device *device;
5789         struct p_block_ack *p = pi->data;
5790         sector_t sector = be64_to_cpu(p->sector);
5791         int size = be32_to_cpu(p->blksize);
5792         int err;
5793
5794         peer_device = conn_peer_device(connection, pi->vnr);
5795         if (!peer_device)
5796                 return -EIO;
5797         device = peer_device->device;
5798
5799         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5800
5801         if (p->block_id == ID_SYNCER) {
5802                 dec_rs_pending(device);
5803                 drbd_rs_failed_io(device, sector, size);
5804                 return 0;
5805         }
5806
5807         err = validate_req_change_req_state(device, p->block_id, sector,
5808                                             &device->write_requests, __func__,
5809                                             NEG_ACKED, true);
5810         if (err) {
5811                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5812                    The master bio might already be completed, therefore the
5813                    request is no longer in the collision hash. */
5814                 /* In Protocol B we might already have got a P_RECV_ACK
5815                    but then get a P_NEG_ACK afterwards. */
5816                 drbd_set_out_of_sync(device, sector, size);
5817         }
5818         return 0;
5819 }
5820
5821 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5822 {
5823         struct drbd_peer_device *peer_device;
5824         struct drbd_device *device;
5825         struct p_block_ack *p = pi->data;
5826         sector_t sector = be64_to_cpu(p->sector);
5827
5828         peer_device = conn_peer_device(connection, pi->vnr);
5829         if (!peer_device)
5830                 return -EIO;
5831         device = peer_device->device;
5832
5833         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5834
5835         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5836             (unsigned long long)sector, be32_to_cpu(p->blksize));
5837
5838         return validate_req_change_req_state(device, p->block_id, sector,
5839                                              &device->read_requests, __func__,
5840                                              NEG_ACKED, false);
5841 }
5842
5843 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5844 {
5845         struct drbd_peer_device *peer_device;
5846         struct drbd_device *device;
5847         sector_t sector;
5848         int size;
5849         struct p_block_ack *p = pi->data;
5850
5851         peer_device = conn_peer_device(connection, pi->vnr);
5852         if (!peer_device)
5853                 return -EIO;
5854         device = peer_device->device;
5855
5856         sector = be64_to_cpu(p->sector);
5857         size = be32_to_cpu(p->blksize);
5858
5859         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5860
5861         dec_rs_pending(device);
5862
5863         if (get_ldev_if_state(device, D_FAILED)) {
5864                 drbd_rs_complete_io(device, sector);
5865                 switch (pi->cmd) {
5866                 case P_NEG_RS_DREPLY:
5867                         drbd_rs_failed_io(device, sector, size);
5868                 case P_RS_CANCEL:
5869                         break;
5870                 default:
5871                         BUG();
5872                 }
5873                 put_ldev(device);
5874         }
5875
5876         return 0;
5877 }
5878
5879 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5880 {
5881         struct p_barrier_ack *p = pi->data;
5882         struct drbd_peer_device *peer_device;
5883         int vnr;
5884
5885         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5886
5887         rcu_read_lock();
5888         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5889                 struct drbd_device *device = peer_device->device;
5890
5891                 if (device->state.conn == C_AHEAD &&
5892                     atomic_read(&device->ap_in_flight) == 0 &&
5893                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5894                         device->start_resync_timer.expires = jiffies + HZ;
5895                         add_timer(&device->start_resync_timer);
5896                 }
5897         }
5898         rcu_read_unlock();
5899
5900         return 0;
5901 }
5902
5903 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5904 {
5905         struct drbd_peer_device *peer_device;
5906         struct drbd_device *device;
5907         struct p_block_ack *p = pi->data;
5908         struct drbd_device_work *dw;
5909         sector_t sector;
5910         int size;
5911
5912         peer_device = conn_peer_device(connection, pi->vnr);
5913         if (!peer_device)
5914                 return -EIO;
5915         device = peer_device->device;
5916
5917         sector = be64_to_cpu(p->sector);
5918         size = be32_to_cpu(p->blksize);
5919
5920         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5921
5922         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5923                 drbd_ov_out_of_sync_found(device, sector, size);
5924         else
5925                 ov_out_of_sync_print(device);
5926
5927         if (!get_ldev(device))
5928                 return 0;
5929
5930         drbd_rs_complete_io(device, sector);
5931         dec_rs_pending(device);
5932
5933         --device->ov_left;
5934
5935         /* let's advance progress step marks only for every other megabyte */
5936         if ((device->ov_left & 0x200) == 0x200)
5937                 drbd_advance_rs_marks(device, device->ov_left);
5938
5939         if (device->ov_left == 0) {
5940                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5941                 if (dw) {
5942                         dw->w.cb = w_ov_finished;
5943                         dw->device = device;
5944                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5945                 } else {
5946                         drbd_err(device, "kmalloc(dw) failed.");
5947                         ov_out_of_sync_print(device);
5948                         drbd_resync_finished(device);
5949                 }
5950         }
5951         put_ldev(device);
5952         return 0;
5953 }
5954
5955 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5956 {
5957         return 0;
5958 }
5959
5960 struct meta_sock_cmd {
5961         size_t pkt_size;
5962         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5963 };
5964
5965 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5966 {
5967         long t;
5968         struct net_conf *nc;
5969
5970         rcu_read_lock();
5971         nc = rcu_dereference(connection->net_conf);
5972         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5973         rcu_read_unlock();
5974
5975         t *= HZ;
5976         if (ping_timeout)
5977                 t /= 10;
5978
5979         connection->meta.socket->sk->sk_rcvtimeo = t;
5980 }
5981
5982 static void set_ping_timeout(struct drbd_connection *connection)
5983 {
5984         set_rcvtimeo(connection, 1);
5985 }
5986
5987 static void set_idle_timeout(struct drbd_connection *connection)
5988 {
5989         set_rcvtimeo(connection, 0);
5990 }
5991
5992 static struct meta_sock_cmd ack_receiver_tbl[] = {
5993         [P_PING]            = { 0, got_Ping },
5994         [P_PING_ACK]        = { 0, got_PingAck },
5995         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5996         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5997         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5998         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5999         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
6000         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
6001         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
6002         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
6003         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
6004         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
6005         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
6006         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
6007         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
6008         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
6009         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
6010 };
6011
6012 int drbd_ack_receiver(struct drbd_thread *thi)
6013 {
6014         struct drbd_connection *connection = thi->connection;
6015         struct meta_sock_cmd *cmd = NULL;
6016         struct packet_info pi;
6017         unsigned long pre_recv_jif;
6018         int rv;
6019         void *buf    = connection->meta.rbuf;
6020         int received = 0;
6021         unsigned int header_size = drbd_header_size(connection);
6022         int expect   = header_size;
6023         bool ping_timeout_active = false;
6024         struct sched_param param = { .sched_priority = 2 };
6025
6026         rv = sched_setscheduler(current, SCHED_RR, &param);
6027         if (rv < 0)
6028                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
6029
6030         while (get_t_state(thi) == RUNNING) {
6031                 drbd_thread_current_set_cpu(thi);
6032
6033                 conn_reclaim_net_peer_reqs(connection);
6034
6035                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6036                         if (drbd_send_ping(connection)) {
6037                                 drbd_err(connection, "drbd_send_ping has failed\n");
6038                                 goto reconnect;
6039                         }
6040                         set_ping_timeout(connection);
6041                         ping_timeout_active = true;
6042                 }
6043
6044                 pre_recv_jif = jiffies;
6045                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6046
6047                 /* Note:
6048                  * -EINTR        (on meta) we got a signal
6049                  * -EAGAIN       (on meta) rcvtimeo expired
6050                  * -ECONNRESET   other side closed the connection
6051                  * -ERESTARTSYS  (on data) we got a signal
6052                  * rv <  0       other than above: unexpected error!
6053                  * rv == expected: full header or command
6054                  * rv <  expected: "woken" by signal during receive
6055                  * rv == 0       : "connection shut down by peer"
6056                  */
6057                 if (likely(rv > 0)) {
6058                         received += rv;
6059                         buf      += rv;
6060                 } else if (rv == 0) {
6061                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6062                                 long t;
6063                                 rcu_read_lock();
6064                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6065                                 rcu_read_unlock();
6066
6067                                 t = wait_event_timeout(connection->ping_wait,
6068                                                        connection->cstate < C_WF_REPORT_PARAMS,
6069                                                        t);
6070                                 if (t)
6071                                         break;
6072                         }
6073                         drbd_err(connection, "meta connection shut down by peer.\n");
6074                         goto reconnect;
6075                 } else if (rv == -EAGAIN) {
6076                         /* If the data socket received something meanwhile,
6077                          * that is good enough: peer is still alive. */
6078                         if (time_after(connection->last_received, pre_recv_jif))
6079                                 continue;
6080                         if (ping_timeout_active) {
6081                                 drbd_err(connection, "PingAck did not arrive in time.\n");
6082                                 goto reconnect;
6083                         }
6084                         set_bit(SEND_PING, &connection->flags);
6085                         continue;
6086                 } else if (rv == -EINTR) {
6087                         /* maybe drbd_thread_stop(): the while condition will notice.
6088                          * maybe woken for send_ping: we'll send a ping above,
6089                          * and change the rcvtimeo */
6090                         flush_signals(current);
6091                         continue;
6092                 } else {
6093                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6094                         goto reconnect;
6095                 }
6096
6097                 if (received == expect && cmd == NULL) {
6098                         if (decode_header(connection, connection->meta.rbuf, &pi))
6099                                 goto reconnect;
6100                         cmd = &ack_receiver_tbl[pi.cmd];
6101                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6102                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6103                                          cmdname(pi.cmd), pi.cmd);
6104                                 goto disconnect;
6105                         }
6106                         expect = header_size + cmd->pkt_size;
6107                         if (pi.size != expect - header_size) {
6108                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6109                                         pi.cmd, pi.size);
6110                                 goto reconnect;
6111                         }
6112                 }
6113                 if (received == expect) {
6114                         bool err;
6115
6116                         err = cmd->fn(connection, &pi);
6117                         if (err) {
6118                                 drbd_err(connection, "%pf failed\n", cmd->fn);
6119                                 goto reconnect;
6120                         }
6121
6122                         connection->last_received = jiffies;
6123
6124                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6125                                 set_idle_timeout(connection);
6126                                 ping_timeout_active = false;
6127                         }
6128
6129                         buf      = connection->meta.rbuf;
6130                         received = 0;
6131                         expect   = header_size;
6132                         cmd      = NULL;
6133                 }
6134         }
6135
6136         if (0) {
6137 reconnect:
6138                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6139                 conn_md_sync(connection);
6140         }
6141         if (0) {
6142 disconnect:
6143                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6144         }
6145
6146         drbd_info(connection, "ack_receiver terminated\n");
6147
6148         return 0;
6149 }
6150
6151 void drbd_send_acks_wf(struct work_struct *ws)
6152 {
6153         struct drbd_peer_device *peer_device =
6154                 container_of(ws, struct drbd_peer_device, send_acks_work);
6155         struct drbd_connection *connection = peer_device->connection;
6156         struct drbd_device *device = peer_device->device;
6157         struct net_conf *nc;
6158         int tcp_cork, err;
6159
6160         rcu_read_lock();
6161         nc = rcu_dereference(connection->net_conf);
6162         tcp_cork = nc->tcp_cork;
6163         rcu_read_unlock();
6164
6165         if (tcp_cork)
6166                 drbd_tcp_cork(connection->meta.socket);
6167
6168         err = drbd_finish_peer_reqs(device);
6169         kref_put(&device->kref, drbd_destroy_device);
6170         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6171            struct work_struct send_acks_work alive, which is in the peer_device object */
6172
6173         if (err) {
6174                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6175                 return;
6176         }
6177
6178         if (tcp_cork)
6179                 drbd_tcp_uncork(connection->meta.socket);
6180
6181         return;
6182 }