Merge tag 'for-4.21/block-20190102' of git://git.kernel.dk/linux-block
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <uapi/linux/sched/types.h>
40 #include <linux/sched/signal.h>
41 #include <linux/pkt_sched.h>
42 #define __KERNEL_SYSCALLS__
43 #include <linux/unistd.h>
44 #include <linux/vmalloc.h>
45 #include <linux/random.h>
46 #include <linux/string.h>
47 #include <linux/scatterlist.h>
48 #include "drbd_int.h"
49 #include "drbd_protocol.h"
50 #include "drbd_req.h"
51 #include "drbd_vli.h"
52
53 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
54
55 struct packet_info {
56         enum drbd_packet cmd;
57         unsigned int size;
58         unsigned int vnr;
59         void *data;
60 };
61
62 enum finish_epoch {
63         FE_STILL_LIVE,
64         FE_DESTROYED,
65         FE_RECYCLED,
66 };
67
68 static int drbd_do_features(struct drbd_connection *connection);
69 static int drbd_do_auth(struct drbd_connection *connection);
70 static int drbd_disconnected(struct drbd_peer_device *);
71 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
72 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
73 static int e_end_block(struct drbd_work *, int);
74
75
76 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
77
78 /*
79  * some helper functions to deal with single linked page lists,
80  * page->private being our "next" pointer.
81  */
82
83 /* If at least n pages are linked at head, get n pages off.
84  * Otherwise, don't modify head, and return NULL.
85  * Locking is the responsibility of the caller.
86  */
87 static struct page *page_chain_del(struct page **head, int n)
88 {
89         struct page *page;
90         struct page *tmp;
91
92         BUG_ON(!n);
93         BUG_ON(!head);
94
95         page = *head;
96
97         if (!page)
98                 return NULL;
99
100         while (page) {
101                 tmp = page_chain_next(page);
102                 if (--n == 0)
103                         break; /* found sufficient pages */
104                 if (tmp == NULL)
105                         /* insufficient pages, don't use any of them. */
106                         return NULL;
107                 page = tmp;
108         }
109
110         /* add end of list marker for the returned list */
111         set_page_private(page, 0);
112         /* actual return value, and adjustment of head */
113         page = *head;
114         *head = tmp;
115         return page;
116 }
117
118 /* may be used outside of locks to find the tail of a (usually short)
119  * "private" page chain, before adding it back to a global chain head
120  * with page_chain_add() under a spinlock. */
121 static struct page *page_chain_tail(struct page *page, int *len)
122 {
123         struct page *tmp;
124         int i = 1;
125         while ((tmp = page_chain_next(page)))
126                 ++i, page = tmp;
127         if (len)
128                 *len = i;
129         return page;
130 }
131
132 static int page_chain_free(struct page *page)
133 {
134         struct page *tmp;
135         int i = 0;
136         page_chain_for_each_safe(page, tmp) {
137                 put_page(page);
138                 ++i;
139         }
140         return i;
141 }
142
143 static void page_chain_add(struct page **head,
144                 struct page *chain_first, struct page *chain_last)
145 {
146 #if 1
147         struct page *tmp;
148         tmp = page_chain_tail(chain_first, NULL);
149         BUG_ON(tmp != chain_last);
150 #endif
151
152         /* add chain to head */
153         set_page_private(chain_last, (unsigned long)*head);
154         *head = chain_first;
155 }
156
157 static struct page *__drbd_alloc_pages(struct drbd_device *device,
158                                        unsigned int number)
159 {
160         struct page *page = NULL;
161         struct page *tmp = NULL;
162         unsigned int i = 0;
163
164         /* Yes, testing drbd_pp_vacant outside the lock is racy.
165          * So what. It saves a spin_lock. */
166         if (drbd_pp_vacant >= number) {
167                 spin_lock(&drbd_pp_lock);
168                 page = page_chain_del(&drbd_pp_pool, number);
169                 if (page)
170                         drbd_pp_vacant -= number;
171                 spin_unlock(&drbd_pp_lock);
172                 if (page)
173                         return page;
174         }
175
176         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
177          * "criss-cross" setup, that might cause write-out on some other DRBD,
178          * which in turn might block on the other node at this very place.  */
179         for (i = 0; i < number; i++) {
180                 tmp = alloc_page(GFP_TRY);
181                 if (!tmp)
182                         break;
183                 set_page_private(tmp, (unsigned long)page);
184                 page = tmp;
185         }
186
187         if (i == number)
188                 return page;
189
190         /* Not enough pages immediately available this time.
191          * No need to jump around here, drbd_alloc_pages will retry this
192          * function "soon". */
193         if (page) {
194                 tmp = page_chain_tail(page, NULL);
195                 spin_lock(&drbd_pp_lock);
196                 page_chain_add(&drbd_pp_pool, page, tmp);
197                 drbd_pp_vacant += i;
198                 spin_unlock(&drbd_pp_lock);
199         }
200         return NULL;
201 }
202
203 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
204                                            struct list_head *to_be_freed)
205 {
206         struct drbd_peer_request *peer_req, *tmp;
207
208         /* The EEs are always appended to the end of the list. Since
209            they are sent in order over the wire, they have to finish
210            in order. As soon as we see the first not finished we can
211            stop to examine the list... */
212
213         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
214                 if (drbd_peer_req_has_active_page(peer_req))
215                         break;
216                 list_move(&peer_req->w.list, to_be_freed);
217         }
218 }
219
220 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
221 {
222         LIST_HEAD(reclaimed);
223         struct drbd_peer_request *peer_req, *t;
224
225         spin_lock_irq(&device->resource->req_lock);
226         reclaim_finished_net_peer_reqs(device, &reclaimed);
227         spin_unlock_irq(&device->resource->req_lock);
228         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
229                 drbd_free_net_peer_req(device, peer_req);
230 }
231
232 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
233 {
234         struct drbd_peer_device *peer_device;
235         int vnr;
236
237         rcu_read_lock();
238         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
239                 struct drbd_device *device = peer_device->device;
240                 if (!atomic_read(&device->pp_in_use_by_net))
241                         continue;
242
243                 kref_get(&device->kref);
244                 rcu_read_unlock();
245                 drbd_reclaim_net_peer_reqs(device);
246                 kref_put(&device->kref, drbd_destroy_device);
247                 rcu_read_lock();
248         }
249         rcu_read_unlock();
250 }
251
252 /**
253  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
254  * @device:     DRBD device.
255  * @number:     number of pages requested
256  * @retry:      whether to retry, if not enough pages are available right now
257  *
258  * Tries to allocate number pages, first from our own page pool, then from
259  * the kernel.
260  * Possibly retry until DRBD frees sufficient pages somewhere else.
261  *
262  * If this allocation would exceed the max_buffers setting, we throttle
263  * allocation (schedule_timeout) to give the system some room to breathe.
264  *
265  * We do not use max-buffers as hard limit, because it could lead to
266  * congestion and further to a distributed deadlock during online-verify or
267  * (checksum based) resync, if the max-buffers, socket buffer sizes and
268  * resync-rate settings are mis-configured.
269  *
270  * Returns a page chain linked via page->private.
271  */
272 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
273                               bool retry)
274 {
275         struct drbd_device *device = peer_device->device;
276         struct page *page = NULL;
277         struct net_conf *nc;
278         DEFINE_WAIT(wait);
279         unsigned int mxb;
280
281         rcu_read_lock();
282         nc = rcu_dereference(peer_device->connection->net_conf);
283         mxb = nc ? nc->max_buffers : 1000000;
284         rcu_read_unlock();
285
286         if (atomic_read(&device->pp_in_use) < mxb)
287                 page = __drbd_alloc_pages(device, number);
288
289         /* Try to keep the fast path fast, but occasionally we need
290          * to reclaim the pages we lended to the network stack. */
291         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
292                 drbd_reclaim_net_peer_reqs(device);
293
294         while (page == NULL) {
295                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
296
297                 drbd_reclaim_net_peer_reqs(device);
298
299                 if (atomic_read(&device->pp_in_use) < mxb) {
300                         page = __drbd_alloc_pages(device, number);
301                         if (page)
302                                 break;
303                 }
304
305                 if (!retry)
306                         break;
307
308                 if (signal_pending(current)) {
309                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
310                         break;
311                 }
312
313                 if (schedule_timeout(HZ/10) == 0)
314                         mxb = UINT_MAX;
315         }
316         finish_wait(&drbd_pp_wait, &wait);
317
318         if (page)
319                 atomic_add(number, &device->pp_in_use);
320         return page;
321 }
322
323 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
324  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
325  * Either links the page chain back to the global pool,
326  * or returns all pages to the system. */
327 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
328 {
329         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
330         int i;
331
332         if (page == NULL)
333                 return;
334
335         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
336                 i = page_chain_free(page);
337         else {
338                 struct page *tmp;
339                 tmp = page_chain_tail(page, &i);
340                 spin_lock(&drbd_pp_lock);
341                 page_chain_add(&drbd_pp_pool, page, tmp);
342                 drbd_pp_vacant += i;
343                 spin_unlock(&drbd_pp_lock);
344         }
345         i = atomic_sub_return(i, a);
346         if (i < 0)
347                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
348                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
349         wake_up(&drbd_pp_wait);
350 }
351
352 /*
353 You need to hold the req_lock:
354  _drbd_wait_ee_list_empty()
355
356 You must not have the req_lock:
357  drbd_free_peer_req()
358  drbd_alloc_peer_req()
359  drbd_free_peer_reqs()
360  drbd_ee_fix_bhs()
361  drbd_finish_peer_reqs()
362  drbd_clear_done_ee()
363  drbd_wait_ee_list_empty()
364 */
365
366 /* normal: payload_size == request size (bi_size)
367  * w_same: payload_size == logical_block_size
368  * trim: payload_size == 0 */
369 struct drbd_peer_request *
370 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
371                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
372 {
373         struct drbd_device *device = peer_device->device;
374         struct drbd_peer_request *peer_req;
375         struct page *page = NULL;
376         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
377
378         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
379                 return NULL;
380
381         peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
382         if (!peer_req) {
383                 if (!(gfp_mask & __GFP_NOWARN))
384                         drbd_err(device, "%s: allocation failed\n", __func__);
385                 return NULL;
386         }
387
388         if (nr_pages) {
389                 page = drbd_alloc_pages(peer_device, nr_pages,
390                                         gfpflags_allow_blocking(gfp_mask));
391                 if (!page)
392                         goto fail;
393         }
394
395         memset(peer_req, 0, sizeof(*peer_req));
396         INIT_LIST_HEAD(&peer_req->w.list);
397         drbd_clear_interval(&peer_req->i);
398         peer_req->i.size = request_size;
399         peer_req->i.sector = sector;
400         peer_req->submit_jif = jiffies;
401         peer_req->peer_device = peer_device;
402         peer_req->pages = page;
403         /*
404          * The block_id is opaque to the receiver.  It is not endianness
405          * converted, and sent back to the sender unchanged.
406          */
407         peer_req->block_id = id;
408
409         return peer_req;
410
411  fail:
412         mempool_free(peer_req, &drbd_ee_mempool);
413         return NULL;
414 }
415
416 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
417                        int is_net)
418 {
419         might_sleep();
420         if (peer_req->flags & EE_HAS_DIGEST)
421                 kfree(peer_req->digest);
422         drbd_free_pages(device, peer_req->pages, is_net);
423         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
424         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
425         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
426                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
427                 drbd_al_complete_io(device, &peer_req->i);
428         }
429         mempool_free(peer_req, &drbd_ee_mempool);
430 }
431
432 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
433 {
434         LIST_HEAD(work_list);
435         struct drbd_peer_request *peer_req, *t;
436         int count = 0;
437         int is_net = list == &device->net_ee;
438
439         spin_lock_irq(&device->resource->req_lock);
440         list_splice_init(list, &work_list);
441         spin_unlock_irq(&device->resource->req_lock);
442
443         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
444                 __drbd_free_peer_req(device, peer_req, is_net);
445                 count++;
446         }
447         return count;
448 }
449
450 /*
451  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
452  */
453 static int drbd_finish_peer_reqs(struct drbd_device *device)
454 {
455         LIST_HEAD(work_list);
456         LIST_HEAD(reclaimed);
457         struct drbd_peer_request *peer_req, *t;
458         int err = 0;
459
460         spin_lock_irq(&device->resource->req_lock);
461         reclaim_finished_net_peer_reqs(device, &reclaimed);
462         list_splice_init(&device->done_ee, &work_list);
463         spin_unlock_irq(&device->resource->req_lock);
464
465         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
466                 drbd_free_net_peer_req(device, peer_req);
467
468         /* possible callbacks here:
469          * e_end_block, and e_end_resync_block, e_send_superseded.
470          * all ignore the last argument.
471          */
472         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
473                 int err2;
474
475                 /* list_del not necessary, next/prev members not touched */
476                 err2 = peer_req->w.cb(&peer_req->w, !!err);
477                 if (!err)
478                         err = err2;
479                 drbd_free_peer_req(device, peer_req);
480         }
481         wake_up(&device->ee_wait);
482
483         return err;
484 }
485
486 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
487                                      struct list_head *head)
488 {
489         DEFINE_WAIT(wait);
490
491         /* avoids spin_lock/unlock
492          * and calling prepare_to_wait in the fast path */
493         while (!list_empty(head)) {
494                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
495                 spin_unlock_irq(&device->resource->req_lock);
496                 io_schedule();
497                 finish_wait(&device->ee_wait, &wait);
498                 spin_lock_irq(&device->resource->req_lock);
499         }
500 }
501
502 static void drbd_wait_ee_list_empty(struct drbd_device *device,
503                                     struct list_head *head)
504 {
505         spin_lock_irq(&device->resource->req_lock);
506         _drbd_wait_ee_list_empty(device, head);
507         spin_unlock_irq(&device->resource->req_lock);
508 }
509
510 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
511 {
512         struct kvec iov = {
513                 .iov_base = buf,
514                 .iov_len = size,
515         };
516         struct msghdr msg = {
517                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
518         };
519         iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
520         return sock_recvmsg(sock, &msg, msg.msg_flags);
521 }
522
523 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
524 {
525         int rv;
526
527         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
528
529         if (rv < 0) {
530                 if (rv == -ECONNRESET)
531                         drbd_info(connection, "sock was reset by peer\n");
532                 else if (rv != -ERESTARTSYS)
533                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
534         } else if (rv == 0) {
535                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
536                         long t;
537                         rcu_read_lock();
538                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
539                         rcu_read_unlock();
540
541                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
542
543                         if (t)
544                                 goto out;
545                 }
546                 drbd_info(connection, "sock was shut down by peer\n");
547         }
548
549         if (rv != size)
550                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
551
552 out:
553         return rv;
554 }
555
556 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
557 {
558         int err;
559
560         err = drbd_recv(connection, buf, size);
561         if (err != size) {
562                 if (err >= 0)
563                         err = -EIO;
564         } else
565                 err = 0;
566         return err;
567 }
568
569 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
570 {
571         int err;
572
573         err = drbd_recv_all(connection, buf, size);
574         if (err && !signal_pending(current))
575                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
576         return err;
577 }
578
579 /* quoting tcp(7):
580  *   On individual connections, the socket buffer size must be set prior to the
581  *   listen(2) or connect(2) calls in order to have it take effect.
582  * This is our wrapper to do so.
583  */
584 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
585                 unsigned int rcv)
586 {
587         /* open coded SO_SNDBUF, SO_RCVBUF */
588         if (snd) {
589                 sock->sk->sk_sndbuf = snd;
590                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
591         }
592         if (rcv) {
593                 sock->sk->sk_rcvbuf = rcv;
594                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
595         }
596 }
597
598 static struct socket *drbd_try_connect(struct drbd_connection *connection)
599 {
600         const char *what;
601         struct socket *sock;
602         struct sockaddr_in6 src_in6;
603         struct sockaddr_in6 peer_in6;
604         struct net_conf *nc;
605         int err, peer_addr_len, my_addr_len;
606         int sndbuf_size, rcvbuf_size, connect_int;
607         int disconnect_on_error = 1;
608
609         rcu_read_lock();
610         nc = rcu_dereference(connection->net_conf);
611         if (!nc) {
612                 rcu_read_unlock();
613                 return NULL;
614         }
615         sndbuf_size = nc->sndbuf_size;
616         rcvbuf_size = nc->rcvbuf_size;
617         connect_int = nc->connect_int;
618         rcu_read_unlock();
619
620         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
621         memcpy(&src_in6, &connection->my_addr, my_addr_len);
622
623         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
624                 src_in6.sin6_port = 0;
625         else
626                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
627
628         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
629         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
630
631         what = "sock_create_kern";
632         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
633                                SOCK_STREAM, IPPROTO_TCP, &sock);
634         if (err < 0) {
635                 sock = NULL;
636                 goto out;
637         }
638
639         sock->sk->sk_rcvtimeo =
640         sock->sk->sk_sndtimeo = connect_int * HZ;
641         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
642
643        /* explicitly bind to the configured IP as source IP
644         *  for the outgoing connections.
645         *  This is needed for multihomed hosts and to be
646         *  able to use lo: interfaces for drbd.
647         * Make sure to use 0 as port number, so linux selects
648         *  a free one dynamically.
649         */
650         what = "bind before connect";
651         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
652         if (err < 0)
653                 goto out;
654
655         /* connect may fail, peer not yet available.
656          * stay C_WF_CONNECTION, don't go Disconnecting! */
657         disconnect_on_error = 0;
658         what = "connect";
659         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
660
661 out:
662         if (err < 0) {
663                 if (sock) {
664                         sock_release(sock);
665                         sock = NULL;
666                 }
667                 switch (-err) {
668                         /* timeout, busy, signal pending */
669                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
670                 case EINTR: case ERESTARTSYS:
671                         /* peer not (yet) available, network problem */
672                 case ECONNREFUSED: case ENETUNREACH:
673                 case EHOSTDOWN:    case EHOSTUNREACH:
674                         disconnect_on_error = 0;
675                         break;
676                 default:
677                         drbd_err(connection, "%s failed, err = %d\n", what, err);
678                 }
679                 if (disconnect_on_error)
680                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
681         }
682
683         return sock;
684 }
685
686 struct accept_wait_data {
687         struct drbd_connection *connection;
688         struct socket *s_listen;
689         struct completion door_bell;
690         void (*original_sk_state_change)(struct sock *sk);
691
692 };
693
694 static void drbd_incoming_connection(struct sock *sk)
695 {
696         struct accept_wait_data *ad = sk->sk_user_data;
697         void (*state_change)(struct sock *sk);
698
699         state_change = ad->original_sk_state_change;
700         if (sk->sk_state == TCP_ESTABLISHED)
701                 complete(&ad->door_bell);
702         state_change(sk);
703 }
704
705 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
706 {
707         int err, sndbuf_size, rcvbuf_size, my_addr_len;
708         struct sockaddr_in6 my_addr;
709         struct socket *s_listen;
710         struct net_conf *nc;
711         const char *what;
712
713         rcu_read_lock();
714         nc = rcu_dereference(connection->net_conf);
715         if (!nc) {
716                 rcu_read_unlock();
717                 return -EIO;
718         }
719         sndbuf_size = nc->sndbuf_size;
720         rcvbuf_size = nc->rcvbuf_size;
721         rcu_read_unlock();
722
723         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
724         memcpy(&my_addr, &connection->my_addr, my_addr_len);
725
726         what = "sock_create_kern";
727         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
728                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
729         if (err) {
730                 s_listen = NULL;
731                 goto out;
732         }
733
734         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
735         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
736
737         what = "bind before listen";
738         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
739         if (err < 0)
740                 goto out;
741
742         ad->s_listen = s_listen;
743         write_lock_bh(&s_listen->sk->sk_callback_lock);
744         ad->original_sk_state_change = s_listen->sk->sk_state_change;
745         s_listen->sk->sk_state_change = drbd_incoming_connection;
746         s_listen->sk->sk_user_data = ad;
747         write_unlock_bh(&s_listen->sk->sk_callback_lock);
748
749         what = "listen";
750         err = s_listen->ops->listen(s_listen, 5);
751         if (err < 0)
752                 goto out;
753
754         return 0;
755 out:
756         if (s_listen)
757                 sock_release(s_listen);
758         if (err < 0) {
759                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
760                         drbd_err(connection, "%s failed, err = %d\n", what, err);
761                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
762                 }
763         }
764
765         return -EIO;
766 }
767
768 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
769 {
770         write_lock_bh(&sk->sk_callback_lock);
771         sk->sk_state_change = ad->original_sk_state_change;
772         sk->sk_user_data = NULL;
773         write_unlock_bh(&sk->sk_callback_lock);
774 }
775
776 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
777 {
778         int timeo, connect_int, err = 0;
779         struct socket *s_estab = NULL;
780         struct net_conf *nc;
781
782         rcu_read_lock();
783         nc = rcu_dereference(connection->net_conf);
784         if (!nc) {
785                 rcu_read_unlock();
786                 return NULL;
787         }
788         connect_int = nc->connect_int;
789         rcu_read_unlock();
790
791         timeo = connect_int * HZ;
792         /* 28.5% random jitter */
793         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
794
795         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
796         if (err <= 0)
797                 return NULL;
798
799         err = kernel_accept(ad->s_listen, &s_estab, 0);
800         if (err < 0) {
801                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
802                         drbd_err(connection, "accept failed, err = %d\n", err);
803                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
804                 }
805         }
806
807         if (s_estab)
808                 unregister_state_change(s_estab->sk, ad);
809
810         return s_estab;
811 }
812
813 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
814
815 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
816                              enum drbd_packet cmd)
817 {
818         if (!conn_prepare_command(connection, sock))
819                 return -EIO;
820         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
821 }
822
823 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
824 {
825         unsigned int header_size = drbd_header_size(connection);
826         struct packet_info pi;
827         struct net_conf *nc;
828         int err;
829
830         rcu_read_lock();
831         nc = rcu_dereference(connection->net_conf);
832         if (!nc) {
833                 rcu_read_unlock();
834                 return -EIO;
835         }
836         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
837         rcu_read_unlock();
838
839         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
840         if (err != header_size) {
841                 if (err >= 0)
842                         err = -EIO;
843                 return err;
844         }
845         err = decode_header(connection, connection->data.rbuf, &pi);
846         if (err)
847                 return err;
848         return pi.cmd;
849 }
850
851 /**
852  * drbd_socket_okay() - Free the socket if its connection is not okay
853  * @sock:       pointer to the pointer to the socket.
854  */
855 static bool drbd_socket_okay(struct socket **sock)
856 {
857         int rr;
858         char tb[4];
859
860         if (!*sock)
861                 return false;
862
863         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
864
865         if (rr > 0 || rr == -EAGAIN) {
866                 return true;
867         } else {
868                 sock_release(*sock);
869                 *sock = NULL;
870                 return false;
871         }
872 }
873
874 static bool connection_established(struct drbd_connection *connection,
875                                    struct socket **sock1,
876                                    struct socket **sock2)
877 {
878         struct net_conf *nc;
879         int timeout;
880         bool ok;
881
882         if (!*sock1 || !*sock2)
883                 return false;
884
885         rcu_read_lock();
886         nc = rcu_dereference(connection->net_conf);
887         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
888         rcu_read_unlock();
889         schedule_timeout_interruptible(timeout);
890
891         ok = drbd_socket_okay(sock1);
892         ok = drbd_socket_okay(sock2) && ok;
893
894         return ok;
895 }
896
897 /* Gets called if a connection is established, or if a new minor gets created
898    in a connection */
899 int drbd_connected(struct drbd_peer_device *peer_device)
900 {
901         struct drbd_device *device = peer_device->device;
902         int err;
903
904         atomic_set(&device->packet_seq, 0);
905         device->peer_seq = 0;
906
907         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
908                 &peer_device->connection->cstate_mutex :
909                 &device->own_state_mutex;
910
911         err = drbd_send_sync_param(peer_device);
912         if (!err)
913                 err = drbd_send_sizes(peer_device, 0, 0);
914         if (!err)
915                 err = drbd_send_uuids(peer_device);
916         if (!err)
917                 err = drbd_send_current_state(peer_device);
918         clear_bit(USE_DEGR_WFC_T, &device->flags);
919         clear_bit(RESIZE_PENDING, &device->flags);
920         atomic_set(&device->ap_in_flight, 0);
921         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
922         return err;
923 }
924
925 /*
926  * return values:
927  *   1 yes, we have a valid connection
928  *   0 oops, did not work out, please try again
929  *  -1 peer talks different language,
930  *     no point in trying again, please go standalone.
931  *  -2 We do not have a network config...
932  */
933 static int conn_connect(struct drbd_connection *connection)
934 {
935         struct drbd_socket sock, msock;
936         struct drbd_peer_device *peer_device;
937         struct net_conf *nc;
938         int vnr, timeout, h;
939         bool discard_my_data, ok;
940         enum drbd_state_rv rv;
941         struct accept_wait_data ad = {
942                 .connection = connection,
943                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
944         };
945
946         clear_bit(DISCONNECT_SENT, &connection->flags);
947         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
948                 return -2;
949
950         mutex_init(&sock.mutex);
951         sock.sbuf = connection->data.sbuf;
952         sock.rbuf = connection->data.rbuf;
953         sock.socket = NULL;
954         mutex_init(&msock.mutex);
955         msock.sbuf = connection->meta.sbuf;
956         msock.rbuf = connection->meta.rbuf;
957         msock.socket = NULL;
958
959         /* Assume that the peer only understands protocol 80 until we know better.  */
960         connection->agreed_pro_version = 80;
961
962         if (prepare_listen_socket(connection, &ad))
963                 return 0;
964
965         do {
966                 struct socket *s;
967
968                 s = drbd_try_connect(connection);
969                 if (s) {
970                         if (!sock.socket) {
971                                 sock.socket = s;
972                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
973                         } else if (!msock.socket) {
974                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
975                                 msock.socket = s;
976                                 send_first_packet(connection, &msock, P_INITIAL_META);
977                         } else {
978                                 drbd_err(connection, "Logic error in conn_connect()\n");
979                                 goto out_release_sockets;
980                         }
981                 }
982
983                 if (connection_established(connection, &sock.socket, &msock.socket))
984                         break;
985
986 retry:
987                 s = drbd_wait_for_connect(connection, &ad);
988                 if (s) {
989                         int fp = receive_first_packet(connection, s);
990                         drbd_socket_okay(&sock.socket);
991                         drbd_socket_okay(&msock.socket);
992                         switch (fp) {
993                         case P_INITIAL_DATA:
994                                 if (sock.socket) {
995                                         drbd_warn(connection, "initial packet S crossed\n");
996                                         sock_release(sock.socket);
997                                         sock.socket = s;
998                                         goto randomize;
999                                 }
1000                                 sock.socket = s;
1001                                 break;
1002                         case P_INITIAL_META:
1003                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1004                                 if (msock.socket) {
1005                                         drbd_warn(connection, "initial packet M crossed\n");
1006                                         sock_release(msock.socket);
1007                                         msock.socket = s;
1008                                         goto randomize;
1009                                 }
1010                                 msock.socket = s;
1011                                 break;
1012                         default:
1013                                 drbd_warn(connection, "Error receiving initial packet\n");
1014                                 sock_release(s);
1015 randomize:
1016                                 if (prandom_u32() & 1)
1017                                         goto retry;
1018                         }
1019                 }
1020
1021                 if (connection->cstate <= C_DISCONNECTING)
1022                         goto out_release_sockets;
1023                 if (signal_pending(current)) {
1024                         flush_signals(current);
1025                         smp_rmb();
1026                         if (get_t_state(&connection->receiver) == EXITING)
1027                                 goto out_release_sockets;
1028                 }
1029
1030                 ok = connection_established(connection, &sock.socket, &msock.socket);
1031         } while (!ok);
1032
1033         if (ad.s_listen)
1034                 sock_release(ad.s_listen);
1035
1036         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1037         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1038
1039         sock.socket->sk->sk_allocation = GFP_NOIO;
1040         msock.socket->sk->sk_allocation = GFP_NOIO;
1041
1042         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1043         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1044
1045         /* NOT YET ...
1046          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1047          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1048          * first set it to the P_CONNECTION_FEATURES timeout,
1049          * which we set to 4x the configured ping_timeout. */
1050         rcu_read_lock();
1051         nc = rcu_dereference(connection->net_conf);
1052
1053         sock.socket->sk->sk_sndtimeo =
1054         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1055
1056         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1057         timeout = nc->timeout * HZ / 10;
1058         discard_my_data = nc->discard_my_data;
1059         rcu_read_unlock();
1060
1061         msock.socket->sk->sk_sndtimeo = timeout;
1062
1063         /* we don't want delays.
1064          * we use TCP_CORK where appropriate, though */
1065         drbd_tcp_nodelay(sock.socket);
1066         drbd_tcp_nodelay(msock.socket);
1067
1068         connection->data.socket = sock.socket;
1069         connection->meta.socket = msock.socket;
1070         connection->last_received = jiffies;
1071
1072         h = drbd_do_features(connection);
1073         if (h <= 0)
1074                 return h;
1075
1076         if (connection->cram_hmac_tfm) {
1077                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1078                 switch (drbd_do_auth(connection)) {
1079                 case -1:
1080                         drbd_err(connection, "Authentication of peer failed\n");
1081                         return -1;
1082                 case 0:
1083                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1084                         return 0;
1085                 }
1086         }
1087
1088         connection->data.socket->sk->sk_sndtimeo = timeout;
1089         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1090
1091         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1092                 return -1;
1093
1094         /* Prevent a race between resync-handshake and
1095          * being promoted to Primary.
1096          *
1097          * Grab and release the state mutex, so we know that any current
1098          * drbd_set_role() is finished, and any incoming drbd_set_role
1099          * will see the STATE_SENT flag, and wait for it to be cleared.
1100          */
1101         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1102                 mutex_lock(peer_device->device->state_mutex);
1103
1104         /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1105         spin_lock_irq(&connection->resource->req_lock);
1106         set_bit(STATE_SENT, &connection->flags);
1107         spin_unlock_irq(&connection->resource->req_lock);
1108
1109         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1110                 mutex_unlock(peer_device->device->state_mutex);
1111
1112         rcu_read_lock();
1113         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1114                 struct drbd_device *device = peer_device->device;
1115                 kref_get(&device->kref);
1116                 rcu_read_unlock();
1117
1118                 if (discard_my_data)
1119                         set_bit(DISCARD_MY_DATA, &device->flags);
1120                 else
1121                         clear_bit(DISCARD_MY_DATA, &device->flags);
1122
1123                 drbd_connected(peer_device);
1124                 kref_put(&device->kref, drbd_destroy_device);
1125                 rcu_read_lock();
1126         }
1127         rcu_read_unlock();
1128
1129         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1130         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1131                 clear_bit(STATE_SENT, &connection->flags);
1132                 return 0;
1133         }
1134
1135         drbd_thread_start(&connection->ack_receiver);
1136         /* opencoded create_singlethread_workqueue(),
1137          * to be able to use format string arguments */
1138         connection->ack_sender =
1139                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1140         if (!connection->ack_sender) {
1141                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1142                 return 0;
1143         }
1144
1145         mutex_lock(&connection->resource->conf_update);
1146         /* The discard_my_data flag is a single-shot modifier to the next
1147          * connection attempt, the handshake of which is now well underway.
1148          * No need for rcu style copying of the whole struct
1149          * just to clear a single value. */
1150         connection->net_conf->discard_my_data = 0;
1151         mutex_unlock(&connection->resource->conf_update);
1152
1153         return h;
1154
1155 out_release_sockets:
1156         if (ad.s_listen)
1157                 sock_release(ad.s_listen);
1158         if (sock.socket)
1159                 sock_release(sock.socket);
1160         if (msock.socket)
1161                 sock_release(msock.socket);
1162         return -1;
1163 }
1164
1165 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1166 {
1167         unsigned int header_size = drbd_header_size(connection);
1168
1169         if (header_size == sizeof(struct p_header100) &&
1170             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1171                 struct p_header100 *h = header;
1172                 if (h->pad != 0) {
1173                         drbd_err(connection, "Header padding is not zero\n");
1174                         return -EINVAL;
1175                 }
1176                 pi->vnr = be16_to_cpu(h->volume);
1177                 pi->cmd = be16_to_cpu(h->command);
1178                 pi->size = be32_to_cpu(h->length);
1179         } else if (header_size == sizeof(struct p_header95) &&
1180                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1181                 struct p_header95 *h = header;
1182                 pi->cmd = be16_to_cpu(h->command);
1183                 pi->size = be32_to_cpu(h->length);
1184                 pi->vnr = 0;
1185         } else if (header_size == sizeof(struct p_header80) &&
1186                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1187                 struct p_header80 *h = header;
1188                 pi->cmd = be16_to_cpu(h->command);
1189                 pi->size = be16_to_cpu(h->length);
1190                 pi->vnr = 0;
1191         } else {
1192                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1193                          be32_to_cpu(*(__be32 *)header),
1194                          connection->agreed_pro_version);
1195                 return -EINVAL;
1196         }
1197         pi->data = header + header_size;
1198         return 0;
1199 }
1200
1201 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1202 {
1203         if (current->plug == &connection->receiver_plug) {
1204                 blk_finish_plug(&connection->receiver_plug);
1205                 blk_start_plug(&connection->receiver_plug);
1206         } /* else: maybe just schedule() ?? */
1207 }
1208
1209 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1210 {
1211         void *buffer = connection->data.rbuf;
1212         int err;
1213
1214         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1215         if (err)
1216                 return err;
1217
1218         err = decode_header(connection, buffer, pi);
1219         connection->last_received = jiffies;
1220
1221         return err;
1222 }
1223
1224 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1225 {
1226         void *buffer = connection->data.rbuf;
1227         unsigned int size = drbd_header_size(connection);
1228         int err;
1229
1230         err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1231         if (err != size) {
1232                 /* If we have nothing in the receive buffer now, to reduce
1233                  * application latency, try to drain the backend queues as
1234                  * quickly as possible, and let remote TCP know what we have
1235                  * received so far. */
1236                 if (err == -EAGAIN) {
1237                         drbd_tcp_quickack(connection->data.socket);
1238                         drbd_unplug_all_devices(connection);
1239                 }
1240                 if (err > 0) {
1241                         buffer += err;
1242                         size -= err;
1243                 }
1244                 err = drbd_recv_all_warn(connection, buffer, size);
1245                 if (err)
1246                         return err;
1247         }
1248
1249         err = decode_header(connection, connection->data.rbuf, pi);
1250         connection->last_received = jiffies;
1251
1252         return err;
1253 }
1254 /* This is blkdev_issue_flush, but asynchronous.
1255  * We want to submit to all component volumes in parallel,
1256  * then wait for all completions.
1257  */
1258 struct issue_flush_context {
1259         atomic_t pending;
1260         int error;
1261         struct completion done;
1262 };
1263 struct one_flush_context {
1264         struct drbd_device *device;
1265         struct issue_flush_context *ctx;
1266 };
1267
1268 static void one_flush_endio(struct bio *bio)
1269 {
1270         struct one_flush_context *octx = bio->bi_private;
1271         struct drbd_device *device = octx->device;
1272         struct issue_flush_context *ctx = octx->ctx;
1273
1274         if (bio->bi_status) {
1275                 ctx->error = blk_status_to_errno(bio->bi_status);
1276                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1277         }
1278         kfree(octx);
1279         bio_put(bio);
1280
1281         clear_bit(FLUSH_PENDING, &device->flags);
1282         put_ldev(device);
1283         kref_put(&device->kref, drbd_destroy_device);
1284
1285         if (atomic_dec_and_test(&ctx->pending))
1286                 complete(&ctx->done);
1287 }
1288
1289 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1290 {
1291         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1292         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1293         if (!bio || !octx) {
1294                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1295                 /* FIXME: what else can I do now?  disconnecting or detaching
1296                  * really does not help to improve the state of the world, either.
1297                  */
1298                 kfree(octx);
1299                 if (bio)
1300                         bio_put(bio);
1301
1302                 ctx->error = -ENOMEM;
1303                 put_ldev(device);
1304                 kref_put(&device->kref, drbd_destroy_device);
1305                 return;
1306         }
1307
1308         octx->device = device;
1309         octx->ctx = ctx;
1310         bio_set_dev(bio, device->ldev->backing_bdev);
1311         bio->bi_private = octx;
1312         bio->bi_end_io = one_flush_endio;
1313         bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1314
1315         device->flush_jif = jiffies;
1316         set_bit(FLUSH_PENDING, &device->flags);
1317         atomic_inc(&ctx->pending);
1318         submit_bio(bio);
1319 }
1320
1321 static void drbd_flush(struct drbd_connection *connection)
1322 {
1323         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1324                 struct drbd_peer_device *peer_device;
1325                 struct issue_flush_context ctx;
1326                 int vnr;
1327
1328                 atomic_set(&ctx.pending, 1);
1329                 ctx.error = 0;
1330                 init_completion(&ctx.done);
1331
1332                 rcu_read_lock();
1333                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1334                         struct drbd_device *device = peer_device->device;
1335
1336                         if (!get_ldev(device))
1337                                 continue;
1338                         kref_get(&device->kref);
1339                         rcu_read_unlock();
1340
1341                         submit_one_flush(device, &ctx);
1342
1343                         rcu_read_lock();
1344                 }
1345                 rcu_read_unlock();
1346
1347                 /* Do we want to add a timeout,
1348                  * if disk-timeout is set? */
1349                 if (!atomic_dec_and_test(&ctx.pending))
1350                         wait_for_completion(&ctx.done);
1351
1352                 if (ctx.error) {
1353                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1354                          * don't try again for ANY return value != 0
1355                          * if (rv == -EOPNOTSUPP) */
1356                         /* Any error is already reported by bio_endio callback. */
1357                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1358                 }
1359         }
1360 }
1361
1362 /**
1363  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1364  * @device:     DRBD device.
1365  * @epoch:      Epoch object.
1366  * @ev:         Epoch event.
1367  */
1368 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1369                                                struct drbd_epoch *epoch,
1370                                                enum epoch_event ev)
1371 {
1372         int epoch_size;
1373         struct drbd_epoch *next_epoch;
1374         enum finish_epoch rv = FE_STILL_LIVE;
1375
1376         spin_lock(&connection->epoch_lock);
1377         do {
1378                 next_epoch = NULL;
1379
1380                 epoch_size = atomic_read(&epoch->epoch_size);
1381
1382                 switch (ev & ~EV_CLEANUP) {
1383                 case EV_PUT:
1384                         atomic_dec(&epoch->active);
1385                         break;
1386                 case EV_GOT_BARRIER_NR:
1387                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1388                         break;
1389                 case EV_BECAME_LAST:
1390                         /* nothing to do*/
1391                         break;
1392                 }
1393
1394                 if (epoch_size != 0 &&
1395                     atomic_read(&epoch->active) == 0 &&
1396                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1397                         if (!(ev & EV_CLEANUP)) {
1398                                 spin_unlock(&connection->epoch_lock);
1399                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1400                                 spin_lock(&connection->epoch_lock);
1401                         }
1402 #if 0
1403                         /* FIXME: dec unacked on connection, once we have
1404                          * something to count pending connection packets in. */
1405                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1406                                 dec_unacked(epoch->connection);
1407 #endif
1408
1409                         if (connection->current_epoch != epoch) {
1410                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1411                                 list_del(&epoch->list);
1412                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1413                                 connection->epochs--;
1414                                 kfree(epoch);
1415
1416                                 if (rv == FE_STILL_LIVE)
1417                                         rv = FE_DESTROYED;
1418                         } else {
1419                                 epoch->flags = 0;
1420                                 atomic_set(&epoch->epoch_size, 0);
1421                                 /* atomic_set(&epoch->active, 0); is already zero */
1422                                 if (rv == FE_STILL_LIVE)
1423                                         rv = FE_RECYCLED;
1424                         }
1425                 }
1426
1427                 if (!next_epoch)
1428                         break;
1429
1430                 epoch = next_epoch;
1431         } while (1);
1432
1433         spin_unlock(&connection->epoch_lock);
1434
1435         return rv;
1436 }
1437
1438 static enum write_ordering_e
1439 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1440 {
1441         struct disk_conf *dc;
1442
1443         dc = rcu_dereference(bdev->disk_conf);
1444
1445         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1446                 wo = WO_DRAIN_IO;
1447         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1448                 wo = WO_NONE;
1449
1450         return wo;
1451 }
1452
1453 /**
1454  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1455  * @connection: DRBD connection.
1456  * @wo:         Write ordering method to try.
1457  */
1458 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1459                               enum write_ordering_e wo)
1460 {
1461         struct drbd_device *device;
1462         enum write_ordering_e pwo;
1463         int vnr;
1464         static char *write_ordering_str[] = {
1465                 [WO_NONE] = "none",
1466                 [WO_DRAIN_IO] = "drain",
1467                 [WO_BDEV_FLUSH] = "flush",
1468         };
1469
1470         pwo = resource->write_ordering;
1471         if (wo != WO_BDEV_FLUSH)
1472                 wo = min(pwo, wo);
1473         rcu_read_lock();
1474         idr_for_each_entry(&resource->devices, device, vnr) {
1475                 if (get_ldev(device)) {
1476                         wo = max_allowed_wo(device->ldev, wo);
1477                         if (device->ldev == bdev)
1478                                 bdev = NULL;
1479                         put_ldev(device);
1480                 }
1481         }
1482
1483         if (bdev)
1484                 wo = max_allowed_wo(bdev, wo);
1485
1486         rcu_read_unlock();
1487
1488         resource->write_ordering = wo;
1489         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1490                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1491 }
1492
1493 /*
1494  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1495  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1496  * will directly go to fallback mode, submitting normal writes, and
1497  * never even try to UNMAP.
1498  *
1499  * And dm-thin does not do this (yet), mostly because in general it has
1500  * to assume that "skip_block_zeroing" is set.  See also:
1501  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1502  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1503  *
1504  * We *may* ignore the discard-zeroes-data setting, if so configured.
1505  *
1506  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1507  * may ignore partial unaligned discards.
1508  *
1509  * LVM/DM thin as of at least
1510  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1511  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1512  *   Driver version:  4.29.0
1513  * still behaves this way.
1514  *
1515  * For unaligned (wrt. alignment and granularity) or too small discards,
1516  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1517  * but discard all the aligned full chunks.
1518  *
1519  * At least for LVM/DM thin, with skip_block_zeroing=false,
1520  * the result is effectively "discard_zeroes_data=1".
1521  */
1522 /* flags: EE_TRIM|EE_ZEROOUT */
1523 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1524 {
1525         struct block_device *bdev = device->ldev->backing_bdev;
1526         struct request_queue *q = bdev_get_queue(bdev);
1527         sector_t tmp, nr;
1528         unsigned int max_discard_sectors, granularity;
1529         int alignment;
1530         int err = 0;
1531
1532         if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1533                 goto zero_out;
1534
1535         /* Zero-sector (unknown) and one-sector granularities are the same.  */
1536         granularity = max(q->limits.discard_granularity >> 9, 1U);
1537         alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1538
1539         max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1540         max_discard_sectors -= max_discard_sectors % granularity;
1541         if (unlikely(!max_discard_sectors))
1542                 goto zero_out;
1543
1544         if (nr_sectors < granularity)
1545                 goto zero_out;
1546
1547         tmp = start;
1548         if (sector_div(tmp, granularity) != alignment) {
1549                 if (nr_sectors < 2*granularity)
1550                         goto zero_out;
1551                 /* start + gran - (start + gran - align) % gran */
1552                 tmp = start + granularity - alignment;
1553                 tmp = start + granularity - sector_div(tmp, granularity);
1554
1555                 nr = tmp - start;
1556                 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1557                  * layers are below us, some may have smaller granularity */
1558                 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1559                 nr_sectors -= nr;
1560                 start = tmp;
1561         }
1562         while (nr_sectors >= max_discard_sectors) {
1563                 err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1564                 nr_sectors -= max_discard_sectors;
1565                 start += max_discard_sectors;
1566         }
1567         if (nr_sectors) {
1568                 /* max_discard_sectors is unsigned int (and a multiple of
1569                  * granularity, we made sure of that above already);
1570                  * nr is < max_discard_sectors;
1571                  * I don't need sector_div here, even though nr is sector_t */
1572                 nr = nr_sectors;
1573                 nr -= (unsigned int)nr % granularity;
1574                 if (nr) {
1575                         err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1576                         nr_sectors -= nr;
1577                         start += nr;
1578                 }
1579         }
1580  zero_out:
1581         if (nr_sectors) {
1582                 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1583                                 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1584         }
1585         return err != 0;
1586 }
1587
1588 static bool can_do_reliable_discards(struct drbd_device *device)
1589 {
1590         struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1591         struct disk_conf *dc;
1592         bool can_do;
1593
1594         if (!blk_queue_discard(q))
1595                 return false;
1596
1597         rcu_read_lock();
1598         dc = rcu_dereference(device->ldev->disk_conf);
1599         can_do = dc->discard_zeroes_if_aligned;
1600         rcu_read_unlock();
1601         return can_do;
1602 }
1603
1604 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1605 {
1606         /* If the backend cannot discard, or does not guarantee
1607          * read-back zeroes in discarded ranges, we fall back to
1608          * zero-out.  Unless configuration specifically requested
1609          * otherwise. */
1610         if (!can_do_reliable_discards(device))
1611                 peer_req->flags |= EE_ZEROOUT;
1612
1613         if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1614             peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1615                 peer_req->flags |= EE_WAS_ERROR;
1616         drbd_endio_write_sec_final(peer_req);
1617 }
1618
1619 static void drbd_issue_peer_wsame(struct drbd_device *device,
1620                                   struct drbd_peer_request *peer_req)
1621 {
1622         struct block_device *bdev = device->ldev->backing_bdev;
1623         sector_t s = peer_req->i.sector;
1624         sector_t nr = peer_req->i.size >> 9;
1625         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1626                 peer_req->flags |= EE_WAS_ERROR;
1627         drbd_endio_write_sec_final(peer_req);
1628 }
1629
1630
1631 /**
1632  * drbd_submit_peer_request()
1633  * @device:     DRBD device.
1634  * @peer_req:   peer request
1635  * @rw:         flag field, see bio->bi_opf
1636  *
1637  * May spread the pages to multiple bios,
1638  * depending on bio_add_page restrictions.
1639  *
1640  * Returns 0 if all bios have been submitted,
1641  * -ENOMEM if we could not allocate enough bios,
1642  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1643  *  single page to an empty bio (which should never happen and likely indicates
1644  *  that the lower level IO stack is in some way broken). This has been observed
1645  *  on certain Xen deployments.
1646  */
1647 /* TODO allocate from our own bio_set. */
1648 int drbd_submit_peer_request(struct drbd_device *device,
1649                              struct drbd_peer_request *peer_req,
1650                              const unsigned op, const unsigned op_flags,
1651                              const int fault_type)
1652 {
1653         struct bio *bios = NULL;
1654         struct bio *bio;
1655         struct page *page = peer_req->pages;
1656         sector_t sector = peer_req->i.sector;
1657         unsigned data_size = peer_req->i.size;
1658         unsigned n_bios = 0;
1659         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1660         int err = -ENOMEM;
1661
1662         /* TRIM/DISCARD: for now, always use the helper function
1663          * blkdev_issue_zeroout(..., discard=true).
1664          * It's synchronous, but it does the right thing wrt. bio splitting.
1665          * Correctness first, performance later.  Next step is to code an
1666          * asynchronous variant of the same.
1667          */
1668         if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1669                 /* wait for all pending IO completions, before we start
1670                  * zeroing things out. */
1671                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1672                 /* add it to the active list now,
1673                  * so we can find it to present it in debugfs */
1674                 peer_req->submit_jif = jiffies;
1675                 peer_req->flags |= EE_SUBMITTED;
1676
1677                 /* If this was a resync request from receive_rs_deallocated(),
1678                  * it is already on the sync_ee list */
1679                 if (list_empty(&peer_req->w.list)) {
1680                         spin_lock_irq(&device->resource->req_lock);
1681                         list_add_tail(&peer_req->w.list, &device->active_ee);
1682                         spin_unlock_irq(&device->resource->req_lock);
1683                 }
1684
1685                 if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1686                         drbd_issue_peer_discard_or_zero_out(device, peer_req);
1687                 else /* EE_WRITE_SAME */
1688                         drbd_issue_peer_wsame(device, peer_req);
1689                 return 0;
1690         }
1691
1692         /* In most cases, we will only need one bio.  But in case the lower
1693          * level restrictions happen to be different at this offset on this
1694          * side than those of the sending peer, we may need to submit the
1695          * request in more than one bio.
1696          *
1697          * Plain bio_alloc is good enough here, this is no DRBD internally
1698          * generated bio, but a bio allocated on behalf of the peer.
1699          */
1700 next_bio:
1701         bio = bio_alloc(GFP_NOIO, nr_pages);
1702         if (!bio) {
1703                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1704                 goto fail;
1705         }
1706         /* > peer_req->i.sector, unless this is the first bio */
1707         bio->bi_iter.bi_sector = sector;
1708         bio_set_dev(bio, device->ldev->backing_bdev);
1709         bio_set_op_attrs(bio, op, op_flags);
1710         bio->bi_private = peer_req;
1711         bio->bi_end_io = drbd_peer_request_endio;
1712
1713         bio->bi_next = bios;
1714         bios = bio;
1715         ++n_bios;
1716
1717         page_chain_for_each(page) {
1718                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1719                 if (!bio_add_page(bio, page, len, 0))
1720                         goto next_bio;
1721                 data_size -= len;
1722                 sector += len >> 9;
1723                 --nr_pages;
1724         }
1725         D_ASSERT(device, data_size == 0);
1726         D_ASSERT(device, page == NULL);
1727
1728         atomic_set(&peer_req->pending_bios, n_bios);
1729         /* for debugfs: update timestamp, mark as submitted */
1730         peer_req->submit_jif = jiffies;
1731         peer_req->flags |= EE_SUBMITTED;
1732         do {
1733                 bio = bios;
1734                 bios = bios->bi_next;
1735                 bio->bi_next = NULL;
1736
1737                 drbd_generic_make_request(device, fault_type, bio);
1738         } while (bios);
1739         return 0;
1740
1741 fail:
1742         while (bios) {
1743                 bio = bios;
1744                 bios = bios->bi_next;
1745                 bio_put(bio);
1746         }
1747         return err;
1748 }
1749
1750 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1751                                              struct drbd_peer_request *peer_req)
1752 {
1753         struct drbd_interval *i = &peer_req->i;
1754
1755         drbd_remove_interval(&device->write_requests, i);
1756         drbd_clear_interval(i);
1757
1758         /* Wake up any processes waiting for this peer request to complete.  */
1759         if (i->waiting)
1760                 wake_up(&device->misc_wait);
1761 }
1762
1763 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1764 {
1765         struct drbd_peer_device *peer_device;
1766         int vnr;
1767
1768         rcu_read_lock();
1769         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1770                 struct drbd_device *device = peer_device->device;
1771
1772                 kref_get(&device->kref);
1773                 rcu_read_unlock();
1774                 drbd_wait_ee_list_empty(device, &device->active_ee);
1775                 kref_put(&device->kref, drbd_destroy_device);
1776                 rcu_read_lock();
1777         }
1778         rcu_read_unlock();
1779 }
1780
1781 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1782 {
1783         int rv;
1784         struct p_barrier *p = pi->data;
1785         struct drbd_epoch *epoch;
1786
1787         /* FIXME these are unacked on connection,
1788          * not a specific (peer)device.
1789          */
1790         connection->current_epoch->barrier_nr = p->barrier;
1791         connection->current_epoch->connection = connection;
1792         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1793
1794         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1795          * the activity log, which means it would not be resynced in case the
1796          * R_PRIMARY crashes now.
1797          * Therefore we must send the barrier_ack after the barrier request was
1798          * completed. */
1799         switch (connection->resource->write_ordering) {
1800         case WO_NONE:
1801                 if (rv == FE_RECYCLED)
1802                         return 0;
1803
1804                 /* receiver context, in the writeout path of the other node.
1805                  * avoid potential distributed deadlock */
1806                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1807                 if (epoch)
1808                         break;
1809                 else
1810                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1811                         /* Fall through */
1812
1813         case WO_BDEV_FLUSH:
1814         case WO_DRAIN_IO:
1815                 conn_wait_active_ee_empty(connection);
1816                 drbd_flush(connection);
1817
1818                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1819                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1820                         if (epoch)
1821                                 break;
1822                 }
1823
1824                 return 0;
1825         default:
1826                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1827                          connection->resource->write_ordering);
1828                 return -EIO;
1829         }
1830
1831         epoch->flags = 0;
1832         atomic_set(&epoch->epoch_size, 0);
1833         atomic_set(&epoch->active, 0);
1834
1835         spin_lock(&connection->epoch_lock);
1836         if (atomic_read(&connection->current_epoch->epoch_size)) {
1837                 list_add(&epoch->list, &connection->current_epoch->list);
1838                 connection->current_epoch = epoch;
1839                 connection->epochs++;
1840         } else {
1841                 /* The current_epoch got recycled while we allocated this one... */
1842                 kfree(epoch);
1843         }
1844         spin_unlock(&connection->epoch_lock);
1845
1846         return 0;
1847 }
1848
1849 /* quick wrapper in case payload size != request_size (write same) */
1850 static void drbd_csum_ee_size(struct crypto_shash *h,
1851                               struct drbd_peer_request *r, void *d,
1852                               unsigned int payload_size)
1853 {
1854         unsigned int tmp = r->i.size;
1855         r->i.size = payload_size;
1856         drbd_csum_ee(h, r, d);
1857         r->i.size = tmp;
1858 }
1859
1860 /* used from receive_RSDataReply (recv_resync_read)
1861  * and from receive_Data.
1862  * data_size: actual payload ("data in")
1863  *      for normal writes that is bi_size.
1864  *      for discards, that is zero.
1865  *      for write same, it is logical_block_size.
1866  * both trim and write same have the bi_size ("data len to be affected")
1867  * as extra argument in the packet header.
1868  */
1869 static struct drbd_peer_request *
1870 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1871               struct packet_info *pi) __must_hold(local)
1872 {
1873         struct drbd_device *device = peer_device->device;
1874         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1875         struct drbd_peer_request *peer_req;
1876         struct page *page;
1877         int digest_size, err;
1878         unsigned int data_size = pi->size, ds;
1879         void *dig_in = peer_device->connection->int_dig_in;
1880         void *dig_vv = peer_device->connection->int_dig_vv;
1881         unsigned long *data;
1882         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1883         struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1884         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1885
1886         digest_size = 0;
1887         if (!trim && peer_device->connection->peer_integrity_tfm) {
1888                 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1889                 /*
1890                  * FIXME: Receive the incoming digest into the receive buffer
1891                  *        here, together with its struct p_data?
1892                  */
1893                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1894                 if (err)
1895                         return NULL;
1896                 data_size -= digest_size;
1897         }
1898
1899         /* assume request_size == data_size, but special case trim and wsame. */
1900         ds = data_size;
1901         if (trim) {
1902                 if (!expect(data_size == 0))
1903                         return NULL;
1904                 ds = be32_to_cpu(trim->size);
1905         } else if (zeroes) {
1906                 if (!expect(data_size == 0))
1907                         return NULL;
1908                 ds = be32_to_cpu(zeroes->size);
1909         } else if (wsame) {
1910                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1911                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1912                                 data_size, queue_logical_block_size(device->rq_queue));
1913                         return NULL;
1914                 }
1915                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1916                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1917                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1918                         return NULL;
1919                 }
1920                 ds = be32_to_cpu(wsame->size);
1921         }
1922
1923         if (!expect(IS_ALIGNED(ds, 512)))
1924                 return NULL;
1925         if (trim || wsame || zeroes) {
1926                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1927                         return NULL;
1928         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1929                 return NULL;
1930
1931         /* even though we trust out peer,
1932          * we sometimes have to double check. */
1933         if (sector + (ds>>9) > capacity) {
1934                 drbd_err(device, "request from peer beyond end of local disk: "
1935                         "capacity: %llus < sector: %llus + size: %u\n",
1936                         (unsigned long long)capacity,
1937                         (unsigned long long)sector, ds);
1938                 return NULL;
1939         }
1940
1941         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1942          * "criss-cross" setup, that might cause write-out on some other DRBD,
1943          * which in turn might block on the other node at this very place.  */
1944         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1945         if (!peer_req)
1946                 return NULL;
1947
1948         peer_req->flags |= EE_WRITE;
1949         if (trim) {
1950                 peer_req->flags |= EE_TRIM;
1951                 return peer_req;
1952         }
1953         if (zeroes) {
1954                 peer_req->flags |= EE_ZEROOUT;
1955                 return peer_req;
1956         }
1957         if (wsame)
1958                 peer_req->flags |= EE_WRITE_SAME;
1959
1960         /* receive payload size bytes into page chain */
1961         ds = data_size;
1962         page = peer_req->pages;
1963         page_chain_for_each(page) {
1964                 unsigned len = min_t(int, ds, PAGE_SIZE);
1965                 data = kmap(page);
1966                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1967                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1968                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1969                         data[0] = data[0] ^ (unsigned long)-1;
1970                 }
1971                 kunmap(page);
1972                 if (err) {
1973                         drbd_free_peer_req(device, peer_req);
1974                         return NULL;
1975                 }
1976                 ds -= len;
1977         }
1978
1979         if (digest_size) {
1980                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1981                 if (memcmp(dig_in, dig_vv, digest_size)) {
1982                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1983                                 (unsigned long long)sector, data_size);
1984                         drbd_free_peer_req(device, peer_req);
1985                         return NULL;
1986                 }
1987         }
1988         device->recv_cnt += data_size >> 9;
1989         return peer_req;
1990 }
1991
1992 /* drbd_drain_block() just takes a data block
1993  * out of the socket input buffer, and discards it.
1994  */
1995 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1996 {
1997         struct page *page;
1998         int err = 0;
1999         void *data;
2000
2001         if (!data_size)
2002                 return 0;
2003
2004         page = drbd_alloc_pages(peer_device, 1, 1);
2005
2006         data = kmap(page);
2007         while (data_size) {
2008                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
2009
2010                 err = drbd_recv_all_warn(peer_device->connection, data, len);
2011                 if (err)
2012                         break;
2013                 data_size -= len;
2014         }
2015         kunmap(page);
2016         drbd_free_pages(peer_device->device, page, 0);
2017         return err;
2018 }
2019
2020 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2021                            sector_t sector, int data_size)
2022 {
2023         struct bio_vec bvec;
2024         struct bvec_iter iter;
2025         struct bio *bio;
2026         int digest_size, err, expect;
2027         void *dig_in = peer_device->connection->int_dig_in;
2028         void *dig_vv = peer_device->connection->int_dig_vv;
2029
2030         digest_size = 0;
2031         if (peer_device->connection->peer_integrity_tfm) {
2032                 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2033                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2034                 if (err)
2035                         return err;
2036                 data_size -= digest_size;
2037         }
2038
2039         /* optimistically update recv_cnt.  if receiving fails below,
2040          * we disconnect anyways, and counters will be reset. */
2041         peer_device->device->recv_cnt += data_size>>9;
2042
2043         bio = req->master_bio;
2044         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2045
2046         bio_for_each_segment(bvec, bio, iter) {
2047                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2048                 expect = min_t(int, data_size, bvec.bv_len);
2049                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2050                 kunmap(bvec.bv_page);
2051                 if (err)
2052                         return err;
2053                 data_size -= expect;
2054         }
2055
2056         if (digest_size) {
2057                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2058                 if (memcmp(dig_in, dig_vv, digest_size)) {
2059                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2060                         return -EINVAL;
2061                 }
2062         }
2063
2064         D_ASSERT(peer_device->device, data_size == 0);
2065         return 0;
2066 }
2067
2068 /*
2069  * e_end_resync_block() is called in ack_sender context via
2070  * drbd_finish_peer_reqs().
2071  */
2072 static int e_end_resync_block(struct drbd_work *w, int unused)
2073 {
2074         struct drbd_peer_request *peer_req =
2075                 container_of(w, struct drbd_peer_request, w);
2076         struct drbd_peer_device *peer_device = peer_req->peer_device;
2077         struct drbd_device *device = peer_device->device;
2078         sector_t sector = peer_req->i.sector;
2079         int err;
2080
2081         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2082
2083         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2084                 drbd_set_in_sync(device, sector, peer_req->i.size);
2085                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2086         } else {
2087                 /* Record failure to sync */
2088                 drbd_rs_failed_io(device, sector, peer_req->i.size);
2089
2090                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2091         }
2092         dec_unacked(device);
2093
2094         return err;
2095 }
2096
2097 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2098                             struct packet_info *pi) __releases(local)
2099 {
2100         struct drbd_device *device = peer_device->device;
2101         struct drbd_peer_request *peer_req;
2102
2103         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2104         if (!peer_req)
2105                 goto fail;
2106
2107         dec_rs_pending(device);
2108
2109         inc_unacked(device);
2110         /* corresponding dec_unacked() in e_end_resync_block()
2111          * respective _drbd_clear_done_ee */
2112
2113         peer_req->w.cb = e_end_resync_block;
2114         peer_req->submit_jif = jiffies;
2115
2116         spin_lock_irq(&device->resource->req_lock);
2117         list_add_tail(&peer_req->w.list, &device->sync_ee);
2118         spin_unlock_irq(&device->resource->req_lock);
2119
2120         atomic_add(pi->size >> 9, &device->rs_sect_ev);
2121         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2122                                      DRBD_FAULT_RS_WR) == 0)
2123                 return 0;
2124
2125         /* don't care for the reason here */
2126         drbd_err(device, "submit failed, triggering re-connect\n");
2127         spin_lock_irq(&device->resource->req_lock);
2128         list_del(&peer_req->w.list);
2129         spin_unlock_irq(&device->resource->req_lock);
2130
2131         drbd_free_peer_req(device, peer_req);
2132 fail:
2133         put_ldev(device);
2134         return -EIO;
2135 }
2136
2137 static struct drbd_request *
2138 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2139              sector_t sector, bool missing_ok, const char *func)
2140 {
2141         struct drbd_request *req;
2142
2143         /* Request object according to our peer */
2144         req = (struct drbd_request *)(unsigned long)id;
2145         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2146                 return req;
2147         if (!missing_ok) {
2148                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2149                         (unsigned long)id, (unsigned long long)sector);
2150         }
2151         return NULL;
2152 }
2153
2154 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2155 {
2156         struct drbd_peer_device *peer_device;
2157         struct drbd_device *device;
2158         struct drbd_request *req;
2159         sector_t sector;
2160         int err;
2161         struct p_data *p = pi->data;
2162
2163         peer_device = conn_peer_device(connection, pi->vnr);
2164         if (!peer_device)
2165                 return -EIO;
2166         device = peer_device->device;
2167
2168         sector = be64_to_cpu(p->sector);
2169
2170         spin_lock_irq(&device->resource->req_lock);
2171         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2172         spin_unlock_irq(&device->resource->req_lock);
2173         if (unlikely(!req))
2174                 return -EIO;
2175
2176         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2177          * special casing it there for the various failure cases.
2178          * still no race with drbd_fail_pending_reads */
2179         err = recv_dless_read(peer_device, req, sector, pi->size);
2180         if (!err)
2181                 req_mod(req, DATA_RECEIVED);
2182         /* else: nothing. handled from drbd_disconnect...
2183          * I don't think we may complete this just yet
2184          * in case we are "on-disconnect: freeze" */
2185
2186         return err;
2187 }
2188
2189 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2190 {
2191         struct drbd_peer_device *peer_device;
2192         struct drbd_device *device;
2193         sector_t sector;
2194         int err;
2195         struct p_data *p = pi->data;
2196
2197         peer_device = conn_peer_device(connection, pi->vnr);
2198         if (!peer_device)
2199                 return -EIO;
2200         device = peer_device->device;
2201
2202         sector = be64_to_cpu(p->sector);
2203         D_ASSERT(device, p->block_id == ID_SYNCER);
2204
2205         if (get_ldev(device)) {
2206                 /* data is submitted to disk within recv_resync_read.
2207                  * corresponding put_ldev done below on error,
2208                  * or in drbd_peer_request_endio. */
2209                 err = recv_resync_read(peer_device, sector, pi);
2210         } else {
2211                 if (__ratelimit(&drbd_ratelimit_state))
2212                         drbd_err(device, "Can not write resync data to local disk.\n");
2213
2214                 err = drbd_drain_block(peer_device, pi->size);
2215
2216                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2217         }
2218
2219         atomic_add(pi->size >> 9, &device->rs_sect_in);
2220
2221         return err;
2222 }
2223
2224 static void restart_conflicting_writes(struct drbd_device *device,
2225                                        sector_t sector, int size)
2226 {
2227         struct drbd_interval *i;
2228         struct drbd_request *req;
2229
2230         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2231                 if (!i->local)
2232                         continue;
2233                 req = container_of(i, struct drbd_request, i);
2234                 if (req->rq_state & RQ_LOCAL_PENDING ||
2235                     !(req->rq_state & RQ_POSTPONED))
2236                         continue;
2237                 /* as it is RQ_POSTPONED, this will cause it to
2238                  * be queued on the retry workqueue. */
2239                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2240         }
2241 }
2242
2243 /*
2244  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2245  */
2246 static int e_end_block(struct drbd_work *w, int cancel)
2247 {
2248         struct drbd_peer_request *peer_req =
2249                 container_of(w, struct drbd_peer_request, w);
2250         struct drbd_peer_device *peer_device = peer_req->peer_device;
2251         struct drbd_device *device = peer_device->device;
2252         sector_t sector = peer_req->i.sector;
2253         int err = 0, pcmd;
2254
2255         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2256                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2257                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2258                                 device->state.conn <= C_PAUSED_SYNC_T &&
2259                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2260                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2261                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2262                         if (pcmd == P_RS_WRITE_ACK)
2263                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2264                 } else {
2265                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2266                         /* we expect it to be marked out of sync anyways...
2267                          * maybe assert this?  */
2268                 }
2269                 dec_unacked(device);
2270         }
2271
2272         /* we delete from the conflict detection hash _after_ we sent out the
2273          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2274         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2275                 spin_lock_irq(&device->resource->req_lock);
2276                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2277                 drbd_remove_epoch_entry_interval(device, peer_req);
2278                 if (peer_req->flags & EE_RESTART_REQUESTS)
2279                         restart_conflicting_writes(device, sector, peer_req->i.size);
2280                 spin_unlock_irq(&device->resource->req_lock);
2281         } else
2282                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2283
2284         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2285
2286         return err;
2287 }
2288
2289 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2290 {
2291         struct drbd_peer_request *peer_req =
2292                 container_of(w, struct drbd_peer_request, w);
2293         struct drbd_peer_device *peer_device = peer_req->peer_device;
2294         int err;
2295
2296         err = drbd_send_ack(peer_device, ack, peer_req);
2297         dec_unacked(peer_device->device);
2298
2299         return err;
2300 }
2301
2302 static int e_send_superseded(struct drbd_work *w, int unused)
2303 {
2304         return e_send_ack(w, P_SUPERSEDED);
2305 }
2306
2307 static int e_send_retry_write(struct drbd_work *w, int unused)
2308 {
2309         struct drbd_peer_request *peer_req =
2310                 container_of(w, struct drbd_peer_request, w);
2311         struct drbd_connection *connection = peer_req->peer_device->connection;
2312
2313         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2314                              P_RETRY_WRITE : P_SUPERSEDED);
2315 }
2316
2317 static bool seq_greater(u32 a, u32 b)
2318 {
2319         /*
2320          * We assume 32-bit wrap-around here.
2321          * For 24-bit wrap-around, we would have to shift:
2322          *  a <<= 8; b <<= 8;
2323          */
2324         return (s32)a - (s32)b > 0;
2325 }
2326
2327 static u32 seq_max(u32 a, u32 b)
2328 {
2329         return seq_greater(a, b) ? a : b;
2330 }
2331
2332 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2333 {
2334         struct drbd_device *device = peer_device->device;
2335         unsigned int newest_peer_seq;
2336
2337         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2338                 spin_lock(&device->peer_seq_lock);
2339                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2340                 device->peer_seq = newest_peer_seq;
2341                 spin_unlock(&device->peer_seq_lock);
2342                 /* wake up only if we actually changed device->peer_seq */
2343                 if (peer_seq == newest_peer_seq)
2344                         wake_up(&device->seq_wait);
2345         }
2346 }
2347
2348 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2349 {
2350         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2351 }
2352
2353 /* maybe change sync_ee into interval trees as well? */
2354 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2355 {
2356         struct drbd_peer_request *rs_req;
2357         bool rv = false;
2358
2359         spin_lock_irq(&device->resource->req_lock);
2360         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2361                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2362                              rs_req->i.sector, rs_req->i.size)) {
2363                         rv = true;
2364                         break;
2365                 }
2366         }
2367         spin_unlock_irq(&device->resource->req_lock);
2368
2369         return rv;
2370 }
2371
2372 /* Called from receive_Data.
2373  * Synchronize packets on sock with packets on msock.
2374  *
2375  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2376  * packet traveling on msock, they are still processed in the order they have
2377  * been sent.
2378  *
2379  * Note: we don't care for Ack packets overtaking P_DATA packets.
2380  *
2381  * In case packet_seq is larger than device->peer_seq number, there are
2382  * outstanding packets on the msock. We wait for them to arrive.
2383  * In case we are the logically next packet, we update device->peer_seq
2384  * ourselves. Correctly handles 32bit wrap around.
2385  *
2386  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2387  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2388  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2389  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2390  *
2391  * returns 0 if we may process the packet,
2392  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2393 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2394 {
2395         struct drbd_device *device = peer_device->device;
2396         DEFINE_WAIT(wait);
2397         long timeout;
2398         int ret = 0, tp;
2399
2400         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2401                 return 0;
2402
2403         spin_lock(&device->peer_seq_lock);
2404         for (;;) {
2405                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2406                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2407                         break;
2408                 }
2409
2410                 if (signal_pending(current)) {
2411                         ret = -ERESTARTSYS;
2412                         break;
2413                 }
2414
2415                 rcu_read_lock();
2416                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2417                 rcu_read_unlock();
2418
2419                 if (!tp)
2420                         break;
2421
2422                 /* Only need to wait if two_primaries is enabled */
2423                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2424                 spin_unlock(&device->peer_seq_lock);
2425                 rcu_read_lock();
2426                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2427                 rcu_read_unlock();
2428                 timeout = schedule_timeout(timeout);
2429                 spin_lock(&device->peer_seq_lock);
2430                 if (!timeout) {
2431                         ret = -ETIMEDOUT;
2432                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2433                         break;
2434                 }
2435         }
2436         spin_unlock(&device->peer_seq_lock);
2437         finish_wait(&device->seq_wait, &wait);
2438         return ret;
2439 }
2440
2441 /* see also bio_flags_to_wire()
2442  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2443  * flags and back. We may replicate to other kernel versions. */
2444 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2445 {
2446         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2447                 (dpf & DP_FUA ? REQ_FUA : 0) |
2448                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2449 }
2450
2451 static unsigned long wire_flags_to_bio_op(u32 dpf)
2452 {
2453         if (dpf & DP_ZEROES)
2454                 return REQ_OP_WRITE_ZEROES;
2455         if (dpf & DP_DISCARD)
2456                 return REQ_OP_DISCARD;
2457         if (dpf & DP_WSAME)
2458                 return REQ_OP_WRITE_SAME;
2459         else
2460                 return REQ_OP_WRITE;
2461 }
2462
2463 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2464                                     unsigned int size)
2465 {
2466         struct drbd_interval *i;
2467
2468     repeat:
2469         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2470                 struct drbd_request *req;
2471                 struct bio_and_error m;
2472
2473                 if (!i->local)
2474                         continue;
2475                 req = container_of(i, struct drbd_request, i);
2476                 if (!(req->rq_state & RQ_POSTPONED))
2477                         continue;
2478                 req->rq_state &= ~RQ_POSTPONED;
2479                 __req_mod(req, NEG_ACKED, &m);
2480                 spin_unlock_irq(&device->resource->req_lock);
2481                 if (m.bio)
2482                         complete_master_bio(device, &m);
2483                 spin_lock_irq(&device->resource->req_lock);
2484                 goto repeat;
2485         }
2486 }
2487
2488 static int handle_write_conflicts(struct drbd_device *device,
2489                                   struct drbd_peer_request *peer_req)
2490 {
2491         struct drbd_connection *connection = peer_req->peer_device->connection;
2492         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2493         sector_t sector = peer_req->i.sector;
2494         const unsigned int size = peer_req->i.size;
2495         struct drbd_interval *i;
2496         bool equal;
2497         int err;
2498
2499         /*
2500          * Inserting the peer request into the write_requests tree will prevent
2501          * new conflicting local requests from being added.
2502          */
2503         drbd_insert_interval(&device->write_requests, &peer_req->i);
2504
2505     repeat:
2506         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2507                 if (i == &peer_req->i)
2508                         continue;
2509                 if (i->completed)
2510                         continue;
2511
2512                 if (!i->local) {
2513                         /*
2514                          * Our peer has sent a conflicting remote request; this
2515                          * should not happen in a two-node setup.  Wait for the
2516                          * earlier peer request to complete.
2517                          */
2518                         err = drbd_wait_misc(device, i);
2519                         if (err)
2520                                 goto out;
2521                         goto repeat;
2522                 }
2523
2524                 equal = i->sector == sector && i->size == size;
2525                 if (resolve_conflicts) {
2526                         /*
2527                          * If the peer request is fully contained within the
2528                          * overlapping request, it can be considered overwritten
2529                          * and thus superseded; otherwise, it will be retried
2530                          * once all overlapping requests have completed.
2531                          */
2532                         bool superseded = i->sector <= sector && i->sector +
2533                                        (i->size >> 9) >= sector + (size >> 9);
2534
2535                         if (!equal)
2536                                 drbd_alert(device, "Concurrent writes detected: "
2537                                                "local=%llus +%u, remote=%llus +%u, "
2538                                                "assuming %s came first\n",
2539                                           (unsigned long long)i->sector, i->size,
2540                                           (unsigned long long)sector, size,
2541                                           superseded ? "local" : "remote");
2542
2543                         peer_req->w.cb = superseded ? e_send_superseded :
2544                                                    e_send_retry_write;
2545                         list_add_tail(&peer_req->w.list, &device->done_ee);
2546                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2547
2548                         err = -ENOENT;
2549                         goto out;
2550                 } else {
2551                         struct drbd_request *req =
2552                                 container_of(i, struct drbd_request, i);
2553
2554                         if (!equal)
2555                                 drbd_alert(device, "Concurrent writes detected: "
2556                                                "local=%llus +%u, remote=%llus +%u\n",
2557                                           (unsigned long long)i->sector, i->size,
2558                                           (unsigned long long)sector, size);
2559
2560                         if (req->rq_state & RQ_LOCAL_PENDING ||
2561                             !(req->rq_state & RQ_POSTPONED)) {
2562                                 /*
2563                                  * Wait for the node with the discard flag to
2564                                  * decide if this request has been superseded
2565                                  * or needs to be retried.
2566                                  * Requests that have been superseded will
2567                                  * disappear from the write_requests tree.
2568                                  *
2569                                  * In addition, wait for the conflicting
2570                                  * request to finish locally before submitting
2571                                  * the conflicting peer request.
2572                                  */
2573                                 err = drbd_wait_misc(device, &req->i);
2574                                 if (err) {
2575                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2576                                         fail_postponed_requests(device, sector, size);
2577                                         goto out;
2578                                 }
2579                                 goto repeat;
2580                         }
2581                         /*
2582                          * Remember to restart the conflicting requests after
2583                          * the new peer request has completed.
2584                          */
2585                         peer_req->flags |= EE_RESTART_REQUESTS;
2586                 }
2587         }
2588         err = 0;
2589
2590     out:
2591         if (err)
2592                 drbd_remove_epoch_entry_interval(device, peer_req);
2593         return err;
2594 }
2595
2596 /* mirrored write */
2597 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2598 {
2599         struct drbd_peer_device *peer_device;
2600         struct drbd_device *device;
2601         struct net_conf *nc;
2602         sector_t sector;
2603         struct drbd_peer_request *peer_req;
2604         struct p_data *p = pi->data;
2605         u32 peer_seq = be32_to_cpu(p->seq_num);
2606         int op, op_flags;
2607         u32 dp_flags;
2608         int err, tp;
2609
2610         peer_device = conn_peer_device(connection, pi->vnr);
2611         if (!peer_device)
2612                 return -EIO;
2613         device = peer_device->device;
2614
2615         if (!get_ldev(device)) {
2616                 int err2;
2617
2618                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2619                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2620                 atomic_inc(&connection->current_epoch->epoch_size);
2621                 err2 = drbd_drain_block(peer_device, pi->size);
2622                 if (!err)
2623                         err = err2;
2624                 return err;
2625         }
2626
2627         /*
2628          * Corresponding put_ldev done either below (on various errors), or in
2629          * drbd_peer_request_endio, if we successfully submit the data at the
2630          * end of this function.
2631          */
2632
2633         sector = be64_to_cpu(p->sector);
2634         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2635         if (!peer_req) {
2636                 put_ldev(device);
2637                 return -EIO;
2638         }
2639
2640         peer_req->w.cb = e_end_block;
2641         peer_req->submit_jif = jiffies;
2642         peer_req->flags |= EE_APPLICATION;
2643
2644         dp_flags = be32_to_cpu(p->dp_flags);
2645         op = wire_flags_to_bio_op(dp_flags);
2646         op_flags = wire_flags_to_bio_flags(dp_flags);
2647         if (pi->cmd == P_TRIM) {
2648                 D_ASSERT(peer_device, peer_req->i.size > 0);
2649                 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2650                 D_ASSERT(peer_device, peer_req->pages == NULL);
2651                 /* need to play safe: an older DRBD sender
2652                  * may mean zero-out while sending P_TRIM. */
2653                 if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2654                         peer_req->flags |= EE_ZEROOUT;
2655         } else if (pi->cmd == P_ZEROES) {
2656                 D_ASSERT(peer_device, peer_req->i.size > 0);
2657                 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2658                 D_ASSERT(peer_device, peer_req->pages == NULL);
2659                 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2660                 if (dp_flags & DP_DISCARD)
2661                         peer_req->flags |= EE_TRIM;
2662         } else if (peer_req->pages == NULL) {
2663                 D_ASSERT(device, peer_req->i.size == 0);
2664                 D_ASSERT(device, dp_flags & DP_FLUSH);
2665         }
2666
2667         if (dp_flags & DP_MAY_SET_IN_SYNC)
2668                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2669
2670         spin_lock(&connection->epoch_lock);
2671         peer_req->epoch = connection->current_epoch;
2672         atomic_inc(&peer_req->epoch->epoch_size);
2673         atomic_inc(&peer_req->epoch->active);
2674         spin_unlock(&connection->epoch_lock);
2675
2676         rcu_read_lock();
2677         nc = rcu_dereference(peer_device->connection->net_conf);
2678         tp = nc->two_primaries;
2679         if (peer_device->connection->agreed_pro_version < 100) {
2680                 switch (nc->wire_protocol) {
2681                 case DRBD_PROT_C:
2682                         dp_flags |= DP_SEND_WRITE_ACK;
2683                         break;
2684                 case DRBD_PROT_B:
2685                         dp_flags |= DP_SEND_RECEIVE_ACK;
2686                         break;
2687                 }
2688         }
2689         rcu_read_unlock();
2690
2691         if (dp_flags & DP_SEND_WRITE_ACK) {
2692                 peer_req->flags |= EE_SEND_WRITE_ACK;
2693                 inc_unacked(device);
2694                 /* corresponding dec_unacked() in e_end_block()
2695                  * respective _drbd_clear_done_ee */
2696         }
2697
2698         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2699                 /* I really don't like it that the receiver thread
2700                  * sends on the msock, but anyways */
2701                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2702         }
2703
2704         if (tp) {
2705                 /* two primaries implies protocol C */
2706                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2707                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2708                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2709                 if (err)
2710                         goto out_interrupted;
2711                 spin_lock_irq(&device->resource->req_lock);
2712                 err = handle_write_conflicts(device, peer_req);
2713                 if (err) {
2714                         spin_unlock_irq(&device->resource->req_lock);
2715                         if (err == -ENOENT) {
2716                                 put_ldev(device);
2717                                 return 0;
2718                         }
2719                         goto out_interrupted;
2720                 }
2721         } else {
2722                 update_peer_seq(peer_device, peer_seq);
2723                 spin_lock_irq(&device->resource->req_lock);
2724         }
2725         /* TRIM and WRITE_SAME are processed synchronously,
2726          * we wait for all pending requests, respectively wait for
2727          * active_ee to become empty in drbd_submit_peer_request();
2728          * better not add ourselves here. */
2729         if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2730                 list_add_tail(&peer_req->w.list, &device->active_ee);
2731         spin_unlock_irq(&device->resource->req_lock);
2732
2733         if (device->state.conn == C_SYNC_TARGET)
2734                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2735
2736         if (device->state.pdsk < D_INCONSISTENT) {
2737                 /* In case we have the only disk of the cluster, */
2738                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2739                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2740                 drbd_al_begin_io(device, &peer_req->i);
2741                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2742         }
2743
2744         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2745                                        DRBD_FAULT_DT_WR);
2746         if (!err)
2747                 return 0;
2748
2749         /* don't care for the reason here */
2750         drbd_err(device, "submit failed, triggering re-connect\n");
2751         spin_lock_irq(&device->resource->req_lock);
2752         list_del(&peer_req->w.list);
2753         drbd_remove_epoch_entry_interval(device, peer_req);
2754         spin_unlock_irq(&device->resource->req_lock);
2755         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2756                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2757                 drbd_al_complete_io(device, &peer_req->i);
2758         }
2759
2760 out_interrupted:
2761         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2762         put_ldev(device);
2763         drbd_free_peer_req(device, peer_req);
2764         return err;
2765 }
2766
2767 /* We may throttle resync, if the lower device seems to be busy,
2768  * and current sync rate is above c_min_rate.
2769  *
2770  * To decide whether or not the lower device is busy, we use a scheme similar
2771  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2772  * (more than 64 sectors) of activity we cannot account for with our own resync
2773  * activity, it obviously is "busy".
2774  *
2775  * The current sync rate used here uses only the most recent two step marks,
2776  * to have a short time average so we can react faster.
2777  */
2778 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2779                 bool throttle_if_app_is_waiting)
2780 {
2781         struct lc_element *tmp;
2782         bool throttle = drbd_rs_c_min_rate_throttle(device);
2783
2784         if (!throttle || throttle_if_app_is_waiting)
2785                 return throttle;
2786
2787         spin_lock_irq(&device->al_lock);
2788         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2789         if (tmp) {
2790                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2791                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2792                         throttle = false;
2793                 /* Do not slow down if app IO is already waiting for this extent,
2794                  * and our progress is necessary for application IO to complete. */
2795         }
2796         spin_unlock_irq(&device->al_lock);
2797
2798         return throttle;
2799 }
2800
2801 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2802 {
2803         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2804         unsigned long db, dt, dbdt;
2805         unsigned int c_min_rate;
2806         int curr_events;
2807
2808         rcu_read_lock();
2809         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2810         rcu_read_unlock();
2811
2812         /* feature disabled? */
2813         if (c_min_rate == 0)
2814                 return false;
2815
2816         curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2817                         atomic_read(&device->rs_sect_ev);
2818
2819         if (atomic_read(&device->ap_actlog_cnt)
2820             || curr_events - device->rs_last_events > 64) {
2821                 unsigned long rs_left;
2822                 int i;
2823
2824                 device->rs_last_events = curr_events;
2825
2826                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2827                  * approx. */
2828                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2829
2830                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2831                         rs_left = device->ov_left;
2832                 else
2833                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2834
2835                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2836                 if (!dt)
2837                         dt++;
2838                 db = device->rs_mark_left[i] - rs_left;
2839                 dbdt = Bit2KB(db/dt);
2840
2841                 if (dbdt > c_min_rate)
2842                         return true;
2843         }
2844         return false;
2845 }
2846
2847 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2848 {
2849         struct drbd_peer_device *peer_device;
2850         struct drbd_device *device;
2851         sector_t sector;
2852         sector_t capacity;
2853         struct drbd_peer_request *peer_req;
2854         struct digest_info *di = NULL;
2855         int size, verb;
2856         unsigned int fault_type;
2857         struct p_block_req *p = pi->data;
2858
2859         peer_device = conn_peer_device(connection, pi->vnr);
2860         if (!peer_device)
2861                 return -EIO;
2862         device = peer_device->device;
2863         capacity = drbd_get_capacity(device->this_bdev);
2864
2865         sector = be64_to_cpu(p->sector);
2866         size   = be32_to_cpu(p->blksize);
2867
2868         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2869                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2870                                 (unsigned long long)sector, size);
2871                 return -EINVAL;
2872         }
2873         if (sector + (size>>9) > capacity) {
2874                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2875                                 (unsigned long long)sector, size);
2876                 return -EINVAL;
2877         }
2878
2879         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2880                 verb = 1;
2881                 switch (pi->cmd) {
2882                 case P_DATA_REQUEST:
2883                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2884                         break;
2885                 case P_RS_THIN_REQ:
2886                 case P_RS_DATA_REQUEST:
2887                 case P_CSUM_RS_REQUEST:
2888                 case P_OV_REQUEST:
2889                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2890                         break;
2891                 case P_OV_REPLY:
2892                         verb = 0;
2893                         dec_rs_pending(device);
2894                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2895                         break;
2896                 default:
2897                         BUG();
2898                 }
2899                 if (verb && __ratelimit(&drbd_ratelimit_state))
2900                         drbd_err(device, "Can not satisfy peer's read request, "
2901                             "no local data.\n");
2902
2903                 /* drain possibly payload */
2904                 return drbd_drain_block(peer_device, pi->size);
2905         }
2906
2907         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2908          * "criss-cross" setup, that might cause write-out on some other DRBD,
2909          * which in turn might block on the other node at this very place.  */
2910         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2911                         size, GFP_NOIO);
2912         if (!peer_req) {
2913                 put_ldev(device);
2914                 return -ENOMEM;
2915         }
2916
2917         switch (pi->cmd) {
2918         case P_DATA_REQUEST:
2919                 peer_req->w.cb = w_e_end_data_req;
2920                 fault_type = DRBD_FAULT_DT_RD;
2921                 /* application IO, don't drbd_rs_begin_io */
2922                 peer_req->flags |= EE_APPLICATION;
2923                 goto submit;
2924
2925         case P_RS_THIN_REQ:
2926                 /* If at some point in the future we have a smart way to
2927                    find out if this data block is completely deallocated,
2928                    then we would do something smarter here than reading
2929                    the block... */
2930                 peer_req->flags |= EE_RS_THIN_REQ;
2931                 /* fall through */
2932         case P_RS_DATA_REQUEST:
2933                 peer_req->w.cb = w_e_end_rsdata_req;
2934                 fault_type = DRBD_FAULT_RS_RD;
2935                 /* used in the sector offset progress display */
2936                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2937                 break;
2938
2939         case P_OV_REPLY:
2940         case P_CSUM_RS_REQUEST:
2941                 fault_type = DRBD_FAULT_RS_RD;
2942                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2943                 if (!di)
2944                         goto out_free_e;
2945
2946                 di->digest_size = pi->size;
2947                 di->digest = (((char *)di)+sizeof(struct digest_info));
2948
2949                 peer_req->digest = di;
2950                 peer_req->flags |= EE_HAS_DIGEST;
2951
2952                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2953                         goto out_free_e;
2954
2955                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2956                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2957                         peer_req->w.cb = w_e_end_csum_rs_req;
2958                         /* used in the sector offset progress display */
2959                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2960                         /* remember to report stats in drbd_resync_finished */
2961                         device->use_csums = true;
2962                 } else if (pi->cmd == P_OV_REPLY) {
2963                         /* track progress, we may need to throttle */
2964                         atomic_add(size >> 9, &device->rs_sect_in);
2965                         peer_req->w.cb = w_e_end_ov_reply;
2966                         dec_rs_pending(device);
2967                         /* drbd_rs_begin_io done when we sent this request,
2968                          * but accounting still needs to be done. */
2969                         goto submit_for_resync;
2970                 }
2971                 break;
2972
2973         case P_OV_REQUEST:
2974                 if (device->ov_start_sector == ~(sector_t)0 &&
2975                     peer_device->connection->agreed_pro_version >= 90) {
2976                         unsigned long now = jiffies;
2977                         int i;
2978                         device->ov_start_sector = sector;
2979                         device->ov_position = sector;
2980                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2981                         device->rs_total = device->ov_left;
2982                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2983                                 device->rs_mark_left[i] = device->ov_left;
2984                                 device->rs_mark_time[i] = now;
2985                         }
2986                         drbd_info(device, "Online Verify start sector: %llu\n",
2987                                         (unsigned long long)sector);
2988                 }
2989                 peer_req->w.cb = w_e_end_ov_req;
2990                 fault_type = DRBD_FAULT_RS_RD;
2991                 break;
2992
2993         default:
2994                 BUG();
2995         }
2996
2997         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2998          * wrt the receiver, but it is not as straightforward as it may seem.
2999          * Various places in the resync start and stop logic assume resync
3000          * requests are processed in order, requeuing this on the worker thread
3001          * introduces a bunch of new code for synchronization between threads.
3002          *
3003          * Unlimited throttling before drbd_rs_begin_io may stall the resync
3004          * "forever", throttling after drbd_rs_begin_io will lock that extent
3005          * for application writes for the same time.  For now, just throttle
3006          * here, where the rest of the code expects the receiver to sleep for
3007          * a while, anyways.
3008          */
3009
3010         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
3011          * this defers syncer requests for some time, before letting at least
3012          * on request through.  The resync controller on the receiving side
3013          * will adapt to the incoming rate accordingly.
3014          *
3015          * We cannot throttle here if remote is Primary/SyncTarget:
3016          * we would also throttle its application reads.
3017          * In that case, throttling is done on the SyncTarget only.
3018          */
3019
3020         /* Even though this may be a resync request, we do add to "read_ee";
3021          * "sync_ee" is only used for resync WRITEs.
3022          * Add to list early, so debugfs can find this request
3023          * even if we have to sleep below. */
3024         spin_lock_irq(&device->resource->req_lock);
3025         list_add_tail(&peer_req->w.list, &device->read_ee);
3026         spin_unlock_irq(&device->resource->req_lock);
3027
3028         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3029         if (device->state.peer != R_PRIMARY
3030         && drbd_rs_should_slow_down(device, sector, false))
3031                 schedule_timeout_uninterruptible(HZ/10);
3032         update_receiver_timing_details(connection, drbd_rs_begin_io);
3033         if (drbd_rs_begin_io(device, sector))
3034                 goto out_free_e;
3035
3036 submit_for_resync:
3037         atomic_add(size >> 9, &device->rs_sect_ev);
3038
3039 submit:
3040         update_receiver_timing_details(connection, drbd_submit_peer_request);
3041         inc_unacked(device);
3042         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3043                                      fault_type) == 0)
3044                 return 0;
3045
3046         /* don't care for the reason here */
3047         drbd_err(device, "submit failed, triggering re-connect\n");
3048
3049 out_free_e:
3050         spin_lock_irq(&device->resource->req_lock);
3051         list_del(&peer_req->w.list);
3052         spin_unlock_irq(&device->resource->req_lock);
3053         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
3054
3055         put_ldev(device);
3056         drbd_free_peer_req(device, peer_req);
3057         return -EIO;
3058 }
3059
3060 /**
3061  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3062  */
3063 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3064 {
3065         struct drbd_device *device = peer_device->device;
3066         int self, peer, rv = -100;
3067         unsigned long ch_self, ch_peer;
3068         enum drbd_after_sb_p after_sb_0p;
3069
3070         self = device->ldev->md.uuid[UI_BITMAP] & 1;
3071         peer = device->p_uuid[UI_BITMAP] & 1;
3072
3073         ch_peer = devi