rxrpc: Implement a mechanism to send an event notification to a call
authorDavid Howells <dhowells@redhat.com>
Mon, 10 Oct 2022 14:51:39 +0000 (15:51 +0100)
committerDavid Howells <dhowells@redhat.com>
Thu, 1 Dec 2022 13:36:41 +0000 (13:36 +0000)
Provide a means by which an event notification can be sent to a call such
that the I/O thread can process it rather than it being done in a separate
workqueue.  This will allow a lot of locking to be removed.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/call_object.c
net/rxrpc/input.c
net/rxrpc/io_thread.c
net/rxrpc/local_object.c

index 44a9be9836f985e3d1642a18f96403905b8462a4..0b12d96c7921d85365d390ebb684b1bceae54457 100644 (file)
 /*
  * Declare tracing information enums and their string mappings for display.
  */
+#define rxrpc_call_poke_traces \
+       EM(rxrpc_call_poke_error,               "Error")        \
+       EM(rxrpc_call_poke_idle,                "Idle")         \
+       EM(rxrpc_call_poke_start,               "Start")        \
+       EM(rxrpc_call_poke_timer,               "Timer")        \
+       E_(rxrpc_call_poke_timer_now,           "Timer-now")
+
 #define rxrpc_skb_traces \
        EM(rxrpc_skb_eaten_by_unshare,          "ETN unshare  ") \
        EM(rxrpc_skb_eaten_by_unshare_nomem,    "ETN unshar-nm") \
        EM(rxrpc_call_get_input,                "GET input   ") \
        EM(rxrpc_call_get_kernel_service,       "GET krnl-srv") \
        EM(rxrpc_call_get_notify_socket,        "GET notify  ") \
+       EM(rxrpc_call_get_poke,                 "GET poke    ") \
        EM(rxrpc_call_get_recvmsg,              "GET recvmsg ") \
        EM(rxrpc_call_get_release_sock,         "GET rel-sock") \
        EM(rxrpc_call_get_sendmsg,              "GET sendmsg ") \
        EM(rxrpc_call_put_discard_prealloc,     "PUT disc-pre") \
        EM(rxrpc_call_put_input,                "PUT input   ") \
        EM(rxrpc_call_put_kernel,               "PUT kernel  ") \
+       EM(rxrpc_call_put_poke,                 "PUT poke    ") \
        EM(rxrpc_call_put_recvmsg,              "PUT recvmsg ") \
        EM(rxrpc_call_put_release_sock,         "PUT rls-sock") \
        EM(rxrpc_call_put_release_sock_tba,     "PUT rls-sk-a") \
 #define E_(a, b) a
 
 enum rxrpc_bundle_trace                { rxrpc_bundle_traces } __mode(byte);
+enum rxrpc_call_poke_trace     { rxrpc_call_poke_traces } __mode(byte);
 enum rxrpc_call_trace          { rxrpc_call_traces } __mode(byte);
 enum rxrpc_client_trace                { rxrpc_client_traces } __mode(byte);
 enum rxrpc_congest_change      { rxrpc_congest_changes } __mode(byte);
@@ -408,6 +418,7 @@ enum rxrpc_txqueue_trace    { rxrpc_txqueue_traces } __mode(byte);
 #define E_(a, b) TRACE_DEFINE_ENUM(a);
 
 rxrpc_bundle_traces;
+rxrpc_call_poke_traces;
 rxrpc_call_traces;
 rxrpc_client_traces;
 rxrpc_congest_changes;
@@ -1747,6 +1758,47 @@ TRACE_EVENT(rxrpc_txbuf,
                      __entry->ref)
            );
 
+TRACE_EVENT(rxrpc_poke_call,
+           TP_PROTO(struct rxrpc_call *call, bool busy,
+                    enum rxrpc_call_poke_trace what),
+
+           TP_ARGS(call, busy, what),
+
+           TP_STRUCT__entry(
+                   __field(unsigned int,               call_debug_id   )
+                   __field(bool,                       busy            )
+                   __field(enum rxrpc_call_poke_trace, what            )
+                            ),
+
+           TP_fast_assign(
+                   __entry->call_debug_id = call->debug_id;
+                   __entry->busy = busy;
+                   __entry->what = what;
+                          ),
+
+           TP_printk("c=%08x %s%s",
+                     __entry->call_debug_id,
+                     __print_symbolic(__entry->what, rxrpc_call_poke_traces),
+                     __entry->busy ? "!" : "")
+           );
+
+TRACE_EVENT(rxrpc_call_poked,
+           TP_PROTO(struct rxrpc_call *call),
+
+           TP_ARGS(call),
+
+           TP_STRUCT__entry(
+                   __field(unsigned int,               call_debug_id   )
+                            ),
+
+           TP_fast_assign(
+                   __entry->call_debug_id = call->debug_id;
+                          ),
+
+           TP_printk("c=%08x",
+                     __entry->call_debug_id)
+           );
+
 #undef EM
 #undef E_
 #endif /* _TRACE_RXRPC_H */
index 654e9dab107c521cce131ea8c326ee3110d71c58..a80655fa9dfb0a04dd79b07ac000a9bf2b469fac 100644 (file)
@@ -292,6 +292,7 @@ struct rxrpc_local {
        struct sk_buff_head     reject_queue;   /* packets awaiting rejection */
        struct sk_buff_head     event_queue;    /* endpoint event packets awaiting processing */
        struct sk_buff_head     rx_queue;       /* Received packets */
+       struct list_head        call_attend_q;  /* Calls requiring immediate attention */
        struct rb_root          client_bundles; /* Client connection bundles by socket params */
        spinlock_t              client_bundles_lock; /* Lock for client_bundles */
        spinlock_t              lock;           /* access lock */
@@ -616,6 +617,7 @@ struct rxrpc_call {
        struct list_head        recvmsg_link;   /* Link in rx->recvmsg_q */
        struct list_head        sock_link;      /* Link in rx->sock_calls */
        struct rb_node          sock_node;      /* Node in rx->calls */
+       struct list_head        attend_link;    /* Link in local->call_attend_q */
        struct rxrpc_txbuf      *tx_pending;    /* Tx buffer being filled */
        wait_queue_head_t       waitq;          /* Wait queue for channel or Tx */
        s64                     tx_total_len;   /* Total length left to be transmitted (or -1) */
@@ -843,6 +845,7 @@ extern const char *const rxrpc_call_states[];
 extern const char *const rxrpc_call_completions[];
 extern struct kmem_cache *rxrpc_call_jar;
 
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what);
 struct rxrpc_call *rxrpc_find_call_by_user_ID(struct rxrpc_sock *, unsigned long);
 struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *, gfp_t, unsigned int);
 struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *,
@@ -951,7 +954,7 @@ void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 /*
  * input.c
  */
-void rxrpc_input_call_packet(struct rxrpc_call *, struct sk_buff *);
+void rxrpc_input_call_event(struct rxrpc_call *, struct sk_buff *);
 void rxrpc_input_implicit_end_call(struct rxrpc_sock *, struct rxrpc_connection *,
                                   struct rxrpc_call *);
 
index f6d1b3a6f8c6348fe25fa358b6c214571faeabe9..997641e3d1c8cbb12348fe737586a935dafe16a8 100644 (file)
@@ -45,6 +45,29 @@ static struct semaphore rxrpc_call_limiter =
 static struct semaphore rxrpc_kernel_call_limiter =
        __SEMAPHORE_INITIALIZER(rxrpc_kernel_call_limiter, 1000);
 
+void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
+{
+       struct rxrpc_local *local;
+       struct rxrpc_peer *peer = call->peer;
+       bool busy;
+
+       if (WARN_ON_ONCE(!peer))
+               return;
+       local = peer->local;
+
+       if (call->state < RXRPC_CALL_COMPLETE) {
+               spin_lock_bh(&local->lock);
+               busy = !list_empty(&call->attend_link);
+               trace_rxrpc_poke_call(call, busy, what);
+               if (!busy) {
+                       rxrpc_get_call(call, rxrpc_call_get_poke);
+                       list_add_tail(&call->attend_link, &local->call_attend_q);
+               }
+               spin_unlock_bh(&local->lock);
+               rxrpc_wake_up_io_thread(local);
+       }
+}
+
 static void rxrpc_call_timer_expired(struct timer_list *t)
 {
        struct rxrpc_call *call = from_timer(call, t, timer);
@@ -137,6 +160,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
        INIT_LIST_HEAD(&call->accept_link);
        INIT_LIST_HEAD(&call->recvmsg_link);
        INIT_LIST_HEAD(&call->sock_link);
+       INIT_LIST_HEAD(&call->attend_link);
        INIT_LIST_HEAD(&call->tx_buffer);
        skb_queue_head_init(&call->recvmsg_queue);
        skb_queue_head_init(&call->rx_oos_queue);
index 13c52145a926cc1042749d46227afe68ca7764c2..036f02371051549c1fb97598622f5829436b54c1 100644 (file)
@@ -1017,8 +1017,7 @@ static void rxrpc_input_abort(struct rxrpc_call *call, struct sk_buff *skb)
 /*
  * Process an incoming call packet.
  */
-void rxrpc_input_call_packet(struct rxrpc_call *call,
-                                   struct sk_buff *skb)
+void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
 {
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
        unsigned long timo;
index 416c6101cf78150da0e60f6f5a673258791f50c4..cc249bc6b8cd5b70f67d1d1da3b86e6034d0d11f 100644 (file)
@@ -366,7 +366,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *skb)
        /* Process a call packet; this either discards or passes on the ref
         * elsewhere.
         */
-       rxrpc_input_call_packet(call, skb);
+       rxrpc_input_call_event(call, skb);
        goto out;
 
 discard:
@@ -413,6 +413,7 @@ int rxrpc_io_thread(void *data)
 {
        struct sk_buff_head rx_queue;
        struct rxrpc_local *local = data;
+       struct rxrpc_call *call;
        struct sk_buff *skb;
 
        skb_queue_head_init(&rx_queue);
@@ -422,6 +423,20 @@ int rxrpc_io_thread(void *data)
        for (;;) {
                rxrpc_inc_stat(local->rxnet, stat_io_loop);
 
+               /* Deal with calls that want immediate attention. */
+               if ((call = list_first_entry_or_null(&local->call_attend_q,
+                                                    struct rxrpc_call,
+                                                    attend_link))) {
+                       spin_lock_bh(&local->lock);
+                       list_del_init(&call->attend_link);
+                       spin_unlock_bh(&local->lock);
+
+                       trace_rxrpc_call_poked(call);
+                       rxrpc_input_call_event(call, NULL);
+                       rxrpc_put_call(call, rxrpc_call_put_poke);
+                       continue;
+               }
+
                /* Process received packets and errors. */
                if ((skb = __skb_dequeue(&rx_queue))) {
                        switch (skb->mark) {
@@ -450,7 +465,8 @@ int rxrpc_io_thread(void *data)
                }
 
                set_current_state(TASK_INTERRUPTIBLE);
-               if (!skb_queue_empty(&local->rx_queue)) {
+               if (!skb_queue_empty(&local->rx_queue) ||
+                   !list_empty(&local->call_attend_q)) {
                        __set_current_state(TASK_RUNNING);
                        continue;
                }
index 6b4d77219f3679c64ab47f3cb0299b7f6ce1f52d..03f491cc23efb604519e77743ffbbc5afb0e8a86 100644 (file)
@@ -104,6 +104,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct rxrpc_net *rxnet,
                skb_queue_head_init(&local->reject_queue);
                skb_queue_head_init(&local->event_queue);
                skb_queue_head_init(&local->rx_queue);
+               INIT_LIST_HEAD(&local->call_attend_q);
                local->client_bundles = RB_ROOT;
                spin_lock_init(&local->client_bundles_lock);
                spin_lock_init(&local->lock);