Merge tag 'driver-core-6.8-rc5' of git://git.kernel.org/pub/scm/linux/kernel/git...
[sfrench/cifs-2.6.git] / net / ceph / messenger_v1.c
1 // SPDX-License-Identifier: GPL-2.0
2 #include <linux/ceph/ceph_debug.h>
3
4 #include <linux/bvec.h>
5 #include <linux/crc32c.h>
6 #include <linux/net.h>
7 #include <linux/socket.h>
8 #include <net/sock.h>
9
10 #include <linux/ceph/ceph_features.h>
11 #include <linux/ceph/decode.h>
12 #include <linux/ceph/libceph.h>
13 #include <linux/ceph/messenger.h>
14
15 /* static tag bytes (protocol control messages) */
16 static char tag_msg = CEPH_MSGR_TAG_MSG;
17 static char tag_ack = CEPH_MSGR_TAG_ACK;
18 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
19 static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
20
21 /*
22  * If @buf is NULL, discard up to @len bytes.
23  */
24 static int ceph_tcp_recvmsg(struct socket *sock, void *buf, size_t len)
25 {
26         struct kvec iov = {buf, len};
27         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
28         int r;
29
30         if (!buf)
31                 msg.msg_flags |= MSG_TRUNC;
32
33         iov_iter_kvec(&msg.msg_iter, ITER_DEST, &iov, 1, len);
34         r = sock_recvmsg(sock, &msg, msg.msg_flags);
35         if (r == -EAGAIN)
36                 r = 0;
37         return r;
38 }
39
40 static int ceph_tcp_recvpage(struct socket *sock, struct page *page,
41                      int page_offset, size_t length)
42 {
43         struct bio_vec bvec;
44         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
45         int r;
46
47         BUG_ON(page_offset + length > PAGE_SIZE);
48         bvec_set_page(&bvec, page, length, page_offset);
49         iov_iter_bvec(&msg.msg_iter, ITER_DEST, &bvec, 1, length);
50         r = sock_recvmsg(sock, &msg, msg.msg_flags);
51         if (r == -EAGAIN)
52                 r = 0;
53         return r;
54 }
55
56 /*
57  * write something.  @more is true if caller will be sending more data
58  * shortly.
59  */
60 static int ceph_tcp_sendmsg(struct socket *sock, struct kvec *iov,
61                             size_t kvlen, size_t len, bool more)
62 {
63         struct msghdr msg = { .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL };
64         int r;
65
66         if (more)
67                 msg.msg_flags |= MSG_MORE;
68         else
69                 msg.msg_flags |= MSG_EOR;  /* superfluous, but what the hell */
70
71         r = kernel_sendmsg(sock, &msg, iov, kvlen, len);
72         if (r == -EAGAIN)
73                 r = 0;
74         return r;
75 }
76
77 /*
78  * @more: MSG_MORE or 0.
79  */
80 static int ceph_tcp_sendpage(struct socket *sock, struct page *page,
81                              int offset, size_t size, int more)
82 {
83         struct msghdr msg = {
84                 .msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL | more,
85         };
86         struct bio_vec bvec;
87         int ret;
88
89         /*
90          * MSG_SPLICE_PAGES cannot properly handle pages with page_count == 0,
91          * we need to fall back to sendmsg if that's the case.
92          *
93          * Same goes for slab pages: skb_can_coalesce() allows
94          * coalescing neighboring slab objects into a single frag which
95          * triggers one of hardened usercopy checks.
96          */
97         if (sendpage_ok(page))
98                 msg.msg_flags |= MSG_SPLICE_PAGES;
99
100         bvec_set_page(&bvec, page, size, offset);
101         iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, size);
102
103         ret = sock_sendmsg(sock, &msg);
104         if (ret == -EAGAIN)
105                 ret = 0;
106
107         return ret;
108 }
109
110 static void con_out_kvec_reset(struct ceph_connection *con)
111 {
112         BUG_ON(con->v1.out_skip);
113
114         con->v1.out_kvec_left = 0;
115         con->v1.out_kvec_bytes = 0;
116         con->v1.out_kvec_cur = &con->v1.out_kvec[0];
117 }
118
119 static void con_out_kvec_add(struct ceph_connection *con,
120                                 size_t size, void *data)
121 {
122         int index = con->v1.out_kvec_left;
123
124         BUG_ON(con->v1.out_skip);
125         BUG_ON(index >= ARRAY_SIZE(con->v1.out_kvec));
126
127         con->v1.out_kvec[index].iov_len = size;
128         con->v1.out_kvec[index].iov_base = data;
129         con->v1.out_kvec_left++;
130         con->v1.out_kvec_bytes += size;
131 }
132
133 /*
134  * Chop off a kvec from the end.  Return residual number of bytes for
135  * that kvec, i.e. how many bytes would have been written if the kvec
136  * hadn't been nuked.
137  */
138 static int con_out_kvec_skip(struct ceph_connection *con)
139 {
140         int skip = 0;
141
142         if (con->v1.out_kvec_bytes > 0) {
143                 skip = con->v1.out_kvec_cur[con->v1.out_kvec_left - 1].iov_len;
144                 BUG_ON(con->v1.out_kvec_bytes < skip);
145                 BUG_ON(!con->v1.out_kvec_left);
146                 con->v1.out_kvec_bytes -= skip;
147                 con->v1.out_kvec_left--;
148         }
149
150         return skip;
151 }
152
153 static size_t sizeof_footer(struct ceph_connection *con)
154 {
155         return (con->peer_features & CEPH_FEATURE_MSG_AUTH) ?
156             sizeof(struct ceph_msg_footer) :
157             sizeof(struct ceph_msg_footer_old);
158 }
159
160 static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
161 {
162         /* Initialize data cursor if it's not a sparse read */
163         u64 len = msg->sparse_read_total ? : data_len;
164
165         ceph_msg_data_cursor_init(&msg->cursor, msg, len);
166 }
167
168 /*
169  * Prepare footer for currently outgoing message, and finish things
170  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
171  */
172 static void prepare_write_message_footer(struct ceph_connection *con)
173 {
174         struct ceph_msg *m = con->out_msg;
175
176         m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
177
178         dout("prepare_write_message_footer %p\n", con);
179         con_out_kvec_add(con, sizeof_footer(con), &m->footer);
180         if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
181                 if (con->ops->sign_message)
182                         con->ops->sign_message(m);
183                 else
184                         m->footer.sig = 0;
185         } else {
186                 m->old_footer.flags = m->footer.flags;
187         }
188         con->v1.out_more = m->more_to_follow;
189         con->v1.out_msg_done = true;
190 }
191
192 /*
193  * Prepare headers for the next outgoing message.
194  */
195 static void prepare_write_message(struct ceph_connection *con)
196 {
197         struct ceph_msg *m;
198         u32 crc;
199
200         con_out_kvec_reset(con);
201         con->v1.out_msg_done = false;
202
203         /* Sneak an ack in there first?  If we can get it into the same
204          * TCP packet that's a good thing. */
205         if (con->in_seq > con->in_seq_acked) {
206                 con->in_seq_acked = con->in_seq;
207                 con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
208                 con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
209                 con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
210                         &con->v1.out_temp_ack);
211         }
212
213         ceph_con_get_out_msg(con);
214         m = con->out_msg;
215
216         dout("prepare_write_message %p seq %lld type %d len %d+%d+%zd\n",
217              m, con->out_seq, le16_to_cpu(m->hdr.type),
218              le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
219              m->data_length);
220         WARN_ON(m->front.iov_len != le32_to_cpu(m->hdr.front_len));
221         WARN_ON(m->data_length != le32_to_cpu(m->hdr.data_len));
222
223         /* tag + hdr + front + middle */
224         con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
225         con_out_kvec_add(con, sizeof(con->v1.out_hdr), &con->v1.out_hdr);
226         con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
227
228         if (m->middle)
229                 con_out_kvec_add(con, m->middle->vec.iov_len,
230                         m->middle->vec.iov_base);
231
232         /* fill in hdr crc and finalize hdr */
233         crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
234         con->out_msg->hdr.crc = cpu_to_le32(crc);
235         memcpy(&con->v1.out_hdr, &con->out_msg->hdr, sizeof(con->v1.out_hdr));
236
237         /* fill in front and middle crc, footer */
238         crc = crc32c(0, m->front.iov_base, m->front.iov_len);
239         con->out_msg->footer.front_crc = cpu_to_le32(crc);
240         if (m->middle) {
241                 crc = crc32c(0, m->middle->vec.iov_base,
242                                 m->middle->vec.iov_len);
243                 con->out_msg->footer.middle_crc = cpu_to_le32(crc);
244         } else
245                 con->out_msg->footer.middle_crc = 0;
246         dout("%s front_crc %u middle_crc %u\n", __func__,
247              le32_to_cpu(con->out_msg->footer.front_crc),
248              le32_to_cpu(con->out_msg->footer.middle_crc));
249         con->out_msg->footer.flags = 0;
250
251         /* is there a data payload? */
252         con->out_msg->footer.data_crc = 0;
253         if (m->data_length) {
254                 prepare_message_data(con->out_msg, m->data_length);
255                 con->v1.out_more = 1;  /* data + footer will follow */
256         } else {
257                 /* no, queue up footer too and be done */
258                 prepare_write_message_footer(con);
259         }
260
261         ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
262 }
263
264 /*
265  * Prepare an ack.
266  */
267 static void prepare_write_ack(struct ceph_connection *con)
268 {
269         dout("prepare_write_ack %p %llu -> %llu\n", con,
270              con->in_seq_acked, con->in_seq);
271         con->in_seq_acked = con->in_seq;
272
273         con_out_kvec_reset(con);
274
275         con_out_kvec_add(con, sizeof (tag_ack), &tag_ack);
276
277         con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
278         con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
279                          &con->v1.out_temp_ack);
280
281         con->v1.out_more = 1;  /* more will follow.. eventually.. */
282         ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
283 }
284
285 /*
286  * Prepare to share the seq during handshake
287  */
288 static void prepare_write_seq(struct ceph_connection *con)
289 {
290         dout("prepare_write_seq %p %llu -> %llu\n", con,
291              con->in_seq_acked, con->in_seq);
292         con->in_seq_acked = con->in_seq;
293
294         con_out_kvec_reset(con);
295
296         con->v1.out_temp_ack = cpu_to_le64(con->in_seq_acked);
297         con_out_kvec_add(con, sizeof(con->v1.out_temp_ack),
298                          &con->v1.out_temp_ack);
299
300         ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
301 }
302
303 /*
304  * Prepare to write keepalive byte.
305  */
306 static void prepare_write_keepalive(struct ceph_connection *con)
307 {
308         dout("prepare_write_keepalive %p\n", con);
309         con_out_kvec_reset(con);
310         if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
311                 struct timespec64 now;
312
313                 ktime_get_real_ts64(&now);
314                 con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
315                 ceph_encode_timespec64(&con->v1.out_temp_keepalive2, &now);
316                 con_out_kvec_add(con, sizeof(con->v1.out_temp_keepalive2),
317                                  &con->v1.out_temp_keepalive2);
318         } else {
319                 con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
320         }
321         ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
322 }
323
324 /*
325  * Connection negotiation.
326  */
327
328 static int get_connect_authorizer(struct ceph_connection *con)
329 {
330         struct ceph_auth_handshake *auth;
331         int auth_proto;
332
333         if (!con->ops->get_authorizer) {
334                 con->v1.auth = NULL;
335                 con->v1.out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
336                 con->v1.out_connect.authorizer_len = 0;
337                 return 0;
338         }
339
340         auth = con->ops->get_authorizer(con, &auth_proto, con->v1.auth_retry);
341         if (IS_ERR(auth))
342                 return PTR_ERR(auth);
343
344         con->v1.auth = auth;
345         con->v1.out_connect.authorizer_protocol = cpu_to_le32(auth_proto);
346         con->v1.out_connect.authorizer_len =
347                 cpu_to_le32(auth->authorizer_buf_len);
348         return 0;
349 }
350
351 /*
352  * We connected to a peer and are saying hello.
353  */
354 static void prepare_write_banner(struct ceph_connection *con)
355 {
356         con_out_kvec_add(con, strlen(CEPH_BANNER), CEPH_BANNER);
357         con_out_kvec_add(con, sizeof (con->msgr->my_enc_addr),
358                                         &con->msgr->my_enc_addr);
359
360         con->v1.out_more = 0;
361         ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
362 }
363
364 static void __prepare_write_connect(struct ceph_connection *con)
365 {
366         con_out_kvec_add(con, sizeof(con->v1.out_connect),
367                          &con->v1.out_connect);
368         if (con->v1.auth)
369                 con_out_kvec_add(con, con->v1.auth->authorizer_buf_len,
370                                  con->v1.auth->authorizer_buf);
371
372         con->v1.out_more = 0;
373         ceph_con_flag_set(con, CEPH_CON_F_WRITE_PENDING);
374 }
375
376 static int prepare_write_connect(struct ceph_connection *con)
377 {
378         unsigned int global_seq = ceph_get_global_seq(con->msgr, 0);
379         int proto;
380         int ret;
381
382         switch (con->peer_name.type) {
383         case CEPH_ENTITY_TYPE_MON:
384                 proto = CEPH_MONC_PROTOCOL;
385                 break;
386         case CEPH_ENTITY_TYPE_OSD:
387                 proto = CEPH_OSDC_PROTOCOL;
388                 break;
389         case CEPH_ENTITY_TYPE_MDS:
390                 proto = CEPH_MDSC_PROTOCOL;
391                 break;
392         default:
393                 BUG();
394         }
395
396         dout("prepare_write_connect %p cseq=%d gseq=%d proto=%d\n", con,
397              con->v1.connect_seq, global_seq, proto);
398
399         con->v1.out_connect.features =
400                 cpu_to_le64(from_msgr(con->msgr)->supported_features);
401         con->v1.out_connect.host_type = cpu_to_le32(CEPH_ENTITY_TYPE_CLIENT);
402         con->v1.out_connect.connect_seq = cpu_to_le32(con->v1.connect_seq);
403         con->v1.out_connect.global_seq = cpu_to_le32(global_seq);
404         con->v1.out_connect.protocol_version = cpu_to_le32(proto);
405         con->v1.out_connect.flags = 0;
406
407         ret = get_connect_authorizer(con);
408         if (ret)
409                 return ret;
410
411         __prepare_write_connect(con);
412         return 0;
413 }
414
415 /*
416  * write as much of pending kvecs to the socket as we can.
417  *  1 -> done
418  *  0 -> socket full, but more to do
419  * <0 -> error
420  */
421 static int write_partial_kvec(struct ceph_connection *con)
422 {
423         int ret;
424
425         dout("write_partial_kvec %p %d left\n", con, con->v1.out_kvec_bytes);
426         while (con->v1.out_kvec_bytes > 0) {
427                 ret = ceph_tcp_sendmsg(con->sock, con->v1.out_kvec_cur,
428                                        con->v1.out_kvec_left,
429                                        con->v1.out_kvec_bytes,
430                                        con->v1.out_more);
431                 if (ret <= 0)
432                         goto out;
433                 con->v1.out_kvec_bytes -= ret;
434                 if (!con->v1.out_kvec_bytes)
435                         break;            /* done */
436
437                 /* account for full iov entries consumed */
438                 while (ret >= con->v1.out_kvec_cur->iov_len) {
439                         BUG_ON(!con->v1.out_kvec_left);
440                         ret -= con->v1.out_kvec_cur->iov_len;
441                         con->v1.out_kvec_cur++;
442                         con->v1.out_kvec_left--;
443                 }
444                 /* and for a partially-consumed entry */
445                 if (ret) {
446                         con->v1.out_kvec_cur->iov_len -= ret;
447                         con->v1.out_kvec_cur->iov_base += ret;
448                 }
449         }
450         con->v1.out_kvec_left = 0;
451         ret = 1;
452 out:
453         dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
454              con->v1.out_kvec_bytes, con->v1.out_kvec_left, ret);
455         return ret;  /* done! */
456 }
457
458 /*
459  * Write as much message data payload as we can.  If we finish, queue
460  * up the footer.
461  *  1 -> done, footer is now queued in out_kvec[].
462  *  0 -> socket full, but more to do
463  * <0 -> error
464  */
465 static int write_partial_message_data(struct ceph_connection *con)
466 {
467         struct ceph_msg *msg = con->out_msg;
468         struct ceph_msg_data_cursor *cursor = &msg->cursor;
469         bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
470         u32 crc;
471
472         dout("%s %p msg %p\n", __func__, con, msg);
473
474         if (!msg->num_data_items)
475                 return -EINVAL;
476
477         /*
478          * Iterate through each page that contains data to be
479          * written, and send as much as possible for each.
480          *
481          * If we are calculating the data crc (the default), we will
482          * need to map the page.  If we have no pages, they have
483          * been revoked, so use the zero page.
484          */
485         crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
486         while (cursor->total_resid) {
487                 struct page *page;
488                 size_t page_offset;
489                 size_t length;
490                 int ret;
491
492                 if (!cursor->resid) {
493                         ceph_msg_data_advance(cursor, 0);
494                         continue;
495                 }
496
497                 page = ceph_msg_data_next(cursor, &page_offset, &length);
498                 ret = ceph_tcp_sendpage(con->sock, page, page_offset, length,
499                                         MSG_MORE);
500                 if (ret <= 0) {
501                         if (do_datacrc)
502                                 msg->footer.data_crc = cpu_to_le32(crc);
503
504                         return ret;
505                 }
506                 if (do_datacrc && cursor->need_crc)
507                         crc = ceph_crc32c_page(crc, page, page_offset, length);
508                 ceph_msg_data_advance(cursor, (size_t)ret);
509         }
510
511         dout("%s %p msg %p done\n", __func__, con, msg);
512
513         /* prepare and queue up footer, too */
514         if (do_datacrc)
515                 msg->footer.data_crc = cpu_to_le32(crc);
516         else
517                 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
518         con_out_kvec_reset(con);
519         prepare_write_message_footer(con);
520
521         return 1;       /* must return > 0 to indicate success */
522 }
523
524 /*
525  * write some zeros
526  */
527 static int write_partial_skip(struct ceph_connection *con)
528 {
529         int ret;
530
531         dout("%s %p %d left\n", __func__, con, con->v1.out_skip);
532         while (con->v1.out_skip > 0) {
533                 size_t size = min(con->v1.out_skip, (int)PAGE_SIZE);
534
535                 ret = ceph_tcp_sendpage(con->sock, ceph_zero_page, 0, size,
536                                         MSG_MORE);
537                 if (ret <= 0)
538                         goto out;
539                 con->v1.out_skip -= ret;
540         }
541         ret = 1;
542 out:
543         return ret;
544 }
545
546 /*
547  * Prepare to read connection handshake, or an ack.
548  */
549 static void prepare_read_banner(struct ceph_connection *con)
550 {
551         dout("prepare_read_banner %p\n", con);
552         con->v1.in_base_pos = 0;
553 }
554
555 static void prepare_read_connect(struct ceph_connection *con)
556 {
557         dout("prepare_read_connect %p\n", con);
558         con->v1.in_base_pos = 0;
559 }
560
561 static void prepare_read_ack(struct ceph_connection *con)
562 {
563         dout("prepare_read_ack %p\n", con);
564         con->v1.in_base_pos = 0;
565 }
566
567 static void prepare_read_seq(struct ceph_connection *con)
568 {
569         dout("prepare_read_seq %p\n", con);
570         con->v1.in_base_pos = 0;
571         con->v1.in_tag = CEPH_MSGR_TAG_SEQ;
572 }
573
574 static void prepare_read_tag(struct ceph_connection *con)
575 {
576         dout("prepare_read_tag %p\n", con);
577         con->v1.in_base_pos = 0;
578         con->v1.in_tag = CEPH_MSGR_TAG_READY;
579 }
580
581 static void prepare_read_keepalive_ack(struct ceph_connection *con)
582 {
583         dout("prepare_read_keepalive_ack %p\n", con);
584         con->v1.in_base_pos = 0;
585 }
586
587 /*
588  * Prepare to read a message.
589  */
590 static int prepare_read_message(struct ceph_connection *con)
591 {
592         dout("prepare_read_message %p\n", con);
593         BUG_ON(con->in_msg != NULL);
594         con->v1.in_base_pos = 0;
595         con->in_front_crc = con->in_middle_crc = con->in_data_crc = 0;
596         return 0;
597 }
598
599 static int read_partial(struct ceph_connection *con,
600                         int end, int size, void *object)
601 {
602         while (con->v1.in_base_pos < end) {
603                 int left = end - con->v1.in_base_pos;
604                 int have = size - left;
605                 int ret = ceph_tcp_recvmsg(con->sock, object + have, left);
606                 if (ret <= 0)
607                         return ret;
608                 con->v1.in_base_pos += ret;
609         }
610         return 1;
611 }
612
613 /*
614  * Read all or part of the connect-side handshake on a new connection
615  */
616 static int read_partial_banner(struct ceph_connection *con)
617 {
618         int size;
619         int end;
620         int ret;
621
622         dout("read_partial_banner %p at %d\n", con, con->v1.in_base_pos);
623
624         /* peer's banner */
625         size = strlen(CEPH_BANNER);
626         end = size;
627         ret = read_partial(con, end, size, con->v1.in_banner);
628         if (ret <= 0)
629                 goto out;
630
631         size = sizeof(con->v1.actual_peer_addr);
632         end += size;
633         ret = read_partial(con, end, size, &con->v1.actual_peer_addr);
634         if (ret <= 0)
635                 goto out;
636         ceph_decode_banner_addr(&con->v1.actual_peer_addr);
637
638         size = sizeof(con->v1.peer_addr_for_me);
639         end += size;
640         ret = read_partial(con, end, size, &con->v1.peer_addr_for_me);
641         if (ret <= 0)
642                 goto out;
643         ceph_decode_banner_addr(&con->v1.peer_addr_for_me);
644
645 out:
646         return ret;
647 }
648
649 static int read_partial_connect(struct ceph_connection *con)
650 {
651         int size;
652         int end;
653         int ret;
654
655         dout("read_partial_connect %p at %d\n", con, con->v1.in_base_pos);
656
657         size = sizeof(con->v1.in_reply);
658         end = size;
659         ret = read_partial(con, end, size, &con->v1.in_reply);
660         if (ret <= 0)
661                 goto out;
662
663         if (con->v1.auth) {
664                 size = le32_to_cpu(con->v1.in_reply.authorizer_len);
665                 if (size > con->v1.auth->authorizer_reply_buf_len) {
666                         pr_err("authorizer reply too big: %d > %zu\n", size,
667                                con->v1.auth->authorizer_reply_buf_len);
668                         ret = -EINVAL;
669                         goto out;
670                 }
671
672                 end += size;
673                 ret = read_partial(con, end, size,
674                                    con->v1.auth->authorizer_reply_buf);
675                 if (ret <= 0)
676                         goto out;
677         }
678
679         dout("read_partial_connect %p tag %d, con_seq = %u, g_seq = %u\n",
680              con, con->v1.in_reply.tag,
681              le32_to_cpu(con->v1.in_reply.connect_seq),
682              le32_to_cpu(con->v1.in_reply.global_seq));
683 out:
684         return ret;
685 }
686
687 /*
688  * Verify the hello banner looks okay.
689  */
690 static int verify_hello(struct ceph_connection *con)
691 {
692         if (memcmp(con->v1.in_banner, CEPH_BANNER, strlen(CEPH_BANNER))) {
693                 pr_err("connect to %s got bad banner\n",
694                        ceph_pr_addr(&con->peer_addr));
695                 con->error_msg = "protocol error, bad banner";
696                 return -1;
697         }
698         return 0;
699 }
700
701 static int process_banner(struct ceph_connection *con)
702 {
703         struct ceph_entity_addr *my_addr = &con->msgr->inst.addr;
704
705         dout("process_banner on %p\n", con);
706
707         if (verify_hello(con) < 0)
708                 return -1;
709
710         /*
711          * Make sure the other end is who we wanted.  note that the other
712          * end may not yet know their ip address, so if it's 0.0.0.0, give
713          * them the benefit of the doubt.
714          */
715         if (memcmp(&con->peer_addr, &con->v1.actual_peer_addr,
716                    sizeof(con->peer_addr)) != 0 &&
717             !(ceph_addr_is_blank(&con->v1.actual_peer_addr) &&
718               con->v1.actual_peer_addr.nonce == con->peer_addr.nonce)) {
719                 pr_warn("wrong peer, want %s/%u, got %s/%u\n",
720                         ceph_pr_addr(&con->peer_addr),
721                         le32_to_cpu(con->peer_addr.nonce),
722                         ceph_pr_addr(&con->v1.actual_peer_addr),
723                         le32_to_cpu(con->v1.actual_peer_addr.nonce));
724                 con->error_msg = "wrong peer at address";
725                 return -1;
726         }
727
728         /*
729          * did we learn our address?
730          */
731         if (ceph_addr_is_blank(my_addr)) {
732                 memcpy(&my_addr->in_addr,
733                        &con->v1.peer_addr_for_me.in_addr,
734                        sizeof(con->v1.peer_addr_for_me.in_addr));
735                 ceph_addr_set_port(my_addr, 0);
736                 ceph_encode_my_addr(con->msgr);
737                 dout("process_banner learned my addr is %s\n",
738                      ceph_pr_addr(my_addr));
739         }
740
741         return 0;
742 }
743
744 static int process_connect(struct ceph_connection *con)
745 {
746         u64 sup_feat = from_msgr(con->msgr)->supported_features;
747         u64 req_feat = from_msgr(con->msgr)->required_features;
748         u64 server_feat = le64_to_cpu(con->v1.in_reply.features);
749         int ret;
750
751         dout("process_connect on %p tag %d\n", con, con->v1.in_tag);
752
753         if (con->v1.auth) {
754                 int len = le32_to_cpu(con->v1.in_reply.authorizer_len);
755
756                 /*
757                  * Any connection that defines ->get_authorizer()
758                  * should also define ->add_authorizer_challenge() and
759                  * ->verify_authorizer_reply().
760                  *
761                  * See get_connect_authorizer().
762                  */
763                 if (con->v1.in_reply.tag ==
764                                 CEPH_MSGR_TAG_CHALLENGE_AUTHORIZER) {
765                         ret = con->ops->add_authorizer_challenge(
766                                 con, con->v1.auth->authorizer_reply_buf, len);
767                         if (ret < 0)
768                                 return ret;
769
770                         con_out_kvec_reset(con);
771                         __prepare_write_connect(con);
772                         prepare_read_connect(con);
773                         return 0;
774                 }
775
776                 if (len) {
777                         ret = con->ops->verify_authorizer_reply(con);
778                         if (ret < 0) {
779                                 con->error_msg = "bad authorize reply";
780                                 return ret;
781                         }
782                 }
783         }
784
785         switch (con->v1.in_reply.tag) {
786         case CEPH_MSGR_TAG_FEATURES:
787                 pr_err("%s%lld %s feature set mismatch,"
788                        " my %llx < server's %llx, missing %llx\n",
789                        ENTITY_NAME(con->peer_name),
790                        ceph_pr_addr(&con->peer_addr),
791                        sup_feat, server_feat, server_feat & ~sup_feat);
792                 con->error_msg = "missing required protocol features";
793                 return -1;
794
795         case CEPH_MSGR_TAG_BADPROTOVER:
796                 pr_err("%s%lld %s protocol version mismatch,"
797                        " my %d != server's %d\n",
798                        ENTITY_NAME(con->peer_name),
799                        ceph_pr_addr(&con->peer_addr),
800                        le32_to_cpu(con->v1.out_connect.protocol_version),
801                        le32_to_cpu(con->v1.in_reply.protocol_version));
802                 con->error_msg = "protocol version mismatch";
803                 return -1;
804
805         case CEPH_MSGR_TAG_BADAUTHORIZER:
806                 con->v1.auth_retry++;
807                 dout("process_connect %p got BADAUTHORIZER attempt %d\n", con,
808                      con->v1.auth_retry);
809                 if (con->v1.auth_retry == 2) {
810                         con->error_msg = "connect authorization failure";
811                         return -1;
812                 }
813                 con_out_kvec_reset(con);
814                 ret = prepare_write_connect(con);
815                 if (ret < 0)
816                         return ret;
817                 prepare_read_connect(con);
818                 break;
819
820         case CEPH_MSGR_TAG_RESETSESSION:
821                 /*
822                  * If we connected with a large connect_seq but the peer
823                  * has no record of a session with us (no connection, or
824                  * connect_seq == 0), they will send RESETSESION to indicate
825                  * that they must have reset their session, and may have
826                  * dropped messages.
827                  */
828                 dout("process_connect got RESET peer seq %u\n",
829                      le32_to_cpu(con->v1.in_reply.connect_seq));
830                 pr_info("%s%lld %s session reset\n",
831                         ENTITY_NAME(con->peer_name),
832                         ceph_pr_addr(&con->peer_addr));
833                 ceph_con_reset_session(con);
834                 con_out_kvec_reset(con);
835                 ret = prepare_write_connect(con);
836                 if (ret < 0)
837                         return ret;
838                 prepare_read_connect(con);
839
840                 /* Tell ceph about it. */
841                 mutex_unlock(&con->mutex);
842                 if (con->ops->peer_reset)
843                         con->ops->peer_reset(con);
844                 mutex_lock(&con->mutex);
845                 if (con->state != CEPH_CON_S_V1_CONNECT_MSG)
846                         return -EAGAIN;
847                 break;
848
849         case CEPH_MSGR_TAG_RETRY_SESSION:
850                 /*
851                  * If we sent a smaller connect_seq than the peer has, try
852                  * again with a larger value.
853                  */
854                 dout("process_connect got RETRY_SESSION my seq %u, peer %u\n",
855                      le32_to_cpu(con->v1.out_connect.connect_seq),
856                      le32_to_cpu(con->v1.in_reply.connect_seq));
857                 con->v1.connect_seq = le32_to_cpu(con->v1.in_reply.connect_seq);
858                 con_out_kvec_reset(con);
859                 ret = prepare_write_connect(con);
860                 if (ret < 0)
861                         return ret;
862                 prepare_read_connect(con);
863                 break;
864
865         case CEPH_MSGR_TAG_RETRY_GLOBAL:
866                 /*
867                  * If we sent a smaller global_seq than the peer has, try
868                  * again with a larger value.
869                  */
870                 dout("process_connect got RETRY_GLOBAL my %u peer_gseq %u\n",
871                      con->v1.peer_global_seq,
872                      le32_to_cpu(con->v1.in_reply.global_seq));
873                 ceph_get_global_seq(con->msgr,
874                                     le32_to_cpu(con->v1.in_reply.global_seq));
875                 con_out_kvec_reset(con);
876                 ret = prepare_write_connect(con);
877                 if (ret < 0)
878                         return ret;
879                 prepare_read_connect(con);
880                 break;
881
882         case CEPH_MSGR_TAG_SEQ:
883         case CEPH_MSGR_TAG_READY:
884                 if (req_feat & ~server_feat) {
885                         pr_err("%s%lld %s protocol feature mismatch,"
886                                " my required %llx > server's %llx, need %llx\n",
887                                ENTITY_NAME(con->peer_name),
888                                ceph_pr_addr(&con->peer_addr),
889                                req_feat, server_feat, req_feat & ~server_feat);
890                         con->error_msg = "missing required protocol features";
891                         return -1;
892                 }
893
894                 WARN_ON(con->state != CEPH_CON_S_V1_CONNECT_MSG);
895                 con->state = CEPH_CON_S_OPEN;
896                 con->v1.auth_retry = 0;    /* we authenticated; clear flag */
897                 con->v1.peer_global_seq =
898                         le32_to_cpu(con->v1.in_reply.global_seq);
899                 con->v1.connect_seq++;
900                 con->peer_features = server_feat;
901                 dout("process_connect got READY gseq %d cseq %d (%d)\n",
902                      con->v1.peer_global_seq,
903                      le32_to_cpu(con->v1.in_reply.connect_seq),
904                      con->v1.connect_seq);
905                 WARN_ON(con->v1.connect_seq !=
906                         le32_to_cpu(con->v1.in_reply.connect_seq));
907
908                 if (con->v1.in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
909                         ceph_con_flag_set(con, CEPH_CON_F_LOSSYTX);
910
911                 con->delay = 0;      /* reset backoff memory */
912
913                 if (con->v1.in_reply.tag == CEPH_MSGR_TAG_SEQ) {
914                         prepare_write_seq(con);
915                         prepare_read_seq(con);
916                 } else {
917                         prepare_read_tag(con);
918                 }
919                 break;
920
921         case CEPH_MSGR_TAG_WAIT:
922                 /*
923                  * If there is a connection race (we are opening
924                  * connections to each other), one of us may just have
925                  * to WAIT.  This shouldn't happen if we are the
926                  * client.
927                  */
928                 con->error_msg = "protocol error, got WAIT as client";
929                 return -1;
930
931         default:
932                 con->error_msg = "protocol error, garbage tag during connect";
933                 return -1;
934         }
935         return 0;
936 }
937
938 /*
939  * read (part of) an ack
940  */
941 static int read_partial_ack(struct ceph_connection *con)
942 {
943         int size = sizeof(con->v1.in_temp_ack);
944         int end = size;
945
946         return read_partial(con, end, size, &con->v1.in_temp_ack);
947 }
948
949 /*
950  * We can finally discard anything that's been acked.
951  */
952 static void process_ack(struct ceph_connection *con)
953 {
954         u64 ack = le64_to_cpu(con->v1.in_temp_ack);
955
956         if (con->v1.in_tag == CEPH_MSGR_TAG_ACK)
957                 ceph_con_discard_sent(con, ack);
958         else
959                 ceph_con_discard_requeued(con, ack);
960
961         prepare_read_tag(con);
962 }
963
964 static int read_partial_message_chunk(struct ceph_connection *con,
965                                       struct kvec *section,
966                                       unsigned int sec_len, u32 *crc)
967 {
968         int ret, left;
969
970         BUG_ON(!section);
971
972         while (section->iov_len < sec_len) {
973                 BUG_ON(section->iov_base == NULL);
974                 left = sec_len - section->iov_len;
975                 ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
976                                        section->iov_len, left);
977                 if (ret <= 0)
978                         return ret;
979                 section->iov_len += ret;
980         }
981         if (section->iov_len == sec_len)
982                 *crc = crc32c(*crc, section->iov_base, section->iov_len);
983
984         return 1;
985 }
986
987 static inline int read_partial_message_section(struct ceph_connection *con,
988                                                struct kvec *section,
989                                                unsigned int sec_len, u32 *crc)
990 {
991         *crc = 0;
992         return read_partial_message_chunk(con, section, sec_len, crc);
993 }
994
995 static int read_partial_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
996 {
997         struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
998         bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
999
1000         if (do_bounce && unlikely(!con->bounce_page)) {
1001                 con->bounce_page = alloc_page(GFP_NOIO);
1002                 if (!con->bounce_page) {
1003                         pr_err("failed to allocate bounce page\n");
1004                         return -ENOMEM;
1005                 }
1006         }
1007
1008         while (cursor->sr_resid > 0) {
1009                 struct page *page, *rpage;
1010                 size_t off, len;
1011                 int ret;
1012
1013                 page = ceph_msg_data_next(cursor, &off, &len);
1014                 rpage = do_bounce ? con->bounce_page : page;
1015
1016                 /* clamp to what remains in extent */
1017                 len = min_t(int, len, cursor->sr_resid);
1018                 ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
1019                 if (ret <= 0)
1020                         return ret;
1021                 *crc = ceph_crc32c_page(*crc, rpage, off, ret);
1022                 ceph_msg_data_advance(cursor, (size_t)ret);
1023                 cursor->sr_resid -= ret;
1024                 if (do_bounce)
1025                         memcpy_page(page, off, rpage, off, ret);
1026         }
1027         return 1;
1028 }
1029
1030 static int read_partial_sparse_msg_data(struct ceph_connection *con)
1031 {
1032         struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1033         bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1034         u32 crc = 0;
1035         int ret = 1;
1036
1037         if (do_datacrc)
1038                 crc = con->in_data_crc;
1039
1040         while (cursor->total_resid) {
1041                 if (con->v1.in_sr_kvec.iov_base)
1042                         ret = read_partial_message_chunk(con,
1043                                                          &con->v1.in_sr_kvec,
1044                                                          con->v1.in_sr_len,
1045                                                          &crc);
1046                 else if (cursor->sr_resid > 0)
1047                         ret = read_partial_sparse_msg_extent(con, &crc);
1048                 if (ret <= 0)
1049                         break;
1050
1051                 memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
1052                 ret = con->ops->sparse_read(con, cursor,
1053                                 (char **)&con->v1.in_sr_kvec.iov_base);
1054                 if (ret <= 0) {
1055                         ret = ret ? ret : 1;  /* must return > 0 to indicate success */
1056                         break;
1057                 }
1058                 con->v1.in_sr_len = ret;
1059         }
1060
1061         if (do_datacrc)
1062                 con->in_data_crc = crc;
1063
1064         return ret;
1065 }
1066
1067 static int read_partial_msg_data(struct ceph_connection *con)
1068 {
1069         struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1070         bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1071         struct page *page;
1072         size_t page_offset;
1073         size_t length;
1074         u32 crc = 0;
1075         int ret;
1076
1077         if (do_datacrc)
1078                 crc = con->in_data_crc;
1079         while (cursor->total_resid) {
1080                 if (!cursor->resid) {
1081                         ceph_msg_data_advance(cursor, 0);
1082                         continue;
1083                 }
1084
1085                 page = ceph_msg_data_next(cursor, &page_offset, &length);
1086                 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
1087                 if (ret <= 0) {
1088                         if (do_datacrc)
1089                                 con->in_data_crc = crc;
1090
1091                         return ret;
1092                 }
1093
1094                 if (do_datacrc)
1095                         crc = ceph_crc32c_page(crc, page, page_offset, ret);
1096                 ceph_msg_data_advance(cursor, (size_t)ret);
1097         }
1098         if (do_datacrc)
1099                 con->in_data_crc = crc;
1100
1101         return 1;       /* must return > 0 to indicate success */
1102 }
1103
1104 static int read_partial_msg_data_bounce(struct ceph_connection *con)
1105 {
1106         struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
1107         struct page *page;
1108         size_t off, len;
1109         u32 crc;
1110         int ret;
1111
1112         if (unlikely(!con->bounce_page)) {
1113                 con->bounce_page = alloc_page(GFP_NOIO);
1114                 if (!con->bounce_page) {
1115                         pr_err("failed to allocate bounce page\n");
1116                         return -ENOMEM;
1117                 }
1118         }
1119
1120         crc = con->in_data_crc;
1121         while (cursor->total_resid) {
1122                 if (!cursor->resid) {
1123                         ceph_msg_data_advance(cursor, 0);
1124                         continue;
1125                 }
1126
1127                 page = ceph_msg_data_next(cursor, &off, &len);
1128                 ret = ceph_tcp_recvpage(con->sock, con->bounce_page, 0, len);
1129                 if (ret <= 0) {
1130                         con->in_data_crc = crc;
1131                         return ret;
1132                 }
1133
1134                 crc = crc32c(crc, page_address(con->bounce_page), ret);
1135                 memcpy_to_page(page, off, page_address(con->bounce_page), ret);
1136
1137                 ceph_msg_data_advance(cursor, ret);
1138         }
1139         con->in_data_crc = crc;
1140
1141         return 1;       /* must return > 0 to indicate success */
1142 }
1143
1144 /*
1145  * read (part of) a message.
1146  */
1147 static int read_partial_message(struct ceph_connection *con)
1148 {
1149         struct ceph_msg *m = con->in_msg;
1150         int size;
1151         int end;
1152         int ret;
1153         unsigned int front_len, middle_len, data_len;
1154         bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
1155         bool need_sign = (con->peer_features & CEPH_FEATURE_MSG_AUTH);
1156         u64 seq;
1157         u32 crc;
1158
1159         dout("read_partial_message con %p msg %p\n", con, m);
1160
1161         /* header */
1162         size = sizeof(con->v1.in_hdr);
1163         end = size;
1164         ret = read_partial(con, end, size, &con->v1.in_hdr);
1165         if (ret <= 0)
1166                 return ret;
1167
1168         crc = crc32c(0, &con->v1.in_hdr, offsetof(struct ceph_msg_header, crc));
1169         if (cpu_to_le32(crc) != con->v1.in_hdr.crc) {
1170                 pr_err("read_partial_message bad hdr crc %u != expected %u\n",
1171                        crc, con->v1.in_hdr.crc);
1172                 return -EBADMSG;
1173         }
1174
1175         front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1176         if (front_len > CEPH_MSG_MAX_FRONT_LEN)
1177                 return -EIO;
1178         middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1179         if (middle_len > CEPH_MSG_MAX_MIDDLE_LEN)
1180                 return -EIO;
1181         data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1182         if (data_len > CEPH_MSG_MAX_DATA_LEN)
1183                 return -EIO;
1184
1185         /* verify seq# */
1186         seq = le64_to_cpu(con->v1.in_hdr.seq);
1187         if ((s64)seq - (s64)con->in_seq < 1) {
1188                 pr_info("skipping %s%lld %s seq %lld expected %lld\n",
1189                         ENTITY_NAME(con->peer_name),
1190                         ceph_pr_addr(&con->peer_addr),
1191                         seq, con->in_seq + 1);
1192                 con->v1.in_base_pos = -front_len - middle_len - data_len -
1193                                       sizeof_footer(con);
1194                 con->v1.in_tag = CEPH_MSGR_TAG_READY;
1195                 return 1;
1196         } else if ((s64)seq - (s64)con->in_seq > 1) {
1197                 pr_err("read_partial_message bad seq %lld expected %lld\n",
1198                        seq, con->in_seq + 1);
1199                 con->error_msg = "bad message sequence # for incoming message";
1200                 return -EBADE;
1201         }
1202
1203         /* allocate message? */
1204         if (!con->in_msg) {
1205                 int skip = 0;
1206
1207                 dout("got hdr type %d front %d data %d\n", con->v1.in_hdr.type,
1208                      front_len, data_len);
1209                 ret = ceph_con_in_msg_alloc(con, &con->v1.in_hdr, &skip);
1210                 if (ret < 0)
1211                         return ret;
1212
1213                 BUG_ON((!con->in_msg) ^ skip);
1214                 if (skip) {
1215                         /* skip this message */
1216                         dout("alloc_msg said skip message\n");
1217                         con->v1.in_base_pos = -front_len - middle_len -
1218                                               data_len - sizeof_footer(con);
1219                         con->v1.in_tag = CEPH_MSGR_TAG_READY;
1220                         con->in_seq++;
1221                         return 1;
1222                 }
1223
1224                 BUG_ON(!con->in_msg);
1225                 BUG_ON(con->in_msg->con != con);
1226                 m = con->in_msg;
1227                 m->front.iov_len = 0;    /* haven't read it yet */
1228                 if (m->middle)
1229                         m->middle->vec.iov_len = 0;
1230
1231                 /* prepare for data payload, if any */
1232
1233                 if (data_len)
1234                         prepare_message_data(con->in_msg, data_len);
1235         }
1236
1237         /* front */
1238         ret = read_partial_message_section(con, &m->front, front_len,
1239                                            &con->in_front_crc);
1240         if (ret <= 0)
1241                 return ret;
1242
1243         /* middle */
1244         if (m->middle) {
1245                 ret = read_partial_message_section(con, &m->middle->vec,
1246                                                    middle_len,
1247                                                    &con->in_middle_crc);
1248                 if (ret <= 0)
1249                         return ret;
1250         }
1251
1252         /* (page) data */
1253         if (data_len) {
1254                 if (!m->num_data_items)
1255                         return -EIO;
1256
1257                 if (m->sparse_read_total)
1258                         ret = read_partial_sparse_msg_data(con);
1259                 else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
1260                         ret = read_partial_msg_data_bounce(con);
1261                 else
1262                         ret = read_partial_msg_data(con);
1263                 if (ret <= 0)
1264                         return ret;
1265         }
1266
1267         /* footer */
1268         size = sizeof_footer(con);
1269         end += size;
1270         ret = read_partial(con, end, size, &m->footer);
1271         if (ret <= 0)
1272                 return ret;
1273
1274         if (!need_sign) {
1275                 m->footer.flags = m->old_footer.flags;
1276                 m->footer.sig = 0;
1277         }
1278
1279         dout("read_partial_message got msg %p %d (%u) + %d (%u) + %d (%u)\n",
1280              m, front_len, m->footer.front_crc, middle_len,
1281              m->footer.middle_crc, data_len, m->footer.data_crc);
1282
1283         /* crc ok? */
1284         if (con->in_front_crc != le32_to_cpu(m->footer.front_crc)) {
1285                 pr_err("read_partial_message %p front crc %u != exp. %u\n",
1286                        m, con->in_front_crc, m->footer.front_crc);
1287                 return -EBADMSG;
1288         }
1289         if (con->in_middle_crc != le32_to_cpu(m->footer.middle_crc)) {
1290                 pr_err("read_partial_message %p middle crc %u != exp %u\n",
1291                        m, con->in_middle_crc, m->footer.middle_crc);
1292                 return -EBADMSG;
1293         }
1294         if (do_datacrc &&
1295             (m->footer.flags & CEPH_MSG_FOOTER_NOCRC) == 0 &&
1296             con->in_data_crc != le32_to_cpu(m->footer.data_crc)) {
1297                 pr_err("read_partial_message %p data crc %u != exp. %u\n", m,
1298                        con->in_data_crc, le32_to_cpu(m->footer.data_crc));
1299                 return -EBADMSG;
1300         }
1301
1302         if (need_sign && con->ops->check_message_signature &&
1303             con->ops->check_message_signature(m)) {
1304                 pr_err("read_partial_message %p signature check failed\n", m);
1305                 return -EBADMSG;
1306         }
1307
1308         return 1; /* done! */
1309 }
1310
1311 static int read_keepalive_ack(struct ceph_connection *con)
1312 {
1313         struct ceph_timespec ceph_ts;
1314         size_t size = sizeof(ceph_ts);
1315         int ret = read_partial(con, size, size, &ceph_ts);
1316         if (ret <= 0)
1317                 return ret;
1318         ceph_decode_timespec64(&con->last_keepalive_ack, &ceph_ts);
1319         prepare_read_tag(con);
1320         return 1;
1321 }
1322
1323 /*
1324  * Read what we can from the socket.
1325  */
1326 int ceph_con_v1_try_read(struct ceph_connection *con)
1327 {
1328         int ret = -1;
1329
1330 more:
1331         dout("try_read start %p state %d\n", con, con->state);
1332         if (con->state != CEPH_CON_S_V1_BANNER &&
1333             con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1334             con->state != CEPH_CON_S_OPEN)
1335                 return 0;
1336
1337         BUG_ON(!con->sock);
1338
1339         dout("try_read tag %d in_base_pos %d\n", con->v1.in_tag,
1340              con->v1.in_base_pos);
1341
1342         if (con->state == CEPH_CON_S_V1_BANNER) {
1343                 ret = read_partial_banner(con);
1344                 if (ret <= 0)
1345                         goto out;
1346                 ret = process_banner(con);
1347                 if (ret < 0)
1348                         goto out;
1349
1350                 con->state = CEPH_CON_S_V1_CONNECT_MSG;
1351
1352                 /*
1353                  * Received banner is good, exchange connection info.
1354                  * Do not reset out_kvec, as sending our banner raced
1355                  * with receiving peer banner after connect completed.
1356                  */
1357                 ret = prepare_write_connect(con);
1358                 if (ret < 0)
1359                         goto out;
1360                 prepare_read_connect(con);
1361
1362                 /* Send connection info before awaiting response */
1363                 goto out;
1364         }
1365
1366         if (con->state == CEPH_CON_S_V1_CONNECT_MSG) {
1367                 ret = read_partial_connect(con);
1368                 if (ret <= 0)
1369                         goto out;
1370                 ret = process_connect(con);
1371                 if (ret < 0)
1372                         goto out;
1373                 goto more;
1374         }
1375
1376         WARN_ON(con->state != CEPH_CON_S_OPEN);
1377
1378         if (con->v1.in_base_pos < 0) {
1379                 /*
1380                  * skipping + discarding content.
1381                  */
1382                 ret = ceph_tcp_recvmsg(con->sock, NULL, -con->v1.in_base_pos);
1383                 if (ret <= 0)
1384                         goto out;
1385                 dout("skipped %d / %d bytes\n", ret, -con->v1.in_base_pos);
1386                 con->v1.in_base_pos += ret;
1387                 if (con->v1.in_base_pos)
1388                         goto more;
1389         }
1390         if (con->v1.in_tag == CEPH_MSGR_TAG_READY) {
1391                 /*
1392                  * what's next?
1393                  */
1394                 ret = ceph_tcp_recvmsg(con->sock, &con->v1.in_tag, 1);
1395                 if (ret <= 0)
1396                         goto out;
1397                 dout("try_read got tag %d\n", con->v1.in_tag);
1398                 switch (con->v1.in_tag) {
1399                 case CEPH_MSGR_TAG_MSG:
1400                         prepare_read_message(con);
1401                         break;
1402                 case CEPH_MSGR_TAG_ACK:
1403                         prepare_read_ack(con);
1404                         break;
1405                 case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
1406                         prepare_read_keepalive_ack(con);
1407                         break;
1408                 case CEPH_MSGR_TAG_CLOSE:
1409                         ceph_con_close_socket(con);
1410                         con->state = CEPH_CON_S_CLOSED;
1411                         goto out;
1412                 default:
1413                         goto bad_tag;
1414                 }
1415         }
1416         if (con->v1.in_tag == CEPH_MSGR_TAG_MSG) {
1417                 ret = read_partial_message(con);
1418                 if (ret <= 0) {
1419                         switch (ret) {
1420                         case -EBADMSG:
1421                                 con->error_msg = "bad crc/signature";
1422                                 fallthrough;
1423                         case -EBADE:
1424                                 ret = -EIO;
1425                                 break;
1426                         case -EIO:
1427                                 con->error_msg = "io error";
1428                                 break;
1429                         }
1430                         goto out;
1431                 }
1432                 if (con->v1.in_tag == CEPH_MSGR_TAG_READY)
1433                         goto more;
1434                 ceph_con_process_message(con);
1435                 if (con->state == CEPH_CON_S_OPEN)
1436                         prepare_read_tag(con);
1437                 goto more;
1438         }
1439         if (con->v1.in_tag == CEPH_MSGR_TAG_ACK ||
1440             con->v1.in_tag == CEPH_MSGR_TAG_SEQ) {
1441                 /*
1442                  * the final handshake seq exchange is semantically
1443                  * equivalent to an ACK
1444                  */
1445                 ret = read_partial_ack(con);
1446                 if (ret <= 0)
1447                         goto out;
1448                 process_ack(con);
1449                 goto more;
1450         }
1451         if (con->v1.in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
1452                 ret = read_keepalive_ack(con);
1453                 if (ret <= 0)
1454                         goto out;
1455                 goto more;
1456         }
1457
1458 out:
1459         dout("try_read done on %p ret %d\n", con, ret);
1460         return ret;
1461
1462 bad_tag:
1463         pr_err("try_read bad tag %d\n", con->v1.in_tag);
1464         con->error_msg = "protocol error, garbage tag";
1465         ret = -1;
1466         goto out;
1467 }
1468
1469 /*
1470  * Write something to the socket.  Called in a worker thread when the
1471  * socket appears to be writeable and we have something ready to send.
1472  */
1473 int ceph_con_v1_try_write(struct ceph_connection *con)
1474 {
1475         int ret = 1;
1476
1477         dout("try_write start %p state %d\n", con, con->state);
1478         if (con->state != CEPH_CON_S_PREOPEN &&
1479             con->state != CEPH_CON_S_V1_BANNER &&
1480             con->state != CEPH_CON_S_V1_CONNECT_MSG &&
1481             con->state != CEPH_CON_S_OPEN)
1482                 return 0;
1483
1484         /* open the socket first? */
1485         if (con->state == CEPH_CON_S_PREOPEN) {
1486                 BUG_ON(con->sock);
1487                 con->state = CEPH_CON_S_V1_BANNER;
1488
1489                 con_out_kvec_reset(con);
1490                 prepare_write_banner(con);
1491                 prepare_read_banner(con);
1492
1493                 BUG_ON(con->in_msg);
1494                 con->v1.in_tag = CEPH_MSGR_TAG_READY;
1495                 dout("try_write initiating connect on %p new state %d\n",
1496                      con, con->state);
1497                 ret = ceph_tcp_connect(con);
1498                 if (ret < 0) {
1499                         con->error_msg = "connect error";
1500                         goto out;
1501                 }
1502         }
1503
1504 more:
1505         dout("try_write out_kvec_bytes %d\n", con->v1.out_kvec_bytes);
1506         BUG_ON(!con->sock);
1507
1508         /* kvec data queued? */
1509         if (con->v1.out_kvec_left) {
1510                 ret = write_partial_kvec(con);
1511                 if (ret <= 0)
1512                         goto out;
1513         }
1514         if (con->v1.out_skip) {
1515                 ret = write_partial_skip(con);
1516                 if (ret <= 0)
1517                         goto out;
1518         }
1519
1520         /* msg pages? */
1521         if (con->out_msg) {
1522                 if (con->v1.out_msg_done) {
1523                         ceph_msg_put(con->out_msg);
1524                         con->out_msg = NULL;   /* we're done with this one */
1525                         goto do_next;
1526                 }
1527
1528                 ret = write_partial_message_data(con);
1529                 if (ret == 1)
1530                         goto more;  /* we need to send the footer, too! */
1531                 if (ret == 0)
1532                         goto out;
1533                 if (ret < 0) {
1534                         dout("try_write write_partial_message_data err %d\n",
1535                              ret);
1536                         goto out;
1537                 }
1538         }
1539
1540 do_next:
1541         if (con->state == CEPH_CON_S_OPEN) {
1542                 if (ceph_con_flag_test_and_clear(con,
1543                                 CEPH_CON_F_KEEPALIVE_PENDING)) {
1544                         prepare_write_keepalive(con);
1545                         goto more;
1546                 }
1547                 /* is anything else pending? */
1548                 if (!list_empty(&con->out_queue)) {
1549                         prepare_write_message(con);
1550                         goto more;
1551                 }
1552                 if (con->in_seq > con->in_seq_acked) {
1553                         prepare_write_ack(con);
1554                         goto more;
1555                 }
1556         }
1557
1558         /* Nothing to do! */
1559         ceph_con_flag_clear(con, CEPH_CON_F_WRITE_PENDING);
1560         dout("try_write nothing else to write.\n");
1561         ret = 0;
1562 out:
1563         dout("try_write done on %p ret %d\n", con, ret);
1564         return ret;
1565 }
1566
1567 void ceph_con_v1_revoke(struct ceph_connection *con)
1568 {
1569         struct ceph_msg *msg = con->out_msg;
1570
1571         WARN_ON(con->v1.out_skip);
1572         /* footer */
1573         if (con->v1.out_msg_done) {
1574                 con->v1.out_skip += con_out_kvec_skip(con);
1575         } else {
1576                 WARN_ON(!msg->data_length);
1577                 con->v1.out_skip += sizeof_footer(con);
1578         }
1579         /* data, middle, front */
1580         if (msg->data_length)
1581                 con->v1.out_skip += msg->cursor.total_resid;
1582         if (msg->middle)
1583                 con->v1.out_skip += con_out_kvec_skip(con);
1584         con->v1.out_skip += con_out_kvec_skip(con);
1585
1586         dout("%s con %p out_kvec_bytes %d out_skip %d\n", __func__, con,
1587              con->v1.out_kvec_bytes, con->v1.out_skip);
1588 }
1589
1590 void ceph_con_v1_revoke_incoming(struct ceph_connection *con)
1591 {
1592         unsigned int front_len = le32_to_cpu(con->v1.in_hdr.front_len);
1593         unsigned int middle_len = le32_to_cpu(con->v1.in_hdr.middle_len);
1594         unsigned int data_len = le32_to_cpu(con->v1.in_hdr.data_len);
1595
1596         /* skip rest of message */
1597         con->v1.in_base_pos = con->v1.in_base_pos -
1598                         sizeof(struct ceph_msg_header) -
1599                         front_len -
1600                         middle_len -
1601                         data_len -
1602                         sizeof(struct ceph_msg_footer);
1603
1604         con->v1.in_tag = CEPH_MSGR_TAG_READY;
1605         con->in_seq++;
1606
1607         dout("%s con %p in_base_pos %d\n", __func__, con, con->v1.in_base_pos);
1608 }
1609
1610 bool ceph_con_v1_opened(struct ceph_connection *con)
1611 {
1612         return con->v1.connect_seq;
1613 }
1614
1615 void ceph_con_v1_reset_session(struct ceph_connection *con)
1616 {
1617         con->v1.connect_seq = 0;
1618         con->v1.peer_global_seq = 0;
1619 }
1620
1621 void ceph_con_v1_reset_protocol(struct ceph_connection *con)
1622 {
1623         con->v1.out_skip = 0;
1624 }