udp: keep the sk_receive_queue held when splicing
authorPaolo Abeni <pabeni@redhat.com>
Tue, 16 May 2017 09:20:15 +0000 (11:20 +0200)
committerDavid S. Miller <davem@davemloft.net>
Tue, 16 May 2017 19:41:30 +0000 (15:41 -0400)
On packet reception, when we are forced to splice the
sk_receive_queue, we can keep the related lock held, so
that we can avoid re-acquiring it, if fwd memory
scheduling is required.

v1 -> v2:
  the rx_queue_lock_held param in udp_rmem_release() is
  now a bool

Signed-off-by: Paolo Abeni <pabeni@redhat.com>
Acked-by: Eric Dumazet <edumazet@google.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/ipv4/udp.c

index 492c76be9230f8d99020a59be7675423a92e4494..7bd56c9889b3ffe2d1b5931f0a98b9fa9514c228 100644 (file)
@@ -1164,7 +1164,8 @@ out:
 }
 
 /* fully reclaim rmem/fwd memory allocated for skb */
-static void udp_rmem_release(struct sock *sk, int size, int partial)
+static void udp_rmem_release(struct sock *sk, int size, int partial,
+                            bool rx_queue_lock_held)
 {
        struct udp_sock *up = udp_sk(sk);
        struct sk_buff_head *sk_queue;
@@ -1181,9 +1182,13 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
        }
        up->forward_deficit = 0;
 
-       /* acquire the sk_receive_queue for fwd allocated memory scheduling */
+       /* acquire the sk_receive_queue for fwd allocated memory scheduling,
+        * if the called don't held it already
+        */
        sk_queue = &sk->sk_receive_queue;
-       spin_lock(&sk_queue->lock);
+       if (!rx_queue_lock_held)
+               spin_lock(&sk_queue->lock);
+
 
        sk->sk_forward_alloc += size;
        amt = (sk->sk_forward_alloc - partial) & ~(SK_MEM_QUANTUM - 1);
@@ -1197,7 +1202,8 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
        /* this can save us from acquiring the rx queue lock on next receive */
        skb_queue_splice_tail_init(sk_queue, &up->reader_queue);
 
-       spin_unlock(&sk_queue->lock);
+       if (!rx_queue_lock_held)
+               spin_unlock(&sk_queue->lock);
 }
 
 /* Note: called with reader_queue.lock held.
@@ -1207,10 +1213,16 @@ static void udp_rmem_release(struct sock *sk, int size, int partial)
  */
 void udp_skb_destructor(struct sock *sk, struct sk_buff *skb)
 {
-       udp_rmem_release(sk, skb->dev_scratch, 1);
+       udp_rmem_release(sk, skb->dev_scratch, 1, false);
 }
 EXPORT_SYMBOL(udp_skb_destructor);
 
+/* as above, but the caller held the rx queue lock, too */
+void udp_skb_dtor_locked(struct sock *sk, struct sk_buff *skb)
+{
+       udp_rmem_release(sk, skb->dev_scratch, 1, true);
+}
+
 /* Idea of busylocks is to let producers grab an extra spinlock
  * to relieve pressure on the receive_queue spinlock shared by consumer.
  * Under flood, this means that only one producer can be in line
@@ -1325,7 +1337,7 @@ void udp_destruct_sock(struct sock *sk)
                total += skb->truesize;
                kfree_skb(skb);
        }
-       udp_rmem_release(sk, total, 0);
+       udp_rmem_release(sk, total, 0, true);
 
        inet_sock_destruct(sk);
 }
@@ -1397,7 +1409,7 @@ static int first_packet_length(struct sock *sk)
        }
        res = skb ? skb->len : -1;
        if (total)
-               udp_rmem_release(sk, total, 1);
+               udp_rmem_release(sk, total, 1, false);
        spin_unlock_bh(&rcvq->lock);
        return res;
 }
@@ -1471,16 +1483,20 @@ struct sk_buff *__skb_recv_udp(struct sock *sk, unsigned int flags,
                                goto busy_check;
                        }
 
-                       /* refill the reader queue and walk it again */
+                       /* refill the reader queue and walk it again
+                        * keep both queues locked to avoid re-acquiring
+                        * the sk_receive_queue lock if fwd memory scheduling
+                        * is needed.
+                        */
                        _off = *off;
                        spin_lock(&sk_queue->lock);
                        skb_queue_splice_tail_init(sk_queue, queue);
-                       spin_unlock(&sk_queue->lock);
 
                        skb = __skb_try_recv_from_queue(sk, queue, flags,
-                                                       udp_skb_destructor,
+                                                       udp_skb_dtor_locked,
                                                        peeked, &_off, err,
                                                        &last);
+                       spin_unlock(&sk_queue->lock);
                        spin_unlock_bh(&queue->lock);
                        if (skb) {
                                *off = _off;