Merge tag 'tag-chrome-platform-fixes-for-v5.3-rc6' of git://git.kernel.org/pub/scm...
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_receiver.c
1 // SPDX-License-Identifier: GPL-2.0-or-later
2 /*
3    drbd_receiver.c
4
5    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
6
7    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
8    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
9    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
10
11  */
12
13
14 #include <linux/module.h>
15
16 #include <linux/uaccess.h>
17 #include <net/sock.h>
18
19 #include <linux/drbd.h>
20 #include <linux/fs.h>
21 #include <linux/file.h>
22 #include <linux/in.h>
23 #include <linux/mm.h>
24 #include <linux/memcontrol.h>
25 #include <linux/mm_inline.h>
26 #include <linux/slab.h>
27 #include <uapi/linux/sched/types.h>
28 #include <linux/sched/signal.h>
29 #include <linux/pkt_sched.h>
30 #define __KERNEL_SYSCALLS__
31 #include <linux/unistd.h>
32 #include <linux/vmalloc.h>
33 #include <linux/random.h>
34 #include <linux/string.h>
35 #include <linux/scatterlist.h>
36 #include "drbd_int.h"
37 #include "drbd_protocol.h"
38 #include "drbd_req.h"
39 #include "drbd_vli.h"
40
41 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME|DRBD_FF_WZEROES)
42
43 struct packet_info {
44         enum drbd_packet cmd;
45         unsigned int size;
46         unsigned int vnr;
47         void *data;
48 };
49
50 enum finish_epoch {
51         FE_STILL_LIVE,
52         FE_DESTROYED,
53         FE_RECYCLED,
54 };
55
56 static int drbd_do_features(struct drbd_connection *connection);
57 static int drbd_do_auth(struct drbd_connection *connection);
58 static int drbd_disconnected(struct drbd_peer_device *);
59 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
60 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
61 static int e_end_block(struct drbd_work *, int);
62
63
64 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
65
66 /*
67  * some helper functions to deal with single linked page lists,
68  * page->private being our "next" pointer.
69  */
70
71 /* If at least n pages are linked at head, get n pages off.
72  * Otherwise, don't modify head, and return NULL.
73  * Locking is the responsibility of the caller.
74  */
75 static struct page *page_chain_del(struct page **head, int n)
76 {
77         struct page *page;
78         struct page *tmp;
79
80         BUG_ON(!n);
81         BUG_ON(!head);
82
83         page = *head;
84
85         if (!page)
86                 return NULL;
87
88         while (page) {
89                 tmp = page_chain_next(page);
90                 if (--n == 0)
91                         break; /* found sufficient pages */
92                 if (tmp == NULL)
93                         /* insufficient pages, don't use any of them. */
94                         return NULL;
95                 page = tmp;
96         }
97
98         /* add end of list marker for the returned list */
99         set_page_private(page, 0);
100         /* actual return value, and adjustment of head */
101         page = *head;
102         *head = tmp;
103         return page;
104 }
105
106 /* may be used outside of locks to find the tail of a (usually short)
107  * "private" page chain, before adding it back to a global chain head
108  * with page_chain_add() under a spinlock. */
109 static struct page *page_chain_tail(struct page *page, int *len)
110 {
111         struct page *tmp;
112         int i = 1;
113         while ((tmp = page_chain_next(page)))
114                 ++i, page = tmp;
115         if (len)
116                 *len = i;
117         return page;
118 }
119
120 static int page_chain_free(struct page *page)
121 {
122         struct page *tmp;
123         int i = 0;
124         page_chain_for_each_safe(page, tmp) {
125                 put_page(page);
126                 ++i;
127         }
128         return i;
129 }
130
131 static void page_chain_add(struct page **head,
132                 struct page *chain_first, struct page *chain_last)
133 {
134 #if 1
135         struct page *tmp;
136         tmp = page_chain_tail(chain_first, NULL);
137         BUG_ON(tmp != chain_last);
138 #endif
139
140         /* add chain to head */
141         set_page_private(chain_last, (unsigned long)*head);
142         *head = chain_first;
143 }
144
145 static struct page *__drbd_alloc_pages(struct drbd_device *device,
146                                        unsigned int number)
147 {
148         struct page *page = NULL;
149         struct page *tmp = NULL;
150         unsigned int i = 0;
151
152         /* Yes, testing drbd_pp_vacant outside the lock is racy.
153          * So what. It saves a spin_lock. */
154         if (drbd_pp_vacant >= number) {
155                 spin_lock(&drbd_pp_lock);
156                 page = page_chain_del(&drbd_pp_pool, number);
157                 if (page)
158                         drbd_pp_vacant -= number;
159                 spin_unlock(&drbd_pp_lock);
160                 if (page)
161                         return page;
162         }
163
164         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
165          * "criss-cross" setup, that might cause write-out on some other DRBD,
166          * which in turn might block on the other node at this very place.  */
167         for (i = 0; i < number; i++) {
168                 tmp = alloc_page(GFP_TRY);
169                 if (!tmp)
170                         break;
171                 set_page_private(tmp, (unsigned long)page);
172                 page = tmp;
173         }
174
175         if (i == number)
176                 return page;
177
178         /* Not enough pages immediately available this time.
179          * No need to jump around here, drbd_alloc_pages will retry this
180          * function "soon". */
181         if (page) {
182                 tmp = page_chain_tail(page, NULL);
183                 spin_lock(&drbd_pp_lock);
184                 page_chain_add(&drbd_pp_pool, page, tmp);
185                 drbd_pp_vacant += i;
186                 spin_unlock(&drbd_pp_lock);
187         }
188         return NULL;
189 }
190
191 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
192                                            struct list_head *to_be_freed)
193 {
194         struct drbd_peer_request *peer_req, *tmp;
195
196         /* The EEs are always appended to the end of the list. Since
197            they are sent in order over the wire, they have to finish
198            in order. As soon as we see the first not finished we can
199            stop to examine the list... */
200
201         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
202                 if (drbd_peer_req_has_active_page(peer_req))
203                         break;
204                 list_move(&peer_req->w.list, to_be_freed);
205         }
206 }
207
208 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
209 {
210         LIST_HEAD(reclaimed);
211         struct drbd_peer_request *peer_req, *t;
212
213         spin_lock_irq(&device->resource->req_lock);
214         reclaim_finished_net_peer_reqs(device, &reclaimed);
215         spin_unlock_irq(&device->resource->req_lock);
216         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
217                 drbd_free_net_peer_req(device, peer_req);
218 }
219
220 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
221 {
222         struct drbd_peer_device *peer_device;
223         int vnr;
224
225         rcu_read_lock();
226         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
227                 struct drbd_device *device = peer_device->device;
228                 if (!atomic_read(&device->pp_in_use_by_net))
229                         continue;
230
231                 kref_get(&device->kref);
232                 rcu_read_unlock();
233                 drbd_reclaim_net_peer_reqs(device);
234                 kref_put(&device->kref, drbd_destroy_device);
235                 rcu_read_lock();
236         }
237         rcu_read_unlock();
238 }
239
240 /**
241  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
242  * @device:     DRBD device.
243  * @number:     number of pages requested
244  * @retry:      whether to retry, if not enough pages are available right now
245  *
246  * Tries to allocate number pages, first from our own page pool, then from
247  * the kernel.
248  * Possibly retry until DRBD frees sufficient pages somewhere else.
249  *
250  * If this allocation would exceed the max_buffers setting, we throttle
251  * allocation (schedule_timeout) to give the system some room to breathe.
252  *
253  * We do not use max-buffers as hard limit, because it could lead to
254  * congestion and further to a distributed deadlock during online-verify or
255  * (checksum based) resync, if the max-buffers, socket buffer sizes and
256  * resync-rate settings are mis-configured.
257  *
258  * Returns a page chain linked via page->private.
259  */
260 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
261                               bool retry)
262 {
263         struct drbd_device *device = peer_device->device;
264         struct page *page = NULL;
265         struct net_conf *nc;
266         DEFINE_WAIT(wait);
267         unsigned int mxb;
268
269         rcu_read_lock();
270         nc = rcu_dereference(peer_device->connection->net_conf);
271         mxb = nc ? nc->max_buffers : 1000000;
272         rcu_read_unlock();
273
274         if (atomic_read(&device->pp_in_use) < mxb)
275                 page = __drbd_alloc_pages(device, number);
276
277         /* Try to keep the fast path fast, but occasionally we need
278          * to reclaim the pages we lended to the network stack. */
279         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
280                 drbd_reclaim_net_peer_reqs(device);
281
282         while (page == NULL) {
283                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
284
285                 drbd_reclaim_net_peer_reqs(device);
286
287                 if (atomic_read(&device->pp_in_use) < mxb) {
288                         page = __drbd_alloc_pages(device, number);
289                         if (page)
290                                 break;
291                 }
292
293                 if (!retry)
294                         break;
295
296                 if (signal_pending(current)) {
297                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
298                         break;
299                 }
300
301                 if (schedule_timeout(HZ/10) == 0)
302                         mxb = UINT_MAX;
303         }
304         finish_wait(&drbd_pp_wait, &wait);
305
306         if (page)
307                 atomic_add(number, &device->pp_in_use);
308         return page;
309 }
310
311 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
312  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
313  * Either links the page chain back to the global pool,
314  * or returns all pages to the system. */
315 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
316 {
317         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
318         int i;
319
320         if (page == NULL)
321                 return;
322
323         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * drbd_minor_count)
324                 i = page_chain_free(page);
325         else {
326                 struct page *tmp;
327                 tmp = page_chain_tail(page, &i);
328                 spin_lock(&drbd_pp_lock);
329                 page_chain_add(&drbd_pp_pool, page, tmp);
330                 drbd_pp_vacant += i;
331                 spin_unlock(&drbd_pp_lock);
332         }
333         i = atomic_sub_return(i, a);
334         if (i < 0)
335                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
336                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
337         wake_up(&drbd_pp_wait);
338 }
339
340 /*
341 You need to hold the req_lock:
342  _drbd_wait_ee_list_empty()
343
344 You must not have the req_lock:
345  drbd_free_peer_req()
346  drbd_alloc_peer_req()
347  drbd_free_peer_reqs()
348  drbd_ee_fix_bhs()
349  drbd_finish_peer_reqs()
350  drbd_clear_done_ee()
351  drbd_wait_ee_list_empty()
352 */
353
354 /* normal: payload_size == request size (bi_size)
355  * w_same: payload_size == logical_block_size
356  * trim: payload_size == 0 */
357 struct drbd_peer_request *
358 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
359                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
360 {
361         struct drbd_device *device = peer_device->device;
362         struct drbd_peer_request *peer_req;
363         struct page *page = NULL;
364         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
365
366         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
367                 return NULL;
368
369         peer_req = mempool_alloc(&drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
370         if (!peer_req) {
371                 if (!(gfp_mask & __GFP_NOWARN))
372                         drbd_err(device, "%s: allocation failed\n", __func__);
373                 return NULL;
374         }
375
376         if (nr_pages) {
377                 page = drbd_alloc_pages(peer_device, nr_pages,
378                                         gfpflags_allow_blocking(gfp_mask));
379                 if (!page)
380                         goto fail;
381         }
382
383         memset(peer_req, 0, sizeof(*peer_req));
384         INIT_LIST_HEAD(&peer_req->w.list);
385         drbd_clear_interval(&peer_req->i);
386         peer_req->i.size = request_size;
387         peer_req->i.sector = sector;
388         peer_req->submit_jif = jiffies;
389         peer_req->peer_device = peer_device;
390         peer_req->pages = page;
391         /*
392          * The block_id is opaque to the receiver.  It is not endianness
393          * converted, and sent back to the sender unchanged.
394          */
395         peer_req->block_id = id;
396
397         return peer_req;
398
399  fail:
400         mempool_free(peer_req, &drbd_ee_mempool);
401         return NULL;
402 }
403
404 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
405                        int is_net)
406 {
407         might_sleep();
408         if (peer_req->flags & EE_HAS_DIGEST)
409                 kfree(peer_req->digest);
410         drbd_free_pages(device, peer_req->pages, is_net);
411         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
412         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
413         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
414                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
415                 drbd_al_complete_io(device, &peer_req->i);
416         }
417         mempool_free(peer_req, &drbd_ee_mempool);
418 }
419
420 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
421 {
422         LIST_HEAD(work_list);
423         struct drbd_peer_request *peer_req, *t;
424         int count = 0;
425         int is_net = list == &device->net_ee;
426
427         spin_lock_irq(&device->resource->req_lock);
428         list_splice_init(list, &work_list);
429         spin_unlock_irq(&device->resource->req_lock);
430
431         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
432                 __drbd_free_peer_req(device, peer_req, is_net);
433                 count++;
434         }
435         return count;
436 }
437
438 /*
439  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
440  */
441 static int drbd_finish_peer_reqs(struct drbd_device *device)
442 {
443         LIST_HEAD(work_list);
444         LIST_HEAD(reclaimed);
445         struct drbd_peer_request *peer_req, *t;
446         int err = 0;
447
448         spin_lock_irq(&device->resource->req_lock);
449         reclaim_finished_net_peer_reqs(device, &reclaimed);
450         list_splice_init(&device->done_ee, &work_list);
451         spin_unlock_irq(&device->resource->req_lock);
452
453         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
454                 drbd_free_net_peer_req(device, peer_req);
455
456         /* possible callbacks here:
457          * e_end_block, and e_end_resync_block, e_send_superseded.
458          * all ignore the last argument.
459          */
460         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
461                 int err2;
462
463                 /* list_del not necessary, next/prev members not touched */
464                 err2 = peer_req->w.cb(&peer_req->w, !!err);
465                 if (!err)
466                         err = err2;
467                 drbd_free_peer_req(device, peer_req);
468         }
469         wake_up(&device->ee_wait);
470
471         return err;
472 }
473
474 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
475                                      struct list_head *head)
476 {
477         DEFINE_WAIT(wait);
478
479         /* avoids spin_lock/unlock
480          * and calling prepare_to_wait in the fast path */
481         while (!list_empty(head)) {
482                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
483                 spin_unlock_irq(&device->resource->req_lock);
484                 io_schedule();
485                 finish_wait(&device->ee_wait, &wait);
486                 spin_lock_irq(&device->resource->req_lock);
487         }
488 }
489
490 static void drbd_wait_ee_list_empty(struct drbd_device *device,
491                                     struct list_head *head)
492 {
493         spin_lock_irq(&device->resource->req_lock);
494         _drbd_wait_ee_list_empty(device, head);
495         spin_unlock_irq(&device->resource->req_lock);
496 }
497
498 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
499 {
500         struct kvec iov = {
501                 .iov_base = buf,
502                 .iov_len = size,
503         };
504         struct msghdr msg = {
505                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
506         };
507         iov_iter_kvec(&msg.msg_iter, READ, &iov, 1, size);
508         return sock_recvmsg(sock, &msg, msg.msg_flags);
509 }
510
511 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
512 {
513         int rv;
514
515         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
516
517         if (rv < 0) {
518                 if (rv == -ECONNRESET)
519                         drbd_info(connection, "sock was reset by peer\n");
520                 else if (rv != -ERESTARTSYS)
521                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
522         } else if (rv == 0) {
523                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
524                         long t;
525                         rcu_read_lock();
526                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
527                         rcu_read_unlock();
528
529                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
530
531                         if (t)
532                                 goto out;
533                 }
534                 drbd_info(connection, "sock was shut down by peer\n");
535         }
536
537         if (rv != size)
538                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
539
540 out:
541         return rv;
542 }
543
544 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
545 {
546         int err;
547
548         err = drbd_recv(connection, buf, size);
549         if (err != size) {
550                 if (err >= 0)
551                         err = -EIO;
552         } else
553                 err = 0;
554         return err;
555 }
556
557 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
558 {
559         int err;
560
561         err = drbd_recv_all(connection, buf, size);
562         if (err && !signal_pending(current))
563                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
564         return err;
565 }
566
567 /* quoting tcp(7):
568  *   On individual connections, the socket buffer size must be set prior to the
569  *   listen(2) or connect(2) calls in order to have it take effect.
570  * This is our wrapper to do so.
571  */
572 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
573                 unsigned int rcv)
574 {
575         /* open coded SO_SNDBUF, SO_RCVBUF */
576         if (snd) {
577                 sock->sk->sk_sndbuf = snd;
578                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
579         }
580         if (rcv) {
581                 sock->sk->sk_rcvbuf = rcv;
582                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
583         }
584 }
585
586 static struct socket *drbd_try_connect(struct drbd_connection *connection)
587 {
588         const char *what;
589         struct socket *sock;
590         struct sockaddr_in6 src_in6;
591         struct sockaddr_in6 peer_in6;
592         struct net_conf *nc;
593         int err, peer_addr_len, my_addr_len;
594         int sndbuf_size, rcvbuf_size, connect_int;
595         int disconnect_on_error = 1;
596
597         rcu_read_lock();
598         nc = rcu_dereference(connection->net_conf);
599         if (!nc) {
600                 rcu_read_unlock();
601                 return NULL;
602         }
603         sndbuf_size = nc->sndbuf_size;
604         rcvbuf_size = nc->rcvbuf_size;
605         connect_int = nc->connect_int;
606         rcu_read_unlock();
607
608         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
609         memcpy(&src_in6, &connection->my_addr, my_addr_len);
610
611         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
612                 src_in6.sin6_port = 0;
613         else
614                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
615
616         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
617         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
618
619         what = "sock_create_kern";
620         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
621                                SOCK_STREAM, IPPROTO_TCP, &sock);
622         if (err < 0) {
623                 sock = NULL;
624                 goto out;
625         }
626
627         sock->sk->sk_rcvtimeo =
628         sock->sk->sk_sndtimeo = connect_int * HZ;
629         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
630
631        /* explicitly bind to the configured IP as source IP
632         *  for the outgoing connections.
633         *  This is needed for multihomed hosts and to be
634         *  able to use lo: interfaces for drbd.
635         * Make sure to use 0 as port number, so linux selects
636         *  a free one dynamically.
637         */
638         what = "bind before connect";
639         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
640         if (err < 0)
641                 goto out;
642
643         /* connect may fail, peer not yet available.
644          * stay C_WF_CONNECTION, don't go Disconnecting! */
645         disconnect_on_error = 0;
646         what = "connect";
647         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
648
649 out:
650         if (err < 0) {
651                 if (sock) {
652                         sock_release(sock);
653                         sock = NULL;
654                 }
655                 switch (-err) {
656                         /* timeout, busy, signal pending */
657                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
658                 case EINTR: case ERESTARTSYS:
659                         /* peer not (yet) available, network problem */
660                 case ECONNREFUSED: case ENETUNREACH:
661                 case EHOSTDOWN:    case EHOSTUNREACH:
662                         disconnect_on_error = 0;
663                         break;
664                 default:
665                         drbd_err(connection, "%s failed, err = %d\n", what, err);
666                 }
667                 if (disconnect_on_error)
668                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
669         }
670
671         return sock;
672 }
673
674 struct accept_wait_data {
675         struct drbd_connection *connection;
676         struct socket *s_listen;
677         struct completion door_bell;
678         void (*original_sk_state_change)(struct sock *sk);
679
680 };
681
682 static void drbd_incoming_connection(struct sock *sk)
683 {
684         struct accept_wait_data *ad = sk->sk_user_data;
685         void (*state_change)(struct sock *sk);
686
687         state_change = ad->original_sk_state_change;
688         if (sk->sk_state == TCP_ESTABLISHED)
689                 complete(&ad->door_bell);
690         state_change(sk);
691 }
692
693 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
694 {
695         int err, sndbuf_size, rcvbuf_size, my_addr_len;
696         struct sockaddr_in6 my_addr;
697         struct socket *s_listen;
698         struct net_conf *nc;
699         const char *what;
700
701         rcu_read_lock();
702         nc = rcu_dereference(connection->net_conf);
703         if (!nc) {
704                 rcu_read_unlock();
705                 return -EIO;
706         }
707         sndbuf_size = nc->sndbuf_size;
708         rcvbuf_size = nc->rcvbuf_size;
709         rcu_read_unlock();
710
711         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
712         memcpy(&my_addr, &connection->my_addr, my_addr_len);
713
714         what = "sock_create_kern";
715         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
716                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
717         if (err) {
718                 s_listen = NULL;
719                 goto out;
720         }
721
722         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
723         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
724
725         what = "bind before listen";
726         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
727         if (err < 0)
728                 goto out;
729
730         ad->s_listen = s_listen;
731         write_lock_bh(&s_listen->sk->sk_callback_lock);
732         ad->original_sk_state_change = s_listen->sk->sk_state_change;
733         s_listen->sk->sk_state_change = drbd_incoming_connection;
734         s_listen->sk->sk_user_data = ad;
735         write_unlock_bh(&s_listen->sk->sk_callback_lock);
736
737         what = "listen";
738         err = s_listen->ops->listen(s_listen, 5);
739         if (err < 0)
740                 goto out;
741
742         return 0;
743 out:
744         if (s_listen)
745                 sock_release(s_listen);
746         if (err < 0) {
747                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
748                         drbd_err(connection, "%s failed, err = %d\n", what, err);
749                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
750                 }
751         }
752
753         return -EIO;
754 }
755
756 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
757 {
758         write_lock_bh(&sk->sk_callback_lock);
759         sk->sk_state_change = ad->original_sk_state_change;
760         sk->sk_user_data = NULL;
761         write_unlock_bh(&sk->sk_callback_lock);
762 }
763
764 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
765 {
766         int timeo, connect_int, err = 0;
767         struct socket *s_estab = NULL;
768         struct net_conf *nc;
769
770         rcu_read_lock();
771         nc = rcu_dereference(connection->net_conf);
772         if (!nc) {
773                 rcu_read_unlock();
774                 return NULL;
775         }
776         connect_int = nc->connect_int;
777         rcu_read_unlock();
778
779         timeo = connect_int * HZ;
780         /* 28.5% random jitter */
781         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
782
783         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
784         if (err <= 0)
785                 return NULL;
786
787         err = kernel_accept(ad->s_listen, &s_estab, 0);
788         if (err < 0) {
789                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
790                         drbd_err(connection, "accept failed, err = %d\n", err);
791                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
792                 }
793         }
794
795         if (s_estab)
796                 unregister_state_change(s_estab->sk, ad);
797
798         return s_estab;
799 }
800
801 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
802
803 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
804                              enum drbd_packet cmd)
805 {
806         if (!conn_prepare_command(connection, sock))
807                 return -EIO;
808         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
809 }
810
811 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
812 {
813         unsigned int header_size = drbd_header_size(connection);
814         struct packet_info pi;
815         struct net_conf *nc;
816         int err;
817
818         rcu_read_lock();
819         nc = rcu_dereference(connection->net_conf);
820         if (!nc) {
821                 rcu_read_unlock();
822                 return -EIO;
823         }
824         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
825         rcu_read_unlock();
826
827         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
828         if (err != header_size) {
829                 if (err >= 0)
830                         err = -EIO;
831                 return err;
832         }
833         err = decode_header(connection, connection->data.rbuf, &pi);
834         if (err)
835                 return err;
836         return pi.cmd;
837 }
838
839 /**
840  * drbd_socket_okay() - Free the socket if its connection is not okay
841  * @sock:       pointer to the pointer to the socket.
842  */
843 static bool drbd_socket_okay(struct socket **sock)
844 {
845         int rr;
846         char tb[4];
847
848         if (!*sock)
849                 return false;
850
851         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
852
853         if (rr > 0 || rr == -EAGAIN) {
854                 return true;
855         } else {
856                 sock_release(*sock);
857                 *sock = NULL;
858                 return false;
859         }
860 }
861
862 static bool connection_established(struct drbd_connection *connection,
863                                    struct socket **sock1,
864                                    struct socket **sock2)
865 {
866         struct net_conf *nc;
867         int timeout;
868         bool ok;
869
870         if (!*sock1 || !*sock2)
871                 return false;
872
873         rcu_read_lock();
874         nc = rcu_dereference(connection->net_conf);
875         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
876         rcu_read_unlock();
877         schedule_timeout_interruptible(timeout);
878
879         ok = drbd_socket_okay(sock1);
880         ok = drbd_socket_okay(sock2) && ok;
881
882         return ok;
883 }
884
885 /* Gets called if a connection is established, or if a new minor gets created
886    in a connection */
887 int drbd_connected(struct drbd_peer_device *peer_device)
888 {
889         struct drbd_device *device = peer_device->device;
890         int err;
891
892         atomic_set(&device->packet_seq, 0);
893         device->peer_seq = 0;
894
895         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
896                 &peer_device->connection->cstate_mutex :
897                 &device->own_state_mutex;
898
899         err = drbd_send_sync_param(peer_device);
900         if (!err)
901                 err = drbd_send_sizes(peer_device, 0, 0);
902         if (!err)
903                 err = drbd_send_uuids(peer_device);
904         if (!err)
905                 err = drbd_send_current_state(peer_device);
906         clear_bit(USE_DEGR_WFC_T, &device->flags);
907         clear_bit(RESIZE_PENDING, &device->flags);
908         atomic_set(&device->ap_in_flight, 0);
909         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
910         return err;
911 }
912
913 /*
914  * return values:
915  *   1 yes, we have a valid connection
916  *   0 oops, did not work out, please try again
917  *  -1 peer talks different language,
918  *     no point in trying again, please go standalone.
919  *  -2 We do not have a network config...
920  */
921 static int conn_connect(struct drbd_connection *connection)
922 {
923         struct drbd_socket sock, msock;
924         struct drbd_peer_device *peer_device;
925         struct net_conf *nc;
926         int vnr, timeout, h;
927         bool discard_my_data, ok;
928         enum drbd_state_rv rv;
929         struct accept_wait_data ad = {
930                 .connection = connection,
931                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
932         };
933
934         clear_bit(DISCONNECT_SENT, &connection->flags);
935         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
936                 return -2;
937
938         mutex_init(&sock.mutex);
939         sock.sbuf = connection->data.sbuf;
940         sock.rbuf = connection->data.rbuf;
941         sock.socket = NULL;
942         mutex_init(&msock.mutex);
943         msock.sbuf = connection->meta.sbuf;
944         msock.rbuf = connection->meta.rbuf;
945         msock.socket = NULL;
946
947         /* Assume that the peer only understands protocol 80 until we know better.  */
948         connection->agreed_pro_version = 80;
949
950         if (prepare_listen_socket(connection, &ad))
951                 return 0;
952
953         do {
954                 struct socket *s;
955
956                 s = drbd_try_connect(connection);
957                 if (s) {
958                         if (!sock.socket) {
959                                 sock.socket = s;
960                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
961                         } else if (!msock.socket) {
962                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
963                                 msock.socket = s;
964                                 send_first_packet(connection, &msock, P_INITIAL_META);
965                         } else {
966                                 drbd_err(connection, "Logic error in conn_connect()\n");
967                                 goto out_release_sockets;
968                         }
969                 }
970
971                 if (connection_established(connection, &sock.socket, &msock.socket))
972                         break;
973
974 retry:
975                 s = drbd_wait_for_connect(connection, &ad);
976                 if (s) {
977                         int fp = receive_first_packet(connection, s);
978                         drbd_socket_okay(&sock.socket);
979                         drbd_socket_okay(&msock.socket);
980                         switch (fp) {
981                         case P_INITIAL_DATA:
982                                 if (sock.socket) {
983                                         drbd_warn(connection, "initial packet S crossed\n");
984                                         sock_release(sock.socket);
985                                         sock.socket = s;
986                                         goto randomize;
987                                 }
988                                 sock.socket = s;
989                                 break;
990                         case P_INITIAL_META:
991                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
992                                 if (msock.socket) {
993                                         drbd_warn(connection, "initial packet M crossed\n");
994                                         sock_release(msock.socket);
995                                         msock.socket = s;
996                                         goto randomize;
997                                 }
998                                 msock.socket = s;
999                                 break;
1000                         default:
1001                                 drbd_warn(connection, "Error receiving initial packet\n");
1002                                 sock_release(s);
1003 randomize:
1004                                 if (prandom_u32() & 1)
1005                                         goto retry;
1006                         }
1007                 }
1008
1009                 if (connection->cstate <= C_DISCONNECTING)
1010                         goto out_release_sockets;
1011                 if (signal_pending(current)) {
1012                         flush_signals(current);
1013                         smp_rmb();
1014                         if (get_t_state(&connection->receiver) == EXITING)
1015                                 goto out_release_sockets;
1016                 }
1017
1018                 ok = connection_established(connection, &sock.socket, &msock.socket);
1019         } while (!ok);
1020
1021         if (ad.s_listen)
1022                 sock_release(ad.s_listen);
1023
1024         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1025         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1026
1027         sock.socket->sk->sk_allocation = GFP_NOIO;
1028         msock.socket->sk->sk_allocation = GFP_NOIO;
1029
1030         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1031         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1032
1033         /* NOT YET ...
1034          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1035          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1036          * first set it to the P_CONNECTION_FEATURES timeout,
1037          * which we set to 4x the configured ping_timeout. */
1038         rcu_read_lock();
1039         nc = rcu_dereference(connection->net_conf);
1040
1041         sock.socket->sk->sk_sndtimeo =
1042         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1043
1044         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1045         timeout = nc->timeout * HZ / 10;
1046         discard_my_data = nc->discard_my_data;
1047         rcu_read_unlock();
1048
1049         msock.socket->sk->sk_sndtimeo = timeout;
1050
1051         /* we don't want delays.
1052          * we use TCP_CORK where appropriate, though */
1053         drbd_tcp_nodelay(sock.socket);
1054         drbd_tcp_nodelay(msock.socket);
1055
1056         connection->data.socket = sock.socket;
1057         connection->meta.socket = msock.socket;
1058         connection->last_received = jiffies;
1059
1060         h = drbd_do_features(connection);
1061         if (h <= 0)
1062                 return h;
1063
1064         if (connection->cram_hmac_tfm) {
1065                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1066                 switch (drbd_do_auth(connection)) {
1067                 case -1:
1068                         drbd_err(connection, "Authentication of peer failed\n");
1069                         return -1;
1070                 case 0:
1071                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1072                         return 0;
1073                 }
1074         }
1075
1076         connection->data.socket->sk->sk_sndtimeo = timeout;
1077         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1078
1079         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1080                 return -1;
1081
1082         /* Prevent a race between resync-handshake and
1083          * being promoted to Primary.
1084          *
1085          * Grab and release the state mutex, so we know that any current
1086          * drbd_set_role() is finished, and any incoming drbd_set_role
1087          * will see the STATE_SENT flag, and wait for it to be cleared.
1088          */
1089         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1090                 mutex_lock(peer_device->device->state_mutex);
1091
1092         /* avoid a race with conn_request_state( C_DISCONNECTING ) */
1093         spin_lock_irq(&connection->resource->req_lock);
1094         set_bit(STATE_SENT, &connection->flags);
1095         spin_unlock_irq(&connection->resource->req_lock);
1096
1097         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1098                 mutex_unlock(peer_device->device->state_mutex);
1099
1100         rcu_read_lock();
1101         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1102                 struct drbd_device *device = peer_device->device;
1103                 kref_get(&device->kref);
1104                 rcu_read_unlock();
1105
1106                 if (discard_my_data)
1107                         set_bit(DISCARD_MY_DATA, &device->flags);
1108                 else
1109                         clear_bit(DISCARD_MY_DATA, &device->flags);
1110
1111                 drbd_connected(peer_device);
1112                 kref_put(&device->kref, drbd_destroy_device);
1113                 rcu_read_lock();
1114         }
1115         rcu_read_unlock();
1116
1117         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1118         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1119                 clear_bit(STATE_SENT, &connection->flags);
1120                 return 0;
1121         }
1122
1123         drbd_thread_start(&connection->ack_receiver);
1124         /* opencoded create_singlethread_workqueue(),
1125          * to be able to use format string arguments */
1126         connection->ack_sender =
1127                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1128         if (!connection->ack_sender) {
1129                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1130                 return 0;
1131         }
1132
1133         mutex_lock(&connection->resource->conf_update);
1134         /* The discard_my_data flag is a single-shot modifier to the next
1135          * connection attempt, the handshake of which is now well underway.
1136          * No need for rcu style copying of the whole struct
1137          * just to clear a single value. */
1138         connection->net_conf->discard_my_data = 0;
1139         mutex_unlock(&connection->resource->conf_update);
1140
1141         return h;
1142
1143 out_release_sockets:
1144         if (ad.s_listen)
1145                 sock_release(ad.s_listen);
1146         if (sock.socket)
1147                 sock_release(sock.socket);
1148         if (msock.socket)
1149                 sock_release(msock.socket);
1150         return -1;
1151 }
1152
1153 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1154 {
1155         unsigned int header_size = drbd_header_size(connection);
1156
1157         if (header_size == sizeof(struct p_header100) &&
1158             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1159                 struct p_header100 *h = header;
1160                 if (h->pad != 0) {
1161                         drbd_err(connection, "Header padding is not zero\n");
1162                         return -EINVAL;
1163                 }
1164                 pi->vnr = be16_to_cpu(h->volume);
1165                 pi->cmd = be16_to_cpu(h->command);
1166                 pi->size = be32_to_cpu(h->length);
1167         } else if (header_size == sizeof(struct p_header95) &&
1168                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1169                 struct p_header95 *h = header;
1170                 pi->cmd = be16_to_cpu(h->command);
1171                 pi->size = be32_to_cpu(h->length);
1172                 pi->vnr = 0;
1173         } else if (header_size == sizeof(struct p_header80) &&
1174                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1175                 struct p_header80 *h = header;
1176                 pi->cmd = be16_to_cpu(h->command);
1177                 pi->size = be16_to_cpu(h->length);
1178                 pi->vnr = 0;
1179         } else {
1180                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1181                          be32_to_cpu(*(__be32 *)header),
1182                          connection->agreed_pro_version);
1183                 return -EINVAL;
1184         }
1185         pi->data = header + header_size;
1186         return 0;
1187 }
1188
1189 static void drbd_unplug_all_devices(struct drbd_connection *connection)
1190 {
1191         if (current->plug == &connection->receiver_plug) {
1192                 blk_finish_plug(&connection->receiver_plug);
1193                 blk_start_plug(&connection->receiver_plug);
1194         } /* else: maybe just schedule() ?? */
1195 }
1196
1197 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1198 {
1199         void *buffer = connection->data.rbuf;
1200         int err;
1201
1202         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1203         if (err)
1204                 return err;
1205
1206         err = decode_header(connection, buffer, pi);
1207         connection->last_received = jiffies;
1208
1209         return err;
1210 }
1211
1212 static int drbd_recv_header_maybe_unplug(struct drbd_connection *connection, struct packet_info *pi)
1213 {
1214         void *buffer = connection->data.rbuf;
1215         unsigned int size = drbd_header_size(connection);
1216         int err;
1217
1218         err = drbd_recv_short(connection->data.socket, buffer, size, MSG_NOSIGNAL|MSG_DONTWAIT);
1219         if (err != size) {
1220                 /* If we have nothing in the receive buffer now, to reduce
1221                  * application latency, try to drain the backend queues as
1222                  * quickly as possible, and let remote TCP know what we have
1223                  * received so far. */
1224                 if (err == -EAGAIN) {
1225                         drbd_tcp_quickack(connection->data.socket);
1226                         drbd_unplug_all_devices(connection);
1227                 }
1228                 if (err > 0) {
1229                         buffer += err;
1230                         size -= err;
1231                 }
1232                 err = drbd_recv_all_warn(connection, buffer, size);
1233                 if (err)
1234                         return err;
1235         }
1236
1237         err = decode_header(connection, connection->data.rbuf, pi);
1238         connection->last_received = jiffies;
1239
1240         return err;
1241 }
1242 /* This is blkdev_issue_flush, but asynchronous.
1243  * We want to submit to all component volumes in parallel,
1244  * then wait for all completions.
1245  */
1246 struct issue_flush_context {
1247         atomic_t pending;
1248         int error;
1249         struct completion done;
1250 };
1251 struct one_flush_context {
1252         struct drbd_device *device;
1253         struct issue_flush_context *ctx;
1254 };
1255
1256 static void one_flush_endio(struct bio *bio)
1257 {
1258         struct one_flush_context *octx = bio->bi_private;
1259         struct drbd_device *device = octx->device;
1260         struct issue_flush_context *ctx = octx->ctx;
1261
1262         if (bio->bi_status) {
1263                 ctx->error = blk_status_to_errno(bio->bi_status);
1264                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_status);
1265         }
1266         kfree(octx);
1267         bio_put(bio);
1268
1269         clear_bit(FLUSH_PENDING, &device->flags);
1270         put_ldev(device);
1271         kref_put(&device->kref, drbd_destroy_device);
1272
1273         if (atomic_dec_and_test(&ctx->pending))
1274                 complete(&ctx->done);
1275 }
1276
1277 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1278 {
1279         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1280         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1281         if (!bio || !octx) {
1282                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1283                 /* FIXME: what else can I do now?  disconnecting or detaching
1284                  * really does not help to improve the state of the world, either.
1285                  */
1286                 kfree(octx);
1287                 if (bio)
1288                         bio_put(bio);
1289
1290                 ctx->error = -ENOMEM;
1291                 put_ldev(device);
1292                 kref_put(&device->kref, drbd_destroy_device);
1293                 return;
1294         }
1295
1296         octx->device = device;
1297         octx->ctx = ctx;
1298         bio_set_dev(bio, device->ldev->backing_bdev);
1299         bio->bi_private = octx;
1300         bio->bi_end_io = one_flush_endio;
1301         bio->bi_opf = REQ_OP_FLUSH | REQ_PREFLUSH;
1302
1303         device->flush_jif = jiffies;
1304         set_bit(FLUSH_PENDING, &device->flags);
1305         atomic_inc(&ctx->pending);
1306         submit_bio(bio);
1307 }
1308
1309 static void drbd_flush(struct drbd_connection *connection)
1310 {
1311         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1312                 struct drbd_peer_device *peer_device;
1313                 struct issue_flush_context ctx;
1314                 int vnr;
1315
1316                 atomic_set(&ctx.pending, 1);
1317                 ctx.error = 0;
1318                 init_completion(&ctx.done);
1319
1320                 rcu_read_lock();
1321                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1322                         struct drbd_device *device = peer_device->device;
1323
1324                         if (!get_ldev(device))
1325                                 continue;
1326                         kref_get(&device->kref);
1327                         rcu_read_unlock();
1328
1329                         submit_one_flush(device, &ctx);
1330
1331                         rcu_read_lock();
1332                 }
1333                 rcu_read_unlock();
1334
1335                 /* Do we want to add a timeout,
1336                  * if disk-timeout is set? */
1337                 if (!atomic_dec_and_test(&ctx.pending))
1338                         wait_for_completion(&ctx.done);
1339
1340                 if (ctx.error) {
1341                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1342                          * don't try again for ANY return value != 0
1343                          * if (rv == -EOPNOTSUPP) */
1344                         /* Any error is already reported by bio_endio callback. */
1345                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1346                 }
1347         }
1348 }
1349
1350 /**
1351  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1352  * @device:     DRBD device.
1353  * @epoch:      Epoch object.
1354  * @ev:         Epoch event.
1355  */
1356 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1357                                                struct drbd_epoch *epoch,
1358                                                enum epoch_event ev)
1359 {
1360         int epoch_size;
1361         struct drbd_epoch *next_epoch;
1362         enum finish_epoch rv = FE_STILL_LIVE;
1363
1364         spin_lock(&connection->epoch_lock);
1365         do {
1366                 next_epoch = NULL;
1367
1368                 epoch_size = atomic_read(&epoch->epoch_size);
1369
1370                 switch (ev & ~EV_CLEANUP) {
1371                 case EV_PUT:
1372                         atomic_dec(&epoch->active);
1373                         break;
1374                 case EV_GOT_BARRIER_NR:
1375                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1376                         break;
1377                 case EV_BECAME_LAST:
1378                         /* nothing to do*/
1379                         break;
1380                 }
1381
1382                 if (epoch_size != 0 &&
1383                     atomic_read(&epoch->active) == 0 &&
1384                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1385                         if (!(ev & EV_CLEANUP)) {
1386                                 spin_unlock(&connection->epoch_lock);
1387                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1388                                 spin_lock(&connection->epoch_lock);
1389                         }
1390 #if 0
1391                         /* FIXME: dec unacked on connection, once we have
1392                          * something to count pending connection packets in. */
1393                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1394                                 dec_unacked(epoch->connection);
1395 #endif
1396
1397                         if (connection->current_epoch != epoch) {
1398                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1399                                 list_del(&epoch->list);
1400                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1401                                 connection->epochs--;
1402                                 kfree(epoch);
1403
1404                                 if (rv == FE_STILL_LIVE)
1405                                         rv = FE_DESTROYED;
1406                         } else {
1407                                 epoch->flags = 0;
1408                                 atomic_set(&epoch->epoch_size, 0);
1409                                 /* atomic_set(&epoch->active, 0); is already zero */
1410                                 if (rv == FE_STILL_LIVE)
1411                                         rv = FE_RECYCLED;
1412                         }
1413                 }
1414
1415                 if (!next_epoch)
1416                         break;
1417
1418                 epoch = next_epoch;
1419         } while (1);
1420
1421         spin_unlock(&connection->epoch_lock);
1422
1423         return rv;
1424 }
1425
1426 static enum write_ordering_e
1427 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1428 {
1429         struct disk_conf *dc;
1430
1431         dc = rcu_dereference(bdev->disk_conf);
1432
1433         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1434                 wo = WO_DRAIN_IO;
1435         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1436                 wo = WO_NONE;
1437
1438         return wo;
1439 }
1440
1441 /**
1442  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1443  * @connection: DRBD connection.
1444  * @wo:         Write ordering method to try.
1445  */
1446 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1447                               enum write_ordering_e wo)
1448 {
1449         struct drbd_device *device;
1450         enum write_ordering_e pwo;
1451         int vnr;
1452         static char *write_ordering_str[] = {
1453                 [WO_NONE] = "none",
1454                 [WO_DRAIN_IO] = "drain",
1455                 [WO_BDEV_FLUSH] = "flush",
1456         };
1457
1458         pwo = resource->write_ordering;
1459         if (wo != WO_BDEV_FLUSH)
1460                 wo = min(pwo, wo);
1461         rcu_read_lock();
1462         idr_for_each_entry(&resource->devices, device, vnr) {
1463                 if (get_ldev(device)) {
1464                         wo = max_allowed_wo(device->ldev, wo);
1465                         if (device->ldev == bdev)
1466                                 bdev = NULL;
1467                         put_ldev(device);
1468                 }
1469         }
1470
1471         if (bdev)
1472                 wo = max_allowed_wo(bdev, wo);
1473
1474         rcu_read_unlock();
1475
1476         resource->write_ordering = wo;
1477         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1478                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1479 }
1480
1481 /*
1482  * Mapping "discard" to ZEROOUT with UNMAP does not work for us:
1483  * Drivers have to "announce" q->limits.max_write_zeroes_sectors, or it
1484  * will directly go to fallback mode, submitting normal writes, and
1485  * never even try to UNMAP.
1486  *
1487  * And dm-thin does not do this (yet), mostly because in general it has
1488  * to assume that "skip_block_zeroing" is set.  See also:
1489  * https://www.mail-archive.com/dm-devel%40redhat.com/msg07965.html
1490  * https://www.redhat.com/archives/dm-devel/2018-January/msg00271.html
1491  *
1492  * We *may* ignore the discard-zeroes-data setting, if so configured.
1493  *
1494  * Assumption is that this "discard_zeroes_data=0" is only because the backend
1495  * may ignore partial unaligned discards.
1496  *
1497  * LVM/DM thin as of at least
1498  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1499  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1500  *   Driver version:  4.29.0
1501  * still behaves this way.
1502  *
1503  * For unaligned (wrt. alignment and granularity) or too small discards,
1504  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1505  * but discard all the aligned full chunks.
1506  *
1507  * At least for LVM/DM thin, with skip_block_zeroing=false,
1508  * the result is effectively "discard_zeroes_data=1".
1509  */
1510 /* flags: EE_TRIM|EE_ZEROOUT */
1511 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, int flags)
1512 {
1513         struct block_device *bdev = device->ldev->backing_bdev;
1514         struct request_queue *q = bdev_get_queue(bdev);
1515         sector_t tmp, nr;
1516         unsigned int max_discard_sectors, granularity;
1517         int alignment;
1518         int err = 0;
1519
1520         if ((flags & EE_ZEROOUT) || !(flags & EE_TRIM))
1521                 goto zero_out;
1522
1523         /* Zero-sector (unknown) and one-sector granularities are the same.  */
1524         granularity = max(q->limits.discard_granularity >> 9, 1U);
1525         alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1526
1527         max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1528         max_discard_sectors -= max_discard_sectors % granularity;
1529         if (unlikely(!max_discard_sectors))
1530                 goto zero_out;
1531
1532         if (nr_sectors < granularity)
1533                 goto zero_out;
1534
1535         tmp = start;
1536         if (sector_div(tmp, granularity) != alignment) {
1537                 if (nr_sectors < 2*granularity)
1538                         goto zero_out;
1539                 /* start + gran - (start + gran - align) % gran */
1540                 tmp = start + granularity - alignment;
1541                 tmp = start + granularity - sector_div(tmp, granularity);
1542
1543                 nr = tmp - start;
1544                 /* don't flag BLKDEV_ZERO_NOUNMAP, we don't know how many
1545                  * layers are below us, some may have smaller granularity */
1546                 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1547                 nr_sectors -= nr;
1548                 start = tmp;
1549         }
1550         while (nr_sectors >= max_discard_sectors) {
1551                 err |= blkdev_issue_discard(bdev, start, max_discard_sectors, GFP_NOIO, 0);
1552                 nr_sectors -= max_discard_sectors;
1553                 start += max_discard_sectors;
1554         }
1555         if (nr_sectors) {
1556                 /* max_discard_sectors is unsigned int (and a multiple of
1557                  * granularity, we made sure of that above already);
1558                  * nr is < max_discard_sectors;
1559                  * I don't need sector_div here, even though nr is sector_t */
1560                 nr = nr_sectors;
1561                 nr -= (unsigned int)nr % granularity;
1562                 if (nr) {
1563                         err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1564                         nr_sectors -= nr;
1565                         start += nr;
1566                 }
1567         }
1568  zero_out:
1569         if (nr_sectors) {
1570                 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO,
1571                                 (flags & EE_TRIM) ? 0 : BLKDEV_ZERO_NOUNMAP);
1572         }
1573         return err != 0;
1574 }
1575
1576 static bool can_do_reliable_discards(struct drbd_device *device)
1577 {
1578         struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1579         struct disk_conf *dc;
1580         bool can_do;
1581
1582         if (!blk_queue_discard(q))
1583                 return false;
1584
1585         rcu_read_lock();
1586         dc = rcu_dereference(device->ldev->disk_conf);
1587         can_do = dc->discard_zeroes_if_aligned;
1588         rcu_read_unlock();
1589         return can_do;
1590 }
1591
1592 static void drbd_issue_peer_discard_or_zero_out(struct drbd_device *device, struct drbd_peer_request *peer_req)
1593 {
1594         /* If the backend cannot discard, or does not guarantee
1595          * read-back zeroes in discarded ranges, we fall back to
1596          * zero-out.  Unless configuration specifically requested
1597          * otherwise. */
1598         if (!can_do_reliable_discards(device))
1599                 peer_req->flags |= EE_ZEROOUT;
1600
1601         if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1602             peer_req->i.size >> 9, peer_req->flags & (EE_ZEROOUT|EE_TRIM)))
1603                 peer_req->flags |= EE_WAS_ERROR;
1604         drbd_endio_write_sec_final(peer_req);
1605 }
1606
1607 static void drbd_issue_peer_wsame(struct drbd_device *device,
1608                                   struct drbd_peer_request *peer_req)
1609 {
1610         struct block_device *bdev = device->ldev->backing_bdev;
1611         sector_t s = peer_req->i.sector;
1612         sector_t nr = peer_req->i.size >> 9;
1613         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1614                 peer_req->flags |= EE_WAS_ERROR;
1615         drbd_endio_write_sec_final(peer_req);
1616 }
1617
1618
1619 /**
1620  * drbd_submit_peer_request()
1621  * @device:     DRBD device.
1622  * @peer_req:   peer request
1623  * @rw:         flag field, see bio->bi_opf
1624  *
1625  * May spread the pages to multiple bios,
1626  * depending on bio_add_page restrictions.
1627  *
1628  * Returns 0 if all bios have been submitted,
1629  * -ENOMEM if we could not allocate enough bios,
1630  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1631  *  single page to an empty bio (which should never happen and likely indicates
1632  *  that the lower level IO stack is in some way broken). This has been observed
1633  *  on certain Xen deployments.
1634  */
1635 /* TODO allocate from our own bio_set. */
1636 int drbd_submit_peer_request(struct drbd_device *device,
1637                              struct drbd_peer_request *peer_req,
1638                              const unsigned op, const unsigned op_flags,
1639                              const int fault_type)
1640 {
1641         struct bio *bios = NULL;
1642         struct bio *bio;
1643         struct page *page = peer_req->pages;
1644         sector_t sector = peer_req->i.sector;
1645         unsigned data_size = peer_req->i.size;
1646         unsigned n_bios = 0;
1647         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1648         int err = -ENOMEM;
1649
1650         /* TRIM/DISCARD: for now, always use the helper function
1651          * blkdev_issue_zeroout(..., discard=true).
1652          * It's synchronous, but it does the right thing wrt. bio splitting.
1653          * Correctness first, performance later.  Next step is to code an
1654          * asynchronous variant of the same.
1655          */
1656         if (peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) {
1657                 /* wait for all pending IO completions, before we start
1658                  * zeroing things out. */
1659                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1660                 /* add it to the active list now,
1661                  * so we can find it to present it in debugfs */
1662                 peer_req->submit_jif = jiffies;
1663                 peer_req->flags |= EE_SUBMITTED;
1664
1665                 /* If this was a resync request from receive_rs_deallocated(),
1666                  * it is already on the sync_ee list */
1667                 if (list_empty(&peer_req->w.list)) {
1668                         spin_lock_irq(&device->resource->req_lock);
1669                         list_add_tail(&peer_req->w.list, &device->active_ee);
1670                         spin_unlock_irq(&device->resource->req_lock);
1671                 }
1672
1673                 if (peer_req->flags & (EE_TRIM|EE_ZEROOUT))
1674                         drbd_issue_peer_discard_or_zero_out(device, peer_req);
1675                 else /* EE_WRITE_SAME */
1676                         drbd_issue_peer_wsame(device, peer_req);
1677                 return 0;
1678         }
1679
1680         /* In most cases, we will only need one bio.  But in case the lower
1681          * level restrictions happen to be different at this offset on this
1682          * side than those of the sending peer, we may need to submit the
1683          * request in more than one bio.
1684          *
1685          * Plain bio_alloc is good enough here, this is no DRBD internally
1686          * generated bio, but a bio allocated on behalf of the peer.
1687          */
1688 next_bio:
1689         bio = bio_alloc(GFP_NOIO, nr_pages);
1690         if (!bio) {
1691                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1692                 goto fail;
1693         }
1694         /* > peer_req->i.sector, unless this is the first bio */
1695         bio->bi_iter.bi_sector = sector;
1696         bio_set_dev(bio, device->ldev->backing_bdev);
1697         bio_set_op_attrs(bio, op, op_flags);
1698         bio->bi_private = peer_req;
1699         bio->bi_end_io = drbd_peer_request_endio;
1700
1701         bio->bi_next = bios;
1702         bios = bio;
1703         ++n_bios;
1704
1705         page_chain_for_each(page) {
1706                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1707                 if (!bio_add_page(bio, page, len, 0))
1708                         goto next_bio;
1709                 data_size -= len;
1710                 sector += len >> 9;
1711                 --nr_pages;
1712         }
1713         D_ASSERT(device, data_size == 0);
1714         D_ASSERT(device, page == NULL);
1715
1716         atomic_set(&peer_req->pending_bios, n_bios);
1717         /* for debugfs: update timestamp, mark as submitted */
1718         peer_req->submit_jif = jiffies;
1719         peer_req->flags |= EE_SUBMITTED;
1720         do {
1721                 bio = bios;
1722                 bios = bios->bi_next;
1723                 bio->bi_next = NULL;
1724
1725                 drbd_generic_make_request(device, fault_type, bio);
1726         } while (bios);
1727         return 0;
1728
1729 fail:
1730         while (bios) {
1731                 bio = bios;
1732                 bios = bios->bi_next;
1733                 bio_put(bio);
1734         }
1735         return err;
1736 }
1737
1738 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1739                                              struct drbd_peer_request *peer_req)
1740 {
1741         struct drbd_interval *i = &peer_req->i;
1742
1743         drbd_remove_interval(&device->write_requests, i);
1744         drbd_clear_interval(i);
1745
1746         /* Wake up any processes waiting for this peer request to complete.  */
1747         if (i->waiting)
1748                 wake_up(&device->misc_wait);
1749 }
1750
1751 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1752 {
1753         struct drbd_peer_device *peer_device;
1754         int vnr;
1755
1756         rcu_read_lock();
1757         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1758                 struct drbd_device *device = peer_device->device;
1759
1760                 kref_get(&device->kref);
1761                 rcu_read_unlock();
1762                 drbd_wait_ee_list_empty(device, &device->active_ee);
1763                 kref_put(&device->kref, drbd_destroy_device);
1764                 rcu_read_lock();
1765         }
1766         rcu_read_unlock();
1767 }
1768
1769 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1770 {
1771         int rv;
1772         struct p_barrier *p = pi->data;
1773         struct drbd_epoch *epoch;
1774
1775         /* FIXME these are unacked on connection,
1776          * not a specific (peer)device.
1777          */
1778         connection->current_epoch->barrier_nr = p->barrier;
1779         connection->current_epoch->connection = connection;
1780         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1781
1782         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1783          * the activity log, which means it would not be resynced in case the
1784          * R_PRIMARY crashes now.
1785          * Therefore we must send the barrier_ack after the barrier request was
1786          * completed. */
1787         switch (connection->resource->write_ordering) {
1788         case WO_NONE:
1789                 if (rv == FE_RECYCLED)
1790                         return 0;
1791
1792                 /* receiver context, in the writeout path of the other node.
1793                  * avoid potential distributed deadlock */
1794                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1795                 if (epoch)
1796                         break;
1797                 else
1798                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1799                         /* Fall through */
1800
1801         case WO_BDEV_FLUSH:
1802         case WO_DRAIN_IO:
1803                 conn_wait_active_ee_empty(connection);
1804                 drbd_flush(connection);
1805
1806                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1807                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1808                         if (epoch)
1809                                 break;
1810                 }
1811
1812                 return 0;
1813         default:
1814                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1815                          connection->resource->write_ordering);
1816                 return -EIO;
1817         }
1818
1819         epoch->flags = 0;
1820         atomic_set(&epoch->epoch_size, 0);
1821         atomic_set(&epoch->active, 0);
1822
1823         spin_lock(&connection->epoch_lock);
1824         if (atomic_read(&connection->current_epoch->epoch_size)) {
1825                 list_add(&epoch->list, &connection->current_epoch->list);
1826                 connection->current_epoch = epoch;
1827                 connection->epochs++;
1828         } else {
1829                 /* The current_epoch got recycled while we allocated this one... */
1830                 kfree(epoch);
1831         }
1832         spin_unlock(&connection->epoch_lock);
1833
1834         return 0;
1835 }
1836
1837 /* quick wrapper in case payload size != request_size (write same) */
1838 static void drbd_csum_ee_size(struct crypto_shash *h,
1839                               struct drbd_peer_request *r, void *d,
1840                               unsigned int payload_size)
1841 {
1842         unsigned int tmp = r->i.size;
1843         r->i.size = payload_size;
1844         drbd_csum_ee(h, r, d);
1845         r->i.size = tmp;
1846 }
1847
1848 /* used from receive_RSDataReply (recv_resync_read)
1849  * and from receive_Data.
1850  * data_size: actual payload ("data in")
1851  *      for normal writes that is bi_size.
1852  *      for discards, that is zero.
1853  *      for write same, it is logical_block_size.
1854  * both trim and write same have the bi_size ("data len to be affected")
1855  * as extra argument in the packet header.
1856  */
1857 static struct drbd_peer_request *
1858 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1859               struct packet_info *pi) __must_hold(local)
1860 {
1861         struct drbd_device *device = peer_device->device;
1862         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1863         struct drbd_peer_request *peer_req;
1864         struct page *page;
1865         int digest_size, err;
1866         unsigned int data_size = pi->size, ds;
1867         void *dig_in = peer_device->connection->int_dig_in;
1868         void *dig_vv = peer_device->connection->int_dig_vv;
1869         unsigned long *data;
1870         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1871         struct p_trim *zeroes = (pi->cmd == P_ZEROES) ? pi->data : NULL;
1872         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1873
1874         digest_size = 0;
1875         if (!trim && peer_device->connection->peer_integrity_tfm) {
1876                 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
1877                 /*
1878                  * FIXME: Receive the incoming digest into the receive buffer
1879                  *        here, together with its struct p_data?
1880                  */
1881                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1882                 if (err)
1883                         return NULL;
1884                 data_size -= digest_size;
1885         }
1886
1887         /* assume request_size == data_size, but special case trim and wsame. */
1888         ds = data_size;
1889         if (trim) {
1890                 if (!expect(data_size == 0))
1891                         return NULL;
1892                 ds = be32_to_cpu(trim->size);
1893         } else if (zeroes) {
1894                 if (!expect(data_size == 0))
1895                         return NULL;
1896                 ds = be32_to_cpu(zeroes->size);
1897         } else if (wsame) {
1898                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1899                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1900                                 data_size, queue_logical_block_size(device->rq_queue));
1901                         return NULL;
1902                 }
1903                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1904                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1905                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1906                         return NULL;
1907                 }
1908                 ds = be32_to_cpu(wsame->size);
1909         }
1910
1911         if (!expect(IS_ALIGNED(ds, 512)))
1912                 return NULL;
1913         if (trim || wsame || zeroes) {
1914                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1915                         return NULL;
1916         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1917                 return NULL;
1918
1919         /* even though we trust out peer,
1920          * we sometimes have to double check. */
1921         if (sector + (ds>>9) > capacity) {
1922                 drbd_err(device, "request from peer beyond end of local disk: "
1923                         "capacity: %llus < sector: %llus + size: %u\n",
1924                         (unsigned long long)capacity,
1925                         (unsigned long long)sector, ds);
1926                 return NULL;
1927         }
1928
1929         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1930          * "criss-cross" setup, that might cause write-out on some other DRBD,
1931          * which in turn might block on the other node at this very place.  */
1932         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1933         if (!peer_req)
1934                 return NULL;
1935
1936         peer_req->flags |= EE_WRITE;
1937         if (trim) {
1938                 peer_req->flags |= EE_TRIM;
1939                 return peer_req;
1940         }
1941         if (zeroes) {
1942                 peer_req->flags |= EE_ZEROOUT;
1943                 return peer_req;
1944         }
1945         if (wsame)
1946                 peer_req->flags |= EE_WRITE_SAME;
1947
1948         /* receive payload size bytes into page chain */
1949         ds = data_size;
1950         page = peer_req->pages;
1951         page_chain_for_each(page) {
1952                 unsigned len = min_t(int, ds, PAGE_SIZE);
1953                 data = kmap(page);
1954                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1955                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1956                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1957                         data[0] = data[0] ^ (unsigned long)-1;
1958                 }
1959                 kunmap(page);
1960                 if (err) {
1961                         drbd_free_peer_req(device, peer_req);
1962                         return NULL;
1963                 }
1964                 ds -= len;
1965         }
1966
1967         if (digest_size) {
1968                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1969                 if (memcmp(dig_in, dig_vv, digest_size)) {
1970                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1971                                 (unsigned long long)sector, data_size);
1972                         drbd_free_peer_req(device, peer_req);
1973                         return NULL;
1974                 }
1975         }
1976         device->recv_cnt += data_size >> 9;
1977         return peer_req;
1978 }
1979
1980 /* drbd_drain_block() just takes a data block
1981  * out of the socket input buffer, and discards it.
1982  */
1983 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1984 {
1985         struct page *page;
1986         int err = 0;
1987         void *data;
1988
1989         if (!data_size)
1990                 return 0;
1991
1992         page = drbd_alloc_pages(peer_device, 1, 1);
1993
1994         data = kmap(page);
1995         while (data_size) {
1996                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1997
1998                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1999                 if (err)
2000                         break;
2001                 data_size -= len;
2002         }
2003         kunmap(page);
2004         drbd_free_pages(peer_device->device, page, 0);
2005         return err;
2006 }
2007
2008 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
2009                            sector_t sector, int data_size)
2010 {
2011         struct bio_vec bvec;
2012         struct bvec_iter iter;
2013         struct bio *bio;
2014         int digest_size, err, expect;
2015         void *dig_in = peer_device->connection->int_dig_in;
2016         void *dig_vv = peer_device->connection->int_dig_vv;
2017
2018         digest_size = 0;
2019         if (peer_device->connection->peer_integrity_tfm) {
2020                 digest_size = crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
2021                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
2022                 if (err)
2023                         return err;
2024                 data_size -= digest_size;
2025         }
2026
2027         /* optimistically update recv_cnt.  if receiving fails below,
2028          * we disconnect anyways, and counters will be reset. */
2029         peer_device->device->recv_cnt += data_size>>9;
2030
2031         bio = req->master_bio;
2032         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
2033
2034         bio_for_each_segment(bvec, bio, iter) {
2035                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
2036                 expect = min_t(int, data_size, bvec.bv_len);
2037                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
2038                 kunmap(bvec.bv_page);
2039                 if (err)
2040                         return err;
2041                 data_size -= expect;
2042         }
2043
2044         if (digest_size) {
2045                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
2046                 if (memcmp(dig_in, dig_vv, digest_size)) {
2047                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
2048                         return -EINVAL;
2049                 }
2050         }
2051
2052         D_ASSERT(peer_device->device, data_size == 0);
2053         return 0;
2054 }
2055
2056 /*
2057  * e_end_resync_block() is called in ack_sender context via
2058  * drbd_finish_peer_reqs().
2059  */
2060 static int e_end_resync_block(struct drbd_work *w, int unused)
2061 {
2062         struct drbd_peer_request *peer_req =
2063                 container_of(w, struct drbd_peer_request, w);
2064         struct drbd_peer_device *peer_device = peer_req->peer_device;
2065         struct drbd_device *device = peer_device->device;
2066         sector_t sector = peer_req->i.sector;
2067         int err;
2068
2069         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2070
2071         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2072                 drbd_set_in_sync(device, sector, peer_req->i.size);
2073                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2074         } else {
2075                 /* Record failure to sync */
2076                 drbd_rs_failed_io(device, sector, peer_req->i.size);
2077
2078                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2079         }
2080         dec_unacked(device);
2081
2082         return err;
2083 }
2084
2085 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2086                             struct packet_info *pi) __releases(local)
2087 {
2088         struct drbd_device *device = peer_device->device;
2089         struct drbd_peer_request *peer_req;
2090
2091         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2092         if (!peer_req)
2093                 goto fail;
2094
2095         dec_rs_pending(device);
2096
2097         inc_unacked(device);
2098         /* corresponding dec_unacked() in e_end_resync_block()
2099          * respective _drbd_clear_done_ee */
2100
2101         peer_req->w.cb = e_end_resync_block;
2102         peer_req->submit_jif = jiffies;
2103
2104         spin_lock_irq(&device->resource->req_lock);
2105         list_add_tail(&peer_req->w.list, &device->sync_ee);
2106         spin_unlock_irq(&device->resource->req_lock);
2107
2108         atomic_add(pi->size >> 9, &device->rs_sect_ev);
2109         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2110                                      DRBD_FAULT_RS_WR) == 0)
2111                 return 0;
2112
2113         /* don't care for the reason here */
2114         drbd_err(device, "submit failed, triggering re-connect\n");
2115         spin_lock_irq(&device->resource->req_lock);
2116         list_del(&peer_req->w.list);
2117         spin_unlock_irq(&device->resource->req_lock);
2118
2119         drbd_free_peer_req(device, peer_req);
2120 fail:
2121         put_ldev(device);
2122         return -EIO;
2123 }
2124
2125 static struct drbd_request *
2126 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2127              sector_t sector, bool missing_ok, const char *func)
2128 {
2129         struct drbd_request *req;
2130
2131         /* Request object according to our peer */
2132         req = (struct drbd_request *)(unsigned long)id;
2133         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2134                 return req;
2135         if (!missing_ok) {
2136                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2137                         (unsigned long)id, (unsigned long long)sector);
2138         }
2139         return NULL;
2140 }
2141
2142 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2143 {
2144         struct drbd_peer_device *peer_device;
2145         struct drbd_device *device;
2146         struct drbd_request *req;
2147         sector_t sector;
2148         int err;
2149         struct p_data *p = pi->data;
2150
2151         peer_device = conn_peer_device(connection, pi->vnr);
2152         if (!peer_device)
2153                 return -EIO;
2154         device = peer_device->device;
2155
2156         sector = be64_to_cpu(p->sector);
2157
2158         spin_lock_irq(&device->resource->req_lock);
2159         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2160         spin_unlock_irq(&device->resource->req_lock);
2161         if (unlikely(!req))
2162                 return -EIO;
2163
2164         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2165          * special casing it there for the various failure cases.
2166          * still no race with drbd_fail_pending_reads */
2167         err = recv_dless_read(peer_device, req, sector, pi->size);
2168         if (!err)
2169                 req_mod(req, DATA_RECEIVED);
2170         /* else: nothing. handled from drbd_disconnect...
2171          * I don't think we may complete this just yet
2172          * in case we are "on-disconnect: freeze" */
2173
2174         return err;
2175 }
2176
2177 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2178 {
2179         struct drbd_peer_device *peer_device;
2180         struct drbd_device *device;
2181         sector_t sector;
2182         int err;
2183         struct p_data *p = pi->data;
2184
2185         peer_device = conn_peer_device(connection, pi->vnr);
2186         if (!peer_device)
2187                 return -EIO;
2188         device = peer_device->device;
2189
2190         sector = be64_to_cpu(p->sector);
2191         D_ASSERT(device, p->block_id == ID_SYNCER);
2192
2193         if (get_ldev(device)) {
2194                 /* data is submitted to disk within recv_resync_read.
2195                  * corresponding put_ldev done below on error,
2196                  * or in drbd_peer_request_endio. */
2197                 err = recv_resync_read(peer_device, sector, pi);
2198         } else {
2199                 if (__ratelimit(&drbd_ratelimit_state))
2200                         drbd_err(device, "Can not write resync data to local disk.\n");
2201
2202                 err = drbd_drain_block(peer_device, pi->size);
2203
2204                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2205         }
2206
2207         atomic_add(pi->size >> 9, &device->rs_sect_in);
2208
2209         return err;
2210 }
2211
2212 static void restart_conflicting_writes(struct drbd_device *device,
2213                                        sector_t sector, int size)
2214 {
2215         struct drbd_interval *i;
2216         struct drbd_request *req;
2217
2218         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2219                 if (!i->local)
2220                         continue;
2221                 req = container_of(i, struct drbd_request, i);
2222                 if (req->rq_state & RQ_LOCAL_PENDING ||
2223                     !(req->rq_state & RQ_POSTPONED))
2224                         continue;
2225                 /* as it is RQ_POSTPONED, this will cause it to
2226                  * be queued on the retry workqueue. */
2227                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2228         }
2229 }
2230
2231 /*
2232  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2233  */
2234 static int e_end_block(struct drbd_work *w, int cancel)
2235 {
2236         struct drbd_peer_request *peer_req =
2237                 container_of(w, struct drbd_peer_request, w);
2238         struct drbd_peer_device *peer_device = peer_req->peer_device;
2239         struct drbd_device *device = peer_device->device;
2240         sector_t sector = peer_req->i.sector;
2241         int err = 0, pcmd;
2242
2243         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2244                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2245                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2246                                 device->state.conn <= C_PAUSED_SYNC_T &&
2247                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2248                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2249                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2250                         if (pcmd == P_RS_WRITE_ACK)
2251                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2252                 } else {
2253                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2254                         /* we expect it to be marked out of sync anyways...
2255                          * maybe assert this?  */
2256                 }
2257                 dec_unacked(device);
2258         }
2259
2260         /* we delete from the conflict detection hash _after_ we sent out the
2261          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2262         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2263                 spin_lock_irq(&device->resource->req_lock);
2264                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2265                 drbd_remove_epoch_entry_interval(device, peer_req);
2266                 if (peer_req->flags & EE_RESTART_REQUESTS)
2267                         restart_conflicting_writes(device, sector, peer_req->i.size);
2268                 spin_unlock_irq(&device->resource->req_lock);
2269         } else
2270                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2271
2272         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2273
2274         return err;
2275 }
2276
2277 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2278 {
2279         struct drbd_peer_request *peer_req =
2280                 container_of(w, struct drbd_peer_request, w);
2281         struct drbd_peer_device *peer_device = peer_req->peer_device;
2282         int err;
2283
2284         err = drbd_send_ack(peer_device, ack, peer_req);
2285         dec_unacked(peer_device->device);
2286
2287         return err;
2288 }
2289
2290 static int e_send_superseded(struct drbd_work *w, int unused)
2291 {
2292         return e_send_ack(w, P_SUPERSEDED);
2293 }
2294
2295 static int e_send_retry_write(struct drbd_work *w, int unused)
2296 {
2297         struct drbd_peer_request *peer_req =
2298                 container_of(w, struct drbd_peer_request, w);
2299         struct drbd_connection *connection = peer_req->peer_device->connection;
2300
2301         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2302                              P_RETRY_WRITE : P_SUPERSEDED);
2303 }
2304
2305 static bool seq_greater(u32 a, u32 b)
2306 {
2307         /*
2308          * We assume 32-bit wrap-around here.
2309          * For 24-bit wrap-around, we would have to shift:
2310          *  a <<= 8; b <<= 8;
2311          */
2312         return (s32)a - (s32)b > 0;
2313 }
2314
2315 static u32 seq_max(u32 a, u32 b)
2316 {
2317         return seq_greater(a, b) ? a : b;
2318 }
2319
2320 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2321 {
2322         struct drbd_device *device = peer_device->device;
2323         unsigned int newest_peer_seq;
2324
2325         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2326                 spin_lock(&device->peer_seq_lock);
2327                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2328                 device->peer_seq = newest_peer_seq;
2329                 spin_unlock(&device->peer_seq_lock);
2330                 /* wake up only if we actually changed device->peer_seq */
2331                 if (peer_seq == newest_peer_seq)
2332                         wake_up(&device->seq_wait);
2333         }
2334 }
2335
2336 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2337 {
2338         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2339 }
2340
2341 /* maybe change sync_ee into interval trees as well? */
2342 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2343 {
2344         struct drbd_peer_request *rs_req;
2345         bool rv = false;
2346
2347         spin_lock_irq(&device->resource->req_lock);
2348         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2349                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2350                              rs_req->i.sector, rs_req->i.size)) {
2351                         rv = true;
2352                         break;
2353                 }
2354         }
2355         spin_unlock_irq(&device->resource->req_lock);
2356
2357         return rv;
2358 }
2359
2360 /* Called from receive_Data.
2361  * Synchronize packets on sock with packets on msock.
2362  *
2363  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2364  * packet traveling on msock, they are still processed in the order they have
2365  * been sent.
2366  *
2367  * Note: we don't care for Ack packets overtaking P_DATA packets.
2368  *
2369  * In case packet_seq is larger than device->peer_seq number, there are
2370  * outstanding packets on the msock. We wait for them to arrive.
2371  * In case we are the logically next packet, we update device->peer_seq
2372  * ourselves. Correctly handles 32bit wrap around.
2373  *
2374  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2375  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2376  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2377  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2378  *
2379  * returns 0 if we may process the packet,
2380  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2381 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2382 {
2383         struct drbd_device *device = peer_device->device;
2384         DEFINE_WAIT(wait);
2385         long timeout;
2386         int ret = 0, tp;
2387
2388         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2389                 return 0;
2390
2391         spin_lock(&device->peer_seq_lock);
2392         for (;;) {
2393                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2394                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2395                         break;
2396                 }
2397
2398                 if (signal_pending(current)) {
2399                         ret = -ERESTARTSYS;
2400                         break;
2401                 }
2402
2403                 rcu_read_lock();
2404                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2405                 rcu_read_unlock();
2406
2407                 if (!tp)
2408                         break;
2409
2410                 /* Only need to wait if two_primaries is enabled */
2411                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2412                 spin_unlock(&device->peer_seq_lock);
2413                 rcu_read_lock();
2414                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2415                 rcu_read_unlock();
2416                 timeout = schedule_timeout(timeout);
2417                 spin_lock(&device->peer_seq_lock);
2418                 if (!timeout) {
2419                         ret = -ETIMEDOUT;
2420                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2421                         break;
2422                 }
2423         }
2424         spin_unlock(&device->peer_seq_lock);
2425         finish_wait(&device->seq_wait, &wait);
2426         return ret;
2427 }
2428
2429 /* see also bio_flags_to_wire()
2430  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2431  * flags and back. We may replicate to other kernel versions. */
2432 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2433 {
2434         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2435                 (dpf & DP_FUA ? REQ_FUA : 0) |
2436                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2437 }
2438
2439 static unsigned long wire_flags_to_bio_op(u32 dpf)
2440 {
2441         if (dpf & DP_ZEROES)
2442                 return REQ_OP_WRITE_ZEROES;
2443         if (dpf & DP_DISCARD)
2444                 return REQ_OP_DISCARD;
2445         if (dpf & DP_WSAME)
2446                 return REQ_OP_WRITE_SAME;
2447         else
2448                 return REQ_OP_WRITE;
2449 }
2450
2451 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2452                                     unsigned int size)
2453 {
2454         struct drbd_interval *i;
2455
2456     repeat:
2457         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2458                 struct drbd_request *req;
2459                 struct bio_and_error m;
2460
2461                 if (!i->local)
2462                         continue;
2463                 req = container_of(i, struct drbd_request, i);
2464                 if (!(req->rq_state & RQ_POSTPONED))
2465                         continue;
2466                 req->rq_state &= ~RQ_POSTPONED;
2467                 __req_mod(req, NEG_ACKED, &m);
2468                 spin_unlock_irq(&device->resource->req_lock);
2469                 if (m.bio)
2470                         complete_master_bio(device, &m);
2471                 spin_lock_irq(&device->resource->req_lock);
2472                 goto repeat;
2473         }
2474 }
2475
2476 static int handle_write_conflicts(struct drbd_device *device,
2477                                   struct drbd_peer_request *peer_req)
2478 {
2479         struct drbd_connection *connection = peer_req->peer_device->connection;
2480         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2481         sector_t sector = peer_req->i.sector;
2482         const unsigned int size = peer_req->i.size;
2483         struct drbd_interval *i;
2484         bool equal;
2485         int err;
2486
2487         /*
2488          * Inserting the peer request into the write_requests tree will prevent
2489          * new conflicting local requests from being added.
2490          */
2491         drbd_insert_interval(&device->write_requests, &peer_req->i);
2492
2493     repeat:
2494         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2495                 if (i == &peer_req->i)
2496                         continue;
2497                 if (i->completed)
2498                         continue;
2499
2500                 if (!i->local) {
2501                         /*
2502                          * Our peer has sent a conflicting remote request; this
2503                          * should not happen in a two-node setup.  Wait for the
2504                          * earlier peer request to complete.
2505                          */
2506                         err = drbd_wait_misc(device, i);
2507                         if (err)
2508                                 goto out;
2509                         goto repeat;
2510                 }
2511
2512                 equal = i->sector == sector && i->size == size;
2513                 if (resolve_conflicts) {
2514                         /*
2515                          * If the peer request is fully contained within the
2516                          * overlapping request, it can be considered overwritten
2517                          * and thus superseded; otherwise, it will be retried
2518                          * once all overlapping requests have completed.
2519                          */
2520                         bool superseded = i->sector <= sector && i->sector +
2521                                        (i->size >> 9) >= sector + (size >> 9);
2522
2523                         if (!equal)
2524                                 drbd_alert(device, "Concurrent writes detected: "
2525                                                "local=%llus +%u, remote=%llus +%u, "
2526                                                "assuming %s came first\n",
2527                                           (unsigned long long)i->sector, i->size,
2528                                           (unsigned long long)sector, size,
2529                                           superseded ? "local" : "remote");
2530
2531                         peer_req->w.cb = superseded ? e_send_superseded :
2532                                                    e_send_retry_write;
2533                         list_add_tail(&peer_req->w.list, &device->done_ee);
2534                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2535
2536                         err = -ENOENT;
2537                         goto out;
2538                 } else {
2539                         struct drbd_request *req =
2540                                 container_of(i, struct drbd_request, i);
2541
2542                         if (!equal)
2543                                 drbd_alert(device, "Concurrent writes detected: "
2544                                                "local=%llus +%u, remote=%llus +%u\n",
2545                                           (unsigned long long)i->sector, i->size,
2546                                           (unsigned long long)sector, size);
2547
2548                         if (req->rq_state & RQ_LOCAL_PENDING ||
2549                             !(req->rq_state & RQ_POSTPONED)) {
2550                                 /*
2551                                  * Wait for the node with the discard flag to
2552                                  * decide if this request has been superseded
2553                                  * or needs to be retried.
2554                                  * Requests that have been superseded will
2555                                  * disappear from the write_requests tree.
2556                                  *
2557                                  * In addition, wait for the conflicting
2558                                  * request to finish locally before submitting
2559                                  * the conflicting peer request.
2560                                  */
2561                                 err = drbd_wait_misc(device, &req->i);
2562                                 if (err) {
2563                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2564                                         fail_postponed_requests(device, sector, size);
2565                                         goto out;
2566                                 }
2567                                 goto repeat;
2568                         }
2569                         /*
2570                          * Remember to restart the conflicting requests after
2571                          * the new peer request has completed.
2572                          */
2573                         peer_req->flags |= EE_RESTART_REQUESTS;
2574                 }
2575         }
2576         err = 0;
2577
2578     out:
2579         if (err)
2580                 drbd_remove_epoch_entry_interval(device, peer_req);
2581         return err;
2582 }
2583
2584 /* mirrored write */
2585 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2586 {
2587         struct drbd_peer_device *peer_device;
2588         struct drbd_device *device;
2589         struct net_conf *nc;
2590         sector_t sector;
2591         struct drbd_peer_request *peer_req;
2592         struct p_data *p = pi->data;
2593         u32 peer_seq = be32_to_cpu(p->seq_num);
2594         int op, op_flags;
2595         u32 dp_flags;
2596         int err, tp;
2597
2598         peer_device = conn_peer_device(connection, pi->vnr);
2599         if (!peer_device)
2600                 return -EIO;
2601         device = peer_device->device;
2602
2603         if (!get_ldev(device)) {
2604                 int err2;
2605
2606                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2607                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2608                 atomic_inc(&connection->current_epoch->epoch_size);
2609                 err2 = drbd_drain_block(peer_device, pi->size);
2610                 if (!err)
2611                         err = err2;
2612                 return err;
2613         }
2614
2615         /*
2616          * Corresponding put_ldev done either below (on various errors), or in
2617          * drbd_peer_request_endio, if we successfully submit the data at the
2618          * end of this function.
2619          */
2620
2621         sector = be64_to_cpu(p->sector);
2622         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2623         if (!peer_req) {
2624                 put_ldev(device);
2625                 return -EIO;
2626         }
2627
2628         peer_req->w.cb = e_end_block;
2629         peer_req->submit_jif = jiffies;
2630         peer_req->flags |= EE_APPLICATION;
2631
2632         dp_flags = be32_to_cpu(p->dp_flags);
2633         op = wire_flags_to_bio_op(dp_flags);
2634         op_flags = wire_flags_to_bio_flags(dp_flags);
2635         if (pi->cmd == P_TRIM) {
2636                 D_ASSERT(peer_device, peer_req->i.size > 0);
2637                 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2638                 D_ASSERT(peer_device, peer_req->pages == NULL);
2639                 /* need to play safe: an older DRBD sender
2640                  * may mean zero-out while sending P_TRIM. */
2641                 if (0 == (connection->agreed_features & DRBD_FF_WZEROES))
2642                         peer_req->flags |= EE_ZEROOUT;
2643         } else if (pi->cmd == P_ZEROES) {
2644                 D_ASSERT(peer_device, peer_req->i.size > 0);
2645                 D_ASSERT(peer_device, op == REQ_OP_WRITE_ZEROES);
2646                 D_ASSERT(peer_device, peer_req->pages == NULL);
2647                 /* Do (not) pass down BLKDEV_ZERO_NOUNMAP? */
2648                 if (dp_flags & DP_DISCARD)
2649                         peer_req->flags |= EE_TRIM;
2650         } else if (peer_req->pages == NULL) {
2651                 D_ASSERT(device, peer_req->i.size == 0);
2652                 D_ASSERT(device, dp_flags & DP_FLUSH);
2653         }
2654
2655         if (dp_flags & DP_MAY_SET_IN_SYNC)
2656                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2657
2658         spin_lock(&connection->epoch_lock);
2659         peer_req->epoch = connection->current_epoch;
2660         atomic_inc(&peer_req->epoch->epoch_size);
2661         atomic_inc(&peer_req->epoch->active);
2662         spin_unlock(&connection->epoch_lock);
2663
2664         rcu_read_lock();
2665         nc = rcu_dereference(peer_device->connection->net_conf);
2666         tp = nc->two_primaries;
2667         if (peer_device->connection->agreed_pro_version < 100) {
2668                 switch (nc->wire_protocol) {
2669                 case DRBD_PROT_C:
2670                         dp_flags |= DP_SEND_WRITE_ACK;
2671                         break;
2672                 case DRBD_PROT_B:
2673                         dp_flags |= DP_SEND_RECEIVE_ACK;
2674                         break;
2675                 }
2676         }
2677         rcu_read_unlock();
2678
2679         if (dp_flags & DP_SEND_WRITE_ACK) {
2680                 peer_req->flags |= EE_SEND_WRITE_ACK;
2681                 inc_unacked(device);
2682                 /* corresponding dec_unacked() in e_end_block()
2683                  * respective _drbd_clear_done_ee */
2684         }
2685
2686         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2687                 /* I really don't like it that the receiver thread
2688                  * sends on the msock, but anyways */
2689                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2690         }
2691
2692         if (tp) {
2693                 /* two primaries implies protocol C */
2694                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2695                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2696                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2697                 if (err)
2698                         goto out_interrupted;
2699                 spin_lock_irq(&device->resource->req_lock);
2700                 err = handle_write_conflicts(device, peer_req);
2701                 if (err) {
2702                         spin_unlock_irq(&device->resource->req_lock);
2703                         if (err == -ENOENT) {
2704                                 put_ldev(device);
2705                                 return 0;
2706                         }
2707                         goto out_interrupted;
2708                 }
2709         } else {
2710                 update_peer_seq(peer_device, peer_seq);
2711                 spin_lock_irq(&device->resource->req_lock);
2712         }
2713         /* TRIM and WRITE_SAME are processed synchronously,
2714          * we wait for all pending requests, respectively wait for
2715          * active_ee to become empty in drbd_submit_peer_request();
2716          * better not add ourselves here. */
2717         if ((peer_req->flags & (EE_TRIM|EE_WRITE_SAME|EE_ZEROOUT)) == 0)
2718                 list_add_tail(&peer_req->w.list, &device->active_ee);
2719         spin_unlock_irq(&device->resource->req_lock);
2720
2721         if (device->state.conn == C_SYNC_TARGET)
2722                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2723
2724         if (device->state.pdsk < D_INCONSISTENT) {
2725                 /* In case we have the only disk of the cluster, */
2726                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2727                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2728                 drbd_al_begin_io(device, &peer_req->i);
2729                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2730         }
2731
2732         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2733                                        DRBD_FAULT_DT_WR);
2734         if (!err)
2735                 return 0;
2736
2737         /* don't care for the reason here */
2738         drbd_err(device, "submit failed, triggering re-connect\n");
2739         spin_lock_irq(&device->resource->req_lock);
2740         list_del(&peer_req->w.list);
2741         drbd_remove_epoch_entry_interval(device, peer_req);
2742         spin_unlock_irq(&device->resource->req_lock);
2743         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2744                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2745                 drbd_al_complete_io(device, &peer_req->i);
2746         }
2747
2748 out_interrupted:
2749         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2750         put_ldev(device);
2751         drbd_free_peer_req(device, peer_req);
2752         return err;
2753 }
2754
2755 /* We may throttle resync, if the lower device seems to be busy,
2756  * and current sync rate is above c_min_rate.
2757  *
2758  * To decide whether or not the lower device is busy, we use a scheme similar
2759  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2760  * (more than 64 sectors) of activity we cannot account for with our own resync
2761  * activity, it obviously is "busy".
2762  *
2763  * The current sync rate used here uses only the most recent two step marks,
2764  * to have a short time average so we can react faster.
2765  */
2766 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2767                 bool throttle_if_app_is_waiting)
2768 {
2769         struct lc_element *tmp;
2770         bool throttle = drbd_rs_c_min_rate_throttle(device);
2771
2772         if (!throttle || throttle_if_app_is_waiting)
2773                 return throttle;
2774
2775         spin_lock_irq(&device->al_lock);
2776         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2777         if (tmp) {
2778                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2779                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2780                         throttle = false;
2781                 /* Do not slow down if app IO is already waiting for this extent,
2782                  * and our progress is necessary for application IO to complete. */
2783         }
2784         spin_unlock_irq(&device->al_lock);
2785
2786         return throttle;
2787 }
2788
2789 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2790 {
2791         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2792         unsigned long db, dt, dbdt;
2793         unsigned int c_min_rate;
2794         int curr_events;
2795
2796         rcu_read_lock();
2797         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2798         rcu_read_unlock();
2799
2800         /* feature disabled? */
2801         if (c_min_rate == 0)
2802                 return false;
2803
2804         curr_events = (int)part_stat_read_accum(&disk->part0, sectors) -
2805                         atomic_read(&device->rs_sect_ev);
2806
2807         if (atomic_read(&device->ap_actlog_cnt)
2808             || curr_events - device->rs_last_events > 64) {
2809                 unsigned long rs_left;
2810                 int i;
2811
2812                 device->rs_last_events = curr_events;
2813
2814                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2815                  * approx. */
2816                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2817
2818                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2819                         rs_left = device->ov_left;
2820                 else
2821                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2822
2823                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2824                 if (!dt)
2825                         dt++;
2826                 db = device->rs_mark_left[i] - rs_left;
2827                 dbdt = Bit2KB(db/dt);
2828
2829                 if (dbdt > c_min_rate)
2830                         return true;
2831         }
2832         return false;
2833 }
2834
2835 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2836 {
2837         struct drbd_peer_device *peer_device;
2838         struct drbd_device *device;
2839         sector_t sector;
2840         sector_t capacity;
2841         struct drbd_peer_request *peer_req;
2842         struct digest_info *di = NULL;
2843         int size, verb;
2844         unsigned int fault_type;
2845         struct p_block_req *p = pi->data;
2846
2847         peer_device = conn_peer_device(connection, pi->vnr);
2848         if (!peer_device)
2849                 return -EIO;
2850         device = peer_device->device;
2851         capacity = drbd_get_capacity(device->this_bdev);
2852
2853         sector = be64_to_cpu(p->sector);
2854         size   = be32_to_cpu(p->blksize);
2855
2856         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2857                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2858                                 (unsigned long long)sector, size);
2859                 return -EINVAL;
2860         }
2861         if (sector + (size>>9) > capacity) {
2862                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2863                                 (unsigned long long)sector, size);
2864                 return -EINVAL;
2865         }
2866
2867         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2868                 verb = 1;
2869                 switch (pi->cmd) {
2870                 case P_DATA_REQUEST:
2871                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2872                         break;
2873                 case P_RS_THIN_REQ:
2874                 case P_RS_DATA_REQUEST:
2875                 case P_CSUM_RS_REQUEST:
2876                 case P_OV_REQUEST:
2877                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2878                         break;
2879                 case P_OV_REPLY:
2880                         verb = 0;
2881                         dec_rs_pending(device);
2882                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2883                         break;
2884                 default:
2885                         BUG();
2886                 }
2887                 if (verb && __ratelimit(&drbd_ratelimit_state))
2888                         drbd_err(device, "Can not satisfy peer's read request, "
2889                             "no local data.\n");
2890
2891                 /* drain possibly payload */
2892                 return drbd_drain_block(peer_device, pi->size);
2893         }
2894
2895         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2896          * "criss-cross" setup, that might cause write-out on some other DRBD,
2897          * which in turn might block on the other node at this very place.  */
2898         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2899                         size, GFP_NOIO);
2900         if (!peer_req) {
2901                 put_ldev(device);
2902                 return -ENOMEM;
2903         }
2904
2905         switch (pi->cmd) {
2906         case P_DATA_REQUEST:
2907                 peer_req->w.cb = w_e_end_data_req;
2908                 fault_type = DRBD_FAULT_DT_RD;
2909                 /* application IO, don't drbd_rs_begin_io */
2910                 peer_req->flags |= EE_APPLICATION;
2911                 goto submit;
2912
2913         case P_RS_THIN_REQ:
2914                 /* If at some point in the future we have a smart way to
2915                    find out if this data block is completely deallocated,
2916                    then we would do something smarter here than reading
2917                    the block... */
2918                 peer_req->flags |= EE_RS_THIN_REQ;
2919                 /* fall through */
2920         case P_RS_DATA_REQUEST:
2921                 peer_req->w.cb = w_e_end_rsdata_req;
2922                 fault_type = DRBD_FAULT_RS_RD;
2923                 /* used in the sector offset progress display */
2924                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2925                 break;
2926
2927         case P_OV_REPLY:
2928         case P_CSUM_RS_REQUEST:
2929                 fault_type = DRBD_FAULT_RS_RD;
2930                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2931                 if (!di)
2932                         goto out_free_e;
2933
2934                 di->digest_size = pi->size;
2935                 di->digest = (((char *)di)+sizeof(struct digest_info));
2936
2937                 peer_req->digest = di;
2938                 peer_req->flags |= EE_HAS_DIGEST;
2939
2940                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2941                         goto out_free_e;
2942
2943                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2944                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2945                         peer_req->w.cb = w_e_end_csum_rs_req;
2946                         /* used in the sector offset progress display */
2947                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2948                         /* remember to report stats in drbd_resync_finished */
2949                         device->use_csums = true;
2950                 } else if (pi->cmd == P_OV_REPLY) {
2951                         /* track progress, we may need to throttle */
2952                         atomic_add(size >> 9, &device->rs_sect_in);
2953                         peer_req->w.cb = w_e_end_ov_reply;
2954                         dec_rs_pending(device);
2955                         /* drbd_rs_begin_io done when we sent this request,
2956                          * but accounting still needs to be done. */
2957                         goto submit_for_resync;
2958                 }
2959                 break;
2960
2961         case P_OV_REQUEST:
2962                 if (device->ov_start_sector == ~(sector_t)0 &&
2963                     peer_device->connection->agreed_pro_version >= 90) {
2964                         unsigned long now = jiffies;
2965                         int i;
2966                         device->ov_start_sector = sector;
2967                         device->ov_position = sector;
2968                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2969                         device->rs_total = device->ov_left;
2970                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2971                                 device->rs_mark_left[i] = device->ov_left;
2972                                 device->rs_mark_time[i] = now;
2973                         }
2974                         drbd_info(device, "Online Verify start sector: %llu\n",
2975                                         (unsigned long long)sector);
2976                 }
2977                 peer_req->w.cb = w_e_end_ov_req;
2978                 fault_type = DRBD_FAULT_RS_RD;
2979                 break;
2980
2981         default:
2982                 BUG();
2983         }
2984
2985         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2986          * wrt the receiver, but it is not as straightforward as it may seem.
2987          * Various places in the resync start and stop logic assume resync
2988          * requests are processed in order, requeuing this on the worker thread
2989          * introduces a bunch of new code for synchronization between threads.
2990          *
2991          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2992          * "forever", throttling after drbd_rs_begin_io will lock that extent
2993          * for application writes for the same time.  For now, just throttle
2994          * here, where the rest of the code expects the receiver to sleep for
2995          * a while, anyways.
2996          */
2997
2998         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2999          * this defers syncer requests for some time, before letting at least
3000          * on request through.  The resync controller on the receiving side
3001          * will adapt to the incoming rate accordingly.
3002          *
3003          * We cannot throttle here if remote is Primary/SyncTarget:
3004          * we would also throttle its application reads.
3005          * In that case, throttling is done on the SyncTarget only.
3006          */
3007
3008         /* Even though this may be a resync request, we do add to "read_ee";
3009          * "sync_ee" is only used for resync WRITEs.
3010          * Add to list early, so debugfs can find this request
3011          * even if we have to sleep below. */
3012         spin_lock_irq(&device->resource->req_lock);
3013         list_add_tail(&peer_req->w.list, &device->read_ee);
3014         spin_unlock_irq(&device->resource->req_lock);
3015
3016         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
3017         if (device->state.peer != R_PRIMARY
3018         && drbd_rs_should_slow_down(device, sector, false))
3019                 schedule_timeout_uninterruptible(HZ/10);
3020         update_receiver_timing_details(connection, drbd_rs_begin_io);
3021         if (drbd_rs_begin_io(device, sector))
3022                 goto out_free_e;
3023
3024 submit_for_resync:
3025         atomic_add(size >> 9, &device->rs_sect_ev);
3026
3027 submit:
3028         update_receiver_timing_details(connection, drbd_submit_peer_request);
3029         inc_unacked(device);
3030         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
3031                                      fault_type) == 0)
3032                 return 0;
3033
3034         /* don't care for the reason here */
3035         drbd_err(device, "submit failed, triggering re-connect\n");
3036
3037 out_free_e:
3038         spin_lock_irq(&device->resource->req_lock);
3039         list_del(&peer_req->w.list);
3040         spin_unlock_irq(&device->resource->req_lock);
3041         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
3042
3043         put_ldev(device);
3044         drbd_free_peer_req(device, peer_req);
3045         return -EIO;
3046 }
3047
3048 /**
3049  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
3050  */
3051 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
3052 {
3053         struct drbd_device *device = peer_device->device;
3054         int self, peer, rv = -100;
3055         unsigned long ch_self, ch_peer;
3056         enum drbd_after_sb_p after_sb_0p;
3057
3058         self = device->ldev->md.uuid[UI_BITMAP] & 1;
3059         peer = device->p_uuid[UI_BITMAP] & 1;
3060
3061         ch_peer = device->p_uuid[UI_SIZE];
3062         ch_self = device->comm_bm_set;
3063
3064         rcu_read_lock();
3065         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
3066         rcu_read_unlock();
3067         switch (after_sb_0p) {
3068         case ASB_CONSENSUS:
3069         case ASB_DISCARD_SECONDARY:
3070         case ASB_CALL_HELPER:
3071         case ASB_VIOLENTLY:
3072                 drbd_err(device, "Configuration error.\n");
3073                 break;
3074         case ASB_DISCONNECT:
3075                 break;
3076         case ASB_DISCARD_YOUNGER_PRI:
3077                 if (self == 0 && peer == 1) {
3078                         rv = -1;
3079                         break;
3080                 }
3081                 if (self == 1 && peer == 0) {
3082                         rv =  1;
3083                         break;
3084                 }
3085                 /* Else fall through - to one of the other strategies... */
3086         case ASB_DISCARD_OLDER_PRI:
3087                 if (self == 0 && peer == 1) {
3088                         rv = 1;
3089                         break;
3090                 }
3091                 if (self == 1 && peer == 0) {
3092                         rv = -1;
3093                         break;
3094                 }
3095                 /* Else fall through to one of the other strategies... */
3096                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3097                      "Using discard-least-changes instead\n");
3098                 /* fall through */
3099         case ASB_DISCARD_ZERO_CHG:
3100                 if (ch_peer == 0 && ch_self == 0) {
3101                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3102                                 ? -1 : 1;
3103                         break;
3104                 } else {
3105                         if (ch_peer == 0) { rv =  1; break; }
3106                         if (ch_self == 0) { rv = -1; break; }
3107                 }
3108                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3109                         break;
3110                 /* else, fall through */
3111         case ASB_DISCARD_LEAST_CHG:
3112                 if      (ch_self < ch_peer)
3113                         rv = -1;
3114                 else if (ch_self > ch_peer)
3115                         rv =  1;
3116                 else /* ( ch_self == ch_peer ) */
3117                      /* Well, then use something else. */
3118                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3119                                 ? -1 : 1;
3120                 break;
3121         case ASB_DISCARD_LOCAL:
3122                 rv = -1;
3123                 break;
3124         case ASB_DISCARD_REMOTE:
3125                 rv =  1;
3126         }
3127
3128         return rv;
3129 }
3130
3131 /**
3132  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3133  */
3134 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3135 {
3136         struct drbd_device *device = peer_device->device;
3137         int hg, rv = -100;
3138         enum drbd_after_sb_p after_sb_1p;
3139
3140         rcu_read_lock();
3141         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3142         rcu_read_unlock();
3143         switch (after_sb_1p) {
3144         case ASB_DISCARD_YOUNGER_PRI:
3145         case ASB_DISCARD_OLDER_PRI:
3146         case ASB_DISCARD_LEAST_CHG:
3147         case ASB_DISCARD_LOCAL:
3148         case ASB_DISCARD_REMOTE:
3149         case ASB_DISCARD_ZERO_CHG:
3150                 drbd_err(device, "Configuration error.\n");
3151                 break;
3152         case ASB_DISCONNECT:
3153                 break;
3154         case ASB_CONSENSUS:
3155                 hg = drbd_asb_recover_0p(peer_device);
3156                 if (hg == -1 && device->state.role == R_SECONDARY)
3157                         rv = hg;
3158                 if (hg == 1  && device->state.role == R_PRIMARY)
3159                         rv = hg;
3160                 break;
3161         case ASB_VIOLENTLY:
3162                 rv = drbd_asb_recover_0p(peer_device);
3163                 break;
3164         case ASB_DISCARD_SECONDARY:
3165                 return device->state.role == R_PRIMARY ? 1 : -1;
3166         case ASB_CALL_HELPER:
3167                 hg = drbd_asb_recover_0p(peer_device);
3168                 if (hg == -1 && device->state.role == R_PRIMARY) {
3169                         enum drbd_state_rv rv2;
3170
3171                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3172                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3173                           * we do not need to wait for the after state change work either. */
3174                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3175                         if (rv2 != SS_SUCCESS) {
3176                                 drbd_khelper(device, "pri-lost-after-sb");
3177                         } else {
3178                                 drbd_warn(device, "Successfully gave up primary role.\n");
3179                                 rv = hg;
3180                         }
3181                 } else
3182                         rv = hg;
3183         }
3184
3185         return rv;
3186 }
3187
3188 /**
3189  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3190  */
3191 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3192 {
3193         struct drbd_device *device = peer_device->device;
3194         int hg, rv = -100;
3195         enum drbd_after_sb_p after_sb_2p;
3196
3197         rcu_read_lock();
3198         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3199         rcu_read_unlock();
3200         switch (after_sb_2p) {
3201         case ASB_DISCARD_YOUNGER_PRI:
3202         case ASB_DISCARD_OLDER_PRI:
3203         case ASB_DISCARD_LEAST_CHG:
3204         case ASB_DISCARD_LOCAL:
3205         case ASB_DISCARD_REMOTE:
3206         case ASB_CONSENSUS:
3207         case ASB_DISCARD_SECONDARY:
3208         case ASB_DISCARD_ZERO_CHG:
3209                 drbd_err(device, "Configuration error.\n");
3210                 break;
3211         case ASB_VIOLENTLY:
3212                 rv = drbd_asb_recover_0p(peer_device);
3213                 break;
3214         case ASB_DISCONNECT:
3215                 break;
3216         case ASB_CALL_HELPER:
3217                 hg = drbd_asb_recover_0p(peer_device);
3218                 if (hg == -1) {
3219                         enum drbd_state_rv rv2;
3220
3221                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3222                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3223                           * we do not need to wait for the after state change work either. */
3224                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3225                         if (rv2 != SS_SUCCESS) {
3226                                 drbd_khelper(device, "pri-lost-after-sb");
3227                         } else {
3228                                 drbd_warn(device, "Successfully gave up primary role.\n");
3229                                 rv = hg;
3230                         }
3231                 } else
3232                         rv = hg;
3233         }
3234
3235         return rv;
3236 }
3237
3238 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3239                            u64 bits, u64 flags)
3240 {
3241         if (!uuid) {
3242                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3243                 return;
3244         }
3245         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3246              text,
3247              (unsigned long long)uuid[UI_CURRENT],
3248              (unsigned long long)uuid[UI_BITMAP],
3249              (unsigned long long)uuid[UI_HISTORY_START],
3250              (unsigned long long)uuid[UI_HISTORY_END],
3251              (unsigned long long)bits,
3252              (unsigned long long)flags);
3253 }
3254
3255 /*
3256   100   after split brain try auto recover
3257     2   C_SYNC_SOURCE set BitMap
3258     1   C_SYNC_SOURCE use BitMap
3259     0   no Sync
3260    -1   C_SYNC_TARGET use BitMap
3261    -2   C_SYNC_TARGET set BitMap
3262  -100   after split brain, disconnect
3263 -1000   unrelated data
3264 -1091   requires proto 91
3265 -1096   requires proto 96
3266  */
3267
3268 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3269 {
3270         struct drbd_peer_device *const peer_device = first_peer_device(device);
3271         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3272         u64 self, peer;
3273         int i, j;
3274
3275         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3276         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3277
3278         *rule_nr = 10;
3279         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3280                 return 0;
3281
3282         *rule_nr = 20;
3283         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3284              peer != UUID_JUST_CREATED)
3285                 return -2;
3286
3287         *rule_nr = 30;
3288         if (self != UUID_JUST_CREATED &&
3289             (peer == UUID_JUST_CREATED || peer == (u64)0))
3290                 return 2;
3291
3292         if (self == peer) {
3293                 int rct, dc; /* roles at crash time */
3294
3295                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3296
3297                         if (connection->agreed_pro_version < 91)
3298                                 return -1091;
3299
3300                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3301                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3302                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3303                                 drbd_uuid_move_history(device);
3304                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3305                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3306
3307                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3308                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3309                                 *rule_nr = 34;
3310                         } else {
3311                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3312                                 *rule_nr = 36;
3313                         }
3314
3315                         return 1;
3316                 }
3317
3318                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3319
3320                         if (connection->agreed_pro_version < 91)
3321                                 return -1091;
3322
3323                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3324                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3325                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3326
3327                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3328                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3329                                 device->p_uuid[UI_BITMAP] = 0UL;
3330
3331                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3332                                 *rule_nr = 35;
3333                         } else {
3334                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3335                                 *rule_nr = 37;
3336                         }
3337
3338                         return -1;
3339                 }
3340
3341                 /* Common power [off|failure] */
3342                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3343                         (device->p_uuid[UI_FLAGS] & 2);
3344                 /* lowest bit is set when we were primary,
3345                  * next bit (weight 2) is set when peer was primary */
3346                 *rule_nr = 40;
3347
3348                 /* Neither has the "crashed primary" flag set,
3349                  * only a replication link hickup. */
3350                 if (rct == 0)
3351                         return 0;
3352
3353                 /* Current UUID equal and no bitmap uuid; does not necessarily
3354                  * mean this was a "simultaneous hard crash", maybe IO was
3355                  * frozen, so no UUID-bump happened.
3356                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3357                  * for "new-enough" peer DRBD version. */
3358                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3359                         *rule_nr = 41;
3360                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3361                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3362                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3363                         }
3364                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3365                                 /* At least one has the "crashed primary" bit set,
3366                                  * both are primary now, but neither has rotated its UUIDs?
3367                                  * "Can not happen." */
3368                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3369                                 return -100;
3370                         }
3371                         if (device->state.role == R_PRIMARY)
3372                                 return 1;
3373                         return -1;
3374                 }
3375
3376                 /* Both are secondary.
3377                  * Really looks like recovery from simultaneous hard crash.
3378                  * Check which had been primary before, and arbitrate. */
3379                 switch (rct) {
3380                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3381                 case 1: /*  self_pri && !peer_pri */ return 1;
3382                 case 2: /* !self_pri &&  peer_pri */ return -1;
3383                 case 3: /*  self_pri &&  peer_pri */
3384                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3385                         return dc ? -1 : 1;
3386                 }
3387         }
3388
3389         *rule_nr = 50;
3390         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3391         if (self == peer)
3392                 return -1;
3393
3394         *rule_nr = 51;
3395         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3396         if (self == peer) {
3397                 if (connection->agreed_pro_version < 96 ?
3398                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3399                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3400                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3401                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3402                            resync as sync source modifications of the peer's UUIDs. */
3403
3404                         if (connection->agreed_pro_version < 91)
3405                                 return -1091;
3406
3407                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3408                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3409
3410                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3411                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3412
3413                         return -1;
3414                 }
3415         }
3416
3417         *rule_nr = 60;
3418         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3419         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3420                 peer = device->p_uuid[i] & ~((u64)1);
3421                 if (self == peer)
3422                         return -2;
3423         }
3424
3425         *rule_nr = 70;
3426         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3427         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3428         if (self == peer)
3429                 return 1;
3430
3431         *rule_nr = 71;
3432         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3433         if (self == peer) {
3434                 if (connection->agreed_pro_version < 96 ?
3435                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3436                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3437                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3438                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3439                            resync as sync source modifications of our UUIDs. */
3440
3441                         if (connection->agreed_pro_version < 91)
3442                                 return -1091;
3443
3444                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3445                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3446
3447                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3448                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3449                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3450
3451                         return 1;
3452                 }
3453         }
3454
3455
3456         *rule_nr = 80;
3457         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3458         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3459                 self = device->ldev->md.uuid[i] & ~((u64)1);
3460                 if (self == peer)
3461                         return 2;
3462         }
3463
3464         *rule_nr = 90;
3465         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3466         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3467         if (self == peer && self != ((u64)0))
3468                 return 100;
3469
3470         *rule_nr = 100;
3471         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3472                 self = device->ldev->md.uuid[i] & ~((u64)1);
3473                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3474                         peer = device->p_uuid[j] & ~((u64)1);
3475                         if (self == peer)
3476                                 return -100;
3477                 }
3478         }
3479
3480         return -1000;
3481 }
3482
3483 /* drbd_sync_handshake() returns the new conn state on success, or
3484    CONN_MASK (-1) on failure.
3485  */
3486 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3487                                            enum drbd_role peer_role,
3488                                            enum drbd_disk_state peer_disk) __must_hold(local)
3489 {
3490         struct drbd_device *device = peer_device->device;
3491         enum drbd_conns rv = C_MASK;
3492         enum drbd_disk_state mydisk;
3493         struct net_conf *nc;
3494         int hg, rule_nr, rr_conflict, tentative, always_asbp;
3495
3496         mydisk = device->state.disk;
3497         if (mydisk == D_NEGOTIATING)
3498                 mydisk = device->new_state_tmp.disk;
3499
3500         drbd_info(device, "drbd_sync_handshake:\n");
3501
3502         spin_lock_irq(&device->ldev->md.uuid_lock);
3503         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3504         drbd_uuid_dump(device, "peer", device->p_uuid,
3505                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3506
3507         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3508         spin_unlock_irq(&device->ldev->md.uuid_lock);
3509
3510         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3511
3512         if (hg == -1000) {
3513                 drbd_alert(device, "Unrelated data, aborting!\n");
3514                 return C_MASK;
3515         }
3516         if (hg < -0x10000) {
3517                 int proto, fflags;
3518                 hg = -hg;
3519                 proto = hg & 0xff;
3520                 fflags = (hg >> 8) & 0xff;
3521                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3522                                         proto, fflags);
3523                 return C_MASK;
3524         }
3525         if (hg < -1000) {
3526                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3527                 return C_MASK;
3528         }
3529
3530         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3531             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3532                 int f = (hg == -100) || abs(hg) == 2;
3533                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3534                 if (f)
3535                         hg = hg*2;
3536                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3537                      hg > 0 ? "source" : "target");
3538         }
3539
3540         if (abs(hg) == 100)
3541                 drbd_khelper(device, "initial-split-brain");
3542
3543         rcu_read_lock();
3544         nc = rcu_dereference(peer_device->connection->net_conf);
3545         always_asbp = nc->always_asbp;
3546         rr_conflict = nc->rr_conflict;
3547         tentative = nc->tentative;
3548         rcu_read_unlock();
3549
3550         if (hg == 100 || (hg == -100 && always_asbp)) {
3551                 int pcount = (device->state.role == R_PRIMARY)
3552                            + (peer_role == R_PRIMARY);
3553                 int forced = (hg == -100);
3554
3555                 switch (pcount) {
3556                 case 0:
3557                         hg = drbd_asb_recover_0p(peer_device);
3558                         break;
3559                 case 1:
3560                         hg = drbd_asb_recover_1p(peer_device);
3561                         break;
3562                 case 2:
3563                         hg = drbd_asb_recover_2p(peer_device);
3564                         break;
3565                 }
3566                 if (abs(hg) < 100) {
3567                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3568                              "automatically solved. Sync from %s node\n",
3569                              pcount, (hg < 0) ? "peer" : "this");
3570                         if (forced) {
3571                                 drbd_warn(device, "Doing a full sync, since"
3572                                      " UUIDs where ambiguous.\n");
3573                                 hg = hg*2;
3574                         }
3575                 }
3576         }
3577
3578         if (hg == -100) {
3579                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3580                         hg = -1;
3581                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3582                         hg = 1;
3583
3584                 if (abs(hg) < 100)
3585                         drbd_warn(device, "Split-Brain detected, manually solved. "
3586                              "Sync from %s node\n",
3587                              (hg < 0) ? "peer" : "this");
3588         }
3589
3590         if (hg == -100) {
3591                 /* FIXME this log message is not correct if we end up here
3592                  * after an attempted attach on a diskless node.
3593                  * We just refuse to attach -- well, we drop the "connection"
3594                  * to that disk, in a way... */
3595                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3596                 drbd_khelper(device, "split-brain");
3597                 return C_MASK;
3598         }
3599
3600         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3601                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3602                 return C_MASK;
3603         }
3604
3605         if (hg < 0 && /* by intention we do not use mydisk here. */
3606             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3607                 switch (rr_conflict) {
3608                 case ASB_CALL_HELPER:
3609                         drbd_khelper(device, "pri-lost");
3610                         /* fall through */
3611                 case ASB_DISCONNECT:
3612                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3613                         return C_MASK;
3614                 case ASB_VIOLENTLY:
3615                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3616                              "assumption\n");
3617                 }
3618         }
3619
3620         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3621                 if (hg == 0)
3622                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3623                 else
3624                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3625                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3626                                  abs(hg) >= 2 ? "full" : "bit-map based");
3627                 return C_MASK;
3628         }
3629
3630         if (abs(hg) >= 2) {
3631                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3632                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3633                                         BM_LOCKED_SET_ALLOWED))
3634                         return C_MASK;
3635         }
3636
3637         if (hg > 0) { /* become sync source. */
3638                 rv = C_WF_BITMAP_S;
3639         } else if (hg < 0) { /* become sync target */
3640                 rv = C_WF_BITMAP_T;
3641         } else {
3642                 rv = C_CONNECTED;
3643                 if (drbd_bm_total_weight(device)) {
3644                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3645                              drbd_bm_total_weight(device));
3646                 }
3647         }
3648
3649         return rv;
3650 }
3651
3652 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3653 {
3654         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3655         if (peer == ASB_DISCARD_REMOTE)
3656                 return ASB_DISCARD_LOCAL;
3657
3658         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3659         if (peer == ASB_DISCARD_LOCAL)
3660                 return ASB_DISCARD_REMOTE;
3661
3662         /* everything else is valid if they are equal on both sides. */
3663         return peer;
3664 }
3665
3666 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3667 {
3668         struct p_protocol *p = pi->data;
3669         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3670         int p_proto, p_discard_my_data, p_two_primaries, cf;
3671         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3672         char integrity_alg[SHARED_SECRET_MAX] = "";
3673         struct crypto_shash *peer_integrity_tfm = NULL;
3674         void *int_dig_in = NULL, *int_dig_vv = NULL;
3675
3676         p_proto         = be32_to_cpu(p->protocol);
3677         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3678         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3679         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3680         p_two_primaries = be32_to_cpu(p->two_primaries);
3681         cf              = be32_to_cpu(p->conn_flags);
3682         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3683
3684         if (connection->agreed_pro_version >= 87) {
3685                 int err;
3686
3687                 if (pi->size > sizeof(integrity_alg))
3688                         return -EIO;
3689                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3690                 if (err)
3691                         return err;
3692                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3693         }
3694
3695         if (pi->cmd != P_PROTOCOL_UPDATE) {
3696                 clear_bit(CONN_DRY_RUN, &connection->flags);
3697
3698                 if (cf & CF_DRY_RUN)
3699                         set_bit(CONN_DRY_RUN, &connection->flags);
3700
3701                 rcu_read_lock();
3702                 nc = rcu_dereference(connection->net_conf);
3703
3704                 if (p_proto != nc->wire_protocol) {
3705                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3706                         goto disconnect_rcu_unlock;
3707                 }
3708
3709                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3710                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3711                         goto disconnect_rcu_unlock;
3712                 }
3713
3714                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3715                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3716                         goto disconnect_rcu_unlock;
3717                 }
3718
3719                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3720                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3721                         goto disconnect_rcu_unlock;
3722                 }
3723
3724                 if (p_discard_my_data && nc->discard_my_data) {
3725                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3726                         goto disconnect_rcu_unlock;
3727                 }
3728
3729                 if (p_two_primaries != nc->two_primaries) {
3730                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3731                         goto disconnect_rcu_unlock;
3732                 }
3733
3734                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3735                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3736                         goto disconnect_rcu_unlock;
3737                 }
3738
3739                 rcu_read_unlock();
3740         }
3741
3742         if (integrity_alg[0]) {
3743                 int hash_size;
3744
3745                 /*
3746                  * We can only change the peer data integrity algorithm
3747                  * here.  Changing our own data integrity algorithm
3748                  * requires that we send a P_PROTOCOL_UPDATE packet at
3749                  * the same time; otherwise, the peer has no way to
3750                  * tell between which packets the algorithm should
3751                  * change.
3752                  */
3753
3754                 peer_integrity_tfm = crypto_alloc_shash(integrity_alg, 0, 0);
3755                 if (IS_ERR(peer_integrity_tfm)) {
3756                         peer_integrity_tfm = NULL;
3757                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3758                                  integrity_alg);
3759                         goto disconnect;
3760                 }
3761
3762                 hash_size = crypto_shash_digestsize(peer_integrity_tfm);
3763                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3764                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3765                 if (!(int_dig_in && int_dig_vv)) {
3766                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3767                         goto disconnect;
3768                 }
3769         }
3770
3771         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3772         if (!new_net_conf) {
3773                 drbd_err(connection, "Allocation of new net_conf failed\n");
3774                 goto disconnect;
3775         }
3776
3777         mutex_lock(&connection->data.mutex);
3778         mutex_lock(&connection->resource->conf_update);
3779         old_net_conf = connection->net_conf;
3780         *new_net_conf = *old_net_conf;
3781
3782         new_net_conf->wire_protocol = p_proto;
3783         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3784         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3785         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3786         new_net_conf->two_primaries = p_two_primaries;
3787
3788         rcu_assign_pointer(connection->net_conf, new_net_conf);
3789         mutex_unlock(&connection->resource->conf_update);
3790         mutex_unlock(&connection->data.mutex);
3791
3792         crypto_free_shash(connection->peer_integrity_tfm);
3793         kfree(connection->int_dig_in);
3794         kfree(connection->int_dig_vv);
3795         connection->peer_integrity_tfm = peer_integrity_tfm;
3796         connection->int_dig_in = int_dig_in;
3797         connection->int_dig_vv = int_dig_vv;
3798
3799         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3800                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3801                           integrity_alg[0] ? integrity_alg : "(none)");
3802
3803         synchronize_rcu();
3804         kfree(old_net_conf);
3805         return 0;
3806
3807 disconnect_rcu_unlock:
3808         rcu_read_unlock();
3809 disconnect:
3810         crypto_free_shash(peer_integrity_tfm);
3811         kfree(int_dig_in);
3812         kfree(int_dig_vv);
3813         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3814         return -EIO;
3815 }
3816
3817 /* helper function
3818  * input: alg name, feature name
3819  * return: NULL (alg name was "")
3820  *         ERR_PTR(error) if something goes wrong
3821  *         or the crypto hash ptr, if it worked out ok. */
3822 static struct crypto_shash *drbd_crypto_alloc_digest_safe(
3823                 const struct drbd_device *device,
3824                 const char *alg, const char *name)
3825 {
3826         struct crypto_shash *tfm;
3827
3828         if (!alg[0])
3829                 return NULL;
3830
3831         tfm = crypto_alloc_shash(alg, 0, 0);
3832         if (IS_ERR(tfm)) {
3833                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3834                         alg, name, PTR_ERR(tfm));
3835                 return tfm;
3836         }
3837         return tfm;
3838 }
3839
3840 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3841 {
3842         void *buffer = connection->data.rbuf;
3843         int size = pi->size;
3844
3845         while (size) {
3846                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3847                 s = drbd_recv(connection, buffer, s);
3848                 if (s <= 0) {
3849                         if (s < 0)
3850                                 return s;
3851                         break;
3852                 }
3853                 size -= s;
3854         }
3855         if (size)
3856                 return -EIO;
3857         return 0;
3858 }
3859
3860 /*
3861  * config_unknown_volume  -  device configuration command for unknown volume
3862  *
3863  * When a device is added to an existing connection, the node on which the
3864  * device is added first will send configuration commands to its peer but the
3865  * peer will not know about the device yet.  It will warn and ignore these
3866  * commands.  Once the device is added on the second node, the second node will
3867  * send the same device configuration commands, but in the other direction.
3868  *
3869  * (We can also end up here if drbd is misconfigured.)
3870  */
3871 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3872 {
3873         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3874                   cmdname(pi->cmd), pi->vnr);
3875         return ignore_remaining_packet(connection, pi);
3876 }
3877
3878 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3879 {
3880         struct drbd_peer_device *peer_device;
3881         struct drbd_device *device;
3882         struct p_rs_param_95 *p;
3883         unsigned int header_size, data_size, exp_max_sz;
3884         struct crypto_shash *verify_tfm = NULL;
3885         struct crypto_shash *csums_tfm = NULL;
3886         struct net_conf *old_net_conf, *new_net_conf = NULL;
3887         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3888         const int apv = connection->agreed_pro_version;
3889         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3890         int fifo_size = 0;
3891         int err;
3892
3893         peer_device = conn_peer_device(connection, pi->vnr);
3894         if (!peer_device)
3895                 return config_unknown_volume(connection, pi);
3896         device = peer_device->device;
3897
3898         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3899                     : apv == 88 ? sizeof(struct p_rs_param)
3900                                         + SHARED_SECRET_MAX
3901                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3902                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3903
3904         if (pi->size > exp_max_sz) {
3905                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3906                     pi->size, exp_max_sz);
3907                 return -EIO;
3908         }
3909
3910         if (apv <= 88) {
3911                 header_size = sizeof(struct p_rs_param);
3912                 data_size = pi->size - header_size;
3913         } else if (apv <= 94) {
3914                 header_size = sizeof(struct p_rs_param_89);
3915                 data_size = pi->size - header_size;
3916                 D_ASSERT(device, data_size == 0);
3917         } else {
3918                 header_size = sizeof(struct p_rs_param_95);
3919                 data_size = pi->size - header_size;
3920                 D_ASSERT(device, data_size == 0);
3921         }
3922
3923         /* initialize verify_alg and csums_alg */
3924         p = pi->data;
3925         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3926
3927         err = drbd_recv_all(peer_device->connection, p, header_size);
3928         if (err)
3929                 return err;
3930
3931         mutex_lock(&connection->resource->conf_update);
3932         old_net_conf = peer_device->connection->net_conf;
3933         if (get_ldev(device)) {
3934                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3935                 if (!new_disk_conf) {
3936                         put_ldev(device);
3937                         mutex_unlock(&connection->resource->conf_update);
3938                         drbd_err(device, "Allocation of new disk_conf failed\n");
3939                         return -ENOMEM;
3940                 }
3941
3942                 old_disk_conf = device->ldev->disk_conf;
3943                 *new_disk_conf = *old_disk_conf;
3944
3945                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3946         }
3947
3948         if (apv >= 88) {
3949                 if (apv == 88) {
3950                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3951                                 drbd_err(device, "verify-alg of wrong size, "
3952                                         "peer wants %u, accepting only up to %u byte\n",
3953                                         data_size, SHARED_SECRET_MAX);
3954                                 err = -EIO;
3955                                 goto reconnect;
3956                         }
3957
3958                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3959                         if (err)
3960                                 goto reconnect;
3961                         /* we expect NUL terminated string */
3962                         /* but just in case someone tries to be evil */
3963                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3964                         p->verify_alg[data_size-1] = 0;
3965
3966                 } else /* apv >= 89 */ {
3967                         /* we still expect NUL terminated strings */
3968                         /* but just in case someone tries to be evil */
3969                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3970                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3971                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3972                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3973                 }
3974
3975                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3976                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3977                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3978                                     old_net_conf->verify_alg, p->verify_alg);
3979                                 goto disconnect;
3980                         }
3981                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3982                                         p->verify_alg, "verify-alg");
3983                         if (IS_ERR(verify_tfm)) {
3984                                 verify_tfm = NULL;
3985                                 goto disconnect;
3986                         }
3987                 }
3988
3989                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3990                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3991                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3992                                     old_net_conf->csums_alg, p->csums_alg);
3993                                 goto disconnect;
3994                         }
3995                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3996                                         p->csums_alg, "csums-alg");
3997                         if (IS_ERR(csums_tfm)) {
3998                                 csums_tfm = NULL;
3999                                 goto disconnect;
4000                         }
4001                 }
4002
4003                 if (apv > 94 && new_disk_conf) {
4004                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
4005                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
4006                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
4007                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
4008
4009                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
4010                         if (fifo_size != device->rs_plan_s->size) {
4011                                 new_plan = fifo_alloc(fifo_size);
4012                                 if (!new_plan) {
4013                                         drbd_err(device, "kmalloc of fifo_buffer failed");
4014                                         put_ldev(device);
4015                                         goto disconnect;
4016                                 }
4017                         }
4018                 }
4019
4020                 if (verify_tfm || csums_tfm) {
4021                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
4022                         if (!new_net_conf) {
4023                                 drbd_err(device, "Allocation of new net_conf failed\n");
4024                                 goto disconnect;
4025                         }
4026
4027                         *new_net_conf = *old_net_conf;
4028
4029                         if (verify_tfm) {
4030                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
4031                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
4032                                 crypto_free_shash(peer_device->connection->verify_tfm);
4033                                 peer_device->connection->verify_tfm = verify_tfm;
4034                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
4035                         }
4036                         if (csums_tfm) {
4037                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
4038                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
4039                                 crypto_free_shash(peer_device->connection->csums_tfm);
4040                                 peer_device->connection->csums_tfm = csums_tfm;
4041                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
4042                         }
4043                         rcu_assign_pointer(connection->net_conf, new_net_conf);
4044                 }
4045         }
4046
4047         if (new_disk_conf) {
4048                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4049                 put_ldev(device);
4050         }
4051
4052         if (new_plan) {
4053                 old_plan = device->rs_plan_s;
4054                 rcu_assign_pointer(device->rs_plan_s, new_plan);
4055         }
4056
4057         mutex_unlock(&connection->resource->conf_update);
4058         synchronize_rcu();
4059         if (new_net_conf)
4060                 kfree(old_net_conf);
4061         kfree(old_disk_conf);
4062         kfree(old_plan);
4063
4064         return 0;
4065
4066 reconnect:
4067         if (new_disk_conf) {
4068                 put_ldev(device);
4069                 kfree(new_disk_conf);
4070         }
4071         mutex_unlock(&connection->resource->conf_update);
4072         return -EIO;
4073
4074 disconnect:
4075         kfree(new_plan);
4076         if (new_disk_conf) {
4077                 put_ldev(device);
4078                 kfree(new_disk_conf);
4079         }
4080         mutex_unlock(&connection->resource->conf_update);
4081         /* just for completeness: actually not needed,
4082          * as this is not reached if csums_tfm was ok. */
4083         crypto_free_shash(csums_tfm);
4084         /* but free the verify_tfm again, if csums_tfm did not work out */
4085         crypto_free_shash(verify_tfm);
4086         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4087         return -EIO;
4088 }
4089
4090 /* warn if the arguments differ by more than 12.5% */
4091 static void warn_if_differ_considerably(struct drbd_device *device,
4092         const char *s, sector_t a, sector_t b)
4093 {
4094         sector_t d;
4095         if (a == 0 || b == 0)
4096                 return;
4097         d = (a > b) ? (a - b) : (b - a);
4098         if (d > (a>>3) || d > (b>>3))
4099                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4100                      (unsigned long long)a, (unsigned long long)b);
4101 }
4102
4103 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4104 {
4105         struct drbd_peer_device *peer_device;
4106         struct drbd_device *device;
4107         struct p_sizes *p = pi->data;
4108         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4109         enum determine_dev_size dd = DS_UNCHANGED;
4110         sector_t p_size, p_usize, p_csize, my_usize;
4111         sector_t new_size, cur_size;
4112         int ldsc = 0; /* local disk size changed */
4113         enum dds_flags ddsf;
4114
4115         peer_device = conn_peer_device(connection, pi->vnr);
4116         if (!peer_device)
4117                 return config_unknown_volume(connection, pi);
4118         device = peer_device->device;
4119         cur_size = drbd_get_capacity(device->this_bdev);
4120
4121         p_size = be64_to_cpu(p->d_size);
4122         p_usize = be64_to_cpu(p->u_size);
4123         p_csize = be64_to_cpu(p->c_size);
4124
4125         /* just store the peer's disk size for now.
4126          * we still need to figure out whether we accept that. */
4127         device->p_size = p_size;
4128
4129         if (get_ldev(device)) {
4130                 rcu_read_lock();
4131                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4132                 rcu_read_unlock();
4133
4134                 warn_if_differ_considerably(device, "lower level device sizes",
4135                            p_size, drbd_get_max_capacity(device->ldev));
4136                 warn_if_differ_considerably(device, "user requested size",
4137                                             p_usize, my_usize);
4138
4139                 /* if this is the first connect, or an otherwise expected
4140                  * param exchange, choose the minimum */
4141                 if (device->state.conn == C_WF_REPORT_PARAMS)
4142                         p_usize = min_not_zero(my_usize, p_usize);
4143
4144                 /* Never shrink a device with usable data during connect,
4145                  * or "attach" on the peer.
4146                  * But allow online shrinking if we are connected. */
4147                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4148                 if (new_size < cur_size &&
4149                     device->state.disk >= D_OUTDATED &&
4150                     (device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS)) {
4151                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4152                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4153                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4154                         put_ldev(device);
4155                         return -EIO;
4156                 }
4157
4158                 if (my_usize != p_usize) {
4159                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4160
4161                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4162                         if (!new_disk_conf) {
4163                                 drbd_err(device, "Allocation of new disk_conf failed\n");
4164                                 put_ldev(device);
4165                                 return -ENOMEM;
4166                         }
4167
4168                         mutex_lock(&connection->resource->conf_update);
4169                         old_disk_conf = device->ldev->disk_conf;
4170                         *new_disk_conf = *old_disk_conf;
4171                         new_disk_conf->disk_size = p_usize;
4172
4173                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4174                         mutex_unlock(&connection->resource->conf_update);
4175                         synchronize_rcu();
4176                         kfree(old_disk_conf);
4177
4178                         drbd_info(device, "Peer sets u_size to %lu sectors (old: %lu)\n",
4179                                  (unsigned long)p_usize, (unsigned long)my_usize);
4180                 }
4181
4182                 put_ldev(device);
4183         }
4184
4185         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4186         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4187            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4188            drbd_reconsider_queue_parameters(), we can be sure that after
4189            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4190
4191         ddsf = be16_to_cpu(p->dds_flags);
4192         if (get_ldev(device)) {
4193                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4194                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4195                 put_ldev(device);
4196                 if (dd == DS_ERROR)
4197                         return -EIO;
4198                 drbd_md_sync(device);
4199         } else {
4200                 /*
4201                  * I am diskless, need to accept the peer's *current* size.
4202                  * I must NOT accept the peers backing disk size,
4203                  * it may have been larger than mine all along...
4204                  *
4205                  * At this point, the peer knows more about my disk, or at
4206                  * least about what we last agreed upon, than myself.
4207                  * So if his c_size is less than his d_size, the most likely
4208                  * reason is that *my* d_size was smaller last time we checked.
4209                  *
4210                  * However, if he sends a zero current size,
4211                  * take his (user-capped or) backing disk size anyways.
4212                  *
4213                  * Unless of course he does not have a disk himself.
4214                  * In which case we ignore this completely.
4215                  */
4216                 sector_t new_size = p_csize ?: p_usize ?: p_size;
4217                 drbd_reconsider_queue_parameters(device, NULL, o);
4218                 if (new_size == 0) {
4219                         /* Ignore, peer does not know nothing. */
4220                 } else if (new_size == cur_size) {
4221                         /* nothing to do */
4222                 } else if (cur_size != 0 && p_size == 0) {
4223                         drbd_warn(device, "Ignored diskless peer device size (peer:%llu != me:%llu sectors)!\n",
4224                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4225                 } else if (new_size < cur_size && device->state.role == R_PRIMARY) {
4226                         drbd_err(device, "The peer's device size is too small! (%llu < %llu sectors); demote me first!\n",
4227                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4228                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4229                         return -EIO;
4230                 } else {
4231                         /* I believe the peer, if
4232                          *  - I don't have a current size myself
4233                          *  - we agree on the size anyways
4234                          *  - I do have a current size, am Secondary,
4235                          *    and he has the only disk
4236                          *  - I do have a current size, am Primary,
4237                          *    and he has the only disk,
4238                          *    which is larger than my current size
4239                          */
4240                         drbd_set_my_capacity(device, new_size);
4241                 }
4242         }
4243
4244         if (get_ldev(device)) {
4245                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4246                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4247                         ldsc = 1;
4248                 }
4249
4250                 put_ldev(device);
4251         }
4252
4253         if (device->state.conn > C_WF_REPORT_PARAMS) {
4254                 if (be64_to_cpu(p->c_size) !=
4255                     drbd_get_capacity(device->this_bdev) || ldsc) {
4256                         /* we have different sizes, probably peer
4257                          * needs to know my new size... */
4258                         drbd_send_sizes(peer_device, 0, ddsf);
4259                 }
4260                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4261                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4262                         if (device->state.pdsk >= D_INCONSISTENT &&
4263                             device->state.disk >= D_INCONSISTENT) {
4264                                 if (ddsf & DDSF_NO_RESYNC)
4265                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4266                                 else
4267                                         resync_after_online_grow(device);
4268                         } else
4269                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4270                 }
4271         }
4272
4273         return 0;
4274 }
4275
4276 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4277 {
4278         struct drbd_peer_device *peer_device;
4279         struct drbd_device *device;
4280         struct p_uuids *p = pi->data;
4281         u64 *p_uuid;
4282         int i, updated_uuids = 0;
4283
4284         peer_device = conn_peer_device(connection, pi->vnr);
4285         if (!peer_device)
4286                 return config_unknown_volume(connection, pi);
4287         device = peer_device->device;
4288
4289         p_uuid = kmalloc_array(UI_EXTENDED_SIZE, sizeof(*p_uuid), GFP_NOIO);
4290         if (!p_uuid) {
4291                 drbd_err(device, "kmalloc of p_uuid failed\n");
4292                 return false;
4293         }
4294
4295         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4296                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4297
4298         kfree(device->p_uuid);
4299         device->p_uuid = p_uuid;
4300
4301         if ((device->state.conn < C_CONNECTED || device->state.pdsk == D_DISKLESS) &&
4302             device->state.disk < D_INCONSISTENT &&
4303             device->state.role == R_PRIMARY &&
4304             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4305                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4306                     (unsigned long long)device->ed_uuid);
4307                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4308                 return -EIO;
4309         }
4310
4311         if (get_ldev(device)) {
4312                 int skip_initial_sync =
4313                         device->state.conn == C_CONNECTED &&
4314                         peer_device->connection->agreed_pro_version >= 90 &&
4315                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4316                         (p_uuid[UI_FLAGS] & 8);
4317                 if (skip_initial_sync) {
4318                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4319                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4320                                         "clear_n_write from receive_uuids",
4321                                         BM_LOCKED_TEST_ALLOWED);
4322                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4323                         _drbd_uuid_set(device, UI_BITMAP, 0);
4324                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4325                                         CS_VERBOSE, NULL);
4326                         drbd_md_sync(device);
4327                         updated_uuids = 1;
4328                 }
4329                 put_ldev(device);
4330         } else if (device->state.disk < D_INCONSISTENT &&
4331                    device->state.role == R_PRIMARY) {
4332                 /* I am a diskless primary, the peer just created a new current UUID
4333                    for me. */
4334                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4335         }
4336
4337         /* Before we test for the disk state, we should wait until an eventually
4338            ongoing cluster wide state change is finished. That is important if
4339            we are primary and are detaching from our disk. We need to see the
4340            new disk state... */
4341         mutex_lock(device->state_mutex);
4342         mutex_unlock(device->state_mutex);
4343         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4344                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4345
4346         if (updated_uuids)
4347                 drbd_print_uuids(device, "receiver updated UUIDs to");
4348
4349         return 0;
4350 }
4351
4352 /**
4353  * convert_state() - Converts the peer's view of the cluster state to our point of view
4354  * @ps:         The state as seen by the peer.
4355  */
4356 static union drbd_state convert_state(union drbd_state ps)
4357 {
4358         union drbd_state ms;
4359
4360         static enum drbd_conns c_tab[] = {
4361                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4362                 [C_CONNECTED] = C_CONNECTED,
4363
4364                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4365                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4366                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4367                 [C_VERIFY_S]       = C_VERIFY_T,
4368                 [C_MASK]   = C_MASK,
4369         };
4370
4371         ms.i = ps.i;
4372
4373         ms.conn = c_tab[ps.conn];
4374         ms.peer = ps.role;
4375         ms.role = ps.peer;
4376         ms.pdsk = ps.disk;
4377         ms.disk = ps.pdsk;
4378         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4379
4380         return ms;
4381 }
4382
4383 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4384 {
4385         struct drbd_peer_device *peer_device;
4386         struct drbd_device *device;
4387         struct p_req_state *p = pi->data;
4388         union drbd_state mask, val;
4389         enum drbd_state_rv rv;
4390
4391         peer_device = conn_peer_device(connection, pi->vnr);
4392         if (!peer_device)
4393                 return -EIO;
4394         device = peer_device->device;
4395
4396         mask.i = be32_to_cpu(p->mask);
4397         val.i = be32_to_cpu(p->val);
4398
4399         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4400             mutex_is_locked(device->state_mutex)) {
4401                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4402                 return 0;
4403         }
4404
4405         mask = convert_state(mask);
4406         val = convert_state(val);
4407
4408         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4409         drbd_send_sr_reply(peer_device, rv);
4410
4411         drbd_md_sync(device);
4412
4413         return 0;
4414 }
4415
4416 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4417 {
4418         struct p_req_state *p = pi->data;
4419         union drbd_state mask, val;
4420         enum drbd_state_rv rv;
4421
4422         mask.i = be32_to_cpu(p->mask);
4423         val.i = be32_to_cpu(p->val);
4424
4425         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4426             mutex_is_locked(&connection->cstate_mutex)) {
4427                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4428                 return 0;
4429         }
4430
4431         mask = convert_state(mask);
4432         val = convert_state(val);
4433
4434         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4435         conn_send_sr_reply(connection, rv);
4436
4437         return 0;
4438 }
4439
4440 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4441 {
4442         struct drbd_peer_device *peer_device;
4443         struct drbd_device *device;
4444         struct p_state *p = pi->data;
4445         union drbd_state os, ns, peer_state;
4446         enum drbd_disk_state real_peer_disk;
4447         enum chg_state_flags cs_flags;
4448         int rv;
4449
4450         peer_device = conn_peer_device(connection, pi->vnr);
4451         if (!peer_device)
4452                 return config_unknown_volume(connection, pi);
4453         device = peer_device->device;
4454
4455         peer_state.i = be32_to_cpu(p->state);
4456
4457         real_peer_disk = peer_state.disk;
4458         if (peer_state.disk == D_NEGOTIATING) {
4459                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4460                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4461         }
4462
4463         spin_lock_irq(&device->resource->req_lock);
4464  retry:
4465         os = ns = drbd_read_state(device);
4466         spin_unlock_irq(&device->resource->req_lock);
4467
4468         /* If some other part of the code (ack_receiver thread, timeout)
4469          * already decided to close the connection again,
4470          * we must not "re-establish" it here. */
4471         if (os.conn <= C_TEAR_DOWN)
4472                 return -ECONNRESET;
4473
4474         /* If this is the "end of sync" confirmation, usually the peer disk
4475          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4476          * set) resync started in PausedSyncT, or if the timing of pause-/
4477          * unpause-sync events has been "just right", the peer disk may
4478          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4479          */
4480         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4481             real_peer_disk == D_UP_TO_DATE &&
4482             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4483                 /* If we are (becoming) SyncSource, but peer is still in sync
4484                  * preparation, ignore its uptodate-ness to avoid flapping, it
4485                  * will change to inconsistent once the peer reaches active
4486                  * syncing states.
4487                  * It may have changed syncer-paused flags, however, so we
4488                  * cannot ignore this completely. */
4489                 if (peer_state.conn > C_CONNECTED &&
4490                     peer_state.conn < C_SYNC_SOURCE)
4491                         real_peer_disk = D_INCONSISTENT;
4492
4493                 /* if peer_state changes to connected at the same time,
4494                  * it explicitly notifies us that it finished resync.
4495                  * Maybe we should finish it up, too? */
4496                 else if (os.conn >= C_SYNC_SOURCE &&
4497                          peer_state.conn == C_CONNECTED) {
4498                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4499                                 drbd_resync_finished(device);
4500                         return 0;
4501                 }
4502         }
4503
4504         /* explicit verify finished notification, stop sector reached. */
4505         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4506             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4507                 ov_out_of_sync_print(device);
4508                 drbd_resync_finished(device);
4509                 return 0;
4510         }
4511
4512         /* peer says his disk is inconsistent, while we think it is uptodate,
4513          * and this happens while the peer still thinks we have a sync going on,
4514          * but we think we are already done with the sync.
4515          * We ignore this to avoid flapping pdsk.
4516          * This should not happen, if the peer is a recent version of drbd. */
4517         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4518             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4519                 real_peer_disk = D_UP_TO_DATE;
4520
4521         if (ns.conn == C_WF_REPORT_PARAMS)
4522                 ns.conn = C_CONNECTED;
4523
4524         if (peer_state.conn == C_AHEAD)
4525                 ns.conn = C_BEHIND;
4526
4527         /* TODO:
4528          * if (primary and diskless and peer uuid != effective uuid)
4529          *     abort attach on peer;
4530          *
4531          * If this node does not have good data, was already connected, but
4532          * the peer did a late attach only now, trying to "negotiate" with me,
4533          * AND I am currently Primary, possibly frozen, with some specific
4534          * "effective" uuid, this should never be reached, really, because
4535          * we first send the uuids, then the current state.
4536          *
4537          * In this scenario, we already dropped the connection hard
4538          * when we received the unsuitable uuids (receive_uuids().
4539          *
4540          * Should we want to change this, that is: not drop the connection in
4541          * receive_uuids() already, then we would need to add a branch here
4542          * that aborts the attach of "unsuitable uuids" on the peer in case
4543          * this node is currently Diskless Primary.
4544          */
4545
4546         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4547             get_ldev_if_state(device, D_NEGOTIATING)) {
4548                 int cr; /* consider resync */
4549
4550                 /* if we established a new connection */
4551                 cr  = (os.conn < C_CONNECTED);
4552                 /* if we had an established connection
4553                  * and one of the nodes newly attaches a disk */
4554                 cr |= (os.conn == C_CONNECTED &&
4555                        (peer_state.disk == D_NEGOTIATING ||
4556                         os.disk == D_NEGOTIATING));
4557                 /* if we have both been inconsistent, and the peer has been
4558                  * forced to be UpToDate with --force */
4559                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4560                 /* if we had been plain connected, and the admin requested to
4561                  * start a sync by "invalidate" or "invalidate-remote" */
4562                 cr |= (os.conn == C_CONNECTED &&
4563                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4564                                  peer_state.conn <= C_WF_BITMAP_T));
4565
4566                 if (cr)
4567                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4568
4569                 put_ldev(device);
4570                 if (ns.conn == C_MASK) {
4571                         ns.conn = C_CONNECTED;
4572                         if (device->state.disk == D_NEGOTIATING) {
4573                                 drbd_force_state(device, NS(disk, D_FAILED));
4574                         } else if (peer_state.disk == D_NEGOTIATING) {
4575                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4576                                 peer_state.disk = D_DISKLESS;
4577                                 real_peer_disk = D_DISKLESS;
4578                         } else {
4579                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4580                                         return -EIO;
4581                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4582                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4583                                 return -EIO;
4584                         }
4585                 }
4586         }
4587
4588         spin_lock_irq(&device->resource->req_lock);
4589         if (os.i != drbd_read_state(device).i)
4590                 goto retry;
4591         clear_bit(CONSIDER_RESYNC, &device->flags);
4592         ns.peer = peer_state.role;
4593         ns.pdsk = real_peer_disk;
4594         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4595         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4596                 ns.disk = device->new_state_tmp.disk;
4597         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4598         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4599             test_bit(NEW_CUR_UUID, &device->flags)) {
4600                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4601                    for temporal network outages! */
4602                 spin_unlock_irq(&device->resource->req_lock);
4603                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4604                 tl_clear(peer_device->connection);
4605                 drbd_uuid_new_current(device);
4606                 clear_bit(NEW_CUR_UUID, &device->flags);
4607                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4608                 return -EIO;
4609         }
4610         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4611         ns = drbd_read_state(device);
4612         spin_unlock_irq(&device->resource->req_lock);
4613
4614         if (rv < SS_SUCCESS) {
4615                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4616                 return -EIO;
4617         }
4618
4619         if (os.conn > C_WF_REPORT_PARAMS) {
4620                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4621                     peer_state.disk != D_NEGOTIATING ) {
4622                         /* we want resync, peer has not yet decided to sync... */
4623                         /* Nowadays only used when forcing a node into primary role and
4624                            setting its disk to UpToDate with that */
4625                         drbd_send_uuids(peer_device);
4626                         drbd_send_current_state(peer_device);
4627                 }
4628         }
4629
4630         clear_bit(DISCARD_MY_DATA, &device->flags);
4631
4632         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4633
4634         return 0;
4635 }
4636
4637 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4638 {
4639         struct drbd_peer_device *peer_device;
4640         struct drbd_device *device;
4641         struct p_rs_uuid *p = pi->data;
4642
4643         peer_device = conn_peer_device(connection, pi->vnr);
4644         if (!peer_device)
4645                 return -EIO;
4646         device = peer_device->device;
4647
4648         wait_event(device->misc_wait,
4649                    device->state.conn == C_WF_SYNC_UUID ||
4650                    device->state.conn == C_BEHIND ||
4651                    device->state.conn < C_CONNECTED ||
4652                    device->state.disk < D_NEGOTIATING);
4653
4654         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4655
4656         /* Here the _drbd_uuid_ functions are right, current should
4657            _not_ be rotated into the history */
4658         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4659                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4660                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4661
4662                 drbd_print_uuids(device, "updated sync uuid");
4663                 drbd_start_resync(device, C_SYNC_TARGET);
4664
4665                 put_ldev(device);
4666         } else
4667                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4668
4669         return 0;
4670 }
4671
4672 /**
4673  * receive_bitmap_plain
4674  *
4675  * Return 0 when done, 1 when another iteration is needed, and a negative error
4676  * code upon failure.
4677  */
4678 static int
4679 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4680                      unsigned long *p, struct bm_xfer_ctx *c)
4681 {
4682         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4683                                  drbd_header_size(peer_device->connection);
4684         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4685                                        c->bm_words - c->word_offset);
4686         unsigned int want = num_words * sizeof(*p);
4687         int err;
4688
4689         if (want != size) {
4690                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4691                 return -EIO;
4692         }
4693         if (want == 0)
4694                 return 0;
4695         err = drbd_recv_all(peer_device->connection, p, want);
4696         if (err)
4697                 return err;
4698
4699         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4700
4701         c->word_offset += num_words;
4702         c->bit_offset = c->word_offset * BITS_PER_LONG;
4703         if (c->bit_offset > c->bm_bits)
4704                 c->bit_offset = c->bm_bits;
4705
4706         return 1;
4707 }
4708
4709 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4710 {
4711         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4712 }
4713
4714 static int dcbp_get_start(struct p_compressed_bm *p)
4715 {
4716         return (p->encoding & 0x80) != 0;
4717 }
4718
4719 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4720 {
4721         return (p->encoding >> 4) & 0x7;
4722 }
4723
4724 /**
4725  * recv_bm_rle_bits
4726  *
4727  * Return 0 when done, 1 when another iteration is needed, and a negative error
4728  * code upon failure.
4729  */
4730 static int
4731 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4732                 struct p_compressed_bm *p,
4733                  struct bm_xfer_ctx *c,
4734                  unsigned int len)
4735 {
4736         struct bitstream bs;
4737         u64 look_ahead;
4738         u64 rl;
4739         u64 tmp;
4740         unsigned long s = c->bit_offset;
4741         unsigned long e;
4742         int toggle = dcbp_get_start(p);
4743         int have;
4744         int bits;
4745
4746         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4747
4748         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4749         if (bits < 0)
4750                 return -EIO;
4751
4752         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4753                 bits = vli_decode_bits(&rl, look_ahead);
4754                 if (bits <= 0)
4755                         return -EIO;
4756
4757                 if (toggle) {
4758                         e = s + rl -1;
4759                         if (e >= c->bm_bits) {
4760                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4761                                 return -EIO;
4762                         }
4763                         _drbd_bm_set_bits(peer_device->device, s, e);
4764                 }
4765
4766                 if (have < bits) {
4767                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4768                                 have, bits, look_ahead,
4769                                 (unsigned int)(bs.cur.b - p->code),
4770                                 (unsigned int)bs.buf_len);
4771                         return -EIO;
4772                 }
4773                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4774                 if (likely(bits < 64))
4775                         look_ahead >>= bits;
4776                 else
4777                         look_ahead = 0;
4778                 have -= bits;
4779
4780                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4781                 if (bits < 0)
4782                         return -EIO;
4783                 look_ahead |= tmp << have;
4784                 have += bits;
4785         }
4786
4787         c->bit_offset = s;
4788         bm_xfer_ctx_bit_to_word_offset(c);
4789
4790         return (s != c->bm_bits);
4791 }
4792
4793 /**
4794  * decode_bitmap_c
4795  *
4796  * Return 0 when done, 1 when another iteration is needed, and a negative error
4797  * code upon failure.
4798  */
4799 static int
4800 decode_bitmap_c(struct drbd_peer_device *peer_device,
4801                 struct p_compressed_bm *p,
4802                 struct bm_xfer_ctx *c,
4803                 unsigned int len)
4804 {
4805         if (dcbp_get_code(p) == RLE_VLI_Bits)
4806                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4807
4808         /* other variants had been implemented for evaluation,
4809          * but have been dropped as this one turned out to be "best"
4810          * during all our tests. */
4811
4812         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4813         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4814         return -EIO;
4815 }
4816
4817 void INFO_bm_xfer_stats(struct drbd_device *device,
4818                 const char *direction, struct bm_xfer_ctx *c)
4819 {
4820         /* what would it take to transfer it "plaintext" */
4821         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4822         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4823         unsigned int plain =
4824                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4825                 c->bm_words * sizeof(unsigned long);
4826         unsigned int total = c->bytes[0] + c->bytes[1];
4827         unsigned int r;
4828
4829         /* total can not be zero. but just in case: */
4830         if (total == 0)
4831                 return;
4832
4833         /* don't report if not compressed */
4834         if (total >= plain)
4835                 return;
4836
4837         /* total < plain. check for overflow, still */
4838         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4839                                     : (1000 * total / plain);
4840
4841         if (r > 1000)
4842                 r = 1000;
4843
4844         r = 1000 - r;
4845         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4846              "total %u; compression: %u.%u%%\n",
4847                         direction,
4848                         c->bytes[1], c->packets[1],
4849                         c->bytes[0], c->packets[0],
4850                         total, r/10, r % 10);
4851 }
4852
4853 /* Since we are processing the bitfield from lower addresses to higher,
4854    it does not matter if the process it in 32 bit chunks or 64 bit
4855    chunks as long as it is little endian. (Understand it as byte stream,
4856    beginning with the lowest byte...) If we would use big endian
4857    we would need to process it from the highest address to the lowest,
4858    in order to be agnostic to the 32 vs 64 bits issue.
4859
4860    returns 0 on failure, 1 if we successfully received it. */
4861 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4862 {
4863         struct drbd_peer_device *peer_device;
4864         struct drbd_device *device;
4865         struct bm_xfer_ctx c;
4866         int err;
4867
4868         peer_device = conn_peer_device(connection, pi->vnr);
4869         if (!peer_device)
4870                 return -EIO;
4871         device = peer_device->device;
4872
4873         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4874         /* you are supposed to send additional out-of-sync information
4875          * if you actually set bits during this phase */
4876
4877         c = (struct bm_xfer_ctx) {
4878                 .bm_bits = drbd_bm_bits(device),
4879                 .bm_words = drbd_bm_words(device),
4880         };
4881
4882         for(;;) {
4883                 if (pi->cmd == P_BITMAP)
4884                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4885                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4886                         /* MAYBE: sanity check that we speak proto >= 90,
4887                          * and the feature is enabled! */
4888                         struct p_compressed_bm *p = pi->data;
4889
4890                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4891                                 drbd_err(device, "ReportCBitmap packet too large\n");
4892                                 err = -EIO;
4893                                 goto out;
4894                         }
4895                         if (pi->size <= sizeof(*p)) {
4896                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4897                                 err = -EIO;
4898                                 goto out;
4899                         }
4900                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4901                         if (err)
4902                                goto out;
4903                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4904                 } else {
4905                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4906                         err = -EIO;
4907                         goto out;
4908                 }
4909
4910                 c.packets[pi->cmd == P_BITMAP]++;
4911                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4912
4913                 if (err <= 0) {
4914                         if (err < 0)
4915                                 goto out;
4916                         break;
4917                 }
4918                 err = drbd_recv_header(peer_device->connection, pi);
4919                 if (err)
4920                         goto out;
4921         }
4922
4923         INFO_bm_xfer_stats(device, "receive", &c);
4924
4925         if (device->state.conn == C_WF_BITMAP_T) {
4926                 enum drbd_state_rv rv;
4927
4928                 err = drbd_send_bitmap(device);
4929                 if (err)
4930                         goto out;
4931                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4932                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4933                 D_ASSERT(device, rv == SS_SUCCESS);
4934         } else if (device->state.conn != C_WF_BITMAP_S) {
4935                 /* admin may have requested C_DISCONNECTING,
4936                  * other threads may have noticed network errors */
4937                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4938                     drbd_conn_str(device->state.conn));
4939         }
4940         err = 0;
4941
4942  out:
4943         drbd_bm_unlock(device);
4944         if (!err && device->state.conn == C_WF_BITMAP_S)
4945                 drbd_start_resync(device, C_SYNC_SOURCE);
4946         return err;
4947 }
4948
4949 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4950 {
4951         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4952                  pi->cmd, pi->size);
4953
4954         return ignore_remaining_packet(connection, pi);
4955 }
4956
4957 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4958 {
4959         /* Make sure we've acked all the TCP data associated
4960          * with the data requests being unplugged */
4961         drbd_tcp_quickack(connection->data.socket);
4962
4963         return 0;
4964 }
4965
4966 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4967 {
4968         struct drbd_peer_device *peer_device;
4969         struct drbd_device *device;
4970         struct p_block_desc *p = pi->data;
4971
4972         peer_device = conn_peer_device(connection, pi->vnr);
4973         if (!peer_device)
4974                 return -EIO;
4975         device = peer_device->device;
4976
4977         switch (device->state.conn) {
4978         case C_WF_SYNC_UUID:
4979         case C_WF_BITMAP_T:
4980         case C_BEHIND:
4981                         break;
4982         default:
4983                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4984                                 drbd_conn_str(device->state.conn));
4985         }
4986
4987         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4988
4989         return 0;
4990 }
4991
4992 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4993 {
4994         struct drbd_peer_device *peer_device;
4995         struct p_block_desc *p = pi->data;
4996         struct drbd_device *device;
4997         sector_t sector;
4998         int size, err = 0;
4999
5000         peer_device = conn_peer_device(connection, pi->vnr);
5001         if (!peer_device)
5002                 return -EIO;
5003         device = peer_device->device;
5004
5005         sector = be64_to_cpu(p->sector);
5006         size = be32_to_cpu(p->blksize);
5007
5008         dec_rs_pending(device);
5009
5010         if (get_ldev(device)) {
5011                 struct drbd_peer_request *peer_req;
5012                 const int op = REQ_OP_WRITE_ZEROES;
5013
5014                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
5015                                                size, 0, GFP_NOIO);
5016                 if (!peer_req) {
5017                         put_ldev(device);
5018                         return -ENOMEM;
5019                 }
5020
5021                 peer_req->w.cb = e_end_resync_block;
5022                 peer_req->submit_jif = jiffies;
5023                 peer_req->flags |= EE_TRIM;
5024
5025                 spin_lock_irq(&device->resource->req_lock);
5026                 list_add_tail(&peer_req->w.list, &device->sync_ee);
5027                 spin_unlock_irq(&device->resource->req_lock);
5028
5029                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
5030                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
5031
5032                 if (err) {
5033                         spin_lock_irq(&device->resource->req_lock);
5034                         list_del(&peer_req->w.list);
5035                         spin_unlock_irq(&device->resource->req_lock);
5036
5037                         drbd_free_peer_req(device, peer_req);
5038                         put_ldev(device);
5039                         err = 0;
5040                         goto fail;
5041                 }
5042
5043                 inc_unacked(device);
5044
5045                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
5046                    as well as drbd_rs_complete_io() */
5047         } else {
5048         fail:
5049                 drbd_rs_complete_io(device, sector);
5050                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
5051         }
5052
5053         atomic_add(size >> 9, &device->rs_sect_in);
5054
5055         return err;
5056 }
5057
5058 struct data_cmd {
5059         int expect_payload;
5060         unsigned int pkt_size;
5061         int (*fn)(struct drbd_connection *, struct packet_info *);
5062 };
5063
5064 static struct data_cmd drbd_cmd_handler[] = {
5065         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
5066         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
5067         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
5068         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
5069         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
5070         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
5071         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
5072         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
5073         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
5074         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
5075         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
5076         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
5077         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
5078         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
5079         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
5080         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
5081         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
5082         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
5083         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
5084         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
5085         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
5086         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
5087         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
5088         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
5089         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
5090         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
5091         [P_ZEROES]          = { 0, sizeof(struct p_trim), receive_Data },
5092         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
5093         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
5094 };
5095
5096 static void drbdd(struct drbd_connection *connection)
5097 {
5098         struct packet_info pi;
5099         size_t shs; /* sub header size */
5100         int err;
5101
5102         while (get_t_state(&connection->receiver) == RUNNING) {
5103                 struct data_cmd const *cmd;
5104
5105                 drbd_thread_current_set_cpu(&connection->receiver);
5106                 update_receiver_timing_details(connection, drbd_recv_header_maybe_unplug);
5107                 if (drbd_recv_header_maybe_unplug(connection, &pi))
5108                         goto err_out;
5109
5110                 cmd = &drbd_cmd_handler[pi.cmd];
5111                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
5112                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
5113                                  cmdname(pi.cmd), pi.cmd);
5114                         goto err_out;
5115                 }
5116
5117                 shs = cmd->pkt_size;
5118                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
5119                         shs += sizeof(struct o_qlim);
5120                 if (pi.size > shs && !cmd->expect_payload) {
5121                         drbd_err(connection, "No payload expected %s l:%d\n",
5122                                  cmdname(pi.cmd), pi.size);
5123                         goto err_out;
5124                 }
5125                 if (pi.size < shs) {
5126                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5127                                  cmdname(pi.cmd), (int)shs, pi.size);
5128                         goto err_out;
5129                 }
5130
5131                 if (shs) {
5132                         update_receiver_timing_details(connection, drbd_recv_all_warn);
5133                         err = drbd_recv_all_warn(connection, pi.data, shs);
5134                         if (err)
5135                                 goto err_out;
5136                         pi.size -= shs;
5137                 }
5138
5139                 update_receiver_timing_details(connection, cmd->fn);
5140                 err = cmd->fn(connection, &pi);
5141                 if (err) {
5142                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5143                                  cmdname(pi.cmd), err, pi.size);
5144                         goto err_out;
5145                 }
5146         }
5147         return;
5148
5149     err_out:
5150         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5151 }
5152
5153 static void conn_disconnect(struct drbd_connection *connection)
5154 {
5155         struct drbd_peer_device *peer_device;
5156         enum drbd_conns oc;
5157         int vnr;
5158
5159         if (connection->cstate == C_STANDALONE)
5160                 return;
5161
5162         /* We are about to start the cleanup after connection loss.
5163          * Make sure drbd_make_request knows about that.
5164          * Usually we should be in some network failure state already,
5165          * but just in case we are not, we fix it up here.
5166          */
5167         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5168
5169         /* ack_receiver does not clean up anything. it must not interfere, either */
5170         drbd_thread_stop(&connection->ack_receiver);
5171         if (connection->ack_sender) {
5172                 destroy_workqueue(connection->ack_sender);
5173                 connection->ack_sender = NULL;
5174         }
5175         drbd_free_sock(connection);
5176
5177         rcu_read_lock();
5178         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5179                 struct drbd_device *device = peer_device->device;
5180                 kref_get(&device->kref);
5181                 rcu_read_unlock();
5182                 drbd_disconnected(peer_device);
5183                 kref_put(&device->kref, drbd_destroy_device);
5184                 rcu_read_lock();
5185         }
5186         rcu_read_unlock();
5187
5188         if (!list_empty(&connection->current_epoch->list))
5189                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5190         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5191         atomic_set(&connection->current_epoch->epoch_size, 0);
5192         connection->send.seen_any_write_yet = false;
5193
5194         drbd_info(connection, "Connection closed\n");
5195
5196         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5197                 conn_try_outdate_peer_async(connection);
5198
5199         spin_lock_irq(&connection->resource->req_lock);
5200         oc = connection->cstate;
5201         if (oc >= C_UNCONNECTED)
5202                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5203
5204         spin_unlock_irq(&connection->resource->req_lock);
5205
5206         if (oc == C_DISCONNECTING)
5207                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5208 }
5209
5210 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5211 {
5212         struct drbd_device *device = peer_device->device;
5213         unsigned int i;
5214
5215         /* wait for current activity to cease. */
5216         spin_lock_irq(&device->resource->req_lock);
5217         _drbd_wait_ee_list_empty(device, &device->active_ee);
5218         _drbd_wait_ee_list_empty(device, &device->sync_ee);
5219         _drbd_wait_ee_list_empty(device, &device->read_ee);
5220         spin_unlock_irq(&device->resource->req_lock);
5221
5222         /* We do not have data structures that would allow us to
5223          * get the rs_pending_cnt down to 0 again.
5224          *  * On C_SYNC_TARGET we do not have any data structures describing
5225          *    the pending RSDataRequest's we have sent.
5226          *  * On C_SYNC_SOURCE there is no data structure that tracks
5227          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5228          *  And no, it is not the sum of the reference counts in the
5229          *  resync_LRU. The resync_LRU tracks the whole operation including
5230          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5231          *  on the fly. */
5232         drbd_rs_cancel_all(device);
5233         device->rs_total = 0;
5234         device->rs_failed = 0;
5235         atomic_set(&device->rs_pending_cnt, 0);
5236         wake_up(&device->misc_wait);
5237
5238         del_timer_sync(&device->resync_timer);
5239         resync_timer_fn(&device->resync_timer);
5240
5241         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5242          * w_make_resync_request etc. which may still be on the worker queue
5243          * to be "canceled" */
5244         drbd_flush_workqueue(&peer_device->connection->sender_work);
5245
5246         drbd_finish_peer_reqs(device);
5247
5248         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5249            might have issued a work again. The one before drbd_finish_peer_reqs() is
5250            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5251         drbd_flush_workqueue(&peer_device->connection->sender_work);
5252
5253         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5254          * again via drbd_try_clear_on_disk_bm(). */
5255         drbd_rs_cancel_all(device);
5256
5257         kfree(device->p_uuid);
5258         device->p_uuid = NULL;
5259
5260         if (!drbd_suspended(device))
5261                 tl_clear(peer_device->connection);
5262
5263         drbd_md_sync(device);
5264
5265         if (get_ldev(device)) {
5266                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5267                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5268                 put_ldev(device);
5269         }
5270
5271         /* tcp_close and release of sendpage pages can be deferred.  I don't
5272          * want to use SO_LINGER, because apparently it can be deferred for
5273          * more than 20 seconds (longest time I checked).
5274          *
5275          * Actually we don't care for exactly when the network stack does its
5276          * put_page(), but release our reference on these pages right here.
5277          */
5278         i = drbd_free_peer_reqs(device, &device->net_ee);
5279         if (i)
5280                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5281         i = atomic_read(&device->pp_in_use_by_net);
5282         if (i)
5283                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5284         i = atomic_read(&device->pp_in_use);
5285         if (i)
5286                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5287
5288         D_ASSERT(device, list_empty(&device->read_ee));
5289         D_ASSERT(device, list_empty(&device->active_ee));
5290         D_ASSERT(device, list_empty(&device->sync_ee));
5291         D_ASSERT(device, list_empty(&device->done_ee));
5292
5293         return 0;
5294 }
5295
5296 /*
5297  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5298  * we can agree on is stored in agreed_pro_version.
5299  *
5300  * feature flags and the reserved array should be enough room for future
5301  * enhancements of the handshake protocol, and possible plugins...
5302  *
5303  * for now, they are expected to be zero, but ignored.
5304  */
5305 static int drbd_send_features(struct drbd_connection *connection)
5306 {
5307         struct drbd_socket *sock;
5308         struct p_connection_features *p;
5309
5310         sock = &connection->data;
5311         p = conn_prepare_command(connection, sock);
5312         if (!p)
5313                 return -EIO;
5314         memset(p, 0, sizeof(*p));
5315         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5316         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5317         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5318         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5319 }
5320
5321 /*
5322  * return values:
5323  *   1 yes, we have a valid connection
5324  *   0 oops, did not work out, please try again
5325  *  -1 peer talks different language,
5326  *     no point in trying again, please go standalone.
5327  */
5328 static int drbd_do_features(struct drbd_connection *connection)
5329 {
5330         /* ASSERT current == connection->receiver ... */
5331         struct p_connection_features *p;
5332         const int expect = sizeof(struct p_connection_features);
5333         struct packet_info pi;
5334         int err;
5335
5336         err = drbd_send_features(connection);
5337         if (err)
5338                 return 0;
5339
5340         err = drbd_recv_header(connection, &pi);
5341         if (err)
5342                 return 0;
5343
5344         if (pi.cmd != P_CONNECTION_FEATURES) {
5345                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5346                          cmdname(pi.cmd), pi.cmd);
5347                 return -1;
5348         }
5349
5350         if (pi.size != expect) {
5351                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5352                      expect, pi.size);
5353                 return -1;
5354         }
5355
5356         p = pi.data;
5357         err = drbd_recv_all_warn(connection, p, expect);
5358         if (err)
5359                 return 0;
5360
5361         p->protocol_min = be32_to_cpu(p->protocol_min);
5362         p->protocol_max = be32_to_cpu(p->protocol_max);
5363         if (p->protocol_max == 0)
5364                 p->protocol_max = p->protocol_min;
5365
5366         if (PRO_VERSION_MAX < p->protocol_min ||
5367             PRO_VERSION_MIN > p->protocol_max)
5368                 goto incompat;
5369
5370         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5371         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5372
5373         drbd_info(connection, "Handshake successful: "
5374              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5375
5376         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s%s.\n",
5377                   connection->agreed_features,
5378                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5379                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5380                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" : "",
5381                   connection->agreed_features & DRBD_FF_WZEROES ? " WRITE_ZEROES" :
5382                   connection->agreed_features ? "" : " none");
5383
5384         return 1;
5385
5386  incompat:
5387         drbd_err(connection, "incompatible DRBD dialects: "
5388             "I support %d-%d, peer supports %d-%d\n",
5389             PRO_VERSION_MIN, PRO_VERSION_MAX,
5390             p->protocol_min, p->protocol_max);
5391         return -1;
5392 }
5393
5394 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5395 static int drbd_do_auth(struct drbd_connection *connection)
5396 {
5397         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5398         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5399         return -1;
5400 }
5401 #else
5402 #define CHALLENGE_LEN 64
5403
5404 /* Return value:
5405         1 - auth succeeded,
5406         0 - failed, try again (network error),
5407         -1 - auth failed, don't try again.
5408 */
5409
5410 static int drbd_do_auth(struct drbd_connection *connection)
5411 {
5412         struct drbd_socket *sock;
5413         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5414         char *response = NULL;
5415         char *right_response = NULL;
5416         char *peers_ch = NULL;
5417         unsigned int key_len;
5418         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5419         unsigned int resp_size;
5420         struct shash_desc *desc;
5421         struct packet_info pi;
5422         struct net_conf *nc;
5423         int err, rv;
5424
5425         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5426
5427         rcu_read_lock();
5428         nc = rcu_dereference(connection->net_conf);
5429         key_len = strlen(nc->shared_secret);
5430         memcpy(secret, nc->shared_secret, key_len);
5431         rcu_read_unlock();
5432
5433         desc = kmalloc(sizeof(struct shash_desc) +
5434                        crypto_shash_descsize(connection->cram_hmac_tfm),
5435                        GFP_KERNEL);
5436         if (!desc) {
5437                 rv = -1;
5438                 goto fail;
5439         }
5440         desc->tfm = connection->cram_hmac_tfm;
5441
5442         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5443         if (rv) {
5444                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5445                 rv = -1;
5446                 goto fail;
5447         }
5448
5449         get_random_bytes(my_challenge, CHALLENGE_LEN);
5450
5451         sock = &connection->data;
5452         if (!conn_prepare_command(connection, sock)) {
5453                 rv = 0;
5454                 goto fail;
5455         }
5456         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5457                                 my_challenge, CHALLENGE_LEN);
5458         if (!rv)
5459                 goto fail;
5460
5461         err = drbd_recv_header(connection, &pi);
5462         if (err) {
5463                 rv = 0;
5464                 goto fail;
5465         }
5466
5467         if (pi.cmd != P_AUTH_CHALLENGE) {
5468                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5469                          cmdname(pi.cmd), pi.cmd);
5470                 rv = -1;
5471                 goto fail;
5472         }
5473
5474         if (pi.size > CHALLENGE_LEN * 2) {
5475                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5476                 rv = -1;
5477                 goto fail;
5478         }
5479
5480         if (pi.size < CHALLENGE_LEN) {
5481                 drbd_err(connection, "AuthChallenge payload too small.\n");
5482                 rv = -1;
5483                 goto fail;
5484         }
5485
5486         peers_ch = kmalloc(pi.size, GFP_NOIO);
5487         if (peers_ch == NULL) {
5488                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5489                 rv = -1;
5490                 goto fail;
5491         }
5492
5493         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5494         if (err) {
5495                 rv = 0;
5496                 goto fail;
5497         }
5498
5499         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5500                 drbd_err(connection, "Peer presented the same challenge!\n");
5501                 rv = -1;
5502                 goto fail;
5503         }
5504
5505         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5506         response = kmalloc(resp_size, GFP_NOIO);
5507         if (response == NULL) {
5508                 drbd_err(connection, "kmalloc of response failed\n");
5509                 rv = -1;
5510                 goto fail;
5511         }
5512
5513         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5514         if (rv) {
5515                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5516                 rv = -1;
5517                 goto fail;
5518         }
5519
5520         if (!conn_prepare_command(connection, sock)) {
5521                 rv = 0;
5522                 goto fail;
5523         }
5524         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5525                                 response, resp_size);
5526         if (!rv)
5527                 goto fail;
5528
5529         err = drbd_recv_header(connection, &pi);
5530         if (err) {
5531                 rv = 0;
5532                 goto fail;
5533         }
5534
5535         if (pi.cmd != P_AUTH_RESPONSE) {
5536                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5537                          cmdname(pi.cmd), pi.cmd);
5538                 rv = 0;
5539                 goto fail;
5540         }
5541
5542         if (pi.size != resp_size) {
5543                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5544                 rv = 0;
5545                 goto fail;
5546         }
5547
5548         err = drbd_recv_all_warn(connection, response , resp_size);
5549         if (err) {
5550                 rv = 0;
5551                 goto fail;
5552         }
5553
5554         right_response = kmalloc(resp_size, GFP_NOIO);
5555         if (right_response == NULL) {
5556                 drbd_err(connection, "kmalloc of right_response failed\n");
5557                 rv = -1;
5558                 goto fail;
5559         }
5560
5561         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5562                                  right_response);
5563         if (rv) {
5564                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5565                 rv = -1;
5566                 goto fail;
5567         }
5568
5569         rv = !memcmp(response, right_response, resp_size);
5570
5571         if (rv)
5572                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5573                      resp_size);
5574         else
5575                 rv = -1;
5576
5577  fail:
5578         kfree(peers_ch);
5579         kfree(response);
5580         kfree(right_response);
5581         if (desc) {
5582                 shash_desc_zero(desc);
5583                 kfree(desc);
5584         }
5585
5586         return rv;
5587 }
5588 #endif
5589
5590 int drbd_receiver(struct drbd_thread *thi)
5591 {
5592         struct drbd_connection *connection = thi->connection;
5593         int h;
5594
5595         drbd_info(connection, "receiver (re)started\n");
5596
5597         do {
5598                 h = conn_connect(connection);
5599                 if (h == 0) {
5600                         conn_disconnect(connection);
5601                         schedule_timeout_interruptible(HZ);
5602                 }
5603                 if (h == -1) {
5604                         drbd_warn(connection, "Discarding network configuration.\n");
5605                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5606                 }
5607         } while (h == 0);
5608
5609         if (h > 0) {
5610                 blk_start_plug(&connection->receiver_plug);
5611                 drbdd(connection);
5612                 blk_finish_plug(&connection->receiver_plug);
5613         }
5614
5615         conn_disconnect(connection);
5616
5617         drbd_info(connection, "receiver terminated\n");
5618         return 0;
5619 }
5620
5621 /* ********* acknowledge sender ******** */
5622
5623 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5624 {
5625         struct p_req_state_reply *p = pi->data;
5626         int retcode = be32_to_cpu(p->retcode);
5627
5628         if (retcode >= SS_SUCCESS) {
5629                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5630         } else {
5631                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5632                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5633                          drbd_set_st_err_str(retcode), retcode);
5634         }
5635         wake_up(&connection->ping_wait);
5636
5637         return 0;
5638 }
5639
5640 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5641 {
5642         struct drbd_peer_device *peer_device;
5643         struct drbd_device *device;
5644         struct p_req_state_reply *p = pi->data;
5645         int retcode = be32_to_cpu(p->retcode);
5646
5647         peer_device = conn_peer_device(connection, pi->vnr);
5648         if (!peer_device)
5649                 return -EIO;
5650         device = peer_device->device;
5651
5652         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5653                 D_ASSERT(device, connection->agreed_pro_version < 100);
5654                 return got_conn_RqSReply(connection, pi);
5655         }
5656
5657         if (retcode >= SS_SUCCESS) {
5658                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5659         } else {
5660                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5661                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5662                         drbd_set_st_err_str(retcode), retcode);
5663         }
5664         wake_up(&device->state_wait);
5665
5666         return 0;
5667 }
5668
5669 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5670 {
5671         return drbd_send_ping_ack(connection);
5672
5673 }
5674
5675 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5676 {
5677         /* restore idle timeout */
5678         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5679         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5680                 wake_up(&connection->ping_wait);
5681
5682         return 0;
5683 }
5684
5685 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5686 {
5687         struct drbd_peer_device *peer_device;
5688         struct drbd_device *device;
5689         struct p_block_ack *p = pi->data;
5690         sector_t sector = be64_to_cpu(p->sector);
5691         int blksize = be32_to_cpu(p->blksize);
5692
5693         peer_device = conn_peer_device(connection, pi->vnr);
5694         if (!peer_device)
5695                 return -EIO;
5696         device = peer_device->device;
5697
5698         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5699
5700         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5701
5702         if (get_ldev(device)) {
5703                 drbd_rs_complete_io(device, sector);
5704                 drbd_set_in_sync(device, sector, blksize);
5705                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5706                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5707                 put_ldev(device);
5708         }
5709         dec_rs_pending(device);
5710         atomic_add(blksize >> 9, &device->rs_sect_in);
5711
5712         return 0;
5713 }
5714
5715 static int
5716 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5717                               struct rb_root *root, const char *func,
5718                               enum drbd_req_event what, bool missing_ok)
5719 {
5720         struct drbd_request *req;
5721         struct bio_and_error m;
5722
5723         spin_lock_irq(&device->resource->req_lock);
5724         req = find_request(device, root, id, sector, missing_ok, func);
5725         if (unlikely(!req)) {
5726                 spin_unlock_irq(&device->resource->req_lock);
5727                 return -EIO;
5728         }
5729         __req_mod(req, what, &m);
5730         spin_unlock_irq(&device->resource->req_lock);
5731
5732         if (m.bio)
5733                 complete_master_bio(device, &m);
5734         return 0;
5735 }
5736
5737 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5738 {
5739         struct drbd_peer_device *peer_device;
5740         struct drbd_device *device;
5741         struct p_block_ack *p = pi->data;
5742         sector_t sector = be64_to_cpu(p->sector);
5743         int blksize = be32_to_cpu(p->blksize);
5744         enum drbd_req_event what;
5745
5746         peer_device = conn_peer_device(connection, pi->vnr);
5747         if (!peer_device)
5748                 return -EIO;
5749         device = peer_device->device;
5750
5751         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5752
5753         if (p->block_id == ID_SYNCER) {
5754                 drbd_set_in_sync(device, sector, blksize);
5755                 dec_rs_pending(device);
5756                 return 0;
5757         }
5758         switch (pi->cmd) {
5759         case P_RS_WRITE_ACK:
5760                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5761                 break;
5762         case P_WRITE_ACK:
5763                 what = WRITE_ACKED_BY_PEER;
5764                 break;
5765         case P_RECV_ACK:
5766                 what = RECV_ACKED_BY_PEER;
5767                 break;
5768         case P_SUPERSEDED:
5769                 what = CONFLICT_RESOLVED;
5770                 break;
5771         case P_RETRY_WRITE:
5772                 what = POSTPONE_WRITE;
5773                 break;
5774         default:
5775                 BUG();
5776         }
5777
5778         return validate_req_change_req_state(device, p->block_id, sector,
5779                                              &device->write_requests, __func__,
5780                                              what, false);
5781 }
5782
5783 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5784 {
5785         struct drbd_peer_device *peer_device;
5786         struct drbd_device *device;
5787         struct p_block_ack *p = pi->data;
5788         sector_t sector = be64_to_cpu(p->sector);
5789         int size = be32_to_cpu(p->blksize);
5790         int err;
5791
5792         peer_device = conn_peer_device(connection, pi->vnr);
5793         if (!peer_device)
5794                 return -EIO;
5795         device = peer_device->device;
5796
5797         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5798
5799         if (p->block_id == ID_SYNCER) {
5800                 dec_rs_pending(device);
5801                 drbd_rs_failed_io(device, sector, size);
5802                 return 0;
5803         }
5804
5805         err = validate_req_change_req_state(device, p->block_id, sector,
5806                                             &device->write_requests, __func__,
5807                                             NEG_ACKED, true);
5808         if (err) {
5809                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5810                    The master bio might already be completed, therefore the
5811                    request is no longer in the collision hash. */
5812                 /* In Protocol B we might already have got a P_RECV_ACK
5813                    but then get a P_NEG_ACK afterwards. */
5814                 drbd_set_out_of_sync(device, sector, size);
5815         }
5816         return 0;
5817 }
5818
5819 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5820 {
5821         struct drbd_peer_device *peer_device;
5822         struct drbd_device *device;
5823         struct p_block_ack *p = pi->data;
5824         sector_t sector = be64_to_cpu(p->sector);
5825
5826         peer_device = conn_peer_device(connection, pi->vnr);
5827         if (!peer_device)
5828                 return -EIO;
5829         device = peer_device->device;
5830
5831         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5832
5833         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5834             (unsigned long long)sector, be32_to_cpu(p->blksize));
5835
5836         return validate_req_change_req_state(device, p->block_id, sector,
5837                                              &device->read_requests, __func__,
5838                                              NEG_ACKED, false);
5839 }
5840
5841 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5842 {
5843         struct drbd_peer_device *peer_device;
5844         struct drbd_device *device;
5845         sector_t sector;
5846         int size;
5847         struct p_block_ack *p = pi->data;
5848
5849         peer_device = conn_peer_device(connection, pi->vnr);
5850         if (!peer_device)
5851                 return -EIO;
5852         device = peer_device->device;
5853
5854         sector = be64_to_cpu(p->sector);
5855         size = be32_to_cpu(p->blksize);
5856
5857         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5858
5859         dec_rs_pending(device);
5860
5861         if (get_ldev_if_state(device, D_FAILED)) {
5862                 drbd_rs_complete_io(device, sector);
5863                 switch (pi->cmd) {
5864                 case P_NEG_RS_DREPLY:
5865                         drbd_rs_failed_io(device, sector, size);
5866                 case P_RS_CANCEL:
5867                         break;
5868                 default:
5869                         BUG();
5870                 }
5871                 put_ldev(device);
5872         }
5873
5874         return 0;
5875 }
5876
5877 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5878 {
5879         struct p_barrier_ack *p = pi->data;
5880         struct drbd_peer_device *peer_device;
5881         int vnr;
5882
5883         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5884
5885         rcu_read_lock();
5886         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5887                 struct drbd_device *device = peer_device->device;
5888
5889                 if (device->state.conn == C_AHEAD &&
5890                     atomic_read(&device->ap_in_flight) == 0 &&
5891                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5892                         device->start_resync_timer.expires = jiffies + HZ;
5893                         add_timer(&device->start_resync_timer);
5894                 }
5895         }
5896         rcu_read_unlock();
5897
5898         return 0;
5899 }
5900
5901 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5902 {
5903         struct drbd_peer_device *peer_device;
5904         struct drbd_device *device;
5905         struct p_block_ack *p = pi->data;
5906         struct drbd_device_work *dw;
5907         sector_t sector;
5908         int size;
5909
5910         peer_device = conn_peer_device(connection, pi->vnr);
5911         if (!peer_device)
5912                 return -EIO;
5913         device = peer_device->device;
5914
5915         sector = be64_to_cpu(p->sector);
5916         size = be32_to_cpu(p->blksize);
5917
5918         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5919
5920         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5921                 drbd_ov_out_of_sync_found(device, sector, size);
5922         else
5923                 ov_out_of_sync_print(device);
5924
5925         if (!get_ldev(device))
5926                 return 0;
5927
5928         drbd_rs_complete_io(device, sector);
5929         dec_rs_pending(device);
5930
5931         --device->ov_left;
5932
5933         /* let's advance progress step marks only for every other megabyte */
5934         if ((device->ov_left & 0x200) == 0x200)
5935                 drbd_advance_rs_marks(device, device->ov_left);
5936
5937         if (device->ov_left == 0) {
5938                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5939                 if (dw) {
5940                         dw->w.cb = w_ov_finished;
5941                         dw->device = device;
5942                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5943                 } else {
5944                         drbd_err(device, "kmalloc(dw) failed.");
5945                         ov_out_of_sync_print(device);
5946                         drbd_resync_finished(device);
5947                 }
5948         }
5949         put_ldev(device);
5950         return 0;
5951 }
5952
5953 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5954 {
5955         return 0;
5956 }
5957
5958 struct meta_sock_cmd {
5959         size_t pkt_size;
5960         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5961 };
5962
5963 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5964 {
5965         long t;
5966         struct net_conf *nc;
5967
5968         rcu_read_lock();
5969         nc = rcu_dereference(connection->net_conf);
5970         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5971         rcu_read_unlock();
5972
5973         t *= HZ;
5974         if (ping_timeout)
5975                 t /= 10;
5976
5977         connection->meta.socket->sk->sk_rcvtimeo = t;
5978 }
5979
5980 static void set_ping_timeout(struct drbd_connection *connection)
5981 {
5982         set_rcvtimeo(connection, 1);
5983 }
5984
5985 static void set_idle_timeout(struct drbd_connection *connection)
5986 {
5987         set_rcvtimeo(connection, 0);
5988 }
5989
5990 static struct meta_sock_cmd ack_receiver_tbl[] = {
5991         [P_PING]            = { 0, got_Ping },
5992         [P_PING_ACK]        = { 0, got_PingAck },
5993         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5994         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5995         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5996         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5997         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5998         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5999         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
6000         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
6001         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
6002         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
6003         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
6004         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
6005         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
6006         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
6007         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
6008 };
6009
6010 int drbd_ack_receiver(struct drbd_thread *thi)
6011 {
6012         struct drbd_connection *connection = thi->connection;
6013         struct meta_sock_cmd *cmd = NULL;
6014         struct packet_info pi;
6015         unsigned long pre_recv_jif;
6016         int rv;
6017         void *buf    = connection->meta.rbuf;
6018         int received = 0;
6019         unsigned int header_size = drbd_header_size(connection);
6020         int expect   = header_size;
6021         bool ping_timeout_active = false;
6022         struct sched_param param = { .sched_priority = 2 };
6023
6024         rv = sched_setscheduler(current, SCHED_RR, &param);
6025         if (rv < 0)
6026                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
6027
6028         while (get_t_state(thi) == RUNNING) {
6029                 drbd_thread_current_set_cpu(thi);
6030
6031                 conn_reclaim_net_peer_reqs(connection);
6032
6033                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
6034                         if (drbd_send_ping(connection)) {
6035                                 drbd_err(connection, "drbd_send_ping has failed\n");
6036                                 goto reconnect;
6037                         }
6038                         set_ping_timeout(connection);
6039                         ping_timeout_active = true;
6040                 }
6041
6042                 pre_recv_jif = jiffies;
6043                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
6044
6045                 /* Note:
6046                  * -EINTR        (on meta) we got a signal
6047                  * -EAGAIN       (on meta) rcvtimeo expired
6048                  * -ECONNRESET   other side closed the connection
6049                  * -ERESTARTSYS  (on data) we got a signal
6050                  * rv <  0       other than above: unexpected error!
6051                  * rv == expected: full header or command
6052                  * rv <  expected: "woken" by signal during receive
6053                  * rv == 0       : "connection shut down by peer"
6054                  */
6055                 if (likely(rv > 0)) {
6056                         received += rv;
6057                         buf      += rv;
6058                 } else if (rv == 0) {
6059                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
6060                                 long t;
6061                                 rcu_read_lock();
6062                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
6063                                 rcu_read_unlock();
6064
6065                                 t = wait_event_timeout(connection->ping_wait,
6066                                                        connection->cstate < C_WF_REPORT_PARAMS,
6067                                                        t);
6068                                 if (t)
6069                                         break;
6070                         }
6071                         drbd_err(connection, "meta connection shut down by peer.\n");
6072                         goto reconnect;
6073                 } else if (rv == -EAGAIN) {
6074                         /* If the data socket received something meanwhile,
6075                          * that is good enough: peer is still alive. */
6076                         if (time_after(connection->last_received, pre_recv_jif))
6077                                 continue;
6078                         if (ping_timeout_active) {
6079                                 drbd_err(connection, "PingAck did not arrive in time.\n");
6080                                 goto reconnect;
6081                         }
6082                         set_bit(SEND_PING, &connection->flags);
6083                         continue;
6084                 } else if (rv == -EINTR) {
6085                         /* maybe drbd_thread_stop(): the while condition will notice.
6086                          * maybe woken for send_ping: we'll send a ping above,
6087                          * and change the rcvtimeo */
6088                         flush_signals(current);
6089                         continue;
6090                 } else {
6091                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
6092                         goto reconnect;
6093                 }
6094
6095                 if (received == expect && cmd == NULL) {
6096                         if (decode_header(connection, connection->meta.rbuf, &pi))
6097                                 goto reconnect;
6098                         cmd = &ack_receiver_tbl[pi.cmd];
6099                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
6100                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
6101                                          cmdname(pi.cmd), pi.cmd);
6102                                 goto disconnect;
6103                         }
6104                         expect = header_size + cmd->pkt_size;
6105                         if (pi.size != expect - header_size) {
6106                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
6107                                         pi.cmd, pi.size);
6108                                 goto reconnect;
6109                         }
6110                 }
6111                 if (received == expect) {
6112                         bool err;
6113
6114                         err = cmd->fn(connection, &pi);
6115                         if (err) {
6116                                 drbd_err(connection, "%ps failed\n", cmd->fn);
6117                                 goto reconnect;
6118                         }
6119
6120                         connection->last_received = jiffies;
6121
6122                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
6123                                 set_idle_timeout(connection);
6124                                 ping_timeout_active = false;
6125                         }
6126
6127                         buf      = connection->meta.rbuf;
6128                         received = 0;
6129                         expect   = header_size;
6130                         cmd      = NULL;
6131                 }
6132         }
6133
6134         if (0) {
6135 reconnect:
6136                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6137                 conn_md_sync(connection);
6138         }
6139         if (0) {
6140 disconnect:
6141                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6142         }
6143
6144         drbd_info(connection, "ack_receiver terminated\n");
6145
6146         return 0;
6147 }
6148
6149 void drbd_send_acks_wf(struct work_struct *ws)
6150 {
6151         struct drbd_peer_device *peer_device =
6152                 container_of(ws, struct drbd_peer_device, send_acks_work);
6153         struct drbd_connection *connection = peer_device->connection;
6154         struct drbd_device *device = peer_device->device;
6155         struct net_conf *nc;
6156         int tcp_cork, err;
6157
6158         rcu_read_lock();
6159         nc = rcu_dereference(connection->net_conf);
6160         tcp_cork = nc->tcp_cork;
6161         rcu_read_unlock();
6162
6163         if (tcp_cork)
6164                 drbd_tcp_cork(connection->meta.socket);
6165
6166         err = drbd_finish_peer_reqs(device);
6167         kref_put(&device->kref, drbd_destroy_device);
6168         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6169            struct work_struct send_acks_work alive, which is in the peer_device object */
6170
6171         if (err) {
6172                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6173                 return;
6174         }
6175
6176         if (tcp_cork)
6177                 drbd_tcp_uncork(connection->meta.socket);
6178
6179         return;
6180 }