rxrpc: Move call state changes from sendmsg to I/O thread
authorDavid Howells <dhowells@redhat.com>
Fri, 11 Nov 2022 08:35:36 +0000 (08:35 +0000)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:33 +0000 (09:43 +0000)
Move all the call state changes that are made in rxrpc_sendmsg() to the I/O
thread.  This is a step towards removing the call state lock.

This requires the switch to the RXRPC_CALL_CLIENT_AWAIT_REPLY and
RXRPC_CALL_SERVER_SEND_REPLY states to be done when the last packet is
decanted from ->tx_sendmsg to ->tx_buffer in the I/O thread, not when it is
added to ->tx_sendmsg by sendmsg().

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

Documentation/networking/rxrpc.rst
net/rxrpc/call_event.c
net/rxrpc/sendmsg.c

index 39494a6ea739cf02217f088721ff23e1751521fe..e1af54424192b375f701992a95ffeeb68a76de8c 100644 (file)
@@ -880,8 +880,8 @@ The kernel interface functions are as follows:
 
      notify_end_rx can be NULL or it can be used to specify a function to be
      called when the call changes state to end the Tx phase.  This function is
-     called with the call-state spinlock held to prevent any reply or final ACK
-     from being delivered first.
+     called with a spinlock held to prevent the last DATA packet from being
+     transmitted until the function returns.
 
  (#) Receive data from a call::
 
index 695aeb70d1a6bcd396f0b15a0887ceb2823730a4..2e3c01060d59a977dd51839d0b1d9553c9572867 100644 (file)
@@ -251,6 +251,50 @@ out:
        _leave("");
 }
 
+/*
+ * Start transmitting the reply to a service.  This cancels the need to ACK the
+ * request if we haven't yet done so.
+ */
+static void rxrpc_begin_service_reply(struct rxrpc_call *call)
+{
+       unsigned long now;
+
+       write_lock(&call->state_lock);
+
+       if (call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
+               now = jiffies;
+               call->state = RXRPC_CALL_SERVER_SEND_REPLY;
+               WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
+               if (call->ackr_reason == RXRPC_ACK_DELAY)
+                       call->ackr_reason = 0;
+               trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
+       }
+
+       write_unlock(&call->state_lock);
+}
+
+/*
+ * Close the transmission phase.  After this point there is no more data to be
+ * transmitted in the call.
+ */
+static void rxrpc_close_tx_phase(struct rxrpc_call *call)
+{
+       _debug("________awaiting reply/ACK__________");
+
+       write_lock(&call->state_lock);
+       switch (call->state) {
+       case RXRPC_CALL_CLIENT_SEND_REQUEST:
+               call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
+               break;
+       case RXRPC_CALL_SERVER_SEND_REPLY:
+               call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
+               break;
+       default:
+               break;
+       }
+       write_unlock(&call->state_lock);
+}
+
 static bool rxrpc_tx_window_has_space(struct rxrpc_call *call)
 {
        unsigned int winsize = min_t(unsigned int, call->tx_winsize,
@@ -285,6 +329,9 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
                call->tx_top = txb->seq;
                list_add_tail(&txb->call_link, &call->tx_buffer);
 
+               if (txb->wire.flags & RXRPC_LAST_PACKET)
+                       rxrpc_close_tx_phase(call);
+
                rxrpc_transmit_one(call, txb);
 
                if (!rxrpc_tx_window_has_space(call))
@@ -298,12 +345,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
        case RXRPC_CALL_SERVER_ACK_REQUEST:
                if (list_empty(&call->tx_sendmsg))
                        return;
+               rxrpc_begin_service_reply(call);
                fallthrough;
 
        case RXRPC_CALL_SERVER_SEND_REPLY:
-       case RXRPC_CALL_SERVER_AWAIT_ACK:
        case RXRPC_CALL_CLIENT_SEND_REQUEST:
-       case RXRPC_CALL_CLIENT_AWAIT_REPLY:
                if (!rxrpc_tx_window_has_space(call))
                        return;
                if (list_empty(&call->tx_sendmsg)) {
index f0b5822f3e04c62b8cc85e165274b310e3f67853..0428528abbf499cecf2d690ca3625acbc0e9231d 100644 (file)
@@ -189,7 +189,6 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
                               struct rxrpc_txbuf *txb,
                               rxrpc_notify_end_tx_t notify_end_tx)
 {
-       unsigned long now;
        rxrpc_seq_t seq = txb->seq;
        bool last = test_bit(RXRPC_TXBUF_LAST, &txb->flags), poke;
 
@@ -212,36 +211,10 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
        poke = list_empty(&call->tx_sendmsg);
        list_add_tail(&txb->call_link, &call->tx_sendmsg);
        call->tx_prepared = seq;
+       if (last)
+               rxrpc_notify_end_tx(rx, call, notify_end_tx);
        spin_unlock(&call->tx_lock);
 
-       if (last || call->state == RXRPC_CALL_SERVER_ACK_REQUEST) {
-               _debug("________awaiting reply/ACK__________");
-               write_lock(&call->state_lock);
-               switch (call->state) {
-               case RXRPC_CALL_CLIENT_SEND_REQUEST:
-                       call->state = RXRPC_CALL_CLIENT_AWAIT_REPLY;
-                       rxrpc_notify_end_tx(rx, call, notify_end_tx);
-                       break;
-               case RXRPC_CALL_SERVER_ACK_REQUEST:
-                       call->state = RXRPC_CALL_SERVER_SEND_REPLY;
-                       now = jiffies;
-                       WRITE_ONCE(call->delay_ack_at, now + MAX_JIFFY_OFFSET);
-                       if (call->ackr_reason == RXRPC_ACK_DELAY)
-                               call->ackr_reason = 0;
-                       trace_rxrpc_timer(call, rxrpc_timer_init_for_send_reply, now);
-                       if (!last)
-                               break;
-                       fallthrough;
-               case RXRPC_CALL_SERVER_SEND_REPLY:
-                       call->state = RXRPC_CALL_SERVER_AWAIT_ACK;
-                       rxrpc_notify_end_tx(rx, call, notify_end_tx);
-                       break;
-               default:
-                       break;
-               }
-               write_unlock(&call->state_lock);
-       }
-
        if (poke)
                rxrpc_poke_call(call, rxrpc_call_poke_start);
 }
@@ -280,8 +253,13 @@ reload:
        ret = -EPROTO;
        if (state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
            state != RXRPC_CALL_SERVER_ACK_REQUEST &&
-           state != RXRPC_CALL_SERVER_SEND_REPLY)
+           state != RXRPC_CALL_SERVER_SEND_REPLY) {
+               /* Request phase complete for this client call */
+               trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
+                                 call->cid, call->call_id, call->rx_consumed,
+                                 0, -EPROTO);
                goto maybe_error;
+       }
 
        ret = -EMSGSIZE;
        if (call->tx_total_len != -1) {
@@ -573,7 +551,6 @@ rxrpc_new_client_call_for_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg,
 int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
        __releases(&rx->sk.sk_lock.slock)
 {
-       enum rxrpc_call_state state;
        struct rxrpc_call *call;
        unsigned long now, j;
        bool dropped_lock = false;
@@ -672,11 +649,7 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
                break;
        }
 
-       state = rxrpc_call_state(call);
-       _debug("CALL %d USR %lx ST %d on CONN %p",
-              call->debug_id, call->user_call_ID, state, call->conn);
-
-       if (state >= RXRPC_CALL_COMPLETE) {
+       if (rxrpc_call_is_complete(call)) {
                /* it's too late for this call */
                ret = -ESHUTDOWN;
        } else if (p.command == RXRPC_CMD_SEND_ABORT) {
@@ -722,7 +695,7 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
        bool dropped_lock = false;
        int ret;
 
-       _enter("{%d,%s},", call->debug_id, rxrpc_call_states[call->state]);
+       _enter("{%d},", call->debug_id);
 
        ASSERTCMP(msg->msg_name, ==, NULL);
        ASSERTCMP(msg->msg_control, ==, NULL);
@@ -732,26 +705,10 @@ int rxrpc_kernel_send_data(struct socket *sock, struct rxrpc_call *call,
        _debug("CALL %d USR %lx ST %d on CONN %p",
               call->debug_id, call->user_call_ID, call->state, call->conn);
 
-       switch (rxrpc_call_state(call)) {
-       case RXRPC_CALL_CLIENT_SEND_REQUEST:
-       case RXRPC_CALL_SERVER_ACK_REQUEST:
-       case RXRPC_CALL_SERVER_SEND_REPLY:
-               ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
-                                     notify_end_tx, &dropped_lock);
-               break;
-       case RXRPC_CALL_COMPLETE:
-               read_lock(&call->state_lock);
+       ret = rxrpc_send_data(rxrpc_sk(sock->sk), call, msg, len,
+                             notify_end_tx, &dropped_lock);
+       if (ret == -ESHUTDOWN)
                ret = call->error;
-               read_unlock(&call->state_lock);
-               break;
-       default:
-               /* Request phase complete for this client call */
-               trace_rxrpc_abort(call->debug_id, rxrpc_sendmsg_late_send,
-                                 call->cid, call->call_id, call->rx_consumed,
-                                 0, -EPROTO);
-               ret = -EPROTO;
-               break;
-       }
 
        if (!dropped_lock)
                mutex_unlock(&call->user_mutex);