Merge tag 'for-v4.8' of git://git.kernel.org/pub/scm/linux/kernel/git/sre/linux-power...
[sfrench/cifs-2.6.git] / drivers / block / drbd / drbd_receiver.c
1 /*
2    drbd_receiver.c
3
4    This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
5
6    Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
7    Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
8    Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
9
10    drbd is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2, or (at your option)
13    any later version.
14
15    drbd is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19
20    You should have received a copy of the GNU General Public License
21    along with drbd; see the file COPYING.  If not, write to
22    the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25
26 #include <linux/module.h>
27
28 #include <linux/uaccess.h>
29 #include <net/sock.h>
30
31 #include <linux/drbd.h>
32 #include <linux/fs.h>
33 #include <linux/file.h>
34 #include <linux/in.h>
35 #include <linux/mm.h>
36 #include <linux/memcontrol.h>
37 #include <linux/mm_inline.h>
38 #include <linux/slab.h>
39 #include <linux/pkt_sched.h>
40 #define __KERNEL_SYSCALLS__
41 #include <linux/unistd.h>
42 #include <linux/vmalloc.h>
43 #include <linux/random.h>
44 #include <linux/string.h>
45 #include <linux/scatterlist.h>
46 #include "drbd_int.h"
47 #include "drbd_protocol.h"
48 #include "drbd_req.h"
49 #include "drbd_vli.h"
50
51 #define PRO_FEATURES (DRBD_FF_TRIM|DRBD_FF_THIN_RESYNC|DRBD_FF_WSAME)
52
53 struct packet_info {
54         enum drbd_packet cmd;
55         unsigned int size;
56         unsigned int vnr;
57         void *data;
58 };
59
60 enum finish_epoch {
61         FE_STILL_LIVE,
62         FE_DESTROYED,
63         FE_RECYCLED,
64 };
65
66 static int drbd_do_features(struct drbd_connection *connection);
67 static int drbd_do_auth(struct drbd_connection *connection);
68 static int drbd_disconnected(struct drbd_peer_device *);
69 static void conn_wait_active_ee_empty(struct drbd_connection *connection);
70 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *, struct drbd_epoch *, enum epoch_event);
71 static int e_end_block(struct drbd_work *, int);
72
73
74 #define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
75
76 /*
77  * some helper functions to deal with single linked page lists,
78  * page->private being our "next" pointer.
79  */
80
81 /* If at least n pages are linked at head, get n pages off.
82  * Otherwise, don't modify head, and return NULL.
83  * Locking is the responsibility of the caller.
84  */
85 static struct page *page_chain_del(struct page **head, int n)
86 {
87         struct page *page;
88         struct page *tmp;
89
90         BUG_ON(!n);
91         BUG_ON(!head);
92
93         page = *head;
94
95         if (!page)
96                 return NULL;
97
98         while (page) {
99                 tmp = page_chain_next(page);
100                 if (--n == 0)
101                         break; /* found sufficient pages */
102                 if (tmp == NULL)
103                         /* insufficient pages, don't use any of them. */
104                         return NULL;
105                 page = tmp;
106         }
107
108         /* add end of list marker for the returned list */
109         set_page_private(page, 0);
110         /* actual return value, and adjustment of head */
111         page = *head;
112         *head = tmp;
113         return page;
114 }
115
116 /* may be used outside of locks to find the tail of a (usually short)
117  * "private" page chain, before adding it back to a global chain head
118  * with page_chain_add() under a spinlock. */
119 static struct page *page_chain_tail(struct page *page, int *len)
120 {
121         struct page *tmp;
122         int i = 1;
123         while ((tmp = page_chain_next(page)))
124                 ++i, page = tmp;
125         if (len)
126                 *len = i;
127         return page;
128 }
129
130 static int page_chain_free(struct page *page)
131 {
132         struct page *tmp;
133         int i = 0;
134         page_chain_for_each_safe(page, tmp) {
135                 put_page(page);
136                 ++i;
137         }
138         return i;
139 }
140
141 static void page_chain_add(struct page **head,
142                 struct page *chain_first, struct page *chain_last)
143 {
144 #if 1
145         struct page *tmp;
146         tmp = page_chain_tail(chain_first, NULL);
147         BUG_ON(tmp != chain_last);
148 #endif
149
150         /* add chain to head */
151         set_page_private(chain_last, (unsigned long)*head);
152         *head = chain_first;
153 }
154
155 static struct page *__drbd_alloc_pages(struct drbd_device *device,
156                                        unsigned int number)
157 {
158         struct page *page = NULL;
159         struct page *tmp = NULL;
160         unsigned int i = 0;
161
162         /* Yes, testing drbd_pp_vacant outside the lock is racy.
163          * So what. It saves a spin_lock. */
164         if (drbd_pp_vacant >= number) {
165                 spin_lock(&drbd_pp_lock);
166                 page = page_chain_del(&drbd_pp_pool, number);
167                 if (page)
168                         drbd_pp_vacant -= number;
169                 spin_unlock(&drbd_pp_lock);
170                 if (page)
171                         return page;
172         }
173
174         /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
175          * "criss-cross" setup, that might cause write-out on some other DRBD,
176          * which in turn might block on the other node at this very place.  */
177         for (i = 0; i < number; i++) {
178                 tmp = alloc_page(GFP_TRY);
179                 if (!tmp)
180                         break;
181                 set_page_private(tmp, (unsigned long)page);
182                 page = tmp;
183         }
184
185         if (i == number)
186                 return page;
187
188         /* Not enough pages immediately available this time.
189          * No need to jump around here, drbd_alloc_pages will retry this
190          * function "soon". */
191         if (page) {
192                 tmp = page_chain_tail(page, NULL);
193                 spin_lock(&drbd_pp_lock);
194                 page_chain_add(&drbd_pp_pool, page, tmp);
195                 drbd_pp_vacant += i;
196                 spin_unlock(&drbd_pp_lock);
197         }
198         return NULL;
199 }
200
201 static void reclaim_finished_net_peer_reqs(struct drbd_device *device,
202                                            struct list_head *to_be_freed)
203 {
204         struct drbd_peer_request *peer_req, *tmp;
205
206         /* The EEs are always appended to the end of the list. Since
207            they are sent in order over the wire, they have to finish
208            in order. As soon as we see the first not finished we can
209            stop to examine the list... */
210
211         list_for_each_entry_safe(peer_req, tmp, &device->net_ee, w.list) {
212                 if (drbd_peer_req_has_active_page(peer_req))
213                         break;
214                 list_move(&peer_req->w.list, to_be_freed);
215         }
216 }
217
218 static void drbd_reclaim_net_peer_reqs(struct drbd_device *device)
219 {
220         LIST_HEAD(reclaimed);
221         struct drbd_peer_request *peer_req, *t;
222
223         spin_lock_irq(&device->resource->req_lock);
224         reclaim_finished_net_peer_reqs(device, &reclaimed);
225         spin_unlock_irq(&device->resource->req_lock);
226         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
227                 drbd_free_net_peer_req(device, peer_req);
228 }
229
230 static void conn_reclaim_net_peer_reqs(struct drbd_connection *connection)
231 {
232         struct drbd_peer_device *peer_device;
233         int vnr;
234
235         rcu_read_lock();
236         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
237                 struct drbd_device *device = peer_device->device;
238                 if (!atomic_read(&device->pp_in_use_by_net))
239                         continue;
240
241                 kref_get(&device->kref);
242                 rcu_read_unlock();
243                 drbd_reclaim_net_peer_reqs(device);
244                 kref_put(&device->kref, drbd_destroy_device);
245                 rcu_read_lock();
246         }
247         rcu_read_unlock();
248 }
249
250 /**
251  * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
252  * @device:     DRBD device.
253  * @number:     number of pages requested
254  * @retry:      whether to retry, if not enough pages are available right now
255  *
256  * Tries to allocate number pages, first from our own page pool, then from
257  * the kernel.
258  * Possibly retry until DRBD frees sufficient pages somewhere else.
259  *
260  * If this allocation would exceed the max_buffers setting, we throttle
261  * allocation (schedule_timeout) to give the system some room to breathe.
262  *
263  * We do not use max-buffers as hard limit, because it could lead to
264  * congestion and further to a distributed deadlock during online-verify or
265  * (checksum based) resync, if the max-buffers, socket buffer sizes and
266  * resync-rate settings are mis-configured.
267  *
268  * Returns a page chain linked via page->private.
269  */
270 struct page *drbd_alloc_pages(struct drbd_peer_device *peer_device, unsigned int number,
271                               bool retry)
272 {
273         struct drbd_device *device = peer_device->device;
274         struct page *page = NULL;
275         struct net_conf *nc;
276         DEFINE_WAIT(wait);
277         unsigned int mxb;
278
279         rcu_read_lock();
280         nc = rcu_dereference(peer_device->connection->net_conf);
281         mxb = nc ? nc->max_buffers : 1000000;
282         rcu_read_unlock();
283
284         if (atomic_read(&device->pp_in_use) < mxb)
285                 page = __drbd_alloc_pages(device, number);
286
287         /* Try to keep the fast path fast, but occasionally we need
288          * to reclaim the pages we lended to the network stack. */
289         if (page && atomic_read(&device->pp_in_use_by_net) > 512)
290                 drbd_reclaim_net_peer_reqs(device);
291
292         while (page == NULL) {
293                 prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
294
295                 drbd_reclaim_net_peer_reqs(device);
296
297                 if (atomic_read(&device->pp_in_use) < mxb) {
298                         page = __drbd_alloc_pages(device, number);
299                         if (page)
300                                 break;
301                 }
302
303                 if (!retry)
304                         break;
305
306                 if (signal_pending(current)) {
307                         drbd_warn(device, "drbd_alloc_pages interrupted!\n");
308                         break;
309                 }
310
311                 if (schedule_timeout(HZ/10) == 0)
312                         mxb = UINT_MAX;
313         }
314         finish_wait(&drbd_pp_wait, &wait);
315
316         if (page)
317                 atomic_add(number, &device->pp_in_use);
318         return page;
319 }
320
321 /* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
322  * Is also used from inside an other spin_lock_irq(&resource->req_lock);
323  * Either links the page chain back to the global pool,
324  * or returns all pages to the system. */
325 static void drbd_free_pages(struct drbd_device *device, struct page *page, int is_net)
326 {
327         atomic_t *a = is_net ? &device->pp_in_use_by_net : &device->pp_in_use;
328         int i;
329
330         if (page == NULL)
331                 return;
332
333         if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
334                 i = page_chain_free(page);
335         else {
336                 struct page *tmp;
337                 tmp = page_chain_tail(page, &i);
338                 spin_lock(&drbd_pp_lock);
339                 page_chain_add(&drbd_pp_pool, page, tmp);
340                 drbd_pp_vacant += i;
341                 spin_unlock(&drbd_pp_lock);
342         }
343         i = atomic_sub_return(i, a);
344         if (i < 0)
345                 drbd_warn(device, "ASSERTION FAILED: %s: %d < 0\n",
346                         is_net ? "pp_in_use_by_net" : "pp_in_use", i);
347         wake_up(&drbd_pp_wait);
348 }
349
350 /*
351 You need to hold the req_lock:
352  _drbd_wait_ee_list_empty()
353
354 You must not have the req_lock:
355  drbd_free_peer_req()
356  drbd_alloc_peer_req()
357  drbd_free_peer_reqs()
358  drbd_ee_fix_bhs()
359  drbd_finish_peer_reqs()
360  drbd_clear_done_ee()
361  drbd_wait_ee_list_empty()
362 */
363
364 /* normal: payload_size == request size (bi_size)
365  * w_same: payload_size == logical_block_size
366  * trim: payload_size == 0 */
367 struct drbd_peer_request *
368 drbd_alloc_peer_req(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
369                     unsigned int request_size, unsigned int payload_size, gfp_t gfp_mask) __must_hold(local)
370 {
371         struct drbd_device *device = peer_device->device;
372         struct drbd_peer_request *peer_req;
373         struct page *page = NULL;
374         unsigned nr_pages = (payload_size + PAGE_SIZE -1) >> PAGE_SHIFT;
375
376         if (drbd_insert_fault(device, DRBD_FAULT_AL_EE))
377                 return NULL;
378
379         peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
380         if (!peer_req) {
381                 if (!(gfp_mask & __GFP_NOWARN))
382                         drbd_err(device, "%s: allocation failed\n", __func__);
383                 return NULL;
384         }
385
386         if (nr_pages) {
387                 page = drbd_alloc_pages(peer_device, nr_pages,
388                                         gfpflags_allow_blocking(gfp_mask));
389                 if (!page)
390                         goto fail;
391         }
392
393         memset(peer_req, 0, sizeof(*peer_req));
394         INIT_LIST_HEAD(&peer_req->w.list);
395         drbd_clear_interval(&peer_req->i);
396         peer_req->i.size = request_size;
397         peer_req->i.sector = sector;
398         peer_req->submit_jif = jiffies;
399         peer_req->peer_device = peer_device;
400         peer_req->pages = page;
401         /*
402          * The block_id is opaque to the receiver.  It is not endianness
403          * converted, and sent back to the sender unchanged.
404          */
405         peer_req->block_id = id;
406
407         return peer_req;
408
409  fail:
410         mempool_free(peer_req, drbd_ee_mempool);
411         return NULL;
412 }
413
414 void __drbd_free_peer_req(struct drbd_device *device, struct drbd_peer_request *peer_req,
415                        int is_net)
416 {
417         might_sleep();
418         if (peer_req->flags & EE_HAS_DIGEST)
419                 kfree(peer_req->digest);
420         drbd_free_pages(device, peer_req->pages, is_net);
421         D_ASSERT(device, atomic_read(&peer_req->pending_bios) == 0);
422         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
423         if (!expect(!(peer_req->flags & EE_CALL_AL_COMPLETE_IO))) {
424                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
425                 drbd_al_complete_io(device, &peer_req->i);
426         }
427         mempool_free(peer_req, drbd_ee_mempool);
428 }
429
430 int drbd_free_peer_reqs(struct drbd_device *device, struct list_head *list)
431 {
432         LIST_HEAD(work_list);
433         struct drbd_peer_request *peer_req, *t;
434         int count = 0;
435         int is_net = list == &device->net_ee;
436
437         spin_lock_irq(&device->resource->req_lock);
438         list_splice_init(list, &work_list);
439         spin_unlock_irq(&device->resource->req_lock);
440
441         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
442                 __drbd_free_peer_req(device, peer_req, is_net);
443                 count++;
444         }
445         return count;
446 }
447
448 /*
449  * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
450  */
451 static int drbd_finish_peer_reqs(struct drbd_device *device)
452 {
453         LIST_HEAD(work_list);
454         LIST_HEAD(reclaimed);
455         struct drbd_peer_request *peer_req, *t;
456         int err = 0;
457
458         spin_lock_irq(&device->resource->req_lock);
459         reclaim_finished_net_peer_reqs(device, &reclaimed);
460         list_splice_init(&device->done_ee, &work_list);
461         spin_unlock_irq(&device->resource->req_lock);
462
463         list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
464                 drbd_free_net_peer_req(device, peer_req);
465
466         /* possible callbacks here:
467          * e_end_block, and e_end_resync_block, e_send_superseded.
468          * all ignore the last argument.
469          */
470         list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
471                 int err2;
472
473                 /* list_del not necessary, next/prev members not touched */
474                 err2 = peer_req->w.cb(&peer_req->w, !!err);
475                 if (!err)
476                         err = err2;
477                 drbd_free_peer_req(device, peer_req);
478         }
479         wake_up(&device->ee_wait);
480
481         return err;
482 }
483
484 static void _drbd_wait_ee_list_empty(struct drbd_device *device,
485                                      struct list_head *head)
486 {
487         DEFINE_WAIT(wait);
488
489         /* avoids spin_lock/unlock
490          * and calling prepare_to_wait in the fast path */
491         while (!list_empty(head)) {
492                 prepare_to_wait(&device->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
493                 spin_unlock_irq(&device->resource->req_lock);
494                 io_schedule();
495                 finish_wait(&device->ee_wait, &wait);
496                 spin_lock_irq(&device->resource->req_lock);
497         }
498 }
499
500 static void drbd_wait_ee_list_empty(struct drbd_device *device,
501                                     struct list_head *head)
502 {
503         spin_lock_irq(&device->resource->req_lock);
504         _drbd_wait_ee_list_empty(device, head);
505         spin_unlock_irq(&device->resource->req_lock);
506 }
507
508 static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
509 {
510         struct kvec iov = {
511                 .iov_base = buf,
512                 .iov_len = size,
513         };
514         struct msghdr msg = {
515                 .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
516         };
517         return kernel_recvmsg(sock, &msg, &iov, 1, size, msg.msg_flags);
518 }
519
520 static int drbd_recv(struct drbd_connection *connection, void *buf, size_t size)
521 {
522         int rv;
523
524         rv = drbd_recv_short(connection->data.socket, buf, size, 0);
525
526         if (rv < 0) {
527                 if (rv == -ECONNRESET)
528                         drbd_info(connection, "sock was reset by peer\n");
529                 else if (rv != -ERESTARTSYS)
530                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
531         } else if (rv == 0) {
532                 if (test_bit(DISCONNECT_SENT, &connection->flags)) {
533                         long t;
534                         rcu_read_lock();
535                         t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
536                         rcu_read_unlock();
537
538                         t = wait_event_timeout(connection->ping_wait, connection->cstate < C_WF_REPORT_PARAMS, t);
539
540                         if (t)
541                                 goto out;
542                 }
543                 drbd_info(connection, "sock was shut down by peer\n");
544         }
545
546         if (rv != size)
547                 conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
548
549 out:
550         return rv;
551 }
552
553 static int drbd_recv_all(struct drbd_connection *connection, void *buf, size_t size)
554 {
555         int err;
556
557         err = drbd_recv(connection, buf, size);
558         if (err != size) {
559                 if (err >= 0)
560                         err = -EIO;
561         } else
562                 err = 0;
563         return err;
564 }
565
566 static int drbd_recv_all_warn(struct drbd_connection *connection, void *buf, size_t size)
567 {
568         int err;
569
570         err = drbd_recv_all(connection, buf, size);
571         if (err && !signal_pending(current))
572                 drbd_warn(connection, "short read (expected size %d)\n", (int)size);
573         return err;
574 }
575
576 /* quoting tcp(7):
577  *   On individual connections, the socket buffer size must be set prior to the
578  *   listen(2) or connect(2) calls in order to have it take effect.
579  * This is our wrapper to do so.
580  */
581 static void drbd_setbufsize(struct socket *sock, unsigned int snd,
582                 unsigned int rcv)
583 {
584         /* open coded SO_SNDBUF, SO_RCVBUF */
585         if (snd) {
586                 sock->sk->sk_sndbuf = snd;
587                 sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
588         }
589         if (rcv) {
590                 sock->sk->sk_rcvbuf = rcv;
591                 sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
592         }
593 }
594
595 static struct socket *drbd_try_connect(struct drbd_connection *connection)
596 {
597         const char *what;
598         struct socket *sock;
599         struct sockaddr_in6 src_in6;
600         struct sockaddr_in6 peer_in6;
601         struct net_conf *nc;
602         int err, peer_addr_len, my_addr_len;
603         int sndbuf_size, rcvbuf_size, connect_int;
604         int disconnect_on_error = 1;
605
606         rcu_read_lock();
607         nc = rcu_dereference(connection->net_conf);
608         if (!nc) {
609                 rcu_read_unlock();
610                 return NULL;
611         }
612         sndbuf_size = nc->sndbuf_size;
613         rcvbuf_size = nc->rcvbuf_size;
614         connect_int = nc->connect_int;
615         rcu_read_unlock();
616
617         my_addr_len = min_t(int, connection->my_addr_len, sizeof(src_in6));
618         memcpy(&src_in6, &connection->my_addr, my_addr_len);
619
620         if (((struct sockaddr *)&connection->my_addr)->sa_family == AF_INET6)
621                 src_in6.sin6_port = 0;
622         else
623                 ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
624
625         peer_addr_len = min_t(int, connection->peer_addr_len, sizeof(src_in6));
626         memcpy(&peer_in6, &connection->peer_addr, peer_addr_len);
627
628         what = "sock_create_kern";
629         err = sock_create_kern(&init_net, ((struct sockaddr *)&src_in6)->sa_family,
630                                SOCK_STREAM, IPPROTO_TCP, &sock);
631         if (err < 0) {
632                 sock = NULL;
633                 goto out;
634         }
635
636         sock->sk->sk_rcvtimeo =
637         sock->sk->sk_sndtimeo = connect_int * HZ;
638         drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
639
640        /* explicitly bind to the configured IP as source IP
641         *  for the outgoing connections.
642         *  This is needed for multihomed hosts and to be
643         *  able to use lo: interfaces for drbd.
644         * Make sure to use 0 as port number, so linux selects
645         *  a free one dynamically.
646         */
647         what = "bind before connect";
648         err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
649         if (err < 0)
650                 goto out;
651
652         /* connect may fail, peer not yet available.
653          * stay C_WF_CONNECTION, don't go Disconnecting! */
654         disconnect_on_error = 0;
655         what = "connect";
656         err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
657
658 out:
659         if (err < 0) {
660                 if (sock) {
661                         sock_release(sock);
662                         sock = NULL;
663                 }
664                 switch (-err) {
665                         /* timeout, busy, signal pending */
666                 case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
667                 case EINTR: case ERESTARTSYS:
668                         /* peer not (yet) available, network problem */
669                 case ECONNREFUSED: case ENETUNREACH:
670                 case EHOSTDOWN:    case EHOSTUNREACH:
671                         disconnect_on_error = 0;
672                         break;
673                 default:
674                         drbd_err(connection, "%s failed, err = %d\n", what, err);
675                 }
676                 if (disconnect_on_error)
677                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
678         }
679
680         return sock;
681 }
682
683 struct accept_wait_data {
684         struct drbd_connection *connection;
685         struct socket *s_listen;
686         struct completion door_bell;
687         void (*original_sk_state_change)(struct sock *sk);
688
689 };
690
691 static void drbd_incoming_connection(struct sock *sk)
692 {
693         struct accept_wait_data *ad = sk->sk_user_data;
694         void (*state_change)(struct sock *sk);
695
696         state_change = ad->original_sk_state_change;
697         if (sk->sk_state == TCP_ESTABLISHED)
698                 complete(&ad->door_bell);
699         state_change(sk);
700 }
701
702 static int prepare_listen_socket(struct drbd_connection *connection, struct accept_wait_data *ad)
703 {
704         int err, sndbuf_size, rcvbuf_size, my_addr_len;
705         struct sockaddr_in6 my_addr;
706         struct socket *s_listen;
707         struct net_conf *nc;
708         const char *what;
709
710         rcu_read_lock();
711         nc = rcu_dereference(connection->net_conf);
712         if (!nc) {
713                 rcu_read_unlock();
714                 return -EIO;
715         }
716         sndbuf_size = nc->sndbuf_size;
717         rcvbuf_size = nc->rcvbuf_size;
718         rcu_read_unlock();
719
720         my_addr_len = min_t(int, connection->my_addr_len, sizeof(struct sockaddr_in6));
721         memcpy(&my_addr, &connection->my_addr, my_addr_len);
722
723         what = "sock_create_kern";
724         err = sock_create_kern(&init_net, ((struct sockaddr *)&my_addr)->sa_family,
725                                SOCK_STREAM, IPPROTO_TCP, &s_listen);
726         if (err) {
727                 s_listen = NULL;
728                 goto out;
729         }
730
731         s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
732         drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
733
734         what = "bind before listen";
735         err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
736         if (err < 0)
737                 goto out;
738
739         ad->s_listen = s_listen;
740         write_lock_bh(&s_listen->sk->sk_callback_lock);
741         ad->original_sk_state_change = s_listen->sk->sk_state_change;
742         s_listen->sk->sk_state_change = drbd_incoming_connection;
743         s_listen->sk->sk_user_data = ad;
744         write_unlock_bh(&s_listen->sk->sk_callback_lock);
745
746         what = "listen";
747         err = s_listen->ops->listen(s_listen, 5);
748         if (err < 0)
749                 goto out;
750
751         return 0;
752 out:
753         if (s_listen)
754                 sock_release(s_listen);
755         if (err < 0) {
756                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
757                         drbd_err(connection, "%s failed, err = %d\n", what, err);
758                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
759                 }
760         }
761
762         return -EIO;
763 }
764
765 static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
766 {
767         write_lock_bh(&sk->sk_callback_lock);
768         sk->sk_state_change = ad->original_sk_state_change;
769         sk->sk_user_data = NULL;
770         write_unlock_bh(&sk->sk_callback_lock);
771 }
772
773 static struct socket *drbd_wait_for_connect(struct drbd_connection *connection, struct accept_wait_data *ad)
774 {
775         int timeo, connect_int, err = 0;
776         struct socket *s_estab = NULL;
777         struct net_conf *nc;
778
779         rcu_read_lock();
780         nc = rcu_dereference(connection->net_conf);
781         if (!nc) {
782                 rcu_read_unlock();
783                 return NULL;
784         }
785         connect_int = nc->connect_int;
786         rcu_read_unlock();
787
788         timeo = connect_int * HZ;
789         /* 28.5% random jitter */
790         timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
791
792         err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
793         if (err <= 0)
794                 return NULL;
795
796         err = kernel_accept(ad->s_listen, &s_estab, 0);
797         if (err < 0) {
798                 if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
799                         drbd_err(connection, "accept failed, err = %d\n", err);
800                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
801                 }
802         }
803
804         if (s_estab)
805                 unregister_state_change(s_estab->sk, ad);
806
807         return s_estab;
808 }
809
810 static int decode_header(struct drbd_connection *, void *, struct packet_info *);
811
812 static int send_first_packet(struct drbd_connection *connection, struct drbd_socket *sock,
813                              enum drbd_packet cmd)
814 {
815         if (!conn_prepare_command(connection, sock))
816                 return -EIO;
817         return conn_send_command(connection, sock, cmd, 0, NULL, 0);
818 }
819
820 static int receive_first_packet(struct drbd_connection *connection, struct socket *sock)
821 {
822         unsigned int header_size = drbd_header_size(connection);
823         struct packet_info pi;
824         struct net_conf *nc;
825         int err;
826
827         rcu_read_lock();
828         nc = rcu_dereference(connection->net_conf);
829         if (!nc) {
830                 rcu_read_unlock();
831                 return -EIO;
832         }
833         sock->sk->sk_rcvtimeo = nc->ping_timeo * 4 * HZ / 10;
834         rcu_read_unlock();
835
836         err = drbd_recv_short(sock, connection->data.rbuf, header_size, 0);
837         if (err != header_size) {
838                 if (err >= 0)
839                         err = -EIO;
840                 return err;
841         }
842         err = decode_header(connection, connection->data.rbuf, &pi);
843         if (err)
844                 return err;
845         return pi.cmd;
846 }
847
848 /**
849  * drbd_socket_okay() - Free the socket if its connection is not okay
850  * @sock:       pointer to the pointer to the socket.
851  */
852 static bool drbd_socket_okay(struct socket **sock)
853 {
854         int rr;
855         char tb[4];
856
857         if (!*sock)
858                 return false;
859
860         rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
861
862         if (rr > 0 || rr == -EAGAIN) {
863                 return true;
864         } else {
865                 sock_release(*sock);
866                 *sock = NULL;
867                 return false;
868         }
869 }
870
871 static bool connection_established(struct drbd_connection *connection,
872                                    struct socket **sock1,
873                                    struct socket **sock2)
874 {
875         struct net_conf *nc;
876         int timeout;
877         bool ok;
878
879         if (!*sock1 || !*sock2)
880                 return false;
881
882         rcu_read_lock();
883         nc = rcu_dereference(connection->net_conf);
884         timeout = (nc->sock_check_timeo ?: nc->ping_timeo) * HZ / 10;
885         rcu_read_unlock();
886         schedule_timeout_interruptible(timeout);
887
888         ok = drbd_socket_okay(sock1);
889         ok = drbd_socket_okay(sock2) && ok;
890
891         return ok;
892 }
893
894 /* Gets called if a connection is established, or if a new minor gets created
895    in a connection */
896 int drbd_connected(struct drbd_peer_device *peer_device)
897 {
898         struct drbd_device *device = peer_device->device;
899         int err;
900
901         atomic_set(&device->packet_seq, 0);
902         device->peer_seq = 0;
903
904         device->state_mutex = peer_device->connection->agreed_pro_version < 100 ?
905                 &peer_device->connection->cstate_mutex :
906                 &device->own_state_mutex;
907
908         err = drbd_send_sync_param(peer_device);
909         if (!err)
910                 err = drbd_send_sizes(peer_device, 0, 0);
911         if (!err)
912                 err = drbd_send_uuids(peer_device);
913         if (!err)
914                 err = drbd_send_current_state(peer_device);
915         clear_bit(USE_DEGR_WFC_T, &device->flags);
916         clear_bit(RESIZE_PENDING, &device->flags);
917         atomic_set(&device->ap_in_flight, 0);
918         mod_timer(&device->request_timer, jiffies + HZ); /* just start it here. */
919         return err;
920 }
921
922 /*
923  * return values:
924  *   1 yes, we have a valid connection
925  *   0 oops, did not work out, please try again
926  *  -1 peer talks different language,
927  *     no point in trying again, please go standalone.
928  *  -2 We do not have a network config...
929  */
930 static int conn_connect(struct drbd_connection *connection)
931 {
932         struct drbd_socket sock, msock;
933         struct drbd_peer_device *peer_device;
934         struct net_conf *nc;
935         int vnr, timeout, h;
936         bool discard_my_data, ok;
937         enum drbd_state_rv rv;
938         struct accept_wait_data ad = {
939                 .connection = connection,
940                 .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
941         };
942
943         clear_bit(DISCONNECT_SENT, &connection->flags);
944         if (conn_request_state(connection, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
945                 return -2;
946
947         mutex_init(&sock.mutex);
948         sock.sbuf = connection->data.sbuf;
949         sock.rbuf = connection->data.rbuf;
950         sock.socket = NULL;
951         mutex_init(&msock.mutex);
952         msock.sbuf = connection->meta.sbuf;
953         msock.rbuf = connection->meta.rbuf;
954         msock.socket = NULL;
955
956         /* Assume that the peer only understands protocol 80 until we know better.  */
957         connection->agreed_pro_version = 80;
958
959         if (prepare_listen_socket(connection, &ad))
960                 return 0;
961
962         do {
963                 struct socket *s;
964
965                 s = drbd_try_connect(connection);
966                 if (s) {
967                         if (!sock.socket) {
968                                 sock.socket = s;
969                                 send_first_packet(connection, &sock, P_INITIAL_DATA);
970                         } else if (!msock.socket) {
971                                 clear_bit(RESOLVE_CONFLICTS, &connection->flags);
972                                 msock.socket = s;
973                                 send_first_packet(connection, &msock, P_INITIAL_META);
974                         } else {
975                                 drbd_err(connection, "Logic error in conn_connect()\n");
976                                 goto out_release_sockets;
977                         }
978                 }
979
980                 if (connection_established(connection, &sock.socket, &msock.socket))
981                         break;
982
983 retry:
984                 s = drbd_wait_for_connect(connection, &ad);
985                 if (s) {
986                         int fp = receive_first_packet(connection, s);
987                         drbd_socket_okay(&sock.socket);
988                         drbd_socket_okay(&msock.socket);
989                         switch (fp) {
990                         case P_INITIAL_DATA:
991                                 if (sock.socket) {
992                                         drbd_warn(connection, "initial packet S crossed\n");
993                                         sock_release(sock.socket);
994                                         sock.socket = s;
995                                         goto randomize;
996                                 }
997                                 sock.socket = s;
998                                 break;
999                         case P_INITIAL_META:
1000                                 set_bit(RESOLVE_CONFLICTS, &connection->flags);
1001                                 if (msock.socket) {
1002                                         drbd_warn(connection, "initial packet M crossed\n");
1003                                         sock_release(msock.socket);
1004                                         msock.socket = s;
1005                                         goto randomize;
1006                                 }
1007                                 msock.socket = s;
1008                                 break;
1009                         default:
1010                                 drbd_warn(connection, "Error receiving initial packet\n");
1011                                 sock_release(s);
1012 randomize:
1013                                 if (prandom_u32() & 1)
1014                                         goto retry;
1015                         }
1016                 }
1017
1018                 if (connection->cstate <= C_DISCONNECTING)
1019                         goto out_release_sockets;
1020                 if (signal_pending(current)) {
1021                         flush_signals(current);
1022                         smp_rmb();
1023                         if (get_t_state(&connection->receiver) == EXITING)
1024                                 goto out_release_sockets;
1025                 }
1026
1027                 ok = connection_established(connection, &sock.socket, &msock.socket);
1028         } while (!ok);
1029
1030         if (ad.s_listen)
1031                 sock_release(ad.s_listen);
1032
1033         sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1034         msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
1035
1036         sock.socket->sk->sk_allocation = GFP_NOIO;
1037         msock.socket->sk->sk_allocation = GFP_NOIO;
1038
1039         sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
1040         msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
1041
1042         /* NOT YET ...
1043          * sock.socket->sk->sk_sndtimeo = connection->net_conf->timeout*HZ/10;
1044          * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1045          * first set it to the P_CONNECTION_FEATURES timeout,
1046          * which we set to 4x the configured ping_timeout. */
1047         rcu_read_lock();
1048         nc = rcu_dereference(connection->net_conf);
1049
1050         sock.socket->sk->sk_sndtimeo =
1051         sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
1052
1053         msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1054         timeout = nc->timeout * HZ / 10;
1055         discard_my_data = nc->discard_my_data;
1056         rcu_read_unlock();
1057
1058         msock.socket->sk->sk_sndtimeo = timeout;
1059
1060         /* we don't want delays.
1061          * we use TCP_CORK where appropriate, though */
1062         drbd_tcp_nodelay(sock.socket);
1063         drbd_tcp_nodelay(msock.socket);
1064
1065         connection->data.socket = sock.socket;
1066         connection->meta.socket = msock.socket;
1067         connection->last_received = jiffies;
1068
1069         h = drbd_do_features(connection);
1070         if (h <= 0)
1071                 return h;
1072
1073         if (connection->cram_hmac_tfm) {
1074                 /* drbd_request_state(device, NS(conn, WFAuth)); */
1075                 switch (drbd_do_auth(connection)) {
1076                 case -1:
1077                         drbd_err(connection, "Authentication of peer failed\n");
1078                         return -1;
1079                 case 0:
1080                         drbd_err(connection, "Authentication of peer failed, trying again.\n");
1081                         return 0;
1082                 }
1083         }
1084
1085         connection->data.socket->sk->sk_sndtimeo = timeout;
1086         connection->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1087
1088         if (drbd_send_protocol(connection) == -EOPNOTSUPP)
1089                 return -1;
1090
1091         /* Prevent a race between resync-handshake and
1092          * being promoted to Primary.
1093          *
1094          * Grab and release the state mutex, so we know that any current
1095          * drbd_set_role() is finished, and any incoming drbd_set_role
1096          * will see the STATE_SENT flag, and wait for it to be cleared.
1097          */
1098         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1099                 mutex_lock(peer_device->device->state_mutex);
1100
1101         set_bit(STATE_SENT, &connection->flags);
1102
1103         idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
1104                 mutex_unlock(peer_device->device->state_mutex);
1105
1106         rcu_read_lock();
1107         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1108                 struct drbd_device *device = peer_device->device;
1109                 kref_get(&device->kref);
1110                 rcu_read_unlock();
1111
1112                 if (discard_my_data)
1113                         set_bit(DISCARD_MY_DATA, &device->flags);
1114                 else
1115                         clear_bit(DISCARD_MY_DATA, &device->flags);
1116
1117                 drbd_connected(peer_device);
1118                 kref_put(&device->kref, drbd_destroy_device);
1119                 rcu_read_lock();
1120         }
1121         rcu_read_unlock();
1122
1123         rv = conn_request_state(connection, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1124         if (rv < SS_SUCCESS || connection->cstate != C_WF_REPORT_PARAMS) {
1125                 clear_bit(STATE_SENT, &connection->flags);
1126                 return 0;
1127         }
1128
1129         drbd_thread_start(&connection->ack_receiver);
1130         /* opencoded create_singlethread_workqueue(),
1131          * to be able to use format string arguments */
1132         connection->ack_sender =
1133                 alloc_ordered_workqueue("drbd_as_%s", WQ_MEM_RECLAIM, connection->resource->name);
1134         if (!connection->ack_sender) {
1135                 drbd_err(connection, "Failed to create workqueue ack_sender\n");
1136                 return 0;
1137         }
1138
1139         mutex_lock(&connection->resource->conf_update);
1140         /* The discard_my_data flag is a single-shot modifier to the next
1141          * connection attempt, the handshake of which is now well underway.
1142          * No need for rcu style copying of the whole struct
1143          * just to clear a single value. */
1144         connection->net_conf->discard_my_data = 0;
1145         mutex_unlock(&connection->resource->conf_update);
1146
1147         return h;
1148
1149 out_release_sockets:
1150         if (ad.s_listen)
1151                 sock_release(ad.s_listen);
1152         if (sock.socket)
1153                 sock_release(sock.socket);
1154         if (msock.socket)
1155                 sock_release(msock.socket);
1156         return -1;
1157 }
1158
1159 static int decode_header(struct drbd_connection *connection, void *header, struct packet_info *pi)
1160 {
1161         unsigned int header_size = drbd_header_size(connection);
1162
1163         if (header_size == sizeof(struct p_header100) &&
1164             *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1165                 struct p_header100 *h = header;
1166                 if (h->pad != 0) {
1167                         drbd_err(connection, "Header padding is not zero\n");
1168                         return -EINVAL;
1169                 }
1170                 pi->vnr = be16_to_cpu(h->volume);
1171                 pi->cmd = be16_to_cpu(h->command);
1172                 pi->size = be32_to_cpu(h->length);
1173         } else if (header_size == sizeof(struct p_header95) &&
1174                    *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1175                 struct p_header95 *h = header;
1176                 pi->cmd = be16_to_cpu(h->command);
1177                 pi->size = be32_to_cpu(h->length);
1178                 pi->vnr = 0;
1179         } else if (header_size == sizeof(struct p_header80) &&
1180                    *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1181                 struct p_header80 *h = header;
1182                 pi->cmd = be16_to_cpu(h->command);
1183                 pi->size = be16_to_cpu(h->length);
1184                 pi->vnr = 0;
1185         } else {
1186                 drbd_err(connection, "Wrong magic value 0x%08x in protocol version %d\n",
1187                          be32_to_cpu(*(__be32 *)header),
1188                          connection->agreed_pro_version);
1189                 return -EINVAL;
1190         }
1191         pi->data = header + header_size;
1192         return 0;
1193 }
1194
1195 static int drbd_recv_header(struct drbd_connection *connection, struct packet_info *pi)
1196 {
1197         void *buffer = connection->data.rbuf;
1198         int err;
1199
1200         err = drbd_recv_all_warn(connection, buffer, drbd_header_size(connection));
1201         if (err)
1202                 return err;
1203
1204         err = decode_header(connection, buffer, pi);
1205         connection->last_received = jiffies;
1206
1207         return err;
1208 }
1209
1210 /* This is blkdev_issue_flush, but asynchronous.
1211  * We want to submit to all component volumes in parallel,
1212  * then wait for all completions.
1213  */
1214 struct issue_flush_context {
1215         atomic_t pending;
1216         int error;
1217         struct completion done;
1218 };
1219 struct one_flush_context {
1220         struct drbd_device *device;
1221         struct issue_flush_context *ctx;
1222 };
1223
1224 void one_flush_endio(struct bio *bio)
1225 {
1226         struct one_flush_context *octx = bio->bi_private;
1227         struct drbd_device *device = octx->device;
1228         struct issue_flush_context *ctx = octx->ctx;
1229
1230         if (bio->bi_error) {
1231                 ctx->error = bio->bi_error;
1232                 drbd_info(device, "local disk FLUSH FAILED with status %d\n", bio->bi_error);
1233         }
1234         kfree(octx);
1235         bio_put(bio);
1236
1237         clear_bit(FLUSH_PENDING, &device->flags);
1238         put_ldev(device);
1239         kref_put(&device->kref, drbd_destroy_device);
1240
1241         if (atomic_dec_and_test(&ctx->pending))
1242                 complete(&ctx->done);
1243 }
1244
1245 static void submit_one_flush(struct drbd_device *device, struct issue_flush_context *ctx)
1246 {
1247         struct bio *bio = bio_alloc(GFP_NOIO, 0);
1248         struct one_flush_context *octx = kmalloc(sizeof(*octx), GFP_NOIO);
1249         if (!bio || !octx) {
1250                 drbd_warn(device, "Could not allocate a bio, CANNOT ISSUE FLUSH\n");
1251                 /* FIXME: what else can I do now?  disconnecting or detaching
1252                  * really does not help to improve the state of the world, either.
1253                  */
1254                 kfree(octx);
1255                 if (bio)
1256                         bio_put(bio);
1257
1258                 ctx->error = -ENOMEM;
1259                 put_ldev(device);
1260                 kref_put(&device->kref, drbd_destroy_device);
1261                 return;
1262         }
1263
1264         octx->device = device;
1265         octx->ctx = ctx;
1266         bio->bi_bdev = device->ldev->backing_bdev;
1267         bio->bi_private = octx;
1268         bio->bi_end_io = one_flush_endio;
1269         bio_set_op_attrs(bio, REQ_OP_FLUSH, WRITE_FLUSH);
1270
1271         device->flush_jif = jiffies;
1272         set_bit(FLUSH_PENDING, &device->flags);
1273         atomic_inc(&ctx->pending);
1274         submit_bio(bio);
1275 }
1276
1277 static void drbd_flush(struct drbd_connection *connection)
1278 {
1279         if (connection->resource->write_ordering >= WO_BDEV_FLUSH) {
1280                 struct drbd_peer_device *peer_device;
1281                 struct issue_flush_context ctx;
1282                 int vnr;
1283
1284                 atomic_set(&ctx.pending, 1);
1285                 ctx.error = 0;
1286                 init_completion(&ctx.done);
1287
1288                 rcu_read_lock();
1289                 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1290                         struct drbd_device *device = peer_device->device;
1291
1292                         if (!get_ldev(device))
1293                                 continue;
1294                         kref_get(&device->kref);
1295                         rcu_read_unlock();
1296
1297                         submit_one_flush(device, &ctx);
1298
1299                         rcu_read_lock();
1300                 }
1301                 rcu_read_unlock();
1302
1303                 /* Do we want to add a timeout,
1304                  * if disk-timeout is set? */
1305                 if (!atomic_dec_and_test(&ctx.pending))
1306                         wait_for_completion(&ctx.done);
1307
1308                 if (ctx.error) {
1309                         /* would rather check on EOPNOTSUPP, but that is not reliable.
1310                          * don't try again for ANY return value != 0
1311                          * if (rv == -EOPNOTSUPP) */
1312                         /* Any error is already reported by bio_endio callback. */
1313                         drbd_bump_write_ordering(connection->resource, NULL, WO_DRAIN_IO);
1314                 }
1315         }
1316 }
1317
1318 /**
1319  * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1320  * @device:     DRBD device.
1321  * @epoch:      Epoch object.
1322  * @ev:         Epoch event.
1323  */
1324 static enum finish_epoch drbd_may_finish_epoch(struct drbd_connection *connection,
1325                                                struct drbd_epoch *epoch,
1326                                                enum epoch_event ev)
1327 {
1328         int epoch_size;
1329         struct drbd_epoch *next_epoch;
1330         enum finish_epoch rv = FE_STILL_LIVE;
1331
1332         spin_lock(&connection->epoch_lock);
1333         do {
1334                 next_epoch = NULL;
1335
1336                 epoch_size = atomic_read(&epoch->epoch_size);
1337
1338                 switch (ev & ~EV_CLEANUP) {
1339                 case EV_PUT:
1340                         atomic_dec(&epoch->active);
1341                         break;
1342                 case EV_GOT_BARRIER_NR:
1343                         set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1344                         break;
1345                 case EV_BECAME_LAST:
1346                         /* nothing to do*/
1347                         break;
1348                 }
1349
1350                 if (epoch_size != 0 &&
1351                     atomic_read(&epoch->active) == 0 &&
1352                     (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1353                         if (!(ev & EV_CLEANUP)) {
1354                                 spin_unlock(&connection->epoch_lock);
1355                                 drbd_send_b_ack(epoch->connection, epoch->barrier_nr, epoch_size);
1356                                 spin_lock(&connection->epoch_lock);
1357                         }
1358 #if 0
1359                         /* FIXME: dec unacked on connection, once we have
1360                          * something to count pending connection packets in. */
1361                         if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1362                                 dec_unacked(epoch->connection);
1363 #endif
1364
1365                         if (connection->current_epoch != epoch) {
1366                                 next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1367                                 list_del(&epoch->list);
1368                                 ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1369                                 connection->epochs--;
1370                                 kfree(epoch);
1371
1372                                 if (rv == FE_STILL_LIVE)
1373                                         rv = FE_DESTROYED;
1374                         } else {
1375                                 epoch->flags = 0;
1376                                 atomic_set(&epoch->epoch_size, 0);
1377                                 /* atomic_set(&epoch->active, 0); is already zero */
1378                                 if (rv == FE_STILL_LIVE)
1379                                         rv = FE_RECYCLED;
1380                         }
1381                 }
1382
1383                 if (!next_epoch)
1384                         break;
1385
1386                 epoch = next_epoch;
1387         } while (1);
1388
1389         spin_unlock(&connection->epoch_lock);
1390
1391         return rv;
1392 }
1393
1394 static enum write_ordering_e
1395 max_allowed_wo(struct drbd_backing_dev *bdev, enum write_ordering_e wo)
1396 {
1397         struct disk_conf *dc;
1398
1399         dc = rcu_dereference(bdev->disk_conf);
1400
1401         if (wo == WO_BDEV_FLUSH && !dc->disk_flushes)
1402                 wo = WO_DRAIN_IO;
1403         if (wo == WO_DRAIN_IO && !dc->disk_drain)
1404                 wo = WO_NONE;
1405
1406         return wo;
1407 }
1408
1409 /**
1410  * drbd_bump_write_ordering() - Fall back to an other write ordering method
1411  * @connection: DRBD connection.
1412  * @wo:         Write ordering method to try.
1413  */
1414 void drbd_bump_write_ordering(struct drbd_resource *resource, struct drbd_backing_dev *bdev,
1415                               enum write_ordering_e wo)
1416 {
1417         struct drbd_device *device;
1418         enum write_ordering_e pwo;
1419         int vnr;
1420         static char *write_ordering_str[] = {
1421                 [WO_NONE] = "none",
1422                 [WO_DRAIN_IO] = "drain",
1423                 [WO_BDEV_FLUSH] = "flush",
1424         };
1425
1426         pwo = resource->write_ordering;
1427         if (wo != WO_BDEV_FLUSH)
1428                 wo = min(pwo, wo);
1429         rcu_read_lock();
1430         idr_for_each_entry(&resource->devices, device, vnr) {
1431                 if (get_ldev(device)) {
1432                         wo = max_allowed_wo(device->ldev, wo);
1433                         if (device->ldev == bdev)
1434                                 bdev = NULL;
1435                         put_ldev(device);
1436                 }
1437         }
1438
1439         if (bdev)
1440                 wo = max_allowed_wo(bdev, wo);
1441
1442         rcu_read_unlock();
1443
1444         resource->write_ordering = wo;
1445         if (pwo != resource->write_ordering || wo == WO_BDEV_FLUSH)
1446                 drbd_info(resource, "Method to ensure write ordering: %s\n", write_ordering_str[resource->write_ordering]);
1447 }
1448
1449 /*
1450  * We *may* ignore the discard-zeroes-data setting, if so configured.
1451  *
1452  * Assumption is that it "discard_zeroes_data=0" is only because the backend
1453  * may ignore partial unaligned discards.
1454  *
1455  * LVM/DM thin as of at least
1456  *   LVM version:     2.02.115(2)-RHEL7 (2015-01-28)
1457  *   Library version: 1.02.93-RHEL7 (2015-01-28)
1458  *   Driver version:  4.29.0
1459  * still behaves this way.
1460  *
1461  * For unaligned (wrt. alignment and granularity) or too small discards,
1462  * we zero-out the initial (and/or) trailing unaligned partial chunks,
1463  * but discard all the aligned full chunks.
1464  *
1465  * At least for LVM/DM thin, the result is effectively "discard_zeroes_data=1".
1466  */
1467 int drbd_issue_discard_or_zero_out(struct drbd_device *device, sector_t start, unsigned int nr_sectors, bool discard)
1468 {
1469         struct block_device *bdev = device->ldev->backing_bdev;
1470         struct request_queue *q = bdev_get_queue(bdev);
1471         sector_t tmp, nr;
1472         unsigned int max_discard_sectors, granularity;
1473         int alignment;
1474         int err = 0;
1475
1476         if (!discard)
1477                 goto zero_out;
1478
1479         /* Zero-sector (unknown) and one-sector granularities are the same.  */
1480         granularity = max(q->limits.discard_granularity >> 9, 1U);
1481         alignment = (bdev_discard_alignment(bdev) >> 9) % granularity;
1482
1483         max_discard_sectors = min(q->limits.max_discard_sectors, (1U << 22));
1484         max_discard_sectors -= max_discard_sectors % granularity;
1485         if (unlikely(!max_discard_sectors))
1486                 goto zero_out;
1487
1488         if (nr_sectors < granularity)
1489                 goto zero_out;
1490
1491         tmp = start;
1492         if (sector_div(tmp, granularity) != alignment) {
1493                 if (nr_sectors < 2*granularity)
1494                         goto zero_out;
1495                 /* start + gran - (start + gran - align) % gran */
1496                 tmp = start + granularity - alignment;
1497                 tmp = start + granularity - sector_div(tmp, granularity);
1498
1499                 nr = tmp - start;
1500                 err |= blkdev_issue_zeroout(bdev, start, nr, GFP_NOIO, 0);
1501                 nr_sectors -= nr;
1502                 start = tmp;
1503         }
1504         while (nr_sectors >= granularity) {
1505                 nr = min_t(sector_t, nr_sectors, max_discard_sectors);
1506                 err |= blkdev_issue_discard(bdev, start, nr, GFP_NOIO, 0);
1507                 nr_sectors -= nr;
1508                 start += nr;
1509         }
1510  zero_out:
1511         if (nr_sectors) {
1512                 err |= blkdev_issue_zeroout(bdev, start, nr_sectors, GFP_NOIO, 0);
1513         }
1514         return err != 0;
1515 }
1516
1517 static bool can_do_reliable_discards(struct drbd_device *device)
1518 {
1519         struct request_queue *q = bdev_get_queue(device->ldev->backing_bdev);
1520         struct disk_conf *dc;
1521         bool can_do;
1522
1523         if (!blk_queue_discard(q))
1524                 return false;
1525
1526         if (q->limits.discard_zeroes_data)
1527                 return true;
1528
1529         rcu_read_lock();
1530         dc = rcu_dereference(device->ldev->disk_conf);
1531         can_do = dc->discard_zeroes_if_aligned;
1532         rcu_read_unlock();
1533         return can_do;
1534 }
1535
1536 static void drbd_issue_peer_discard(struct drbd_device *device, struct drbd_peer_request *peer_req)
1537 {
1538         /* If the backend cannot discard, or does not guarantee
1539          * read-back zeroes in discarded ranges, we fall back to
1540          * zero-out.  Unless configuration specifically requested
1541          * otherwise. */
1542         if (!can_do_reliable_discards(device))
1543                 peer_req->flags |= EE_IS_TRIM_USE_ZEROOUT;
1544
1545         if (drbd_issue_discard_or_zero_out(device, peer_req->i.sector,
1546             peer_req->i.size >> 9, !(peer_req->flags & EE_IS_TRIM_USE_ZEROOUT)))
1547                 peer_req->flags |= EE_WAS_ERROR;
1548         drbd_endio_write_sec_final(peer_req);
1549 }
1550
1551 static void drbd_issue_peer_wsame(struct drbd_device *device,
1552                                   struct drbd_peer_request *peer_req)
1553 {
1554         struct block_device *bdev = device->ldev->backing_bdev;
1555         sector_t s = peer_req->i.sector;
1556         sector_t nr = peer_req->i.size >> 9;
1557         if (blkdev_issue_write_same(bdev, s, nr, GFP_NOIO, peer_req->pages))
1558                 peer_req->flags |= EE_WAS_ERROR;
1559         drbd_endio_write_sec_final(peer_req);
1560 }
1561
1562
1563 /**
1564  * drbd_submit_peer_request()
1565  * @device:     DRBD device.
1566  * @peer_req:   peer request
1567  * @rw:         flag field, see bio->bi_rw
1568  *
1569  * May spread the pages to multiple bios,
1570  * depending on bio_add_page restrictions.
1571  *
1572  * Returns 0 if all bios have been submitted,
1573  * -ENOMEM if we could not allocate enough bios,
1574  * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1575  *  single page to an empty bio (which should never happen and likely indicates
1576  *  that the lower level IO stack is in some way broken). This has been observed
1577  *  on certain Xen deployments.
1578  */
1579 /* TODO allocate from our own bio_set. */
1580 int drbd_submit_peer_request(struct drbd_device *device,
1581                              struct drbd_peer_request *peer_req,
1582                              const unsigned op, const unsigned op_flags,
1583                              const int fault_type)
1584 {
1585         struct bio *bios = NULL;
1586         struct bio *bio;
1587         struct page *page = peer_req->pages;
1588         sector_t sector = peer_req->i.sector;
1589         unsigned data_size = peer_req->i.size;
1590         unsigned n_bios = 0;
1591         unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
1592         int err = -ENOMEM;
1593
1594         /* TRIM/DISCARD: for now, always use the helper function
1595          * blkdev_issue_zeroout(..., discard=true).
1596          * It's synchronous, but it does the right thing wrt. bio splitting.
1597          * Correctness first, performance later.  Next step is to code an
1598          * asynchronous variant of the same.
1599          */
1600         if (peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) {
1601                 /* wait for all pending IO completions, before we start
1602                  * zeroing things out. */
1603                 conn_wait_active_ee_empty(peer_req->peer_device->connection);
1604                 /* add it to the active list now,
1605                  * so we can find it to present it in debugfs */
1606                 peer_req->submit_jif = jiffies;
1607                 peer_req->flags |= EE_SUBMITTED;
1608
1609                 /* If this was a resync request from receive_rs_deallocated(),
1610                  * it is already on the sync_ee list */
1611                 if (list_empty(&peer_req->w.list)) {
1612                         spin_lock_irq(&device->resource->req_lock);
1613                         list_add_tail(&peer_req->w.list, &device->active_ee);
1614                         spin_unlock_irq(&device->resource->req_lock);
1615                 }
1616
1617                 if (peer_req->flags & EE_IS_TRIM)
1618                         drbd_issue_peer_discard(device, peer_req);
1619                 else /* EE_WRITE_SAME */
1620                         drbd_issue_peer_wsame(device, peer_req);
1621                 return 0;
1622         }
1623
1624         /* In most cases, we will only need one bio.  But in case the lower
1625          * level restrictions happen to be different at this offset on this
1626          * side than those of the sending peer, we may need to submit the
1627          * request in more than one bio.
1628          *
1629          * Plain bio_alloc is good enough here, this is no DRBD internally
1630          * generated bio, but a bio allocated on behalf of the peer.
1631          */
1632 next_bio:
1633         bio = bio_alloc(GFP_NOIO, nr_pages);
1634         if (!bio) {
1635                 drbd_err(device, "submit_ee: Allocation of a bio failed (nr_pages=%u)\n", nr_pages);
1636                 goto fail;
1637         }
1638         /* > peer_req->i.sector, unless this is the first bio */
1639         bio->bi_iter.bi_sector = sector;
1640         bio->bi_bdev = device->ldev->backing_bdev;
1641         bio_set_op_attrs(bio, op, op_flags);
1642         bio->bi_private = peer_req;
1643         bio->bi_end_io = drbd_peer_request_endio;
1644
1645         bio->bi_next = bios;
1646         bios = bio;
1647         ++n_bios;
1648
1649         page_chain_for_each(page) {
1650                 unsigned len = min_t(unsigned, data_size, PAGE_SIZE);
1651                 if (!bio_add_page(bio, page, len, 0)) {
1652                         /* A single page must always be possible!
1653                          * But in case it fails anyways,
1654                          * we deal with it, and complain (below). */
1655                         if (bio->bi_vcnt == 0) {
1656                                 drbd_err(device,
1657                                         "bio_add_page failed for len=%u, "
1658                                         "bi_vcnt=0 (bi_sector=%llu)\n",
1659                                         len, (uint64_t)bio->bi_iter.bi_sector);
1660                                 err = -ENOSPC;
1661                                 goto fail;
1662                         }
1663                         goto next_bio;
1664                 }
1665                 data_size -= len;
1666                 sector += len >> 9;
1667                 --nr_pages;
1668         }
1669         D_ASSERT(device, data_size == 0);
1670         D_ASSERT(device, page == NULL);
1671
1672         atomic_set(&peer_req->pending_bios, n_bios);
1673         /* for debugfs: update timestamp, mark as submitted */
1674         peer_req->submit_jif = jiffies;
1675         peer_req->flags |= EE_SUBMITTED;
1676         do {
1677                 bio = bios;
1678                 bios = bios->bi_next;
1679                 bio->bi_next = NULL;
1680
1681                 drbd_generic_make_request(device, fault_type, bio);
1682         } while (bios);
1683         return 0;
1684
1685 fail:
1686         while (bios) {
1687                 bio = bios;
1688                 bios = bios->bi_next;
1689                 bio_put(bio);
1690         }
1691         return err;
1692 }
1693
1694 static void drbd_remove_epoch_entry_interval(struct drbd_device *device,
1695                                              struct drbd_peer_request *peer_req)
1696 {
1697         struct drbd_interval *i = &peer_req->i;
1698
1699         drbd_remove_interval(&device->write_requests, i);
1700         drbd_clear_interval(i);
1701
1702         /* Wake up any processes waiting for this peer request to complete.  */
1703         if (i->waiting)
1704                 wake_up(&device->misc_wait);
1705 }
1706
1707 static void conn_wait_active_ee_empty(struct drbd_connection *connection)
1708 {
1709         struct drbd_peer_device *peer_device;
1710         int vnr;
1711
1712         rcu_read_lock();
1713         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1714                 struct drbd_device *device = peer_device->device;
1715
1716                 kref_get(&device->kref);
1717                 rcu_read_unlock();
1718                 drbd_wait_ee_list_empty(device, &device->active_ee);
1719                 kref_put(&device->kref, drbd_destroy_device);
1720                 rcu_read_lock();
1721         }
1722         rcu_read_unlock();
1723 }
1724
1725 static int receive_Barrier(struct drbd_connection *connection, struct packet_info *pi)
1726 {
1727         int rv;
1728         struct p_barrier *p = pi->data;
1729         struct drbd_epoch *epoch;
1730
1731         /* FIXME these are unacked on connection,
1732          * not a specific (peer)device.
1733          */
1734         connection->current_epoch->barrier_nr = p->barrier;
1735         connection->current_epoch->connection = connection;
1736         rv = drbd_may_finish_epoch(connection, connection->current_epoch, EV_GOT_BARRIER_NR);
1737
1738         /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1739          * the activity log, which means it would not be resynced in case the
1740          * R_PRIMARY crashes now.
1741          * Therefore we must send the barrier_ack after the barrier request was
1742          * completed. */
1743         switch (connection->resource->write_ordering) {
1744         case WO_NONE:
1745                 if (rv == FE_RECYCLED)
1746                         return 0;
1747
1748                 /* receiver context, in the writeout path of the other node.
1749                  * avoid potential distributed deadlock */
1750                 epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1751                 if (epoch)
1752                         break;
1753                 else
1754                         drbd_warn(connection, "Allocation of an epoch failed, slowing down\n");
1755                         /* Fall through */
1756
1757         case WO_BDEV_FLUSH:
1758         case WO_DRAIN_IO:
1759                 conn_wait_active_ee_empty(connection);
1760                 drbd_flush(connection);
1761
1762                 if (atomic_read(&connection->current_epoch->epoch_size)) {
1763                         epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1764                         if (epoch)
1765                                 break;
1766                 }
1767
1768                 return 0;
1769         default:
1770                 drbd_err(connection, "Strangeness in connection->write_ordering %d\n",
1771                          connection->resource->write_ordering);
1772                 return -EIO;
1773         }
1774
1775         epoch->flags = 0;
1776         atomic_set(&epoch->epoch_size, 0);
1777         atomic_set(&epoch->active, 0);
1778
1779         spin_lock(&connection->epoch_lock);
1780         if (atomic_read(&connection->current_epoch->epoch_size)) {
1781                 list_add(&epoch->list, &connection->current_epoch->list);
1782                 connection->current_epoch = epoch;
1783                 connection->epochs++;
1784         } else {
1785                 /* The current_epoch got recycled while we allocated this one... */
1786                 kfree(epoch);
1787         }
1788         spin_unlock(&connection->epoch_lock);
1789
1790         return 0;
1791 }
1792
1793 /* quick wrapper in case payload size != request_size (write same) */
1794 static void drbd_csum_ee_size(struct crypto_ahash *h,
1795                               struct drbd_peer_request *r, void *d,
1796                               unsigned int payload_size)
1797 {
1798         unsigned int tmp = r->i.size;
1799         r->i.size = payload_size;
1800         drbd_csum_ee(h, r, d);
1801         r->i.size = tmp;
1802 }
1803
1804 /* used from receive_RSDataReply (recv_resync_read)
1805  * and from receive_Data.
1806  * data_size: actual payload ("data in")
1807  *      for normal writes that is bi_size.
1808  *      for discards, that is zero.
1809  *      for write same, it is logical_block_size.
1810  * both trim and write same have the bi_size ("data len to be affected")
1811  * as extra argument in the packet header.
1812  */
1813 static struct drbd_peer_request *
1814 read_in_block(struct drbd_peer_device *peer_device, u64 id, sector_t sector,
1815               struct packet_info *pi) __must_hold(local)
1816 {
1817         struct drbd_device *device = peer_device->device;
1818         const sector_t capacity = drbd_get_capacity(device->this_bdev);
1819         struct drbd_peer_request *peer_req;
1820         struct page *page;
1821         int digest_size, err;
1822         unsigned int data_size = pi->size, ds;
1823         void *dig_in = peer_device->connection->int_dig_in;
1824         void *dig_vv = peer_device->connection->int_dig_vv;
1825         unsigned long *data;
1826         struct p_trim *trim = (pi->cmd == P_TRIM) ? pi->data : NULL;
1827         struct p_trim *wsame = (pi->cmd == P_WSAME) ? pi->data : NULL;
1828
1829         digest_size = 0;
1830         if (!trim && peer_device->connection->peer_integrity_tfm) {
1831                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1832                 /*
1833                  * FIXME: Receive the incoming digest into the receive buffer
1834                  *        here, together with its struct p_data?
1835                  */
1836                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1837                 if (err)
1838                         return NULL;
1839                 data_size -= digest_size;
1840         }
1841
1842         /* assume request_size == data_size, but special case trim and wsame. */
1843         ds = data_size;
1844         if (trim) {
1845                 if (!expect(data_size == 0))
1846                         return NULL;
1847                 ds = be32_to_cpu(trim->size);
1848         } else if (wsame) {
1849                 if (data_size != queue_logical_block_size(device->rq_queue)) {
1850                         drbd_err(peer_device, "data size (%u) != drbd logical block size (%u)\n",
1851                                 data_size, queue_logical_block_size(device->rq_queue));
1852                         return NULL;
1853                 }
1854                 if (data_size != bdev_logical_block_size(device->ldev->backing_bdev)) {
1855                         drbd_err(peer_device, "data size (%u) != backend logical block size (%u)\n",
1856                                 data_size, bdev_logical_block_size(device->ldev->backing_bdev));
1857                         return NULL;
1858                 }
1859                 ds = be32_to_cpu(wsame->size);
1860         }
1861
1862         if (!expect(IS_ALIGNED(ds, 512)))
1863                 return NULL;
1864         if (trim || wsame) {
1865                 if (!expect(ds <= (DRBD_MAX_BBIO_SECTORS << 9)))
1866                         return NULL;
1867         } else if (!expect(ds <= DRBD_MAX_BIO_SIZE))
1868                 return NULL;
1869
1870         /* even though we trust out peer,
1871          * we sometimes have to double check. */
1872         if (sector + (ds>>9) > capacity) {
1873                 drbd_err(device, "request from peer beyond end of local disk: "
1874                         "capacity: %llus < sector: %llus + size: %u\n",
1875                         (unsigned long long)capacity,
1876                         (unsigned long long)sector, ds);
1877                 return NULL;
1878         }
1879
1880         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1881          * "criss-cross" setup, that might cause write-out on some other DRBD,
1882          * which in turn might block on the other node at this very place.  */
1883         peer_req = drbd_alloc_peer_req(peer_device, id, sector, ds, data_size, GFP_NOIO);
1884         if (!peer_req)
1885                 return NULL;
1886
1887         peer_req->flags |= EE_WRITE;
1888         if (trim) {
1889                 peer_req->flags |= EE_IS_TRIM;
1890                 return peer_req;
1891         }
1892         if (wsame)
1893                 peer_req->flags |= EE_WRITE_SAME;
1894
1895         /* receive payload size bytes into page chain */
1896         ds = data_size;
1897         page = peer_req->pages;
1898         page_chain_for_each(page) {
1899                 unsigned len = min_t(int, ds, PAGE_SIZE);
1900                 data = kmap(page);
1901                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1902                 if (drbd_insert_fault(device, DRBD_FAULT_RECEIVE)) {
1903                         drbd_err(device, "Fault injection: Corrupting data on receive\n");
1904                         data[0] = data[0] ^ (unsigned long)-1;
1905                 }
1906                 kunmap(page);
1907                 if (err) {
1908                         drbd_free_peer_req(device, peer_req);
1909                         return NULL;
1910                 }
1911                 ds -= len;
1912         }
1913
1914         if (digest_size) {
1915                 drbd_csum_ee_size(peer_device->connection->peer_integrity_tfm, peer_req, dig_vv, data_size);
1916                 if (memcmp(dig_in, dig_vv, digest_size)) {
1917                         drbd_err(device, "Digest integrity check FAILED: %llus +%u\n",
1918                                 (unsigned long long)sector, data_size);
1919                         drbd_free_peer_req(device, peer_req);
1920                         return NULL;
1921                 }
1922         }
1923         device->recv_cnt += data_size >> 9;
1924         return peer_req;
1925 }
1926
1927 /* drbd_drain_block() just takes a data block
1928  * out of the socket input buffer, and discards it.
1929  */
1930 static int drbd_drain_block(struct drbd_peer_device *peer_device, int data_size)
1931 {
1932         struct page *page;
1933         int err = 0;
1934         void *data;
1935
1936         if (!data_size)
1937                 return 0;
1938
1939         page = drbd_alloc_pages(peer_device, 1, 1);
1940
1941         data = kmap(page);
1942         while (data_size) {
1943                 unsigned int len = min_t(int, data_size, PAGE_SIZE);
1944
1945                 err = drbd_recv_all_warn(peer_device->connection, data, len);
1946                 if (err)
1947                         break;
1948                 data_size -= len;
1949         }
1950         kunmap(page);
1951         drbd_free_pages(peer_device->device, page, 0);
1952         return err;
1953 }
1954
1955 static int recv_dless_read(struct drbd_peer_device *peer_device, struct drbd_request *req,
1956                            sector_t sector, int data_size)
1957 {
1958         struct bio_vec bvec;
1959         struct bvec_iter iter;
1960         struct bio *bio;
1961         int digest_size, err, expect;
1962         void *dig_in = peer_device->connection->int_dig_in;
1963         void *dig_vv = peer_device->connection->int_dig_vv;
1964
1965         digest_size = 0;
1966         if (peer_device->connection->peer_integrity_tfm) {
1967                 digest_size = crypto_ahash_digestsize(peer_device->connection->peer_integrity_tfm);
1968                 err = drbd_recv_all_warn(peer_device->connection, dig_in, digest_size);
1969                 if (err)
1970                         return err;
1971                 data_size -= digest_size;
1972         }
1973
1974         /* optimistically update recv_cnt.  if receiving fails below,
1975          * we disconnect anyways, and counters will be reset. */
1976         peer_device->device->recv_cnt += data_size>>9;
1977
1978         bio = req->master_bio;
1979         D_ASSERT(peer_device->device, sector == bio->bi_iter.bi_sector);
1980
1981         bio_for_each_segment(bvec, bio, iter) {
1982                 void *mapped = kmap(bvec.bv_page) + bvec.bv_offset;
1983                 expect = min_t(int, data_size, bvec.bv_len);
1984                 err = drbd_recv_all_warn(peer_device->connection, mapped, expect);
1985                 kunmap(bvec.bv_page);
1986                 if (err)
1987                         return err;
1988                 data_size -= expect;
1989         }
1990
1991         if (digest_size) {
1992                 drbd_csum_bio(peer_device->connection->peer_integrity_tfm, bio, dig_vv);
1993                 if (memcmp(dig_in, dig_vv, digest_size)) {
1994                         drbd_err(peer_device, "Digest integrity check FAILED. Broken NICs?\n");
1995                         return -EINVAL;
1996                 }
1997         }
1998
1999         D_ASSERT(peer_device->device, data_size == 0);
2000         return 0;
2001 }
2002
2003 /*
2004  * e_end_resync_block() is called in ack_sender context via
2005  * drbd_finish_peer_reqs().
2006  */
2007 static int e_end_resync_block(struct drbd_work *w, int unused)
2008 {
2009         struct drbd_peer_request *peer_req =
2010                 container_of(w, struct drbd_peer_request, w);
2011         struct drbd_peer_device *peer_device = peer_req->peer_device;
2012         struct drbd_device *device = peer_device->device;
2013         sector_t sector = peer_req->i.sector;
2014         int err;
2015
2016         D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2017
2018         if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2019                 drbd_set_in_sync(device, sector, peer_req->i.size);
2020                 err = drbd_send_ack(peer_device, P_RS_WRITE_ACK, peer_req);
2021         } else {
2022                 /* Record failure to sync */
2023                 drbd_rs_failed_io(device, sector, peer_req->i.size);
2024
2025                 err  = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2026         }
2027         dec_unacked(device);
2028
2029         return err;
2030 }
2031
2032 static int recv_resync_read(struct drbd_peer_device *peer_device, sector_t sector,
2033                             struct packet_info *pi) __releases(local)
2034 {
2035         struct drbd_device *device = peer_device->device;
2036         struct drbd_peer_request *peer_req;
2037
2038         peer_req = read_in_block(peer_device, ID_SYNCER, sector, pi);
2039         if (!peer_req)
2040                 goto fail;
2041
2042         dec_rs_pending(device);
2043
2044         inc_unacked(device);
2045         /* corresponding dec_unacked() in e_end_resync_block()
2046          * respective _drbd_clear_done_ee */
2047
2048         peer_req->w.cb = e_end_resync_block;
2049         peer_req->submit_jif = jiffies;
2050
2051         spin_lock_irq(&device->resource->req_lock);
2052         list_add_tail(&peer_req->w.list, &device->sync_ee);
2053         spin_unlock_irq(&device->resource->req_lock);
2054
2055         atomic_add(pi->size >> 9, &device->rs_sect_ev);
2056         if (drbd_submit_peer_request(device, peer_req, REQ_OP_WRITE, 0,
2057                                      DRBD_FAULT_RS_WR) == 0)
2058                 return 0;
2059
2060         /* don't care for the reason here */
2061         drbd_err(device, "submit failed, triggering re-connect\n");
2062         spin_lock_irq(&device->resource->req_lock);
2063         list_del(&peer_req->w.list);
2064         spin_unlock_irq(&device->resource->req_lock);
2065
2066         drbd_free_peer_req(device, peer_req);
2067 fail:
2068         put_ldev(device);
2069         return -EIO;
2070 }
2071
2072 static struct drbd_request *
2073 find_request(struct drbd_device *device, struct rb_root *root, u64 id,
2074              sector_t sector, bool missing_ok, const char *func)
2075 {
2076         struct drbd_request *req;
2077
2078         /* Request object according to our peer */
2079         req = (struct drbd_request *)(unsigned long)id;
2080         if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
2081                 return req;
2082         if (!missing_ok) {
2083                 drbd_err(device, "%s: failed to find request 0x%lx, sector %llus\n", func,
2084                         (unsigned long)id, (unsigned long long)sector);
2085         }
2086         return NULL;
2087 }
2088
2089 static int receive_DataReply(struct drbd_connection *connection, struct packet_info *pi)
2090 {
2091         struct drbd_peer_device *peer_device;
2092         struct drbd_device *device;
2093         struct drbd_request *req;
2094         sector_t sector;
2095         int err;
2096         struct p_data *p = pi->data;
2097
2098         peer_device = conn_peer_device(connection, pi->vnr);
2099         if (!peer_device)
2100                 return -EIO;
2101         device = peer_device->device;
2102
2103         sector = be64_to_cpu(p->sector);
2104
2105         spin_lock_irq(&device->resource->req_lock);
2106         req = find_request(device, &device->read_requests, p->block_id, sector, false, __func__);
2107         spin_unlock_irq(&device->resource->req_lock);
2108         if (unlikely(!req))
2109                 return -EIO;
2110
2111         /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
2112          * special casing it there for the various failure cases.
2113          * still no race with drbd_fail_pending_reads */
2114         err = recv_dless_read(peer_device, req, sector, pi->size);
2115         if (!err)
2116                 req_mod(req, DATA_RECEIVED);
2117         /* else: nothing. handled from drbd_disconnect...
2118          * I don't think we may complete this just yet
2119          * in case we are "on-disconnect: freeze" */
2120
2121         return err;
2122 }
2123
2124 static int receive_RSDataReply(struct drbd_connection *connection, struct packet_info *pi)
2125 {
2126         struct drbd_peer_device *peer_device;
2127         struct drbd_device *device;
2128         sector_t sector;
2129         int err;
2130         struct p_data *p = pi->data;
2131
2132         peer_device = conn_peer_device(connection, pi->vnr);
2133         if (!peer_device)
2134                 return -EIO;
2135         device = peer_device->device;
2136
2137         sector = be64_to_cpu(p->sector);
2138         D_ASSERT(device, p->block_id == ID_SYNCER);
2139
2140         if (get_ldev(device)) {
2141                 /* data is submitted to disk within recv_resync_read.
2142                  * corresponding put_ldev done below on error,
2143                  * or in drbd_peer_request_endio. */
2144                 err = recv_resync_read(peer_device, sector, pi);
2145         } else {
2146                 if (__ratelimit(&drbd_ratelimit_state))
2147                         drbd_err(device, "Can not write resync data to local disk.\n");
2148
2149                 err = drbd_drain_block(peer_device, pi->size);
2150
2151                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2152         }
2153
2154         atomic_add(pi->size >> 9, &device->rs_sect_in);
2155
2156         return err;
2157 }
2158
2159 static void restart_conflicting_writes(struct drbd_device *device,
2160                                        sector_t sector, int size)
2161 {
2162         struct drbd_interval *i;
2163         struct drbd_request *req;
2164
2165         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2166                 if (!i->local)
2167                         continue;
2168                 req = container_of(i, struct drbd_request, i);
2169                 if (req->rq_state & RQ_LOCAL_PENDING ||
2170                     !(req->rq_state & RQ_POSTPONED))
2171                         continue;
2172                 /* as it is RQ_POSTPONED, this will cause it to
2173                  * be queued on the retry workqueue. */
2174                 __req_mod(req, CONFLICT_RESOLVED, NULL);
2175         }
2176 }
2177
2178 /*
2179  * e_end_block() is called in ack_sender context via drbd_finish_peer_reqs().
2180  */
2181 static int e_end_block(struct drbd_work *w, int cancel)
2182 {
2183         struct drbd_peer_request *peer_req =
2184                 container_of(w, struct drbd_peer_request, w);
2185         struct drbd_peer_device *peer_device = peer_req->peer_device;
2186         struct drbd_device *device = peer_device->device;
2187         sector_t sector = peer_req->i.sector;
2188         int err = 0, pcmd;
2189
2190         if (peer_req->flags & EE_SEND_WRITE_ACK) {
2191                 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
2192                         pcmd = (device->state.conn >= C_SYNC_SOURCE &&
2193                                 device->state.conn <= C_PAUSED_SYNC_T &&
2194                                 peer_req->flags & EE_MAY_SET_IN_SYNC) ?
2195                                 P_RS_WRITE_ACK : P_WRITE_ACK;
2196                         err = drbd_send_ack(peer_device, pcmd, peer_req);
2197                         if (pcmd == P_RS_WRITE_ACK)
2198                                 drbd_set_in_sync(device, sector, peer_req->i.size);
2199                 } else {
2200                         err = drbd_send_ack(peer_device, P_NEG_ACK, peer_req);
2201                         /* we expect it to be marked out of sync anyways...
2202                          * maybe assert this?  */
2203                 }
2204                 dec_unacked(device);
2205         }
2206
2207         /* we delete from the conflict detection hash _after_ we sent out the
2208          * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
2209         if (peer_req->flags & EE_IN_INTERVAL_TREE) {
2210                 spin_lock_irq(&device->resource->req_lock);
2211                 D_ASSERT(device, !drbd_interval_empty(&peer_req->i));
2212                 drbd_remove_epoch_entry_interval(device, peer_req);
2213                 if (peer_req->flags & EE_RESTART_REQUESTS)
2214                         restart_conflicting_writes(device, sector, peer_req->i.size);
2215                 spin_unlock_irq(&device->resource->req_lock);
2216         } else
2217                 D_ASSERT(device, drbd_interval_empty(&peer_req->i));
2218
2219         drbd_may_finish_epoch(peer_device->connection, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
2220
2221         return err;
2222 }
2223
2224 static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
2225 {
2226         struct drbd_peer_request *peer_req =
2227                 container_of(w, struct drbd_peer_request, w);
2228         struct drbd_peer_device *peer_device = peer_req->peer_device;
2229         int err;
2230
2231         err = drbd_send_ack(peer_device, ack, peer_req);
2232         dec_unacked(peer_device->device);
2233
2234         return err;
2235 }
2236
2237 static int e_send_superseded(struct drbd_work *w, int unused)
2238 {
2239         return e_send_ack(w, P_SUPERSEDED);
2240 }
2241
2242 static int e_send_retry_write(struct drbd_work *w, int unused)
2243 {
2244         struct drbd_peer_request *peer_req =
2245                 container_of(w, struct drbd_peer_request, w);
2246         struct drbd_connection *connection = peer_req->peer_device->connection;
2247
2248         return e_send_ack(w, connection->agreed_pro_version >= 100 ?
2249                              P_RETRY_WRITE : P_SUPERSEDED);
2250 }
2251
2252 static bool seq_greater(u32 a, u32 b)
2253 {
2254         /*
2255          * We assume 32-bit wrap-around here.
2256          * For 24-bit wrap-around, we would have to shift:
2257          *  a <<= 8; b <<= 8;
2258          */
2259         return (s32)a - (s32)b > 0;
2260 }
2261
2262 static u32 seq_max(u32 a, u32 b)
2263 {
2264         return seq_greater(a, b) ? a : b;
2265 }
2266
2267 static void update_peer_seq(struct drbd_peer_device *peer_device, unsigned int peer_seq)
2268 {
2269         struct drbd_device *device = peer_device->device;
2270         unsigned int newest_peer_seq;
2271
2272         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)) {
2273                 spin_lock(&device->peer_seq_lock);
2274                 newest_peer_seq = seq_max(device->peer_seq, peer_seq);
2275                 device->peer_seq = newest_peer_seq;
2276                 spin_unlock(&device->peer_seq_lock);
2277                 /* wake up only if we actually changed device->peer_seq */
2278                 if (peer_seq == newest_peer_seq)
2279                         wake_up(&device->seq_wait);
2280         }
2281 }
2282
2283 static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
2284 {
2285         return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
2286 }
2287
2288 /* maybe change sync_ee into interval trees as well? */
2289 static bool overlapping_resync_write(struct drbd_device *device, struct drbd_peer_request *peer_req)
2290 {
2291         struct drbd_peer_request *rs_req;
2292         bool rv = false;
2293
2294         spin_lock_irq(&device->resource->req_lock);
2295         list_for_each_entry(rs_req, &device->sync_ee, w.list) {
2296                 if (overlaps(peer_req->i.sector, peer_req->i.size,
2297                              rs_req->i.sector, rs_req->i.size)) {
2298                         rv = true;
2299                         break;
2300                 }
2301         }
2302         spin_unlock_irq(&device->resource->req_lock);
2303
2304         return rv;
2305 }
2306
2307 /* Called from receive_Data.
2308  * Synchronize packets on sock with packets on msock.
2309  *
2310  * This is here so even when a P_DATA packet traveling via sock overtook an Ack
2311  * packet traveling on msock, they are still processed in the order they have
2312  * been sent.
2313  *
2314  * Note: we don't care for Ack packets overtaking P_DATA packets.
2315  *
2316  * In case packet_seq is larger than device->peer_seq number, there are
2317  * outstanding packets on the msock. We wait for them to arrive.
2318  * In case we are the logically next packet, we update device->peer_seq
2319  * ourselves. Correctly handles 32bit wrap around.
2320  *
2321  * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
2322  * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
2323  * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
2324  * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
2325  *
2326  * returns 0 if we may process the packet,
2327  * -ERESTARTSYS if we were interrupted (by disconnect signal). */
2328 static int wait_for_and_update_peer_seq(struct drbd_peer_device *peer_device, const u32 peer_seq)
2329 {
2330         struct drbd_device *device = peer_device->device;
2331         DEFINE_WAIT(wait);
2332         long timeout;
2333         int ret = 0, tp;
2334
2335         if (!test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags))
2336                 return 0;
2337
2338         spin_lock(&device->peer_seq_lock);
2339         for (;;) {
2340                 if (!seq_greater(peer_seq - 1, device->peer_seq)) {
2341                         device->peer_seq = seq_max(device->peer_seq, peer_seq);
2342                         break;
2343                 }
2344
2345                 if (signal_pending(current)) {
2346                         ret = -ERESTARTSYS;
2347                         break;
2348                 }
2349
2350                 rcu_read_lock();
2351                 tp = rcu_dereference(peer_device->connection->net_conf)->two_primaries;
2352                 rcu_read_unlock();
2353
2354                 if (!tp)
2355                         break;
2356
2357                 /* Only need to wait if two_primaries is enabled */
2358                 prepare_to_wait(&device->seq_wait, &wait, TASK_INTERRUPTIBLE);
2359                 spin_unlock(&device->peer_seq_lock);
2360                 rcu_read_lock();
2361                 timeout = rcu_dereference(peer_device->connection->net_conf)->ping_timeo*HZ/10;
2362                 rcu_read_unlock();
2363                 timeout = schedule_timeout(timeout);
2364                 spin_lock(&device->peer_seq_lock);
2365                 if (!timeout) {
2366                         ret = -ETIMEDOUT;
2367                         drbd_err(device, "Timed out waiting for missing ack packets; disconnecting\n");
2368                         break;
2369                 }
2370         }
2371         spin_unlock(&device->peer_seq_lock);
2372         finish_wait(&device->seq_wait, &wait);
2373         return ret;
2374 }
2375
2376 /* see also bio_flags_to_wire()
2377  * DRBD_REQ_*, because we need to semantically map the flags to data packet
2378  * flags and back. We may replicate to other kernel versions. */
2379 static unsigned long wire_flags_to_bio_flags(u32 dpf)
2380 {
2381         return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2382                 (dpf & DP_FUA ? REQ_FUA : 0) |
2383                 (dpf & DP_FLUSH ? REQ_PREFLUSH : 0);
2384 }
2385
2386 static unsigned long wire_flags_to_bio_op(u32 dpf)
2387 {
2388         if (dpf & DP_DISCARD)
2389                 return REQ_OP_DISCARD;
2390         else
2391                 return REQ_OP_WRITE;
2392 }
2393
2394 static void fail_postponed_requests(struct drbd_device *device, sector_t sector,
2395                                     unsigned int size)
2396 {
2397         struct drbd_interval *i;
2398
2399     repeat:
2400         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2401                 struct drbd_request *req;
2402                 struct bio_and_error m;
2403
2404                 if (!i->local)
2405                         continue;
2406                 req = container_of(i, struct drbd_request, i);
2407                 if (!(req->rq_state & RQ_POSTPONED))
2408                         continue;
2409                 req->rq_state &= ~RQ_POSTPONED;
2410                 __req_mod(req, NEG_ACKED, &m);
2411                 spin_unlock_irq(&device->resource->req_lock);
2412                 if (m.bio)
2413                         complete_master_bio(device, &m);
2414                 spin_lock_irq(&device->resource->req_lock);
2415                 goto repeat;
2416         }
2417 }
2418
2419 static int handle_write_conflicts(struct drbd_device *device,
2420                                   struct drbd_peer_request *peer_req)
2421 {
2422         struct drbd_connection *connection = peer_req->peer_device->connection;
2423         bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &connection->flags);
2424         sector_t sector = peer_req->i.sector;
2425         const unsigned int size = peer_req->i.size;
2426         struct drbd_interval *i;
2427         bool equal;
2428         int err;
2429
2430         /*
2431          * Inserting the peer request into the write_requests tree will prevent
2432          * new conflicting local requests from being added.
2433          */
2434         drbd_insert_interval(&device->write_requests, &peer_req->i);
2435
2436     repeat:
2437         drbd_for_each_overlap(i, &device->write_requests, sector, size) {
2438                 if (i == &peer_req->i)
2439                         continue;
2440                 if (i->completed)
2441                         continue;
2442
2443                 if (!i->local) {
2444                         /*
2445                          * Our peer has sent a conflicting remote request; this
2446                          * should not happen in a two-node setup.  Wait for the
2447                          * earlier peer request to complete.
2448                          */
2449                         err = drbd_wait_misc(device, i);
2450                         if (err)
2451                                 goto out;
2452                         goto repeat;
2453                 }
2454
2455                 equal = i->sector == sector && i->size == size;
2456                 if (resolve_conflicts) {
2457                         /*
2458                          * If the peer request is fully contained within the
2459                          * overlapping request, it can be considered overwritten
2460                          * and thus superseded; otherwise, it will be retried
2461                          * once all overlapping requests have completed.
2462                          */
2463                         bool superseded = i->sector <= sector && i->sector +
2464                                        (i->size >> 9) >= sector + (size >> 9);
2465
2466                         if (!equal)
2467                                 drbd_alert(device, "Concurrent writes detected: "
2468                                                "local=%llus +%u, remote=%llus +%u, "
2469                                                "assuming %s came first\n",
2470                                           (unsigned long long)i->sector, i->size,
2471                                           (unsigned long long)sector, size,
2472                                           superseded ? "local" : "remote");
2473
2474                         peer_req->w.cb = superseded ? e_send_superseded :
2475                                                    e_send_retry_write;
2476                         list_add_tail(&peer_req->w.list, &device->done_ee);
2477                         queue_work(connection->ack_sender, &peer_req->peer_device->send_acks_work);
2478
2479                         err = -ENOENT;
2480                         goto out;
2481                 } else {
2482                         struct drbd_request *req =
2483                                 container_of(i, struct drbd_request, i);
2484
2485                         if (!equal)
2486                                 drbd_alert(device, "Concurrent writes detected: "
2487                                                "local=%llus +%u, remote=%llus +%u\n",
2488                                           (unsigned long long)i->sector, i->size,
2489                                           (unsigned long long)sector, size);
2490
2491                         if (req->rq_state & RQ_LOCAL_PENDING ||
2492                             !(req->rq_state & RQ_POSTPONED)) {
2493                                 /*
2494                                  * Wait for the node with the discard flag to
2495                                  * decide if this request has been superseded
2496                                  * or needs to be retried.
2497                                  * Requests that have been superseded will
2498                                  * disappear from the write_requests tree.
2499                                  *
2500                                  * In addition, wait for the conflicting
2501                                  * request to finish locally before submitting
2502                                  * the conflicting peer request.
2503                                  */
2504                                 err = drbd_wait_misc(device, &req->i);
2505                                 if (err) {
2506                                         _conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
2507                                         fail_postponed_requests(device, sector, size);
2508                                         goto out;
2509                                 }
2510                                 goto repeat;
2511                         }
2512                         /*
2513                          * Remember to restart the conflicting requests after
2514                          * the new peer request has completed.
2515                          */
2516                         peer_req->flags |= EE_RESTART_REQUESTS;
2517                 }
2518         }
2519         err = 0;
2520
2521     out:
2522         if (err)
2523                 drbd_remove_epoch_entry_interval(device, peer_req);
2524         return err;
2525 }
2526
2527 /* mirrored write */
2528 static int receive_Data(struct drbd_connection *connection, struct packet_info *pi)
2529 {
2530         struct drbd_peer_device *peer_device;
2531         struct drbd_device *device;
2532         struct net_conf *nc;
2533         sector_t sector;
2534         struct drbd_peer_request *peer_req;
2535         struct p_data *p = pi->data;
2536         u32 peer_seq = be32_to_cpu(p->seq_num);
2537         int op, op_flags;
2538         u32 dp_flags;
2539         int err, tp;
2540
2541         peer_device = conn_peer_device(connection, pi->vnr);
2542         if (!peer_device)
2543                 return -EIO;
2544         device = peer_device->device;
2545
2546         if (!get_ldev(device)) {
2547                 int err2;
2548
2549                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2550                 drbd_send_ack_dp(peer_device, P_NEG_ACK, p, pi->size);
2551                 atomic_inc(&connection->current_epoch->epoch_size);
2552                 err2 = drbd_drain_block(peer_device, pi->size);
2553                 if (!err)
2554                         err = err2;
2555                 return err;
2556         }
2557
2558         /*
2559          * Corresponding put_ldev done either below (on various errors), or in
2560          * drbd_peer_request_endio, if we successfully submit the data at the
2561          * end of this function.
2562          */
2563
2564         sector = be64_to_cpu(p->sector);
2565         peer_req = read_in_block(peer_device, p->block_id, sector, pi);
2566         if (!peer_req) {
2567                 put_ldev(device);
2568                 return -EIO;
2569         }
2570
2571         peer_req->w.cb = e_end_block;
2572         peer_req->submit_jif = jiffies;
2573         peer_req->flags |= EE_APPLICATION;
2574
2575         dp_flags = be32_to_cpu(p->dp_flags);
2576         op = wire_flags_to_bio_op(dp_flags);
2577         op_flags = wire_flags_to_bio_flags(dp_flags);
2578         if (pi->cmd == P_TRIM) {
2579                 D_ASSERT(peer_device, peer_req->i.size > 0);
2580                 D_ASSERT(peer_device, op == REQ_OP_DISCARD);
2581                 D_ASSERT(peer_device, peer_req->pages == NULL);
2582         } else if (peer_req->pages == NULL) {
2583                 D_ASSERT(device, peer_req->i.size == 0);
2584                 D_ASSERT(device, dp_flags & DP_FLUSH);
2585         }
2586
2587         if (dp_flags & DP_MAY_SET_IN_SYNC)
2588                 peer_req->flags |= EE_MAY_SET_IN_SYNC;
2589
2590         spin_lock(&connection->epoch_lock);
2591         peer_req->epoch = connection->current_epoch;
2592         atomic_inc(&peer_req->epoch->epoch_size);
2593         atomic_inc(&peer_req->epoch->active);
2594         spin_unlock(&connection->epoch_lock);
2595
2596         rcu_read_lock();
2597         nc = rcu_dereference(peer_device->connection->net_conf);
2598         tp = nc->two_primaries;
2599         if (peer_device->connection->agreed_pro_version < 100) {
2600                 switch (nc->wire_protocol) {
2601                 case DRBD_PROT_C:
2602                         dp_flags |= DP_SEND_WRITE_ACK;
2603                         break;
2604                 case DRBD_PROT_B:
2605                         dp_flags |= DP_SEND_RECEIVE_ACK;
2606                         break;
2607                 }
2608         }
2609         rcu_read_unlock();
2610
2611         if (dp_flags & DP_SEND_WRITE_ACK) {
2612                 peer_req->flags |= EE_SEND_WRITE_ACK;
2613                 inc_unacked(device);
2614                 /* corresponding dec_unacked() in e_end_block()
2615                  * respective _drbd_clear_done_ee */
2616         }
2617
2618         if (dp_flags & DP_SEND_RECEIVE_ACK) {
2619                 /* I really don't like it that the receiver thread
2620                  * sends on the msock, but anyways */
2621                 drbd_send_ack(peer_device, P_RECV_ACK, peer_req);
2622         }
2623
2624         if (tp) {
2625                 /* two primaries implies protocol C */
2626                 D_ASSERT(device, dp_flags & DP_SEND_WRITE_ACK);
2627                 peer_req->flags |= EE_IN_INTERVAL_TREE;
2628                 err = wait_for_and_update_peer_seq(peer_device, peer_seq);
2629                 if (err)
2630                         goto out_interrupted;
2631                 spin_lock_irq(&device->resource->req_lock);
2632                 err = handle_write_conflicts(device, peer_req);
2633                 if (err) {
2634                         spin_unlock_irq(&device->resource->req_lock);
2635                         if (err == -ENOENT) {
2636                                 put_ldev(device);
2637                                 return 0;
2638                         }
2639                         goto out_interrupted;
2640                 }
2641         } else {
2642                 update_peer_seq(peer_device, peer_seq);
2643                 spin_lock_irq(&device->resource->req_lock);
2644         }
2645         /* TRIM and WRITE_SAME are processed synchronously,
2646          * we wait for all pending requests, respectively wait for
2647          * active_ee to become empty in drbd_submit_peer_request();
2648          * better not add ourselves here. */
2649         if ((peer_req->flags & (EE_IS_TRIM|EE_WRITE_SAME)) == 0)
2650                 list_add_tail(&peer_req->w.list, &device->active_ee);
2651         spin_unlock_irq(&device->resource->req_lock);
2652
2653         if (device->state.conn == C_SYNC_TARGET)
2654                 wait_event(device->ee_wait, !overlapping_resync_write(device, peer_req));
2655
2656         if (device->state.pdsk < D_INCONSISTENT) {
2657                 /* In case we have the only disk of the cluster, */
2658                 drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
2659                 peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2660                 drbd_al_begin_io(device, &peer_req->i);
2661                 peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2662         }
2663
2664         err = drbd_submit_peer_request(device, peer_req, op, op_flags,
2665                                        DRBD_FAULT_DT_WR);
2666         if (!err)
2667                 return 0;
2668
2669         /* don't care for the reason here */
2670         drbd_err(device, "submit failed, triggering re-connect\n");
2671         spin_lock_irq(&device->resource->req_lock);
2672         list_del(&peer_req->w.list);
2673         drbd_remove_epoch_entry_interval(device, peer_req);
2674         spin_unlock_irq(&device->resource->req_lock);
2675         if (peer_req->flags & EE_CALL_AL_COMPLETE_IO) {
2676                 peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
2677                 drbd_al_complete_io(device, &peer_req->i);
2678         }
2679
2680 out_interrupted:
2681         drbd_may_finish_epoch(connection, peer_req->epoch, EV_PUT | EV_CLEANUP);
2682         put_ldev(device);
2683         drbd_free_peer_req(device, peer_req);
2684         return err;
2685 }
2686
2687 /* We may throttle resync, if the lower device seems to be busy,
2688  * and current sync rate is above c_min_rate.
2689  *
2690  * To decide whether or not the lower device is busy, we use a scheme similar
2691  * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2692  * (more than 64 sectors) of activity we cannot account for with our own resync
2693  * activity, it obviously is "busy".
2694  *
2695  * The current sync rate used here uses only the most recent two step marks,
2696  * to have a short time average so we can react faster.
2697  */
2698 bool drbd_rs_should_slow_down(struct drbd_device *device, sector_t sector,
2699                 bool throttle_if_app_is_waiting)
2700 {
2701         struct lc_element *tmp;
2702         bool throttle = drbd_rs_c_min_rate_throttle(device);
2703
2704         if (!throttle || throttle_if_app_is_waiting)
2705                 return throttle;
2706
2707         spin_lock_irq(&device->al_lock);
2708         tmp = lc_find(device->resync, BM_SECT_TO_EXT(sector));
2709         if (tmp) {
2710                 struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2711                 if (test_bit(BME_PRIORITY, &bm_ext->flags))
2712                         throttle = false;
2713                 /* Do not slow down if app IO is already waiting for this extent,
2714                  * and our progress is necessary for application IO to complete. */
2715         }
2716         spin_unlock_irq(&device->al_lock);
2717
2718         return throttle;
2719 }
2720
2721 bool drbd_rs_c_min_rate_throttle(struct drbd_device *device)
2722 {
2723         struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
2724         unsigned long db, dt, dbdt;
2725         unsigned int c_min_rate;
2726         int curr_events;
2727
2728         rcu_read_lock();
2729         c_min_rate = rcu_dereference(device->ldev->disk_conf)->c_min_rate;
2730         rcu_read_unlock();
2731
2732         /* feature disabled? */
2733         if (c_min_rate == 0)
2734                 return false;
2735
2736         curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2737                       (int)part_stat_read(&disk->part0, sectors[1]) -
2738                         atomic_read(&device->rs_sect_ev);
2739
2740         if (atomic_read(&device->ap_actlog_cnt)
2741             || curr_events - device->rs_last_events > 64) {
2742                 unsigned long rs_left;
2743                 int i;
2744
2745                 device->rs_last_events = curr_events;
2746
2747                 /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2748                  * approx. */
2749                 i = (device->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2750
2751                 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
2752                         rs_left = device->ov_left;
2753                 else
2754                         rs_left = drbd_bm_total_weight(device) - device->rs_failed;
2755
2756                 dt = ((long)jiffies - (long)device->rs_mark_time[i]) / HZ;
2757                 if (!dt)
2758                         dt++;
2759                 db = device->rs_mark_left[i] - rs_left;
2760                 dbdt = Bit2KB(db/dt);
2761
2762                 if (dbdt > c_min_rate)
2763                         return true;
2764         }
2765         return false;
2766 }
2767
2768 static int receive_DataRequest(struct drbd_connection *connection, struct packet_info *pi)
2769 {
2770         struct drbd_peer_device *peer_device;
2771         struct drbd_device *device;
2772         sector_t sector;
2773         sector_t capacity;
2774         struct drbd_peer_request *peer_req;
2775         struct digest_info *di = NULL;
2776         int size, verb;
2777         unsigned int fault_type;
2778         struct p_block_req *p = pi->data;
2779
2780         peer_device = conn_peer_device(connection, pi->vnr);
2781         if (!peer_device)
2782                 return -EIO;
2783         device = peer_device->device;
2784         capacity = drbd_get_capacity(device->this_bdev);
2785
2786         sector = be64_to_cpu(p->sector);
2787         size   = be32_to_cpu(p->blksize);
2788
2789         if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2790                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2791                                 (unsigned long long)sector, size);
2792                 return -EINVAL;
2793         }
2794         if (sector + (size>>9) > capacity) {
2795                 drbd_err(device, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2796                                 (unsigned long long)sector, size);
2797                 return -EINVAL;
2798         }
2799
2800         if (!get_ldev_if_state(device, D_UP_TO_DATE)) {
2801                 verb = 1;
2802                 switch (pi->cmd) {
2803                 case P_DATA_REQUEST:
2804                         drbd_send_ack_rp(peer_device, P_NEG_DREPLY, p);
2805                         break;
2806                 case P_RS_THIN_REQ:
2807                 case P_RS_DATA_REQUEST:
2808                 case P_CSUM_RS_REQUEST:
2809                 case P_OV_REQUEST:
2810                         drbd_send_ack_rp(peer_device, P_NEG_RS_DREPLY , p);
2811                         break;
2812                 case P_OV_REPLY:
2813                         verb = 0;
2814                         dec_rs_pending(device);
2815                         drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, ID_IN_SYNC);
2816                         break;
2817                 default:
2818                         BUG();
2819                 }
2820                 if (verb && __ratelimit(&drbd_ratelimit_state))
2821                         drbd_err(device, "Can not satisfy peer's read request, "
2822                             "no local data.\n");
2823
2824                 /* drain possibly payload */
2825                 return drbd_drain_block(peer_device, pi->size);
2826         }
2827
2828         /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2829          * "criss-cross" setup, that might cause write-out on some other DRBD,
2830          * which in turn might block on the other node at this very place.  */
2831         peer_req = drbd_alloc_peer_req(peer_device, p->block_id, sector, size,
2832                         size, GFP_NOIO);
2833         if (!peer_req) {
2834                 put_ldev(device);
2835                 return -ENOMEM;
2836         }
2837
2838         switch (pi->cmd) {
2839         case P_DATA_REQUEST:
2840                 peer_req->w.cb = w_e_end_data_req;
2841                 fault_type = DRBD_FAULT_DT_RD;
2842                 /* application IO, don't drbd_rs_begin_io */
2843                 peer_req->flags |= EE_APPLICATION;
2844                 goto submit;
2845
2846         case P_RS_THIN_REQ:
2847                 /* If at some point in the future we have a smart way to
2848                    find out if this data block is completely deallocated,
2849                    then we would do something smarter here than reading
2850                    the block... */
2851                 peer_req->flags |= EE_RS_THIN_REQ;
2852         case P_RS_DATA_REQUEST:
2853                 peer_req->w.cb = w_e_end_rsdata_req;
2854                 fault_type = DRBD_FAULT_RS_RD;
2855                 /* used in the sector offset progress display */
2856                 device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2857                 break;
2858
2859         case P_OV_REPLY:
2860         case P_CSUM_RS_REQUEST:
2861                 fault_type = DRBD_FAULT_RS_RD;
2862                 di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2863                 if (!di)
2864                         goto out_free_e;
2865
2866                 di->digest_size = pi->size;
2867                 di->digest = (((char *)di)+sizeof(struct digest_info));
2868
2869                 peer_req->digest = di;
2870                 peer_req->flags |= EE_HAS_DIGEST;
2871
2872                 if (drbd_recv_all(peer_device->connection, di->digest, pi->size))
2873                         goto out_free_e;
2874
2875                 if (pi->cmd == P_CSUM_RS_REQUEST) {
2876                         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
2877                         peer_req->w.cb = w_e_end_csum_rs_req;
2878                         /* used in the sector offset progress display */
2879                         device->bm_resync_fo = BM_SECT_TO_BIT(sector);
2880                         /* remember to report stats in drbd_resync_finished */
2881                         device->use_csums = true;
2882                 } else if (pi->cmd == P_OV_REPLY) {
2883                         /* track progress, we may need to throttle */
2884                         atomic_add(size >> 9, &device->rs_sect_in);
2885                         peer_req->w.cb = w_e_end_ov_reply;
2886                         dec_rs_pending(device);
2887                         /* drbd_rs_begin_io done when we sent this request,
2888                          * but accounting still needs to be done. */
2889                         goto submit_for_resync;
2890                 }
2891                 break;
2892
2893         case P_OV_REQUEST:
2894                 if (device->ov_start_sector == ~(sector_t)0 &&
2895                     peer_device->connection->agreed_pro_version >= 90) {
2896                         unsigned long now = jiffies;
2897                         int i;
2898                         device->ov_start_sector = sector;
2899                         device->ov_position = sector;
2900                         device->ov_left = drbd_bm_bits(device) - BM_SECT_TO_BIT(sector);
2901                         device->rs_total = device->ov_left;
2902                         for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2903                                 device->rs_mark_left[i] = device->ov_left;
2904                                 device->rs_mark_time[i] = now;
2905                         }
2906                         drbd_info(device, "Online Verify start sector: %llu\n",
2907                                         (unsigned long long)sector);
2908                 }
2909                 peer_req->w.cb = w_e_end_ov_req;
2910                 fault_type = DRBD_FAULT_RS_RD;
2911                 break;
2912
2913         default:
2914                 BUG();
2915         }
2916
2917         /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2918          * wrt the receiver, but it is not as straightforward as it may seem.
2919          * Various places in the resync start and stop logic assume resync
2920          * requests are processed in order, requeuing this on the worker thread
2921          * introduces a bunch of new code for synchronization between threads.
2922          *
2923          * Unlimited throttling before drbd_rs_begin_io may stall the resync
2924          * "forever", throttling after drbd_rs_begin_io will lock that extent
2925          * for application writes for the same time.  For now, just throttle
2926          * here, where the rest of the code expects the receiver to sleep for
2927          * a while, anyways.
2928          */
2929
2930         /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2931          * this defers syncer requests for some time, before letting at least
2932          * on request through.  The resync controller on the receiving side
2933          * will adapt to the incoming rate accordingly.
2934          *
2935          * We cannot throttle here if remote is Primary/SyncTarget:
2936          * we would also throttle its application reads.
2937          * In that case, throttling is done on the SyncTarget only.
2938          */
2939
2940         /* Even though this may be a resync request, we do add to "read_ee";
2941          * "sync_ee" is only used for resync WRITEs.
2942          * Add to list early, so debugfs can find this request
2943          * even if we have to sleep below. */
2944         spin_lock_irq(&device->resource->req_lock);
2945         list_add_tail(&peer_req->w.list, &device->read_ee);
2946         spin_unlock_irq(&device->resource->req_lock);
2947
2948         update_receiver_timing_details(connection, drbd_rs_should_slow_down);
2949         if (device->state.peer != R_PRIMARY
2950         && drbd_rs_should_slow_down(device, sector, false))
2951                 schedule_timeout_uninterruptible(HZ/10);
2952         update_receiver_timing_details(connection, drbd_rs_begin_io);
2953         if (drbd_rs_begin_io(device, sector))
2954                 goto out_free_e;
2955
2956 submit_for_resync:
2957         atomic_add(size >> 9, &device->rs_sect_ev);
2958
2959 submit:
2960         update_receiver_timing_details(connection, drbd_submit_peer_request);
2961         inc_unacked(device);
2962         if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
2963                                      fault_type) == 0)
2964                 return 0;
2965
2966         /* don't care for the reason here */
2967         drbd_err(device, "submit failed, triggering re-connect\n");
2968
2969 out_free_e:
2970         spin_lock_irq(&device->resource->req_lock);
2971         list_del(&peer_req->w.list);
2972         spin_unlock_irq(&device->resource->req_lock);
2973         /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2974
2975         put_ldev(device);
2976         drbd_free_peer_req(device, peer_req);
2977         return -EIO;
2978 }
2979
2980 /**
2981  * drbd_asb_recover_0p  -  Recover after split-brain with no remaining primaries
2982  */
2983 static int drbd_asb_recover_0p(struct drbd_peer_device *peer_device) __must_hold(local)
2984 {
2985         struct drbd_device *device = peer_device->device;
2986         int self, peer, rv = -100;
2987         unsigned long ch_self, ch_peer;
2988         enum drbd_after_sb_p after_sb_0p;
2989
2990         self = device->ldev->md.uuid[UI_BITMAP] & 1;
2991         peer = device->p_uuid[UI_BITMAP] & 1;
2992
2993         ch_peer = device->p_uuid[UI_SIZE];
2994         ch_self = device->comm_bm_set;
2995
2996         rcu_read_lock();
2997         after_sb_0p = rcu_dereference(peer_device->connection->net_conf)->after_sb_0p;
2998         rcu_read_unlock();
2999         switch (after_sb_0p) {
3000         case ASB_CONSENSUS:
3001         case ASB_DISCARD_SECONDARY:
3002         case ASB_CALL_HELPER:
3003         case ASB_VIOLENTLY:
3004                 drbd_err(device, "Configuration error.\n");
3005                 break;
3006         case ASB_DISCONNECT:
3007                 break;
3008         case ASB_DISCARD_YOUNGER_PRI:
3009                 if (self == 0 && peer == 1) {
3010                         rv = -1;
3011                         break;
3012                 }
3013                 if (self == 1 && peer == 0) {
3014                         rv =  1;
3015                         break;
3016                 }
3017                 /* Else fall through to one of the other strategies... */
3018         case ASB_DISCARD_OLDER_PRI:
3019                 if (self == 0 && peer == 1) {
3020                         rv = 1;
3021                         break;
3022                 }
3023                 if (self == 1 && peer == 0) {
3024                         rv = -1;
3025                         break;
3026                 }
3027                 /* Else fall through to one of the other strategies... */
3028                 drbd_warn(device, "Discard younger/older primary did not find a decision\n"
3029                      "Using discard-least-changes instead\n");
3030         case ASB_DISCARD_ZERO_CHG:
3031                 if (ch_peer == 0 && ch_self == 0) {
3032                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3033                                 ? -1 : 1;
3034                         break;
3035                 } else {
3036                         if (ch_peer == 0) { rv =  1; break; }
3037                         if (ch_self == 0) { rv = -1; break; }
3038                 }
3039                 if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
3040                         break;
3041         case ASB_DISCARD_LEAST_CHG:
3042                 if      (ch_self < ch_peer)
3043                         rv = -1;
3044                 else if (ch_self > ch_peer)
3045                         rv =  1;
3046                 else /* ( ch_self == ch_peer ) */
3047                      /* Well, then use something else. */
3048                         rv = test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags)
3049                                 ? -1 : 1;
3050                 break;
3051         case ASB_DISCARD_LOCAL:
3052                 rv = -1;
3053                 break;
3054         case ASB_DISCARD_REMOTE:
3055                 rv =  1;
3056         }
3057
3058         return rv;
3059 }
3060
3061 /**
3062  * drbd_asb_recover_1p  -  Recover after split-brain with one remaining primary
3063  */
3064 static int drbd_asb_recover_1p(struct drbd_peer_device *peer_device) __must_hold(local)
3065 {
3066         struct drbd_device *device = peer_device->device;
3067         int hg, rv = -100;
3068         enum drbd_after_sb_p after_sb_1p;
3069
3070         rcu_read_lock();
3071         after_sb_1p = rcu_dereference(peer_device->connection->net_conf)->after_sb_1p;
3072         rcu_read_unlock();
3073         switch (after_sb_1p) {
3074         case ASB_DISCARD_YOUNGER_PRI:
3075         case ASB_DISCARD_OLDER_PRI:
3076         case ASB_DISCARD_LEAST_CHG:
3077         case ASB_DISCARD_LOCAL:
3078         case ASB_DISCARD_REMOTE:
3079         case ASB_DISCARD_ZERO_CHG:
3080                 drbd_err(device, "Configuration error.\n");
3081                 break;
3082         case ASB_DISCONNECT:
3083                 break;
3084         case ASB_CONSENSUS:
3085                 hg = drbd_asb_recover_0p(peer_device);
3086                 if (hg == -1 && device->state.role == R_SECONDARY)
3087                         rv = hg;
3088                 if (hg == 1  && device->state.role == R_PRIMARY)
3089                         rv = hg;
3090                 break;
3091         case ASB_VIOLENTLY:
3092                 rv = drbd_asb_recover_0p(peer_device);
3093                 break;
3094         case ASB_DISCARD_SECONDARY:
3095                 return device->state.role == R_PRIMARY ? 1 : -1;
3096         case ASB_CALL_HELPER:
3097                 hg = drbd_asb_recover_0p(peer_device);
3098                 if (hg == -1 && device->state.role == R_PRIMARY) {
3099                         enum drbd_state_rv rv2;
3100
3101                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3102                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3103                           * we do not need to wait for the after state change work either. */
3104                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3105                         if (rv2 != SS_SUCCESS) {
3106                                 drbd_khelper(device, "pri-lost-after-sb");
3107                         } else {
3108                                 drbd_warn(device, "Successfully gave up primary role.\n");
3109                                 rv = hg;
3110                         }
3111                 } else
3112                         rv = hg;
3113         }
3114
3115         return rv;
3116 }
3117
3118 /**
3119  * drbd_asb_recover_2p  -  Recover after split-brain with two remaining primaries
3120  */
3121 static int drbd_asb_recover_2p(struct drbd_peer_device *peer_device) __must_hold(local)
3122 {
3123         struct drbd_device *device = peer_device->device;
3124         int hg, rv = -100;
3125         enum drbd_after_sb_p after_sb_2p;
3126
3127         rcu_read_lock();
3128         after_sb_2p = rcu_dereference(peer_device->connection->net_conf)->after_sb_2p;
3129         rcu_read_unlock();
3130         switch (after_sb_2p) {
3131         case ASB_DISCARD_YOUNGER_PRI:
3132         case ASB_DISCARD_OLDER_PRI:
3133         case ASB_DISCARD_LEAST_CHG:
3134         case ASB_DISCARD_LOCAL:
3135         case ASB_DISCARD_REMOTE:
3136         case ASB_CONSENSUS:
3137         case ASB_DISCARD_SECONDARY:
3138         case ASB_DISCARD_ZERO_CHG:
3139                 drbd_err(device, "Configuration error.\n");
3140                 break;
3141         case ASB_VIOLENTLY:
3142                 rv = drbd_asb_recover_0p(peer_device);
3143                 break;
3144         case ASB_DISCONNECT:
3145                 break;
3146         case ASB_CALL_HELPER:
3147                 hg = drbd_asb_recover_0p(peer_device);
3148                 if (hg == -1) {
3149                         enum drbd_state_rv rv2;
3150
3151                          /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
3152                           * we might be here in C_WF_REPORT_PARAMS which is transient.
3153                           * we do not need to wait for the after state change work either. */
3154                         rv2 = drbd_change_state(device, CS_VERBOSE, NS(role, R_SECONDARY));
3155                         if (rv2 != SS_SUCCESS) {
3156                                 drbd_khelper(device, "pri-lost-after-sb");
3157                         } else {
3158                                 drbd_warn(device, "Successfully gave up primary role.\n");
3159                                 rv = hg;
3160                         }
3161                 } else
3162                         rv = hg;
3163         }
3164
3165         return rv;
3166 }
3167
3168 static void drbd_uuid_dump(struct drbd_device *device, char *text, u64 *uuid,
3169                            u64 bits, u64 flags)
3170 {
3171         if (!uuid) {
3172                 drbd_info(device, "%s uuid info vanished while I was looking!\n", text);
3173                 return;
3174         }
3175         drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
3176              text,
3177              (unsigned long long)uuid[UI_CURRENT],
3178              (unsigned long long)uuid[UI_BITMAP],
3179              (unsigned long long)uuid[UI_HISTORY_START],
3180              (unsigned long long)uuid[UI_HISTORY_END],
3181              (unsigned long long)bits,
3182              (unsigned long long)flags);
3183 }
3184
3185 /*
3186   100   after split brain try auto recover
3187     2   C_SYNC_SOURCE set BitMap
3188     1   C_SYNC_SOURCE use BitMap
3189     0   no Sync
3190    -1   C_SYNC_TARGET use BitMap
3191    -2   C_SYNC_TARGET set BitMap
3192  -100   after split brain, disconnect
3193 -1000   unrelated data
3194 -1091   requires proto 91
3195 -1096   requires proto 96
3196  */
3197
3198 static int drbd_uuid_compare(struct drbd_device *const device, enum drbd_role const peer_role, int *rule_nr) __must_hold(local)
3199 {
3200         struct drbd_peer_device *const peer_device = first_peer_device(device);
3201         struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
3202         u64 self, peer;
3203         int i, j;
3204
3205         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3206         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3207
3208         *rule_nr = 10;
3209         if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
3210                 return 0;
3211
3212         *rule_nr = 20;
3213         if ((self == UUID_JUST_CREATED || self == (u64)0) &&
3214              peer != UUID_JUST_CREATED)
3215                 return -2;
3216
3217         *rule_nr = 30;
3218         if (self != UUID_JUST_CREATED &&
3219             (peer == UUID_JUST_CREATED || peer == (u64)0))
3220                 return 2;
3221
3222         if (self == peer) {
3223                 int rct, dc; /* roles at crash time */
3224
3225                 if (device->p_uuid[UI_BITMAP] == (u64)0 && device->ldev->md.uuid[UI_BITMAP] != (u64)0) {
3226
3227                         if (connection->agreed_pro_version < 91)
3228                                 return -1091;
3229
3230                         if ((device->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
3231                             (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
3232                                 drbd_info(device, "was SyncSource, missed the resync finished event, corrected myself:\n");
3233                                 drbd_uuid_move_history(device);
3234                                 device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
3235                                 device->ldev->md.uuid[UI_BITMAP] = 0;
3236
3237                                 drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3238                                                device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3239                                 *rule_nr = 34;
3240                         } else {
3241                                 drbd_info(device, "was SyncSource (peer failed to write sync_uuid)\n");
3242                                 *rule_nr = 36;
3243                         }
3244
3245                         return 1;
3246                 }
3247
3248                 if (device->ldev->md.uuid[UI_BITMAP] == (u64)0 && device->p_uuid[UI_BITMAP] != (u64)0) {
3249
3250                         if (connection->agreed_pro_version < 91)
3251                                 return -1091;
3252
3253                         if ((device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (device->p_uuid[UI_BITMAP] & ~((u64)1)) &&
3254                             (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (device->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
3255                                 drbd_info(device, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
3256
3257                                 device->p_uuid[UI_HISTORY_START + 1] = device->p_uuid[UI_HISTORY_START];
3258                                 device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_BITMAP];
3259                                 device->p_uuid[UI_BITMAP] = 0UL;
3260
3261                                 drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3262                                 *rule_nr = 35;
3263                         } else {
3264                                 drbd_info(device, "was SyncTarget (failed to write sync_uuid)\n");
3265                                 *rule_nr = 37;
3266                         }
3267
3268                         return -1;
3269                 }
3270
3271                 /* Common power [off|failure] */
3272                 rct = (test_bit(CRASHED_PRIMARY, &device->flags) ? 1 : 0) +
3273                         (device->p_uuid[UI_FLAGS] & 2);
3274                 /* lowest bit is set when we were primary,
3275                  * next bit (weight 2) is set when peer was primary */
3276                 *rule_nr = 40;
3277
3278                 /* Neither has the "crashed primary" flag set,
3279                  * only a replication link hickup. */
3280                 if (rct == 0)
3281                         return 0;
3282
3283                 /* Current UUID equal and no bitmap uuid; does not necessarily
3284                  * mean this was a "simultaneous hard crash", maybe IO was
3285                  * frozen, so no UUID-bump happened.
3286                  * This is a protocol change, overload DRBD_FF_WSAME as flag
3287                  * for "new-enough" peer DRBD version. */
3288                 if (device->state.role == R_PRIMARY || peer_role == R_PRIMARY) {
3289                         *rule_nr = 41;
3290                         if (!(connection->agreed_features & DRBD_FF_WSAME)) {
3291                                 drbd_warn(peer_device, "Equivalent unrotated UUIDs, but current primary present.\n");
3292                                 return -(0x10000 | PRO_VERSION_MAX | (DRBD_FF_WSAME << 8));
3293                         }
3294                         if (device->state.role == R_PRIMARY && peer_role == R_PRIMARY) {
3295                                 /* At least one has the "crashed primary" bit set,
3296                                  * both are primary now, but neither has rotated its UUIDs?
3297                                  * "Can not happen." */
3298                                 drbd_err(peer_device, "Equivalent unrotated UUIDs, but both are primary. Can not resolve this.\n");
3299                                 return -100;
3300                         }
3301                         if (device->state.role == R_PRIMARY)
3302                                 return 1;
3303                         return -1;
3304                 }
3305
3306                 /* Both are secondary.
3307                  * Really looks like recovery from simultaneous hard crash.
3308                  * Check which had been primary before, and arbitrate. */
3309                 switch (rct) {
3310                 case 0: /* !self_pri && !peer_pri */ return 0; /* already handled */
3311                 case 1: /*  self_pri && !peer_pri */ return 1;
3312                 case 2: /* !self_pri &&  peer_pri */ return -1;
3313                 case 3: /*  self_pri &&  peer_pri */
3314                         dc = test_bit(RESOLVE_CONFLICTS, &connection->flags);
3315                         return dc ? -1 : 1;
3316                 }
3317         }
3318
3319         *rule_nr = 50;
3320         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3321         if (self == peer)
3322                 return -1;
3323
3324         *rule_nr = 51;
3325         peer = device->p_uuid[UI_HISTORY_START] & ~((u64)1);
3326         if (self == peer) {
3327                 if (connection->agreed_pro_version < 96 ?
3328                     (device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
3329                     (device->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
3330                     peer + UUID_NEW_BM_OFFSET == (device->p_uuid[UI_BITMAP] & ~((u64)1))) {
3331                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3332                            resync as sync source modifications of the peer's UUIDs. */
3333
3334                         if (connection->agreed_pro_version < 91)
3335                                 return -1091;
3336
3337                         device->p_uuid[UI_BITMAP] = device->p_uuid[UI_HISTORY_START];
3338                         device->p_uuid[UI_HISTORY_START] = device->p_uuid[UI_HISTORY_START + 1];
3339
3340                         drbd_info(device, "Lost last syncUUID packet, corrected:\n");
3341                         drbd_uuid_dump(device, "peer", device->p_uuid, device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3342
3343                         return -1;
3344                 }
3345         }
3346
3347         *rule_nr = 60;
3348         self = device->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
3349         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3350                 peer = device->p_uuid[i] & ~((u64)1);
3351                 if (self == peer)
3352                         return -2;
3353         }
3354
3355         *rule_nr = 70;
3356         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3357         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3358         if (self == peer)
3359                 return 1;
3360
3361         *rule_nr = 71;
3362         self = device->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
3363         if (self == peer) {
3364                 if (connection->agreed_pro_version < 96 ?
3365                     (device->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
3366                     (device->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
3367                     self + UUID_NEW_BM_OFFSET == (device->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
3368                         /* The last P_SYNC_UUID did not get though. Undo the last start of
3369                            resync as sync source modifications of our UUIDs. */
3370
3371                         if (connection->agreed_pro_version < 91)
3372                                 return -1091;
3373
3374                         __drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_HISTORY_START]);
3375                         __drbd_uuid_set(device, UI_HISTORY_START, device->ldev->md.uuid[UI_HISTORY_START + 1]);
3376
3377                         drbd_info(device, "Last syncUUID did not get through, corrected:\n");
3378                         drbd_uuid_dump(device, "self", device->ldev->md.uuid,
3379                                        device->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(device) : 0, 0);
3380
3381                         return 1;
3382                 }
3383         }
3384
3385
3386         *rule_nr = 80;
3387         peer = device->p_uuid[UI_CURRENT] & ~((u64)1);
3388         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3389                 self = device->ldev->md.uuid[i] & ~((u64)1);
3390                 if (self == peer)
3391                         return 2;
3392         }
3393
3394         *rule_nr = 90;
3395         self = device->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
3396         peer = device->p_uuid[UI_BITMAP] & ~((u64)1);
3397         if (self == peer && self != ((u64)0))
3398                 return 100;
3399
3400         *rule_nr = 100;
3401         for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
3402                 self = device->ldev->md.uuid[i] & ~((u64)1);
3403                 for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
3404                         peer = device->p_uuid[j] & ~((u64)1);
3405                         if (self == peer)
3406                                 return -100;
3407                 }
3408         }
3409
3410         return -1000;
3411 }
3412
3413 /* drbd_sync_handshake() returns the new conn state on success, or
3414    CONN_MASK (-1) on failure.
3415  */
3416 static enum drbd_conns drbd_sync_handshake(struct drbd_peer_device *peer_device,
3417                                            enum drbd_role peer_role,
3418                                            enum drbd_disk_state peer_disk) __must_hold(local)
3419 {
3420         struct drbd_device *device = peer_device->device;
3421         enum drbd_conns rv = C_MASK;
3422         enum drbd_disk_state mydisk;
3423         struct net_conf *nc;
3424         int hg, rule_nr, rr_conflict, tentative;
3425
3426         mydisk = device->state.disk;
3427         if (mydisk == D_NEGOTIATING)
3428                 mydisk = device->new_state_tmp.disk;
3429
3430         drbd_info(device, "drbd_sync_handshake:\n");
3431
3432         spin_lock_irq(&device->ldev->md.uuid_lock);
3433         drbd_uuid_dump(device, "self", device->ldev->md.uuid, device->comm_bm_set, 0);
3434         drbd_uuid_dump(device, "peer", device->p_uuid,
3435                        device->p_uuid[UI_SIZE], device->p_uuid[UI_FLAGS]);
3436
3437         hg = drbd_uuid_compare(device, peer_role, &rule_nr);
3438         spin_unlock_irq(&device->ldev->md.uuid_lock);
3439
3440         drbd_info(device, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
3441
3442         if (hg == -1000) {
3443                 drbd_alert(device, "Unrelated data, aborting!\n");
3444                 return C_MASK;
3445         }
3446         if (hg < -0x10000) {
3447                 int proto, fflags;
3448                 hg = -hg;
3449                 proto = hg & 0xff;
3450                 fflags = (hg >> 8) & 0xff;
3451                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d and feature flags 0x%x\n",
3452                                         proto, fflags);
3453                 return C_MASK;
3454         }
3455         if (hg < -1000) {
3456                 drbd_alert(device, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
3457                 return C_MASK;
3458         }
3459
3460         if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
3461             (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
3462                 int f = (hg == -100) || abs(hg) == 2;
3463                 hg = mydisk > D_INCONSISTENT ? 1 : -1;
3464                 if (f)
3465                         hg = hg*2;
3466                 drbd_info(device, "Becoming sync %s due to disk states.\n",
3467                      hg > 0 ? "source" : "target");
3468         }
3469
3470         if (abs(hg) == 100)
3471                 drbd_khelper(device, "initial-split-brain");
3472
3473         rcu_read_lock();
3474         nc = rcu_dereference(peer_device->connection->net_conf);
3475
3476         if (hg == 100 || (hg == -100 && nc->always_asbp)) {
3477                 int pcount = (device->state.role == R_PRIMARY)
3478                            + (peer_role == R_PRIMARY);
3479                 int forced = (hg == -100);
3480
3481                 switch (pcount) {
3482                 case 0:
3483                         hg = drbd_asb_recover_0p(peer_device);
3484                         break;
3485                 case 1:
3486                         hg = drbd_asb_recover_1p(peer_device);
3487                         break;
3488                 case 2:
3489                         hg = drbd_asb_recover_2p(peer_device);
3490                         break;
3491                 }
3492                 if (abs(hg) < 100) {
3493                         drbd_warn(device, "Split-Brain detected, %d primaries, "
3494                              "automatically solved. Sync from %s node\n",
3495                              pcount, (hg < 0) ? "peer" : "this");
3496                         if (forced) {
3497                                 drbd_warn(device, "Doing a full sync, since"
3498                                      " UUIDs where ambiguous.\n");
3499                                 hg = hg*2;
3500                         }
3501                 }
3502         }
3503
3504         if (hg == -100) {
3505                 if (test_bit(DISCARD_MY_DATA, &device->flags) && !(device->p_uuid[UI_FLAGS]&1))
3506                         hg = -1;
3507                 if (!test_bit(DISCARD_MY_DATA, &device->flags) && (device->p_uuid[UI_FLAGS]&1))
3508                         hg = 1;
3509
3510                 if (abs(hg) < 100)
3511                         drbd_warn(device, "Split-Brain detected, manually solved. "
3512                              "Sync from %s node\n",
3513                              (hg < 0) ? "peer" : "this");
3514         }
3515         rr_conflict = nc->rr_conflict;
3516         tentative = nc->tentative;
3517         rcu_read_unlock();
3518
3519         if (hg == -100) {
3520                 /* FIXME this log message is not correct if we end up here
3521                  * after an attempted attach on a diskless node.
3522                  * We just refuse to attach -- well, we drop the "connection"
3523                  * to that disk, in a way... */
3524                 drbd_alert(device, "Split-Brain detected but unresolved, dropping connection!\n");
3525                 drbd_khelper(device, "split-brain");
3526                 return C_MASK;
3527         }
3528
3529         if (hg > 0 && mydisk <= D_INCONSISTENT) {
3530                 drbd_err(device, "I shall become SyncSource, but I am inconsistent!\n");
3531                 return C_MASK;
3532         }
3533
3534         if (hg < 0 && /* by intention we do not use mydisk here. */
3535             device->state.role == R_PRIMARY && device->state.disk >= D_CONSISTENT) {
3536                 switch (rr_conflict) {
3537                 case ASB_CALL_HELPER:
3538                         drbd_khelper(device, "pri-lost");
3539                         /* fall through */
3540                 case ASB_DISCONNECT:
3541                         drbd_err(device, "I shall become SyncTarget, but I am primary!\n");
3542                         return C_MASK;
3543                 case ASB_VIOLENTLY:
3544                         drbd_warn(device, "Becoming SyncTarget, violating the stable-data"
3545                              "assumption\n");
3546                 }
3547         }
3548
3549         if (tentative || test_bit(CONN_DRY_RUN, &peer_device->connection->flags)) {
3550                 if (hg == 0)
3551                         drbd_info(device, "dry-run connect: No resync, would become Connected immediately.\n");
3552                 else
3553                         drbd_info(device, "dry-run connect: Would become %s, doing a %s resync.",
3554                                  drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3555                                  abs(hg) >= 2 ? "full" : "bit-map based");
3556                 return C_MASK;
3557         }
3558
3559         if (abs(hg) >= 2) {
3560                 drbd_info(device, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3561                 if (drbd_bitmap_io(device, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3562                                         BM_LOCKED_SET_ALLOWED))
3563                         return C_MASK;
3564         }
3565
3566         if (hg > 0) { /* become sync source. */
3567                 rv = C_WF_BITMAP_S;
3568         } else if (hg < 0) { /* become sync target */
3569                 rv = C_WF_BITMAP_T;
3570         } else {
3571                 rv = C_CONNECTED;
3572                 if (drbd_bm_total_weight(device)) {
3573                         drbd_info(device, "No resync, but %lu bits in bitmap!\n",
3574                              drbd_bm_total_weight(device));
3575                 }
3576         }
3577
3578         return rv;
3579 }
3580
3581 static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3582 {
3583         /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3584         if (peer == ASB_DISCARD_REMOTE)
3585                 return ASB_DISCARD_LOCAL;
3586
3587         /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3588         if (peer == ASB_DISCARD_LOCAL)
3589                 return ASB_DISCARD_REMOTE;
3590
3591         /* everything else is valid if they are equal on both sides. */
3592         return peer;
3593 }
3594
3595 static int receive_protocol(struct drbd_connection *connection, struct packet_info *pi)
3596 {
3597         struct p_protocol *p = pi->data;
3598         enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3599         int p_proto, p_discard_my_data, p_two_primaries, cf;
3600         struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3601         char integrity_alg[SHARED_SECRET_MAX] = "";
3602         struct crypto_ahash *peer_integrity_tfm = NULL;
3603         void *int_dig_in = NULL, *int_dig_vv = NULL;
3604
3605         p_proto         = be32_to_cpu(p->protocol);
3606         p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3607         p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3608         p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3609         p_two_primaries = be32_to_cpu(p->two_primaries);
3610         cf              = be32_to_cpu(p->conn_flags);
3611         p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3612
3613         if (connection->agreed_pro_version >= 87) {
3614                 int err;
3615
3616                 if (pi->size > sizeof(integrity_alg))
3617                         return -EIO;
3618                 err = drbd_recv_all(connection, integrity_alg, pi->size);
3619                 if (err)
3620                         return err;
3621                 integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3622         }
3623
3624         if (pi->cmd != P_PROTOCOL_UPDATE) {
3625                 clear_bit(CONN_DRY_RUN, &connection->flags);
3626
3627                 if (cf & CF_DRY_RUN)
3628                         set_bit(CONN_DRY_RUN, &connection->flags);
3629
3630                 rcu_read_lock();
3631                 nc = rcu_dereference(connection->net_conf);
3632
3633                 if (p_proto != nc->wire_protocol) {
3634                         drbd_err(connection, "incompatible %s settings\n", "protocol");
3635                         goto disconnect_rcu_unlock;
3636                 }
3637
3638                 if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3639                         drbd_err(connection, "incompatible %s settings\n", "after-sb-0pri");
3640                         goto disconnect_rcu_unlock;
3641                 }
3642
3643                 if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3644                         drbd_err(connection, "incompatible %s settings\n", "after-sb-1pri");
3645                         goto disconnect_rcu_unlock;
3646                 }
3647
3648                 if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3649                         drbd_err(connection, "incompatible %s settings\n", "after-sb-2pri");
3650                         goto disconnect_rcu_unlock;
3651                 }
3652
3653                 if (p_discard_my_data && nc->discard_my_data) {
3654                         drbd_err(connection, "incompatible %s settings\n", "discard-my-data");
3655                         goto disconnect_rcu_unlock;
3656                 }
3657
3658                 if (p_two_primaries != nc->two_primaries) {
3659                         drbd_err(connection, "incompatible %s settings\n", "allow-two-primaries");
3660                         goto disconnect_rcu_unlock;
3661                 }
3662
3663                 if (strcmp(integrity_alg, nc->integrity_alg)) {
3664                         drbd_err(connection, "incompatible %s settings\n", "data-integrity-alg");
3665                         goto disconnect_rcu_unlock;
3666                 }
3667
3668                 rcu_read_unlock();
3669         }
3670
3671         if (integrity_alg[0]) {
3672                 int hash_size;
3673
3674                 /*
3675                  * We can only change the peer data integrity algorithm
3676                  * here.  Changing our own data integrity algorithm
3677                  * requires that we send a P_PROTOCOL_UPDATE packet at
3678                  * the same time; otherwise, the peer has no way to
3679                  * tell between which packets the algorithm should
3680                  * change.
3681                  */
3682
3683                 peer_integrity_tfm = crypto_alloc_ahash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3684                 if (IS_ERR(peer_integrity_tfm)) {
3685                         peer_integrity_tfm = NULL;
3686                         drbd_err(connection, "peer data-integrity-alg %s not supported\n",
3687                                  integrity_alg);
3688                         goto disconnect;
3689                 }
3690
3691                 hash_size = crypto_ahash_digestsize(peer_integrity_tfm);
3692                 int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3693                 int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3694                 if (!(int_dig_in && int_dig_vv)) {
3695                         drbd_err(connection, "Allocation of buffers for data integrity checking failed\n");
3696                         goto disconnect;
3697                 }
3698         }
3699
3700         new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3701         if (!new_net_conf) {
3702                 drbd_err(connection, "Allocation of new net_conf failed\n");
3703                 goto disconnect;
3704         }
3705
3706         mutex_lock(&connection->data.mutex);
3707         mutex_lock(&connection->resource->conf_update);
3708         old_net_conf = connection->net_conf;
3709         *new_net_conf = *old_net_conf;
3710
3711         new_net_conf->wire_protocol = p_proto;
3712         new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3713         new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3714         new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3715         new_net_conf->two_primaries = p_two_primaries;
3716
3717         rcu_assign_pointer(connection->net_conf, new_net_conf);
3718         mutex_unlock(&connection->resource->conf_update);
3719         mutex_unlock(&connection->data.mutex);
3720
3721         crypto_free_ahash(connection->peer_integrity_tfm);
3722         kfree(connection->int_dig_in);
3723         kfree(connection->int_dig_vv);
3724         connection->peer_integrity_tfm = peer_integrity_tfm;
3725         connection->int_dig_in = int_dig_in;
3726         connection->int_dig_vv = int_dig_vv;
3727
3728         if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3729                 drbd_info(connection, "peer data-integrity-alg: %s\n",
3730                           integrity_alg[0] ? integrity_alg : "(none)");
3731
3732         synchronize_rcu();
3733         kfree(old_net_conf);
3734         return 0;
3735
3736 disconnect_rcu_unlock:
3737         rcu_read_unlock();
3738 disconnect:
3739         crypto_free_ahash(peer_integrity_tfm);
3740         kfree(int_dig_in);
3741         kfree(int_dig_vv);
3742         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
3743         return -EIO;
3744 }
3745
3746 /* helper function
3747  * input: alg name, feature name
3748  * return: NULL (alg name was "")
3749  *         ERR_PTR(error) if something goes wrong
3750  *         or the crypto hash ptr, if it worked out ok. */
3751 static struct crypto_ahash *drbd_crypto_alloc_digest_safe(const struct drbd_device *device,
3752                 const char *alg, const char *name)
3753 {
3754         struct crypto_ahash *tfm;
3755
3756         if (!alg[0])
3757                 return NULL;
3758
3759         tfm = crypto_alloc_ahash(alg, 0, CRYPTO_ALG_ASYNC);
3760         if (IS_ERR(tfm)) {
3761                 drbd_err(device, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3762                         alg, name, PTR_ERR(tfm));
3763                 return tfm;
3764         }
3765         return tfm;
3766 }
3767
3768 static int ignore_remaining_packet(struct drbd_connection *connection, struct packet_info *pi)
3769 {
3770         void *buffer = connection->data.rbuf;
3771         int size = pi->size;
3772
3773         while (size) {
3774                 int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3775                 s = drbd_recv(connection, buffer, s);
3776                 if (s <= 0) {
3777                         if (s < 0)
3778                                 return s;
3779                         break;
3780                 }
3781                 size -= s;
3782         }
3783         if (size)
3784                 return -EIO;
3785         return 0;
3786 }
3787
3788 /*
3789  * config_unknown_volume  -  device configuration command for unknown volume
3790  *
3791  * When a device is added to an existing connection, the node on which the
3792  * device is added first will send configuration commands to its peer but the
3793  * peer will not know about the device yet.  It will warn and ignore these
3794  * commands.  Once the device is added on the second node, the second node will
3795  * send the same device configuration commands, but in the other direction.
3796  *
3797  * (We can also end up here if drbd is misconfigured.)
3798  */
3799 static int config_unknown_volume(struct drbd_connection *connection, struct packet_info *pi)
3800 {
3801         drbd_warn(connection, "%s packet received for volume %u, which is not configured locally\n",
3802                   cmdname(pi->cmd), pi->vnr);
3803         return ignore_remaining_packet(connection, pi);
3804 }
3805
3806 static int receive_SyncParam(struct drbd_connection *connection, struct packet_info *pi)
3807 {
3808         struct drbd_peer_device *peer_device;
3809         struct drbd_device *device;
3810         struct p_rs_param_95 *p;
3811         unsigned int header_size, data_size, exp_max_sz;
3812         struct crypto_ahash *verify_tfm = NULL;
3813         struct crypto_ahash *csums_tfm = NULL;
3814         struct net_conf *old_net_conf, *new_net_conf = NULL;
3815         struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3816         const int apv = connection->agreed_pro_version;
3817         struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3818         int fifo_size = 0;
3819         int err;
3820
3821         peer_device = conn_peer_device(connection, pi->vnr);
3822         if (!peer_device)
3823                 return config_unknown_volume(connection, pi);
3824         device = peer_device->device;
3825
3826         exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3827                     : apv == 88 ? sizeof(struct p_rs_param)
3828                                         + SHARED_SECRET_MAX
3829                     : apv <= 94 ? sizeof(struct p_rs_param_89)
3830                     : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3831
3832         if (pi->size > exp_max_sz) {
3833                 drbd_err(device, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3834                     pi->size, exp_max_sz);
3835                 return -EIO;
3836         }
3837
3838         if (apv <= 88) {
3839                 header_size = sizeof(struct p_rs_param);
3840                 data_size = pi->size - header_size;
3841         } else if (apv <= 94) {
3842                 header_size = sizeof(struct p_rs_param_89);
3843                 data_size = pi->size - header_size;
3844                 D_ASSERT(device, data_size == 0);
3845         } else {
3846                 header_size = sizeof(struct p_rs_param_95);
3847                 data_size = pi->size - header_size;
3848                 D_ASSERT(device, data_size == 0);
3849         }
3850
3851         /* initialize verify_alg and csums_alg */
3852         p = pi->data;
3853         memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3854
3855         err = drbd_recv_all(peer_device->connection, p, header_size);
3856         if (err)
3857                 return err;
3858
3859         mutex_lock(&connection->resource->conf_update);
3860         old_net_conf = peer_device->connection->net_conf;
3861         if (get_ldev(device)) {
3862                 new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3863                 if (!new_disk_conf) {
3864                         put_ldev(device);
3865                         mutex_unlock(&connection->resource->conf_update);
3866                         drbd_err(device, "Allocation of new disk_conf failed\n");
3867                         return -ENOMEM;
3868                 }
3869
3870                 old_disk_conf = device->ldev->disk_conf;
3871                 *new_disk_conf = *old_disk_conf;
3872
3873                 new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3874         }
3875
3876         if (apv >= 88) {
3877                 if (apv == 88) {
3878                         if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3879                                 drbd_err(device, "verify-alg of wrong size, "
3880                                         "peer wants %u, accepting only up to %u byte\n",
3881                                         data_size, SHARED_SECRET_MAX);
3882                                 err = -EIO;
3883                                 goto reconnect;
3884                         }
3885
3886                         err = drbd_recv_all(peer_device->connection, p->verify_alg, data_size);
3887                         if (err)
3888                                 goto reconnect;
3889                         /* we expect NUL terminated string */
3890                         /* but just in case someone tries to be evil */
3891                         D_ASSERT(device, p->verify_alg[data_size-1] == 0);
3892                         p->verify_alg[data_size-1] = 0;
3893
3894                 } else /* apv >= 89 */ {
3895                         /* we still expect NUL terminated strings */
3896                         /* but just in case someone tries to be evil */
3897                         D_ASSERT(device, p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3898                         D_ASSERT(device, p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3899                         p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3900                         p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3901                 }
3902
3903                 if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3904                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3905                                 drbd_err(device, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3906                                     old_net_conf->verify_alg, p->verify_alg);
3907                                 goto disconnect;
3908                         }
3909                         verify_tfm = drbd_crypto_alloc_digest_safe(device,
3910                                         p->verify_alg, "verify-alg");
3911                         if (IS_ERR(verify_tfm)) {
3912                                 verify_tfm = NULL;
3913                                 goto disconnect;
3914                         }
3915                 }
3916
3917                 if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3918                         if (device->state.conn == C_WF_REPORT_PARAMS) {
3919                                 drbd_err(device, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3920                                     old_net_conf->csums_alg, p->csums_alg);
3921                                 goto disconnect;
3922                         }
3923                         csums_tfm = drbd_crypto_alloc_digest_safe(device,
3924                                         p->csums_alg, "csums-alg");
3925                         if (IS_ERR(csums_tfm)) {
3926                                 csums_tfm = NULL;
3927                                 goto disconnect;
3928                         }
3929                 }
3930
3931                 if (apv > 94 && new_disk_conf) {
3932                         new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3933                         new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3934                         new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3935                         new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3936
3937                         fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3938                         if (fifo_size != device->rs_plan_s->size) {
3939                                 new_plan = fifo_alloc(fifo_size);
3940                                 if (!new_plan) {
3941                                         drbd_err(device, "kmalloc of fifo_buffer failed");
3942                                         put_ldev(device);
3943                                         goto disconnect;
3944                                 }
3945                         }
3946                 }
3947
3948                 if (verify_tfm || csums_tfm) {
3949                         new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3950                         if (!new_net_conf) {
3951                                 drbd_err(device, "Allocation of new net_conf failed\n");
3952                                 goto disconnect;
3953                         }
3954
3955                         *new_net_conf = *old_net_conf;
3956
3957                         if (verify_tfm) {
3958                                 strcpy(new_net_conf->verify_alg, p->verify_alg);
3959                                 new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3960                                 crypto_free_ahash(peer_device->connection->verify_tfm);
3961                                 peer_device->connection->verify_tfm = verify_tfm;
3962                                 drbd_info(device, "using verify-alg: \"%s\"\n", p->verify_alg);
3963                         }
3964                         if (csums_tfm) {
3965                                 strcpy(new_net_conf->csums_alg, p->csums_alg);
3966                                 new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3967                                 crypto_free_ahash(peer_device->connection->csums_tfm);
3968                                 peer_device->connection->csums_tfm = csums_tfm;
3969                                 drbd_info(device, "using csums-alg: \"%s\"\n", p->csums_alg);
3970                         }
3971                         rcu_assign_pointer(connection->net_conf, new_net_conf);
3972                 }
3973         }
3974
3975         if (new_disk_conf) {
3976                 rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
3977                 put_ldev(device);
3978         }
3979
3980         if (new_plan) {
3981                 old_plan = device->rs_plan_s;
3982                 rcu_assign_pointer(device->rs_plan_s, new_plan);
3983         }
3984
3985         mutex_unlock(&connection->resource->conf_update);
3986         synchronize_rcu();
3987         if (new_net_conf)
3988                 kfree(old_net_conf);
3989         kfree(old_disk_conf);
3990         kfree(old_plan);
3991
3992         return 0;
3993
3994 reconnect:
3995         if (new_disk_conf) {
3996                 put_ldev(device);
3997                 kfree(new_disk_conf);
3998         }
3999         mutex_unlock(&connection->resource->conf_update);
4000         return -EIO;
4001
4002 disconnect:
4003         kfree(new_plan);
4004         if (new_disk_conf) {
4005                 put_ldev(device);
4006                 kfree(new_disk_conf);
4007         }
4008         mutex_unlock(&connection->resource->conf_update);
4009         /* just for completeness: actually not needed,
4010          * as this is not reached if csums_tfm was ok. */
4011         crypto_free_ahash(csums_tfm);
4012         /* but free the verify_tfm again, if csums_tfm did not work out */
4013         crypto_free_ahash(verify_tfm);
4014         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4015         return -EIO;
4016 }
4017
4018 /* warn if the arguments differ by more than 12.5% */
4019 static void warn_if_differ_considerably(struct drbd_device *device,
4020         const char *s, sector_t a, sector_t b)
4021 {
4022         sector_t d;
4023         if (a == 0 || b == 0)
4024                 return;
4025         d = (a > b) ? (a - b) : (b - a);
4026         if (d > (a>>3) || d > (b>>3))
4027                 drbd_warn(device, "Considerable difference in %s: %llus vs. %llus\n", s,
4028                      (unsigned long long)a, (unsigned long long)b);
4029 }
4030
4031 static int receive_sizes(struct drbd_connection *connection, struct packet_info *pi)
4032 {
4033         struct drbd_peer_device *peer_device;
4034         struct drbd_device *device;
4035         struct p_sizes *p = pi->data;
4036         struct o_qlim *o = (connection->agreed_features & DRBD_FF_WSAME) ? p->qlim : NULL;
4037         enum determine_dev_size dd = DS_UNCHANGED;
4038         sector_t p_size, p_usize, p_csize, my_usize;
4039         int ldsc = 0; /* local disk size changed */
4040         enum dds_flags ddsf;
4041
4042         peer_device = conn_peer_device(connection, pi->vnr);
4043         if (!peer_device)
4044                 return config_unknown_volume(connection, pi);
4045         device = peer_device->device;
4046
4047         p_size = be64_to_cpu(p->d_size);
4048         p_usize = be64_to_cpu(p->u_size);
4049         p_csize = be64_to_cpu(p->c_size);
4050
4051         /* just store the peer's disk size for now.
4052          * we still need to figure out whether we accept that. */
4053         device->p_size = p_size;
4054
4055         if (get_ldev(device)) {
4056                 sector_t new_size, cur_size;
4057                 rcu_read_lock();
4058                 my_usize = rcu_dereference(device->ldev->disk_conf)->disk_size;
4059                 rcu_read_unlock();
4060
4061                 warn_if_differ_considerably(device, "lower level device sizes",
4062                            p_size, drbd_get_max_capacity(device->ldev));
4063                 warn_if_differ_considerably(device, "user requested size",
4064                                             p_usize, my_usize);
4065
4066                 /* if this is the first connect, or an otherwise expected
4067                  * param exchange, choose the minimum */
4068                 if (device->state.conn == C_WF_REPORT_PARAMS)
4069                         p_usize = min_not_zero(my_usize, p_usize);
4070
4071                 /* Never shrink a device with usable data during connect.
4072                    But allow online shrinking if we are connected. */
4073                 new_size = drbd_new_dev_size(device, device->ldev, p_usize, 0);
4074                 cur_size = drbd_get_capacity(device->this_bdev);
4075                 if (new_size < cur_size &&
4076                     device->state.disk >= D_OUTDATED &&
4077                     device->state.conn < C_CONNECTED) {
4078                         drbd_err(device, "The peer's disk size is too small! (%llu < %llu sectors)\n",
4079                                         (unsigned long long)new_size, (unsigned long long)cur_size);
4080                         conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4081                         put_ldev(device);
4082                         return -EIO;
4083                 }
4084
4085                 if (my_usize != p_usize) {
4086                         struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
4087
4088                         new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
4089                         if (!new_disk_conf) {
4090                                 drbd_err(device, "Allocation of new disk_conf failed\n");
4091                                 put_ldev(device);
4092                                 return -ENOMEM;
4093                         }
4094
4095                         mutex_lock(&connection->resource->conf_update);
4096                         old_disk_conf = device->ldev->disk_conf;
4097                         *new_disk_conf = *old_disk_conf;
4098                         new_disk_conf->disk_size = p_usize;
4099
4100                         rcu_assign_pointer(device->ldev->disk_conf, new_disk_conf);
4101                         mutex_unlock(&connection->resource->conf_update);
4102                         synchronize_rcu();
4103                         kfree(old_disk_conf);
4104
4105                         drbd_info(device, "Peer sets u_size to %lu sectors\n",
4106                                  (unsigned long)my_usize);
4107                 }
4108
4109                 put_ldev(device);
4110         }
4111
4112         device->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
4113         /* Leave drbd_reconsider_queue_parameters() before drbd_determine_dev_size().
4114            In case we cleared the QUEUE_FLAG_DISCARD from our queue in
4115            drbd_reconsider_queue_parameters(), we can be sure that after
4116            drbd_determine_dev_size() no REQ_DISCARDs are in the queue. */
4117
4118         ddsf = be16_to_cpu(p->dds_flags);
4119         if (get_ldev(device)) {
4120                 drbd_reconsider_queue_parameters(device, device->ldev, o);
4121                 dd = drbd_determine_dev_size(device, ddsf, NULL);
4122                 put_ldev(device);
4123                 if (dd == DS_ERROR)
4124                         return -EIO;
4125                 drbd_md_sync(device);
4126         } else {
4127                 /*
4128                  * I am diskless, need to accept the peer's *current* size.
4129                  * I must NOT accept the peers backing disk size,
4130                  * it may have been larger than mine all along...
4131                  *
4132                  * At this point, the peer knows more about my disk, or at
4133                  * least about what we last agreed upon, than myself.
4134                  * So if his c_size is less than his d_size, the most likely
4135                  * reason is that *my* d_size was smaller last time we checked.
4136                  *
4137                  * However, if he sends a zero current size,
4138                  * take his (user-capped or) backing disk size anyways.
4139                  */
4140                 drbd_reconsider_queue_parameters(device, NULL, o);
4141                 drbd_set_my_capacity(device, p_csize ?: p_usize ?: p_size);
4142         }
4143
4144         if (get_ldev(device)) {
4145                 if (device->ldev->known_size != drbd_get_capacity(device->ldev->backing_bdev)) {
4146                         device->ldev->known_size = drbd_get_capacity(device->ldev->backing_bdev);
4147                         ldsc = 1;
4148                 }
4149
4150                 put_ldev(device);
4151         }
4152
4153         if (device->state.conn > C_WF_REPORT_PARAMS) {
4154                 if (be64_to_cpu(p->c_size) !=
4155                     drbd_get_capacity(device->this_bdev) || ldsc) {
4156                         /* we have different sizes, probably peer
4157                          * needs to know my new size... */
4158                         drbd_send_sizes(peer_device, 0, ddsf);
4159                 }
4160                 if (test_and_clear_bit(RESIZE_PENDING, &device->flags) ||
4161                     (dd == DS_GREW && device->state.conn == C_CONNECTED)) {
4162                         if (device->state.pdsk >= D_INCONSISTENT &&
4163                             device->state.disk >= D_INCONSISTENT) {
4164                                 if (ddsf & DDSF_NO_RESYNC)
4165                                         drbd_info(device, "Resync of new storage suppressed with --assume-clean\n");
4166                                 else
4167                                         resync_after_online_grow(device);
4168                         } else
4169                                 set_bit(RESYNC_AFTER_NEG, &device->flags);
4170                 }
4171         }
4172
4173         return 0;
4174 }
4175
4176 static int receive_uuids(struct drbd_connection *connection, struct packet_info *pi)
4177 {
4178         struct drbd_peer_device *peer_device;
4179         struct drbd_device *device;
4180         struct p_uuids *p = pi->data;
4181         u64 *p_uuid;
4182         int i, updated_uuids = 0;
4183
4184         peer_device = conn_peer_device(connection, pi->vnr);
4185         if (!peer_device)
4186                 return config_unknown_volume(connection, pi);
4187         device = peer_device->device;
4188
4189         p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
4190         if (!p_uuid) {
4191                 drbd_err(device, "kmalloc of p_uuid failed\n");
4192                 return false;
4193         }
4194
4195         for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
4196                 p_uuid[i] = be64_to_cpu(p->uuid[i]);
4197
4198         kfree(device->p_uuid);
4199         device->p_uuid = p_uuid;
4200
4201         if (device->state.conn < C_CONNECTED &&
4202             device->state.disk < D_INCONSISTENT &&
4203             device->state.role == R_PRIMARY &&
4204             (device->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
4205                 drbd_err(device, "Can only connect to data with current UUID=%016llX\n",
4206                     (unsigned long long)device->ed_uuid);
4207                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4208                 return -EIO;
4209         }
4210
4211         if (get_ldev(device)) {
4212                 int skip_initial_sync =
4213                         device->state.conn == C_CONNECTED &&
4214                         peer_device->connection->agreed_pro_version >= 90 &&
4215                         device->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
4216                         (p_uuid[UI_FLAGS] & 8);
4217                 if (skip_initial_sync) {
4218                         drbd_info(device, "Accepted new current UUID, preparing to skip initial sync\n");
4219                         drbd_bitmap_io(device, &drbd_bmio_clear_n_write,
4220                                         "clear_n_write from receive_uuids",
4221                                         BM_LOCKED_TEST_ALLOWED);
4222                         _drbd_uuid_set(device, UI_CURRENT, p_uuid[UI_CURRENT]);
4223                         _drbd_uuid_set(device, UI_BITMAP, 0);
4224                         _drbd_set_state(_NS2(device, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
4225                                         CS_VERBOSE, NULL);
4226                         drbd_md_sync(device);
4227                         updated_uuids = 1;
4228                 }
4229                 put_ldev(device);
4230         } else if (device->state.disk < D_INCONSISTENT &&
4231                    device->state.role == R_PRIMARY) {
4232                 /* I am a diskless primary, the peer just created a new current UUID
4233                    for me. */
4234                 updated_uuids = drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4235         }
4236
4237         /* Before we test for the disk state, we should wait until an eventually
4238            ongoing cluster wide state change is finished. That is important if
4239            we are primary and are detaching from our disk. We need to see the
4240            new disk state... */
4241         mutex_lock(device->state_mutex);
4242         mutex_unlock(device->state_mutex);
4243         if (device->state.conn >= C_CONNECTED && device->state.disk < D_INCONSISTENT)
4244                 updated_uuids |= drbd_set_ed_uuid(device, p_uuid[UI_CURRENT]);
4245
4246         if (updated_uuids)
4247                 drbd_print_uuids(device, "receiver updated UUIDs to");
4248
4249         return 0;
4250 }
4251
4252 /**
4253  * convert_state() - Converts the peer's view of the cluster state to our point of view
4254  * @ps:         The state as seen by the peer.
4255  */
4256 static union drbd_state convert_state(union drbd_state ps)
4257 {
4258         union drbd_state ms;
4259
4260         static enum drbd_conns c_tab[] = {
4261                 [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
4262                 [C_CONNECTED] = C_CONNECTED,
4263
4264                 [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
4265                 [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
4266                 [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
4267                 [C_VERIFY_S]       = C_VERIFY_T,
4268                 [C_MASK]   = C_MASK,
4269         };
4270
4271         ms.i = ps.i;
4272
4273         ms.conn = c_tab[ps.conn];
4274         ms.peer = ps.role;
4275         ms.role = ps.peer;
4276         ms.pdsk = ps.disk;
4277         ms.disk = ps.pdsk;
4278         ms.peer_isp = (ps.aftr_isp | ps.user_isp);
4279
4280         return ms;
4281 }
4282
4283 static int receive_req_state(struct drbd_connection *connection, struct packet_info *pi)
4284 {
4285         struct drbd_peer_device *peer_device;
4286         struct drbd_device *device;
4287         struct p_req_state *p = pi->data;
4288         union drbd_state mask, val;
4289         enum drbd_state_rv rv;
4290
4291         peer_device = conn_peer_device(connection, pi->vnr);
4292         if (!peer_device)
4293                 return -EIO;
4294         device = peer_device->device;
4295
4296         mask.i = be32_to_cpu(p->mask);
4297         val.i = be32_to_cpu(p->val);
4298
4299         if (test_bit(RESOLVE_CONFLICTS, &peer_device->connection->flags) &&
4300             mutex_is_locked(device->state_mutex)) {
4301                 drbd_send_sr_reply(peer_device, SS_CONCURRENT_ST_CHG);
4302                 return 0;
4303         }
4304
4305         mask = convert_state(mask);
4306         val = convert_state(val);
4307
4308         rv = drbd_change_state(device, CS_VERBOSE, mask, val);
4309         drbd_send_sr_reply(peer_device, rv);
4310
4311         drbd_md_sync(device);
4312
4313         return 0;
4314 }
4315
4316 static int receive_req_conn_state(struct drbd_connection *connection, struct packet_info *pi)
4317 {
4318         struct p_req_state *p = pi->data;
4319         union drbd_state mask, val;
4320         enum drbd_state_rv rv;
4321
4322         mask.i = be32_to_cpu(p->mask);
4323         val.i = be32_to_cpu(p->val);
4324
4325         if (test_bit(RESOLVE_CONFLICTS, &connection->flags) &&
4326             mutex_is_locked(&connection->cstate_mutex)) {
4327                 conn_send_sr_reply(connection, SS_CONCURRENT_ST_CHG);
4328                 return 0;
4329         }
4330
4331         mask = convert_state(mask);
4332         val = convert_state(val);
4333
4334         rv = conn_request_state(connection, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
4335         conn_send_sr_reply(connection, rv);
4336
4337         return 0;
4338 }
4339
4340 static int receive_state(struct drbd_connection *connection, struct packet_info *pi)
4341 {
4342         struct drbd_peer_device *peer_device;
4343         struct drbd_device *device;
4344         struct p_state *p = pi->data;
4345         union drbd_state os, ns, peer_state;
4346         enum drbd_disk_state real_peer_disk;
4347         enum chg_state_flags cs_flags;
4348         int rv;
4349
4350         peer_device = conn_peer_device(connection, pi->vnr);
4351         if (!peer_device)
4352                 return config_unknown_volume(connection, pi);
4353         device = peer_device->device;
4354
4355         peer_state.i = be32_to_cpu(p->state);
4356
4357         real_peer_disk = peer_state.disk;
4358         if (peer_state.disk == D_NEGOTIATING) {
4359                 real_peer_disk = device->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
4360                 drbd_info(device, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
4361         }
4362
4363         spin_lock_irq(&device->resource->req_lock);
4364  retry:
4365         os = ns = drbd_read_state(device);
4366         spin_unlock_irq(&device->resource->req_lock);
4367
4368         /* If some other part of the code (ack_receiver thread, timeout)
4369          * already decided to close the connection again,
4370          * we must not "re-establish" it here. */
4371         if (os.conn <= C_TEAR_DOWN)
4372                 return -ECONNRESET;
4373
4374         /* If this is the "end of sync" confirmation, usually the peer disk
4375          * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
4376          * set) resync started in PausedSyncT, or if the timing of pause-/
4377          * unpause-sync events has been "just right", the peer disk may
4378          * transition from D_CONSISTENT to D_UP_TO_DATE as well.
4379          */
4380         if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
4381             real_peer_disk == D_UP_TO_DATE &&
4382             os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
4383                 /* If we are (becoming) SyncSource, but peer is still in sync
4384                  * preparation, ignore its uptodate-ness to avoid flapping, it
4385                  * will change to inconsistent once the peer reaches active
4386                  * syncing states.
4387                  * It may have changed syncer-paused flags, however, so we
4388                  * cannot ignore this completely. */
4389                 if (peer_state.conn > C_CONNECTED &&
4390                     peer_state.conn < C_SYNC_SOURCE)
4391                         real_peer_disk = D_INCONSISTENT;
4392
4393                 /* if peer_state changes to connected at the same time,
4394                  * it explicitly notifies us that it finished resync.
4395                  * Maybe we should finish it up, too? */
4396                 else if (os.conn >= C_SYNC_SOURCE &&
4397                          peer_state.conn == C_CONNECTED) {
4398                         if (drbd_bm_total_weight(device) <= device->rs_failed)
4399                                 drbd_resync_finished(device);
4400                         return 0;
4401                 }
4402         }
4403
4404         /* explicit verify finished notification, stop sector reached. */
4405         if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
4406             peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
4407                 ov_out_of_sync_print(device);
4408                 drbd_resync_finished(device);
4409                 return 0;
4410         }
4411
4412         /* peer says his disk is inconsistent, while we think it is uptodate,
4413          * and this happens while the peer still thinks we have a sync going on,
4414          * but we think we are already done with the sync.
4415          * We ignore this to avoid flapping pdsk.
4416          * This should not happen, if the peer is a recent version of drbd. */
4417         if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
4418             os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
4419                 real_peer_disk = D_UP_TO_DATE;
4420
4421         if (ns.conn == C_WF_REPORT_PARAMS)
4422                 ns.conn = C_CONNECTED;
4423
4424         if (peer_state.conn == C_AHEAD)
4425                 ns.conn = C_BEHIND;
4426
4427         if (device->p_uuid && peer_state.disk >= D_NEGOTIATING &&
4428             get_ldev_if_state(device, D_NEGOTIATING)) {
4429                 int cr; /* consider resync */
4430
4431                 /* if we established a new connection */
4432                 cr  = (os.conn < C_CONNECTED);
4433                 /* if we had an established connection
4434                  * and one of the nodes newly attaches a disk */
4435                 cr |= (os.conn == C_CONNECTED &&
4436                        (peer_state.disk == D_NEGOTIATING ||
4437                         os.disk == D_NEGOTIATING));
4438                 /* if we have both been inconsistent, and the peer has been
4439                  * forced to be UpToDate with --overwrite-data */
4440                 cr |= test_bit(CONSIDER_RESYNC, &device->flags);
4441                 /* if we had been plain connected, and the admin requested to
4442                  * start a sync by "invalidate" or "invalidate-remote" */
4443                 cr |= (os.conn == C_CONNECTED &&
4444                                 (peer_state.conn >= C_STARTING_SYNC_S &&
4445                                  peer_state.conn <= C_WF_BITMAP_T));
4446
4447                 if (cr)
4448                         ns.conn = drbd_sync_handshake(peer_device, peer_state.role, real_peer_disk);
4449
4450                 put_ldev(device);
4451                 if (ns.conn == C_MASK) {
4452                         ns.conn = C_CONNECTED;
4453                         if (device->state.disk == D_NEGOTIATING) {
4454                                 drbd_force_state(device, NS(disk, D_FAILED));
4455                         } else if (peer_state.disk == D_NEGOTIATING) {
4456                                 drbd_err(device, "Disk attach process on the peer node was aborted.\n");
4457                                 peer_state.disk = D_DISKLESS;
4458                                 real_peer_disk = D_DISKLESS;
4459                         } else {
4460                                 if (test_and_clear_bit(CONN_DRY_RUN, &peer_device->connection->flags))
4461                                         return -EIO;
4462                                 D_ASSERT(device, os.conn == C_WF_REPORT_PARAMS);
4463                                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4464                                 return -EIO;
4465                         }
4466                 }
4467         }
4468
4469         spin_lock_irq(&device->resource->req_lock);
4470         if (os.i != drbd_read_state(device).i)
4471                 goto retry;
4472         clear_bit(CONSIDER_RESYNC, &device->flags);
4473         ns.peer = peer_state.role;
4474         ns.pdsk = real_peer_disk;
4475         ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
4476         if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
4477                 ns.disk = device->new_state_tmp.disk;
4478         cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
4479         if (ns.pdsk == D_CONSISTENT && drbd_suspended(device) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
4480             test_bit(NEW_CUR_UUID, &device->flags)) {
4481                 /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
4482                    for temporal network outages! */
4483                 spin_unlock_irq(&device->resource->req_lock);
4484                 drbd_err(device, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
4485                 tl_clear(peer_device->connection);
4486                 drbd_uuid_new_current(device);
4487                 clear_bit(NEW_CUR_UUID, &device->flags);
4488                 conn_request_state(peer_device->connection, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
4489                 return -EIO;
4490         }
4491         rv = _drbd_set_state(device, ns, cs_flags, NULL);
4492         ns = drbd_read_state(device);
4493         spin_unlock_irq(&device->resource->req_lock);
4494
4495         if (rv < SS_SUCCESS) {
4496                 conn_request_state(peer_device->connection, NS(conn, C_DISCONNECTING), CS_HARD);
4497                 return -EIO;
4498         }
4499
4500         if (os.conn > C_WF_REPORT_PARAMS) {
4501                 if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
4502                     peer_state.disk != D_NEGOTIATING ) {
4503                         /* we want resync, peer has not yet decided to sync... */
4504                         /* Nowadays only used when forcing a node into primary role and
4505                            setting its disk to UpToDate with that */
4506                         drbd_send_uuids(peer_device);
4507                         drbd_send_current_state(peer_device);
4508                 }
4509         }
4510
4511         clear_bit(DISCARD_MY_DATA, &device->flags);
4512
4513         drbd_md_sync(device); /* update connected indicator, la_size_sect, ... */
4514
4515         return 0;
4516 }
4517
4518 static int receive_sync_uuid(struct drbd_connection *connection, struct packet_info *pi)
4519 {
4520         struct drbd_peer_device *peer_device;
4521         struct drbd_device *device;
4522         struct p_rs_uuid *p = pi->data;
4523
4524         peer_device = conn_peer_device(connection, pi->vnr);
4525         if (!peer_device)
4526                 return -EIO;
4527         device = peer_device->device;
4528
4529         wait_event(device->misc_wait,
4530                    device->state.conn == C_WF_SYNC_UUID ||
4531                    device->state.conn == C_BEHIND ||
4532                    device->state.conn < C_CONNECTED ||
4533                    device->state.disk < D_NEGOTIATING);
4534
4535         /* D_ASSERT(device,  device->state.conn == C_WF_SYNC_UUID ); */
4536
4537         /* Here the _drbd_uuid_ functions are right, current should
4538            _not_ be rotated into the history */
4539         if (get_ldev_if_state(device, D_NEGOTIATING)) {
4540                 _drbd_uuid_set(device, UI_CURRENT, be64_to_cpu(p->uuid));
4541                 _drbd_uuid_set(device, UI_BITMAP, 0UL);
4542
4543                 drbd_print_uuids(device, "updated sync uuid");
4544                 drbd_start_resync(device, C_SYNC_TARGET);
4545
4546                 put_ldev(device);
4547         } else
4548                 drbd_err(device, "Ignoring SyncUUID packet!\n");
4549
4550         return 0;
4551 }
4552
4553 /**
4554  * receive_bitmap_plain
4555  *
4556  * Return 0 when done, 1 when another iteration is needed, and a negative error
4557  * code upon failure.
4558  */
4559 static int
4560 receive_bitmap_plain(struct drbd_peer_device *peer_device, unsigned int size,
4561                      unsigned long *p, struct bm_xfer_ctx *c)
4562 {
4563         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4564                                  drbd_header_size(peer_device->connection);
4565         unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4566                                        c->bm_words - c->word_offset);
4567         unsigned int want = num_words * sizeof(*p);
4568         int err;
4569
4570         if (want != size) {
4571                 drbd_err(peer_device, "%s:want (%u) != size (%u)\n", __func__, want, size);
4572                 return -EIO;
4573         }
4574         if (want == 0)
4575                 return 0;
4576         err = drbd_recv_all(peer_device->connection, p, want);
4577         if (err)
4578                 return err;
4579
4580         drbd_bm_merge_lel(peer_device->device, c->word_offset, num_words, p);
4581
4582         c->word_offset += num_words;
4583         c->bit_offset = c->word_offset * BITS_PER_LONG;
4584         if (c->bit_offset > c->bm_bits)
4585                 c->bit_offset = c->bm_bits;
4586
4587         return 1;
4588 }
4589
4590 static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4591 {
4592         return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4593 }
4594
4595 static int dcbp_get_start(struct p_compressed_bm *p)
4596 {
4597         return (p->encoding & 0x80) != 0;
4598 }
4599
4600 static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4601 {
4602         return (p->encoding >> 4) & 0x7;
4603 }
4604
4605 /**
4606  * recv_bm_rle_bits
4607  *
4608  * Return 0 when done, 1 when another iteration is needed, and a negative error
4609  * code upon failure.
4610  */
4611 static int
4612 recv_bm_rle_bits(struct drbd_peer_device *peer_device,
4613                 struct p_compressed_bm *p,
4614                  struct bm_xfer_ctx *c,
4615                  unsigned int len)
4616 {
4617         struct bitstream bs;
4618         u64 look_ahead;
4619         u64 rl;
4620         u64 tmp;
4621         unsigned long s = c->bit_offset;
4622         unsigned long e;
4623         int toggle = dcbp_get_start(p);
4624         int have;
4625         int bits;
4626
4627         bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4628
4629         bits = bitstream_get_bits(&bs, &look_ahead, 64);
4630         if (bits < 0)
4631                 return -EIO;
4632
4633         for (have = bits; have > 0; s += rl, toggle = !toggle) {
4634                 bits = vli_decode_bits(&rl, look_ahead);
4635                 if (bits <= 0)
4636                         return -EIO;
4637
4638                 if (toggle) {
4639                         e = s + rl -1;
4640                         if (e >= c->bm_bits) {
4641                                 drbd_err(peer_device, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4642                                 return -EIO;
4643                         }
4644                         _drbd_bm_set_bits(peer_device->device, s, e);
4645                 }
4646
4647                 if (have < bits) {
4648                         drbd_err(peer_device, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4649                                 have, bits, look_ahead,
4650                                 (unsigned int)(bs.cur.b - p->code),
4651                                 (unsigned int)bs.buf_len);
4652                         return -EIO;
4653                 }
4654                 /* if we consumed all 64 bits, assign 0; >> 64 is "undefined"; */
4655                 if (likely(bits < 64))
4656                         look_ahead >>= bits;
4657                 else
4658                         look_ahead = 0;
4659                 have -= bits;
4660
4661                 bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4662                 if (bits < 0)
4663                         return -EIO;
4664                 look_ahead |= tmp << have;
4665                 have += bits;
4666         }
4667
4668         c->bit_offset = s;
4669         bm_xfer_ctx_bit_to_word_offset(c);
4670
4671         return (s != c->bm_bits);
4672 }
4673
4674 /**
4675  * decode_bitmap_c
4676  *
4677  * Return 0 when done, 1 when another iteration is needed, and a negative error
4678  * code upon failure.
4679  */
4680 static int
4681 decode_bitmap_c(struct drbd_peer_device *peer_device,
4682                 struct p_compressed_bm *p,
4683                 struct bm_xfer_ctx *c,
4684                 unsigned int len)
4685 {
4686         if (dcbp_get_code(p) == RLE_VLI_Bits)
4687                 return recv_bm_rle_bits(peer_device, p, c, len - sizeof(*p));
4688
4689         /* other variants had been implemented for evaluation,
4690          * but have been dropped as this one turned out to be "best"
4691          * during all our tests. */
4692
4693         drbd_err(peer_device, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4694         conn_request_state(peer_device->connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4695         return -EIO;
4696 }
4697
4698 void INFO_bm_xfer_stats(struct drbd_device *device,
4699                 const char *direction, struct bm_xfer_ctx *c)
4700 {
4701         /* what would it take to transfer it "plaintext" */
4702         unsigned int header_size = drbd_header_size(first_peer_device(device)->connection);
4703         unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4704         unsigned int plain =
4705                 header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4706                 c->bm_words * sizeof(unsigned long);
4707         unsigned int total = c->bytes[0] + c->bytes[1];
4708         unsigned int r;
4709
4710         /* total can not be zero. but just in case: */
4711         if (total == 0)
4712                 return;
4713
4714         /* don't report if not compressed */
4715         if (total >= plain)
4716                 return;
4717
4718         /* total < plain. check for overflow, still */
4719         r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4720                                     : (1000 * total / plain);
4721
4722         if (r > 1000)
4723                 r = 1000;
4724
4725         r = 1000 - r;
4726         drbd_info(device, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4727              "total %u; compression: %u.%u%%\n",
4728                         direction,
4729                         c->bytes[1], c->packets[1],
4730                         c->bytes[0], c->packets[0],
4731                         total, r/10, r % 10);
4732 }
4733
4734 /* Since we are processing the bitfield from lower addresses to higher,
4735    it does not matter if the process it in 32 bit chunks or 64 bit
4736    chunks as long as it is little endian. (Understand it as byte stream,
4737    beginning with the lowest byte...) If we would use big endian
4738    we would need to process it from the highest address to the lowest,
4739    in order to be agnostic to the 32 vs 64 bits issue.
4740
4741    returns 0 on failure, 1 if we successfully received it. */
4742 static int receive_bitmap(struct drbd_connection *connection, struct packet_info *pi)
4743 {
4744         struct drbd_peer_device *peer_device;
4745         struct drbd_device *device;
4746         struct bm_xfer_ctx c;
4747         int err;
4748
4749         peer_device = conn_peer_device(connection, pi->vnr);
4750         if (!peer_device)
4751                 return -EIO;
4752         device = peer_device->device;
4753
4754         drbd_bm_lock(device, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4755         /* you are supposed to send additional out-of-sync information
4756          * if you actually set bits during this phase */
4757
4758         c = (struct bm_xfer_ctx) {
4759                 .bm_bits = drbd_bm_bits(device),
4760                 .bm_words = drbd_bm_words(device),
4761         };
4762
4763         for(;;) {
4764                 if (pi->cmd == P_BITMAP)
4765                         err = receive_bitmap_plain(peer_device, pi->size, pi->data, &c);
4766                 else if (pi->cmd == P_COMPRESSED_BITMAP) {
4767                         /* MAYBE: sanity check that we speak proto >= 90,
4768                          * and the feature is enabled! */
4769                         struct p_compressed_bm *p = pi->data;
4770
4771                         if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(connection)) {
4772                                 drbd_err(device, "ReportCBitmap packet too large\n");
4773                                 err = -EIO;
4774                                 goto out;
4775                         }
4776                         if (pi->size <= sizeof(*p)) {
4777                                 drbd_err(device, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4778                                 err = -EIO;
4779                                 goto out;
4780                         }
4781                         err = drbd_recv_all(peer_device->connection, p, pi->size);
4782                         if (err)
4783                                goto out;
4784                         err = decode_bitmap_c(peer_device, p, &c, pi->size);
4785                 } else {
4786                         drbd_warn(device, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4787                         err = -EIO;
4788                         goto out;
4789                 }
4790
4791                 c.packets[pi->cmd == P_BITMAP]++;
4792                 c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(connection) + pi->size;
4793
4794                 if (err <= 0) {
4795                         if (err < 0)
4796                                 goto out;
4797                         break;
4798                 }
4799                 err = drbd_recv_header(peer_device->connection, pi);
4800                 if (err)
4801                         goto out;
4802         }
4803
4804         INFO_bm_xfer_stats(device, "receive", &c);
4805
4806         if (device->state.conn == C_WF_BITMAP_T) {
4807                 enum drbd_state_rv rv;
4808
4809                 err = drbd_send_bitmap(device);
4810                 if (err)
4811                         goto out;
4812                 /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4813                 rv = _drbd_request_state(device, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4814                 D_ASSERT(device, rv == SS_SUCCESS);
4815         } else if (device->state.conn != C_WF_BITMAP_S) {
4816                 /* admin may have requested C_DISCONNECTING,
4817                  * other threads may have noticed network errors */
4818                 drbd_info(device, "unexpected cstate (%s) in receive_bitmap\n",
4819                     drbd_conn_str(device->state.conn));
4820         }
4821         err = 0;
4822
4823  out:
4824         drbd_bm_unlock(device);
4825         if (!err && device->state.conn == C_WF_BITMAP_S)
4826                 drbd_start_resync(device, C_SYNC_SOURCE);
4827         return err;
4828 }
4829
4830 static int receive_skip(struct drbd_connection *connection, struct packet_info *pi)
4831 {
4832         drbd_warn(connection, "skipping unknown optional packet type %d, l: %d!\n",
4833                  pi->cmd, pi->size);
4834
4835         return ignore_remaining_packet(connection, pi);
4836 }
4837
4838 static int receive_UnplugRemote(struct drbd_connection *connection, struct packet_info *pi)
4839 {
4840         /* Make sure we've acked all the TCP data associated
4841          * with the data requests being unplugged */
4842         drbd_tcp_quickack(connection->data.socket);
4843
4844         return 0;
4845 }
4846
4847 static int receive_out_of_sync(struct drbd_connection *connection, struct packet_info *pi)
4848 {
4849         struct drbd_peer_device *peer_device;
4850         struct drbd_device *device;
4851         struct p_block_desc *p = pi->data;
4852
4853         peer_device = conn_peer_device(connection, pi->vnr);
4854         if (!peer_device)
4855                 return -EIO;
4856         device = peer_device->device;
4857
4858         switch (device->state.conn) {
4859         case C_WF_SYNC_UUID:
4860         case C_WF_BITMAP_T:
4861         case C_BEHIND:
4862                         break;
4863         default:
4864                 drbd_err(device, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4865                                 drbd_conn_str(device->state.conn));
4866         }
4867
4868         drbd_set_out_of_sync(device, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4869
4870         return 0;
4871 }
4872
4873 static int receive_rs_deallocated(struct drbd_connection *connection, struct packet_info *pi)
4874 {
4875         struct drbd_peer_device *peer_device;
4876         struct p_block_desc *p = pi->data;
4877         struct drbd_device *device;
4878         sector_t sector;
4879         int size, err = 0;
4880
4881         peer_device = conn_peer_device(connection, pi->vnr);
4882         if (!peer_device)
4883                 return -EIO;
4884         device = peer_device->device;
4885
4886         sector = be64_to_cpu(p->sector);
4887         size = be32_to_cpu(p->blksize);
4888
4889         dec_rs_pending(device);
4890
4891         if (get_ldev(device)) {
4892                 struct drbd_peer_request *peer_req;
4893                 const int op = REQ_OP_DISCARD;
4894
4895                 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER, sector,
4896                                                size, 0, GFP_NOIO);
4897                 if (!peer_req) {
4898                         put_ldev(device);
4899                         return -ENOMEM;
4900                 }
4901
4902                 peer_req->w.cb = e_end_resync_block;
4903                 peer_req->submit_jif = jiffies;
4904                 peer_req->flags |= EE_IS_TRIM;
4905
4906                 spin_lock_irq(&device->resource->req_lock);
4907                 list_add_tail(&peer_req->w.list, &device->sync_ee);
4908                 spin_unlock_irq(&device->resource->req_lock);
4909
4910                 atomic_add(pi->size >> 9, &device->rs_sect_ev);
4911                 err = drbd_submit_peer_request(device, peer_req, op, 0, DRBD_FAULT_RS_WR);
4912
4913                 if (err) {
4914                         spin_lock_irq(&device->resource->req_lock);
4915                         list_del(&peer_req->w.list);
4916                         spin_unlock_irq(&device->resource->req_lock);
4917
4918                         drbd_free_peer_req(device, peer_req);
4919                         put_ldev(device);
4920                         err = 0;
4921                         goto fail;
4922                 }
4923
4924                 inc_unacked(device);
4925
4926                 /* No put_ldev() here. Gets called in drbd_endio_write_sec_final(),
4927                    as well as drbd_rs_complete_io() */
4928         } else {
4929         fail:
4930                 drbd_rs_complete_io(device, sector);
4931                 drbd_send_ack_ex(peer_device, P_NEG_ACK, sector, size, ID_SYNCER);
4932         }
4933
4934         atomic_add(size >> 9, &device->rs_sect_in);
4935
4936         return err;
4937 }
4938
4939 struct data_cmd {
4940         int expect_payload;
4941         unsigned int pkt_size;
4942         int (*fn)(struct drbd_connection *, struct packet_info *);
4943 };
4944
4945 static struct data_cmd drbd_cmd_handler[] = {
4946         [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4947         [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4948         [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4949         [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4950         [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4951         [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4952         [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4953         [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4954         [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4955         [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4956         [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4957         [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4958         [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4959         [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4960         [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4961         [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4962         [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4963         [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4964         [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4965         [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4966         [P_RS_THIN_REQ]     = { 0, sizeof(struct p_block_req), receive_DataRequest },
4967         [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4968         [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4969         [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4970         [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4971         [P_TRIM]            = { 0, sizeof(struct p_trim), receive_Data },
4972         [P_RS_DEALLOCATED]  = { 0, sizeof(struct p_block_desc), receive_rs_deallocated },
4973         [P_WSAME]           = { 1, sizeof(struct p_wsame), receive_Data },
4974 };
4975
4976 static void drbdd(struct drbd_connection *connection)
4977 {
4978         struct packet_info pi;
4979         size_t shs; /* sub header size */
4980         int err;
4981
4982         while (get_t_state(&connection->receiver) == RUNNING) {
4983                 struct data_cmd const *cmd;
4984
4985                 drbd_thread_current_set_cpu(&connection->receiver);
4986                 update_receiver_timing_details(connection, drbd_recv_header);
4987                 if (drbd_recv_header(connection, &pi))
4988                         goto err_out;
4989
4990                 cmd = &drbd_cmd_handler[pi.cmd];
4991                 if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4992                         drbd_err(connection, "Unexpected data packet %s (0x%04x)",
4993                                  cmdname(pi.cmd), pi.cmd);
4994                         goto err_out;
4995                 }
4996
4997                 shs = cmd->pkt_size;
4998                 if (pi.cmd == P_SIZES && connection->agreed_features & DRBD_FF_WSAME)
4999                         shs += sizeof(struct o_qlim);
5000                 if (pi.size > shs && !cmd->expect_payload) {
5001                         drbd_err(connection, "No payload expected %s l:%d\n",
5002                                  cmdname(pi.cmd), pi.size);
5003                         goto err_out;
5004                 }
5005                 if (pi.size < shs) {
5006                         drbd_err(connection, "%s: unexpected packet size, expected:%d received:%d\n",
5007                                  cmdname(pi.cmd), (int)shs, pi.size);
5008                         goto err_out;
5009                 }
5010
5011                 if (shs) {
5012                         update_receiver_timing_details(connection, drbd_recv_all_warn);
5013                         err = drbd_recv_all_warn(connection, pi.data, shs);
5014                         if (err)
5015                                 goto err_out;
5016                         pi.size -= shs;
5017                 }
5018
5019                 update_receiver_timing_details(connection, cmd->fn);
5020                 err = cmd->fn(connection, &pi);
5021                 if (err) {
5022                         drbd_err(connection, "error receiving %s, e: %d l: %d!\n",
5023                                  cmdname(pi.cmd), err, pi.size);
5024                         goto err_out;
5025                 }
5026         }
5027         return;
5028
5029     err_out:
5030         conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
5031 }
5032
5033 static void conn_disconnect(struct drbd_connection *connection)
5034 {
5035         struct drbd_peer_device *peer_device;
5036         enum drbd_conns oc;
5037         int vnr;
5038
5039         if (connection->cstate == C_STANDALONE)
5040                 return;
5041
5042         /* We are about to start the cleanup after connection loss.
5043          * Make sure drbd_make_request knows about that.
5044          * Usually we should be in some network failure state already,
5045          * but just in case we are not, we fix it up here.
5046          */
5047         conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5048
5049         /* ack_receiver does not clean up anything. it must not interfere, either */
5050         drbd_thread_stop(&connection->ack_receiver);
5051         if (connection->ack_sender) {
5052                 destroy_workqueue(connection->ack_sender);
5053                 connection->ack_sender = NULL;
5054         }
5055         drbd_free_sock(connection);
5056
5057         rcu_read_lock();
5058         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5059                 struct drbd_device *device = peer_device->device;
5060                 kref_get(&device->kref);
5061                 rcu_read_unlock();
5062                 drbd_disconnected(peer_device);
5063                 kref_put(&device->kref, drbd_destroy_device);
5064                 rcu_read_lock();
5065         }
5066         rcu_read_unlock();
5067
5068         if (!list_empty(&connection->current_epoch->list))
5069                 drbd_err(connection, "ASSERTION FAILED: connection->current_epoch->list not empty\n");
5070         /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
5071         atomic_set(&connection->current_epoch->epoch_size, 0);
5072         connection->send.seen_any_write_yet = false;
5073
5074         drbd_info(connection, "Connection closed\n");
5075
5076         if (conn_highest_role(connection) == R_PRIMARY && conn_highest_pdsk(connection) >= D_UNKNOWN)
5077                 conn_try_outdate_peer_async(connection);
5078
5079         spin_lock_irq(&connection->resource->req_lock);
5080         oc = connection->cstate;
5081         if (oc >= C_UNCONNECTED)
5082                 _conn_request_state(connection, NS(conn, C_UNCONNECTED), CS_VERBOSE);
5083
5084         spin_unlock_irq(&connection->resource->req_lock);
5085
5086         if (oc == C_DISCONNECTING)
5087                 conn_request_state(connection, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
5088 }
5089
5090 static int drbd_disconnected(struct drbd_peer_device *peer_device)
5091 {
5092         struct drbd_device *device = peer_device->device;
5093         unsigned int i;
5094
5095         /* wait for current activity to cease. */
5096         spin_lock_irq(&device->resource->req_lock);
5097         _drbd_wait_ee_list_empty(device, &device->active_ee);
5098         _drbd_wait_ee_list_empty(device, &device->sync_ee);
5099         _drbd_wait_ee_list_empty(device, &device->read_ee);
5100         spin_unlock_irq(&device->resource->req_lock);
5101
5102         /* We do not have data structures that would allow us to
5103          * get the rs_pending_cnt down to 0 again.
5104          *  * On C_SYNC_TARGET we do not have any data structures describing
5105          *    the pending RSDataRequest's we have sent.
5106          *  * On C_SYNC_SOURCE there is no data structure that tracks
5107          *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
5108          *  And no, it is not the sum of the reference counts in the
5109          *  resync_LRU. The resync_LRU tracks the whole operation including
5110          *  the disk-IO, while the rs_pending_cnt only tracks the blocks
5111          *  on the fly. */
5112         drbd_rs_cancel_all(device);
5113         device->rs_total = 0;
5114         device->rs_failed = 0;
5115         atomic_set(&device->rs_pending_cnt, 0);
5116         wake_up(&device->misc_wait);
5117
5118         del_timer_sync(&device->resync_timer);
5119         resync_timer_fn((unsigned long)device);
5120
5121         /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
5122          * w_make_resync_request etc. which may still be on the worker queue
5123          * to be "canceled" */
5124         drbd_flush_workqueue(&peer_device->connection->sender_work);
5125
5126         drbd_finish_peer_reqs(device);
5127
5128         /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
5129            might have issued a work again. The one before drbd_finish_peer_reqs() is
5130            necessary to reclain net_ee in drbd_finish_peer_reqs(). */
5131         drbd_flush_workqueue(&peer_device->connection->sender_work);
5132
5133         /* need to do it again, drbd_finish_peer_reqs() may have populated it
5134          * again via drbd_try_clear_on_disk_bm(). */
5135         drbd_rs_cancel_all(device);
5136
5137         kfree(device->p_uuid);
5138         device->p_uuid = NULL;
5139
5140         if (!drbd_suspended(device))
5141                 tl_clear(peer_device->connection);
5142
5143         drbd_md_sync(device);
5144
5145         if (get_ldev(device)) {
5146                 drbd_bitmap_io(device, &drbd_bm_write_copy_pages,
5147                                 "write from disconnected", BM_LOCKED_CHANGE_ALLOWED);
5148                 put_ldev(device);
5149         }
5150
5151         /* tcp_close and release of sendpage pages can be deferred.  I don't
5152          * want to use SO_LINGER, because apparently it can be deferred for
5153          * more than 20 seconds (longest time I checked).
5154          *
5155          * Actually we don't care for exactly when the network stack does its
5156          * put_page(), but release our reference on these pages right here.
5157          */
5158         i = drbd_free_peer_reqs(device, &device->net_ee);
5159         if (i)
5160                 drbd_info(device, "net_ee not empty, killed %u entries\n", i);
5161         i = atomic_read(&device->pp_in_use_by_net);
5162         if (i)
5163                 drbd_info(device, "pp_in_use_by_net = %d, expected 0\n", i);
5164         i = atomic_read(&device->pp_in_use);
5165         if (i)
5166                 drbd_info(device, "pp_in_use = %d, expected 0\n", i);
5167
5168         D_ASSERT(device, list_empty(&device->read_ee));
5169         D_ASSERT(device, list_empty(&device->active_ee));
5170         D_ASSERT(device, list_empty(&device->sync_ee));
5171         D_ASSERT(device, list_empty(&device->done_ee));
5172
5173         return 0;
5174 }
5175
5176 /*
5177  * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
5178  * we can agree on is stored in agreed_pro_version.
5179  *
5180  * feature flags and the reserved array should be enough room for future
5181  * enhancements of the handshake protocol, and possible plugins...
5182  *
5183  * for now, they are expected to be zero, but ignored.
5184  */
5185 static int drbd_send_features(struct drbd_connection *connection)
5186 {
5187         struct drbd_socket *sock;
5188         struct p_connection_features *p;
5189
5190         sock = &connection->data;
5191         p = conn_prepare_command(connection, sock);
5192         if (!p)
5193                 return -EIO;
5194         memset(p, 0, sizeof(*p));
5195         p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
5196         p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
5197         p->feature_flags = cpu_to_be32(PRO_FEATURES);
5198         return conn_send_command(connection, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
5199 }
5200
5201 /*
5202  * return values:
5203  *   1 yes, we have a valid connection
5204  *   0 oops, did not work out, please try again
5205  *  -1 peer talks different language,
5206  *     no point in trying again, please go standalone.
5207  */
5208 static int drbd_do_features(struct drbd_connection *connection)
5209 {
5210         /* ASSERT current == connection->receiver ... */
5211         struct p_connection_features *p;
5212         const int expect = sizeof(struct p_connection_features);
5213         struct packet_info pi;
5214         int err;
5215
5216         err = drbd_send_features(connection);
5217         if (err)
5218                 return 0;
5219
5220         err = drbd_recv_header(connection, &pi);
5221         if (err)
5222                 return 0;
5223
5224         if (pi.cmd != P_CONNECTION_FEATURES) {
5225                 drbd_err(connection, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
5226                          cmdname(pi.cmd), pi.cmd);
5227                 return -1;
5228         }
5229
5230         if (pi.size != expect) {
5231                 drbd_err(connection, "expected ConnectionFeatures length: %u, received: %u\n",
5232                      expect, pi.size);
5233                 return -1;
5234         }
5235
5236         p = pi.data;
5237         err = drbd_recv_all_warn(connection, p, expect);
5238         if (err)
5239                 return 0;
5240
5241         p->protocol_min = be32_to_cpu(p->protocol_min);
5242         p->protocol_max = be32_to_cpu(p->protocol_max);
5243         if (p->protocol_max == 0)
5244                 p->protocol_max = p->protocol_min;
5245
5246         if (PRO_VERSION_MAX < p->protocol_min ||
5247             PRO_VERSION_MIN > p->protocol_max)
5248                 goto incompat;
5249
5250         connection->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
5251         connection->agreed_features = PRO_FEATURES & be32_to_cpu(p->feature_flags);
5252
5253         drbd_info(connection, "Handshake successful: "
5254              "Agreed network protocol version %d\n", connection->agreed_pro_version);
5255
5256         drbd_info(connection, "Feature flags enabled on protocol level: 0x%x%s%s%s.\n",
5257                   connection->agreed_features,
5258                   connection->agreed_features & DRBD_FF_TRIM ? " TRIM" : "",
5259                   connection->agreed_features & DRBD_FF_THIN_RESYNC ? " THIN_RESYNC" : "",
5260                   connection->agreed_features & DRBD_FF_WSAME ? " WRITE_SAME" :
5261                   connection->agreed_features ? "" : " none");
5262
5263         return 1;
5264
5265  incompat:
5266         drbd_err(connection, "incompatible DRBD dialects: "
5267             "I support %d-%d, peer supports %d-%d\n",
5268             PRO_VERSION_MIN, PRO_VERSION_MAX,
5269             p->protocol_min, p->protocol_max);
5270         return -1;
5271 }
5272
5273 #if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
5274 static int drbd_do_auth(struct drbd_connection *connection)
5275 {
5276         drbd_err(connection, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
5277         drbd_err(connection, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
5278         return -1;
5279 }
5280 #else
5281 #define CHALLENGE_LEN 64
5282
5283 /* Return value:
5284         1 - auth succeeded,
5285         0 - failed, try again (network error),
5286         -1 - auth failed, don't try again.
5287 */
5288
5289 static int drbd_do_auth(struct drbd_connection *connection)
5290 {
5291         struct drbd_socket *sock;
5292         char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
5293         char *response = NULL;
5294         char *right_response = NULL;
5295         char *peers_ch = NULL;
5296         unsigned int key_len;
5297         char secret[SHARED_SECRET_MAX]; /* 64 byte */
5298         unsigned int resp_size;
5299         SHASH_DESC_ON_STACK(desc, connection->cram_hmac_tfm);
5300         struct packet_info pi;
5301         struct net_conf *nc;
5302         int err, rv;
5303
5304         /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
5305
5306         rcu_read_lock();
5307         nc = rcu_dereference(connection->net_conf);
5308         key_len = strlen(nc->shared_secret);
5309         memcpy(secret, nc->shared_secret, key_len);
5310         rcu_read_unlock();
5311
5312         desc->tfm = connection->cram_hmac_tfm;
5313         desc->flags = 0;
5314
5315         rv = crypto_shash_setkey(connection->cram_hmac_tfm, (u8 *)secret, key_len);
5316         if (rv) {
5317                 drbd_err(connection, "crypto_shash_setkey() failed with %d\n", rv);
5318                 rv = -1;
5319                 goto fail;
5320         }
5321
5322         get_random_bytes(my_challenge, CHALLENGE_LEN);
5323
5324         sock = &connection->data;
5325         if (!conn_prepare_command(connection, sock)) {
5326                 rv = 0;
5327                 goto fail;
5328         }
5329         rv = !conn_send_command(connection, sock, P_AUTH_CHALLENGE, 0,
5330                                 my_challenge, CHALLENGE_LEN);
5331         if (!rv)
5332                 goto fail;
5333
5334         err = drbd_recv_header(connection, &pi);
5335         if (err) {
5336                 rv = 0;
5337                 goto fail;
5338         }
5339
5340         if (pi.cmd != P_AUTH_CHALLENGE) {
5341                 drbd_err(connection, "expected AuthChallenge packet, received: %s (0x%04x)\n",
5342                          cmdname(pi.cmd), pi.cmd);
5343                 rv = 0;
5344                 goto fail;
5345         }
5346
5347         if (pi.size > CHALLENGE_LEN * 2) {
5348                 drbd_err(connection, "expected AuthChallenge payload too big.\n");
5349                 rv = -1;
5350                 goto fail;
5351         }
5352
5353         if (pi.size < CHALLENGE_LEN) {
5354                 drbd_err(connection, "AuthChallenge payload too small.\n");
5355                 rv = -1;
5356                 goto fail;
5357         }
5358
5359         peers_ch = kmalloc(pi.size, GFP_NOIO);
5360         if (peers_ch == NULL) {
5361                 drbd_err(connection, "kmalloc of peers_ch failed\n");
5362                 rv = -1;
5363                 goto fail;
5364         }
5365
5366         err = drbd_recv_all_warn(connection, peers_ch, pi.size);
5367         if (err) {
5368                 rv = 0;
5369                 goto fail;
5370         }
5371
5372         if (!memcmp(my_challenge, peers_ch, CHALLENGE_LEN)) {
5373                 drbd_err(connection, "Peer presented the same challenge!\n");
5374                 rv = -1;
5375                 goto fail;
5376         }
5377
5378         resp_size = crypto_shash_digestsize(connection->cram_hmac_tfm);
5379         response = kmalloc(resp_size, GFP_NOIO);
5380         if (response == NULL) {
5381                 drbd_err(connection, "kmalloc of response failed\n");
5382                 rv = -1;
5383                 goto fail;
5384         }
5385
5386         rv = crypto_shash_digest(desc, peers_ch, pi.size, response);
5387         if (rv) {
5388                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5389                 rv = -1;
5390                 goto fail;
5391         }
5392
5393         if (!conn_prepare_command(connection, sock)) {
5394                 rv = 0;
5395                 goto fail;
5396         }
5397         rv = !conn_send_command(connection, sock, P_AUTH_RESPONSE, 0,
5398                                 response, resp_size);
5399         if (!rv)
5400                 goto fail;
5401
5402         err = drbd_recv_header(connection, &pi);
5403         if (err) {
5404                 rv = 0;
5405                 goto fail;
5406         }
5407
5408         if (pi.cmd != P_AUTH_RESPONSE) {
5409                 drbd_err(connection, "expected AuthResponse packet, received: %s (0x%04x)\n",
5410                          cmdname(pi.cmd), pi.cmd);
5411                 rv = 0;
5412                 goto fail;
5413         }
5414
5415         if (pi.size != resp_size) {
5416                 drbd_err(connection, "expected AuthResponse payload of wrong size\n");
5417                 rv = 0;
5418                 goto fail;
5419         }
5420
5421         err = drbd_recv_all_warn(connection, response , resp_size);
5422         if (err) {
5423                 rv = 0;
5424                 goto fail;
5425         }
5426
5427         right_response = kmalloc(resp_size, GFP_NOIO);
5428         if (right_response == NULL) {
5429                 drbd_err(connection, "kmalloc of right_response failed\n");
5430                 rv = -1;
5431                 goto fail;
5432         }
5433
5434         rv = crypto_shash_digest(desc, my_challenge, CHALLENGE_LEN,
5435                                  right_response);
5436         if (rv) {
5437                 drbd_err(connection, "crypto_hash_digest() failed with %d\n", rv);
5438                 rv = -1;
5439                 goto fail;
5440         }
5441
5442         rv = !memcmp(response, right_response, resp_size);
5443
5444         if (rv)
5445                 drbd_info(connection, "Peer authenticated using %d bytes HMAC\n",
5446                      resp_size);
5447         else
5448                 rv = -1;
5449
5450  fail:
5451         kfree(peers_ch);
5452         kfree(response);
5453         kfree(right_response);
5454         shash_desc_zero(desc);
5455
5456         return rv;
5457 }
5458 #endif
5459
5460 int drbd_receiver(struct drbd_thread *thi)
5461 {
5462         struct drbd_connection *connection = thi->connection;
5463         int h;
5464
5465         drbd_info(connection, "receiver (re)started\n");
5466
5467         do {
5468                 h = conn_connect(connection);
5469                 if (h == 0) {
5470                         conn_disconnect(connection);
5471                         schedule_timeout_interruptible(HZ);
5472                 }
5473                 if (h == -1) {
5474                         drbd_warn(connection, "Discarding network configuration.\n");
5475                         conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
5476                 }
5477         } while (h == 0);
5478
5479         if (h > 0)
5480                 drbdd(connection);
5481
5482         conn_disconnect(connection);
5483
5484         drbd_info(connection, "receiver terminated\n");
5485         return 0;
5486 }
5487
5488 /* ********* acknowledge sender ******** */
5489
5490 static int got_conn_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5491 {
5492         struct p_req_state_reply *p = pi->data;
5493         int retcode = be32_to_cpu(p->retcode);
5494
5495         if (retcode >= SS_SUCCESS) {
5496                 set_bit(CONN_WD_ST_CHG_OKAY, &connection->flags);
5497         } else {
5498                 set_bit(CONN_WD_ST_CHG_FAIL, &connection->flags);
5499                 drbd_err(connection, "Requested state change failed by peer: %s (%d)\n",
5500                          drbd_set_st_err_str(retcode), retcode);
5501         }
5502         wake_up(&connection->ping_wait);
5503
5504         return 0;
5505 }
5506
5507 static int got_RqSReply(struct drbd_connection *connection, struct packet_info *pi)
5508 {
5509         struct drbd_peer_device *peer_device;
5510         struct drbd_device *device;
5511         struct p_req_state_reply *p = pi->data;
5512         int retcode = be32_to_cpu(p->retcode);
5513
5514         peer_device = conn_peer_device(connection, pi->vnr);
5515         if (!peer_device)
5516                 return -EIO;
5517         device = peer_device->device;
5518
5519         if (test_bit(CONN_WD_ST_CHG_REQ, &connection->flags)) {
5520                 D_ASSERT(device, connection->agreed_pro_version < 100);
5521                 return got_conn_RqSReply(connection, pi);
5522         }
5523
5524         if (retcode >= SS_SUCCESS) {
5525                 set_bit(CL_ST_CHG_SUCCESS, &device->flags);
5526         } else {
5527                 set_bit(CL_ST_CHG_FAIL, &device->flags);
5528                 drbd_err(device, "Requested state change failed by peer: %s (%d)\n",
5529                         drbd_set_st_err_str(retcode), retcode);
5530         }
5531         wake_up(&device->state_wait);
5532
5533         return 0;
5534 }
5535
5536 static int got_Ping(struct drbd_connection *connection, struct packet_info *pi)
5537 {
5538         return drbd_send_ping_ack(connection);
5539
5540 }
5541
5542 static int got_PingAck(struct drbd_connection *connection, struct packet_info *pi)
5543 {
5544         /* restore idle timeout */
5545         connection->meta.socket->sk->sk_rcvtimeo = connection->net_conf->ping_int*HZ;
5546         if (!test_and_set_bit(GOT_PING_ACK, &connection->flags))
5547                 wake_up(&connection->ping_wait);
5548
5549         return 0;
5550 }
5551
5552 static int got_IsInSync(struct drbd_connection *connection, struct packet_info *pi)
5553 {
5554         struct drbd_peer_device *peer_device;
5555         struct drbd_device *device;
5556         struct p_block_ack *p = pi->data;
5557         sector_t sector = be64_to_cpu(p->sector);
5558         int blksize = be32_to_cpu(p->blksize);
5559
5560         peer_device = conn_peer_device(connection, pi->vnr);
5561         if (!peer_device)
5562                 return -EIO;
5563         device = peer_device->device;
5564
5565         D_ASSERT(device, peer_device->connection->agreed_pro_version >= 89);
5566
5567         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5568
5569         if (get_ldev(device)) {
5570                 drbd_rs_complete_io(device, sector);
5571                 drbd_set_in_sync(device, sector, blksize);
5572                 /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
5573                 device->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
5574                 put_ldev(device);
5575         }
5576         dec_rs_pending(device);
5577         atomic_add(blksize >> 9, &device->rs_sect_in);
5578
5579         return 0;
5580 }
5581
5582 static int
5583 validate_req_change_req_state(struct drbd_device *device, u64 id, sector_t sector,
5584                               struct rb_root *root, const char *func,
5585                               enum drbd_req_event what, bool missing_ok)
5586 {
5587         struct drbd_request *req;
5588         struct bio_and_error m;
5589
5590         spin_lock_irq(&device->resource->req_lock);
5591         req = find_request(device, root, id, sector, missing_ok, func);
5592         if (unlikely(!req)) {
5593                 spin_unlock_irq(&device->resource->req_lock);
5594                 return -EIO;
5595         }
5596         __req_mod(req, what, &m);
5597         spin_unlock_irq(&device->resource->req_lock);
5598
5599         if (m.bio)
5600                 complete_master_bio(device, &m);
5601         return 0;
5602 }
5603
5604 static int got_BlockAck(struct drbd_connection *connection, struct packet_info *pi)
5605 {
5606         struct drbd_peer_device *peer_device;
5607         struct drbd_device *device;
5608         struct p_block_ack *p = pi->data;
5609         sector_t sector = be64_to_cpu(p->sector);
5610         int blksize = be32_to_cpu(p->blksize);
5611         enum drbd_req_event what;
5612
5613         peer_device = conn_peer_device(connection, pi->vnr);
5614         if (!peer_device)
5615                 return -EIO;
5616         device = peer_device->device;
5617
5618         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5619
5620         if (p->block_id == ID_SYNCER) {
5621                 drbd_set_in_sync(device, sector, blksize);
5622                 dec_rs_pending(device);
5623                 return 0;
5624         }
5625         switch (pi->cmd) {
5626         case P_RS_WRITE_ACK:
5627                 what = WRITE_ACKED_BY_PEER_AND_SIS;
5628                 break;
5629         case P_WRITE_ACK:
5630                 what = WRITE_ACKED_BY_PEER;
5631                 break;
5632         case P_RECV_ACK:
5633                 what = RECV_ACKED_BY_PEER;
5634                 break;
5635         case P_SUPERSEDED:
5636                 what = CONFLICT_RESOLVED;
5637                 break;
5638         case P_RETRY_WRITE:
5639                 what = POSTPONE_WRITE;
5640                 break;
5641         default:
5642                 BUG();
5643         }
5644
5645         return validate_req_change_req_state(device, p->block_id, sector,
5646                                              &device->write_requests, __func__,
5647                                              what, false);
5648 }
5649
5650 static int got_NegAck(struct drbd_connection *connection, struct packet_info *pi)
5651 {
5652         struct drbd_peer_device *peer_device;
5653         struct drbd_device *device;
5654         struct p_block_ack *p = pi->data;
5655         sector_t sector = be64_to_cpu(p->sector);
5656         int size = be32_to_cpu(p->blksize);
5657         int err;
5658
5659         peer_device = conn_peer_device(connection, pi->vnr);
5660         if (!peer_device)
5661                 return -EIO;
5662         device = peer_device->device;
5663
5664         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5665
5666         if (p->block_id == ID_SYNCER) {
5667                 dec_rs_pending(device);
5668                 drbd_rs_failed_io(device, sector, size);
5669                 return 0;
5670         }
5671
5672         err = validate_req_change_req_state(device, p->block_id, sector,
5673                                             &device->write_requests, __func__,
5674                                             NEG_ACKED, true);
5675         if (err) {
5676                 /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5677                    The master bio might already be completed, therefore the
5678                    request is no longer in the collision hash. */
5679                 /* In Protocol B we might already have got a P_RECV_ACK
5680                    but then get a P_NEG_ACK afterwards. */
5681                 drbd_set_out_of_sync(device, sector, size);
5682         }
5683         return 0;
5684 }
5685
5686 static int got_NegDReply(struct drbd_connection *connection, struct packet_info *pi)
5687 {
5688         struct drbd_peer_device *peer_device;
5689         struct drbd_device *device;
5690         struct p_block_ack *p = pi->data;
5691         sector_t sector = be64_to_cpu(p->sector);
5692
5693         peer_device = conn_peer_device(connection, pi->vnr);
5694         if (!peer_device)
5695                 return -EIO;
5696         device = peer_device->device;
5697
5698         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5699
5700         drbd_err(device, "Got NegDReply; Sector %llus, len %u.\n",
5701             (unsigned long long)sector, be32_to_cpu(p->blksize));
5702
5703         return validate_req_change_req_state(device, p->block_id, sector,
5704                                              &device->read_requests, __func__,
5705                                              NEG_ACKED, false);
5706 }
5707
5708 static int got_NegRSDReply(struct drbd_connection *connection, struct packet_info *pi)
5709 {
5710         struct drbd_peer_device *peer_device;
5711         struct drbd_device *device;
5712         sector_t sector;
5713         int size;
5714         struct p_block_ack *p = pi->data;
5715
5716         peer_device = conn_peer_device(connection, pi->vnr);
5717         if (!peer_device)
5718                 return -EIO;
5719         device = peer_device->device;
5720
5721         sector = be64_to_cpu(p->sector);
5722         size = be32_to_cpu(p->blksize);
5723
5724         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5725
5726         dec_rs_pending(device);
5727
5728         if (get_ldev_if_state(device, D_FAILED)) {
5729                 drbd_rs_complete_io(device, sector);
5730                 switch (pi->cmd) {
5731                 case P_NEG_RS_DREPLY:
5732                         drbd_rs_failed_io(device, sector, size);
5733                 case P_RS_CANCEL:
5734                         break;
5735                 default:
5736                         BUG();
5737                 }
5738                 put_ldev(device);
5739         }
5740
5741         return 0;
5742 }
5743
5744 static int got_BarrierAck(struct drbd_connection *connection, struct packet_info *pi)
5745 {
5746         struct p_barrier_ack *p = pi->data;
5747         struct drbd_peer_device *peer_device;
5748         int vnr;
5749
5750         tl_release(connection, p->barrier, be32_to_cpu(p->set_size));
5751
5752         rcu_read_lock();
5753         idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
5754                 struct drbd_device *device = peer_device->device;
5755
5756                 if (device->state.conn == C_AHEAD &&
5757                     atomic_read(&device->ap_in_flight) == 0 &&
5758                     !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &device->flags)) {
5759                         device->start_resync_timer.expires = jiffies + HZ;
5760                         add_timer(&device->start_resync_timer);
5761                 }
5762         }
5763         rcu_read_unlock();
5764
5765         return 0;
5766 }
5767
5768 static int got_OVResult(struct drbd_connection *connection, struct packet_info *pi)
5769 {
5770         struct drbd_peer_device *peer_device;
5771         struct drbd_device *device;
5772         struct p_block_ack *p = pi->data;
5773         struct drbd_device_work *dw;
5774         sector_t sector;
5775         int size;
5776
5777         peer_device = conn_peer_device(connection, pi->vnr);
5778         if (!peer_device)
5779                 return -EIO;
5780         device = peer_device->device;
5781
5782         sector = be64_to_cpu(p->sector);
5783         size = be32_to_cpu(p->blksize);
5784
5785         update_peer_seq(peer_device, be32_to_cpu(p->seq_num));
5786
5787         if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5788                 drbd_ov_out_of_sync_found(device, sector, size);
5789         else
5790                 ov_out_of_sync_print(device);
5791
5792         if (!get_ldev(device))
5793                 return 0;
5794
5795         drbd_rs_complete_io(device, sector);
5796         dec_rs_pending(device);
5797
5798         --device->ov_left;
5799
5800         /* let's advance progress step marks only for every other megabyte */
5801         if ((device->ov_left & 0x200) == 0x200)
5802                 drbd_advance_rs_marks(device, device->ov_left);
5803
5804         if (device->ov_left == 0) {
5805                 dw = kmalloc(sizeof(*dw), GFP_NOIO);
5806                 if (dw) {
5807                         dw->w.cb = w_ov_finished;
5808                         dw->device = device;
5809                         drbd_queue_work(&peer_device->connection->sender_work, &dw->w);
5810                 } else {
5811                         drbd_err(device, "kmalloc(dw) failed.");
5812                         ov_out_of_sync_print(device);
5813                         drbd_resync_finished(device);
5814                 }
5815         }
5816         put_ldev(device);
5817         return 0;
5818 }
5819
5820 static int got_skip(struct drbd_connection *connection, struct packet_info *pi)
5821 {
5822         return 0;
5823 }
5824
5825 struct meta_sock_cmd {
5826         size_t pkt_size;
5827         int (*fn)(struct drbd_connection *connection, struct packet_info *);
5828 };
5829
5830 static void set_rcvtimeo(struct drbd_connection *connection, bool ping_timeout)
5831 {
5832         long t;
5833         struct net_conf *nc;
5834
5835         rcu_read_lock();
5836         nc = rcu_dereference(connection->net_conf);
5837         t = ping_timeout ? nc->ping_timeo : nc->ping_int;
5838         rcu_read_unlock();
5839
5840         t *= HZ;
5841         if (ping_timeout)
5842                 t /= 10;
5843
5844         connection->meta.socket->sk->sk_rcvtimeo = t;
5845 }
5846
5847 static void set_ping_timeout(struct drbd_connection *connection)
5848 {
5849         set_rcvtimeo(connection, 1);
5850 }
5851
5852 static void set_idle_timeout(struct drbd_connection *connection)
5853 {
5854         set_rcvtimeo(connection, 0);
5855 }
5856
5857 static struct meta_sock_cmd ack_receiver_tbl[] = {
5858         [P_PING]            = { 0, got_Ping },
5859         [P_PING_ACK]        = { 0, got_PingAck },
5860         [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5861         [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5862         [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5863         [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5864         [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5865         [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5866         [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5867         [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5868         [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5869         [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5870         [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5871         [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5872         [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5873         [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5874         [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5875 };
5876
5877 int drbd_ack_receiver(struct drbd_thread *thi)
5878 {
5879         struct drbd_connection *connection = thi->connection;
5880         struct meta_sock_cmd *cmd = NULL;
5881         struct packet_info pi;
5882         unsigned long pre_recv_jif;
5883         int rv;
5884         void *buf    = connection->meta.rbuf;
5885         int received = 0;
5886         unsigned int header_size = drbd_header_size(connection);
5887         int expect   = header_size;
5888         bool ping_timeout_active = false;
5889         struct sched_param param = { .sched_priority = 2 };
5890
5891         rv = sched_setscheduler(current, SCHED_RR, &param);
5892         if (rv < 0)
5893                 drbd_err(connection, "drbd_ack_receiver: ERROR set priority, ret=%d\n", rv);
5894
5895         while (get_t_state(thi) == RUNNING) {
5896                 drbd_thread_current_set_cpu(thi);
5897
5898                 conn_reclaim_net_peer_reqs(connection);
5899
5900                 if (test_and_clear_bit(SEND_PING, &connection->flags)) {
5901                         if (drbd_send_ping(connection)) {
5902                                 drbd_err(connection, "drbd_send_ping has failed\n");
5903                                 goto reconnect;
5904                         }
5905                         set_ping_timeout(connection);
5906                         ping_timeout_active = true;
5907                 }
5908
5909                 pre_recv_jif = jiffies;
5910                 rv = drbd_recv_short(connection->meta.socket, buf, expect-received, 0);
5911
5912                 /* Note:
5913                  * -EINTR        (on meta) we got a signal
5914                  * -EAGAIN       (on meta) rcvtimeo expired
5915                  * -ECONNRESET   other side closed the connection
5916                  * -ERESTARTSYS  (on data) we got a signal
5917                  * rv <  0       other than above: unexpected error!
5918                  * rv == expected: full header or command
5919                  * rv <  expected: "woken" by signal during receive
5920                  * rv == 0       : "connection shut down by peer"
5921                  */
5922                 if (likely(rv > 0)) {
5923                         received += rv;
5924                         buf      += rv;
5925                 } else if (rv == 0) {
5926                         if (test_bit(DISCONNECT_SENT, &connection->flags)) {
5927                                 long t;
5928                                 rcu_read_lock();
5929                                 t = rcu_dereference(connection->net_conf)->ping_timeo * HZ/10;
5930                                 rcu_read_unlock();
5931
5932                                 t = wait_event_timeout(connection->ping_wait,
5933                                                        connection->cstate < C_WF_REPORT_PARAMS,
5934                                                        t);
5935                                 if (t)
5936                                         break;
5937                         }
5938                         drbd_err(connection, "meta connection shut down by peer.\n");
5939                         goto reconnect;
5940                 } else if (rv == -EAGAIN) {
5941                         /* If the data socket received something meanwhile,
5942                          * that is good enough: peer is still alive. */
5943                         if (time_after(connection->last_received, pre_recv_jif))
5944                                 continue;
5945                         if (ping_timeout_active) {
5946                                 drbd_err(connection, "PingAck did not arrive in time.\n");
5947                                 goto reconnect;
5948                         }
5949                         set_bit(SEND_PING, &connection->flags);
5950                         continue;
5951                 } else if (rv == -EINTR) {
5952                         /* maybe drbd_thread_stop(): the while condition will notice.
5953                          * maybe woken for send_ping: we'll send a ping above,
5954                          * and change the rcvtimeo */
5955                         flush_signals(current);
5956                         continue;
5957                 } else {
5958                         drbd_err(connection, "sock_recvmsg returned %d\n", rv);
5959                         goto reconnect;
5960                 }
5961
5962                 if (received == expect && cmd == NULL) {
5963                         if (decode_header(connection, connection->meta.rbuf, &pi))
5964                                 goto reconnect;
5965                         cmd = &ack_receiver_tbl[pi.cmd];
5966                         if (pi.cmd >= ARRAY_SIZE(ack_receiver_tbl) || !cmd->fn) {
5967                                 drbd_err(connection, "Unexpected meta packet %s (0x%04x)\n",
5968                                          cmdname(pi.cmd), pi.cmd);
5969                                 goto disconnect;
5970                         }
5971                         expect = header_size + cmd->pkt_size;
5972                         if (pi.size != expect - header_size) {
5973                                 drbd_err(connection, "Wrong packet size on meta (c: %d, l: %d)\n",
5974                                         pi.cmd, pi.size);
5975                                 goto reconnect;
5976                         }
5977                 }
5978                 if (received == expect) {
5979                         bool err;
5980
5981                         err = cmd->fn(connection, &pi);
5982                         if (err) {
5983                                 drbd_err(connection, "%pf failed\n", cmd->fn);
5984                                 goto reconnect;
5985                         }
5986
5987                         connection->last_received = jiffies;
5988
5989                         if (cmd == &ack_receiver_tbl[P_PING_ACK]) {
5990                                 set_idle_timeout(connection);
5991                                 ping_timeout_active = false;
5992                         }
5993
5994                         buf      = connection->meta.rbuf;
5995                         received = 0;
5996                         expect   = header_size;
5997                         cmd      = NULL;
5998                 }
5999         }
6000
6001         if (0) {
6002 reconnect:
6003                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6004                 conn_md_sync(connection);
6005         }
6006         if (0) {
6007 disconnect:
6008                 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
6009         }
6010
6011         drbd_info(connection, "ack_receiver terminated\n");
6012
6013         return 0;
6014 }
6015
6016 void drbd_send_acks_wf(struct work_struct *ws)
6017 {
6018         struct drbd_peer_device *peer_device =
6019                 container_of(ws, struct drbd_peer_device, send_acks_work);
6020         struct drbd_connection *connection = peer_device->connection;
6021         struct drbd_device *device = peer_device->device;
6022         struct net_conf *nc;
6023         int tcp_cork, err;
6024
6025         rcu_read_lock();
6026         nc = rcu_dereference(connection->net_conf);
6027         tcp_cork = nc->tcp_cork;
6028         rcu_read_unlock();
6029
6030         if (tcp_cork)
6031                 drbd_tcp_cork(connection->meta.socket);
6032
6033         err = drbd_finish_peer_reqs(device);
6034         kref_put(&device->kref, drbd_destroy_device);
6035         /* get is in drbd_endio_write_sec_final(). That is necessary to keep the
6036            struct work_struct send_acks_work alive, which is in the peer_device object */
6037
6038         if (err) {
6039                 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
6040                 return;
6041         }
6042
6043         if (tcp_cork)
6044                 drbd_tcp_uncork(connection->meta.socket);
6045
6046         return;
6047 }