tipc: improve function tipc_wait_for_rcvmsg()
[sfrench/cifs-2.6.git] / net / tipc / socket.c
index 8f34db2a97857bbdf0b3fc9bf33353a6e1740912..684f2125fc6b6ed45fd47cf6fff0c2d3ec971cab 100644 (file)
@@ -46,6 +46,7 @@
 #include "bcast.h"
 #include "netlink.h"
 #include "group.h"
+#include "trace.h"
 
 #define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
 #define CONN_PROBING_INTV      msecs_to_jiffies(3600000)  /* [ms] => 1 h */
@@ -233,6 +234,7 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
  */
 static void tsk_advance_rx_queue(struct sock *sk)
 {
+       trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
@@ -247,6 +249,7 @@ static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
        if (!tipc_msg_reverse(onode, &skb, err))
                return;
 
+       trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
        dnode = msg_destnode(buf_msg(skb));
        selector = msg_origport(buf_msg(skb));
        tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
@@ -385,7 +388,7 @@ static int tipc_sk_sock_err(struct socket *sock, long *timeout)
                rc_ = tipc_sk_sock_err((sock_), timeo_);                       \
                if (rc_)                                                       \
                        break;                                                 \
-               prepare_to_wait(sk_sleep(sk_), &wait_, TASK_INTERRUPTIBLE);    \
+               add_wait_queue(sk_sleep(sk_), &wait_);                         \
                release_sock(sk_);                                             \
                *(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
                sched_annotate_sleep();                                        \
@@ -482,6 +485,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
                        tsk_set_unreliable(tsk, true);
        }
 
+       trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
        return 0;
 }
 
@@ -571,6 +575,7 @@ static int tipc_release(struct socket *sock)
        tsk = tipc_sk(sk);
        lock_sock(sk);
 
+       trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
        __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
        sk->sk_shutdown = SHUTDOWN_MASK;
        tipc_sk_leave(tsk);
@@ -718,6 +723,7 @@ static __poll_t tipc_poll(struct file *file, struct socket *sock,
        __poll_t revents = 0;
 
        sock_poll_wait(file, sock, wait);
+       trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
 
        if (sk->sk_shutdown & RCV_SHUTDOWN)
                revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
@@ -804,9 +810,12 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
        rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
        /* Send message if build was successful */
-       if (unlikely(rc == dlen))
+       if (unlikely(rc == dlen)) {
+               trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
+                                       TIPC_DUMP_SK_SNDQ, " ");
                rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
                                     &tsk->cong_link_cnt);
+       }
 
        tipc_nlist_purge(&dsts);
 
@@ -1212,8 +1221,10 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
        bool conn_cong;
 
        /* Ignore if connection cannot be validated: */
-       if (!tsk_peer_msg(tsk, hdr))
+       if (!tsk_peer_msg(tsk, hdr)) {
+               trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
                goto exit;
+       }
 
        if (unlikely(msg_errcode(hdr))) {
                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
@@ -1381,6 +1392,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
        if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue)))
                return -ENOMEM;
 
+       trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
        if (unlikely(rc == -ELINKCONG)) {
                tipc_dest_push(clinks, dnode, 0);
@@ -1458,6 +1470,8 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
                if (unlikely(rc != send))
                        break;
 
+               trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+                                        TIPC_DUMP_SK_SNDQ, " ");
                rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
                if (unlikely(rc == -ELINKCONG)) {
                        tsk->cong_link_cnt = 1;
@@ -1663,7 +1677,7 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk)
 static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
 {
        struct sock *sk = sock->sk;
-       DEFINE_WAIT(wait);
+       DEFINE_WAIT_FUNC(wait, woken_wake_function);
        long timeo = *timeop;
        int err = sock_error(sk);
 
@@ -1671,15 +1685,17 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
                return err;
 
        for (;;) {
-               prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
                        if (sk->sk_shutdown & RCV_SHUTDOWN) {
                                err = -ENOTCONN;
                                break;
                        }
+                       add_wait_queue(sk_sleep(sk), &wait);
                        release_sock(sk);
-                       timeo = schedule_timeout(timeo);
+                       timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
+                       sched_annotate_sleep();
                        lock_sock(sk);
+                       remove_wait_queue(sk_sleep(sk), &wait);
                }
                err = 0;
                if (!skb_queue_empty(&sk->sk_receive_queue))
@@ -1695,7 +1711,6 @@ static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
                if (err)
                        break;
        }
-       finish_wait(sk_sleep(sk), &wait);
        *timeop = timeo;
        return err;
 }
@@ -2132,6 +2147,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
        struct sk_buff_head inputq;
        int limit, err = TIPC_OK;
 
+       trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
        TIPC_SKB_CB(skb)->bytes_read = 0;
        __skb_queue_head_init(&inputq);
        __skb_queue_tail(&inputq, skb);
@@ -2151,17 +2167,25 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
                    (!grp && msg_in_group(hdr)))
                        err = TIPC_ERR_NO_PORT;
                else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
+                       trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
+                                          "err_overload2!");
                        atomic_inc(&sk->sk_drops);
                        err = TIPC_ERR_OVERLOAD;
                }
 
                if (unlikely(err)) {
-                       tipc_skb_reject(net, err, skb, xmitq);
+                       if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
+                               trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
+                                                     "@filter_rcv!");
+                               __skb_queue_tail(xmitq, skb);
+                       }
                        err = TIPC_OK;
                        continue;
                }
                __skb_queue_tail(&sk->sk_receive_queue, skb);
                skb_set_owner_r(skb, sk);
+               trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
+                                        "rcvq >90% allocated!");
                sk->sk_data_ready(sk);
        }
 }
@@ -2227,14 +2251,21 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
                if (!sk->sk_backlog.len)
                        atomic_set(dcnt, 0);
                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
-               if (likely(!sk_add_backlog(sk, skb, lim)))
+               if (likely(!sk_add_backlog(sk, skb, lim))) {
+                       trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
+                                                "bklg & rcvq >90% allocated!");
                        continue;
+               }
 
+               trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
                /* Overload => reject message back to sender */
                onode = tipc_own_addr(sock_net(sk));
                atomic_inc(&sk->sk_drops);
-               if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+               if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
+                       trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
+                                             "@sk_enqueue!");
                        __skb_queue_tail(xmitq, skb);
+               }
                break;
        }
 }
@@ -2283,6 +2314,8 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
                /* Prepare for message rejection */
                if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
                        continue;
+
+               trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
 xmit:
                dnode = msg_destnode(buf_msg(skb));
                tipc_node_xmit_skb(net, skb, dnode, dport);
@@ -2556,6 +2589,7 @@ static int tipc_shutdown(struct socket *sock, int how)
 
        lock_sock(sk);
 
+       trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
        __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
        sk->sk_shutdown = SEND_SHUTDOWN;
 
@@ -3572,3 +3606,187 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
 
        return skb->len;
 }
+
+/**
+ * tipc_sk_filtering - check if a socket should be traced
+ * @sk: the socket to be examined
+ * @sysctl_tipc_sk_filter[]: the socket tuple for filtering,
+ *  (portid, sock type, name type, name lower, name upper)
+ *
+ * Returns true if the socket meets the socket tuple data
+ * (value 0 = 'any') or when there is no tuple set (all = 0),
+ * otherwise false
+ */
+bool tipc_sk_filtering(struct sock *sk)
+{
+       struct tipc_sock *tsk;
+       struct publication *p;
+       u32 _port, _sktype, _type, _lower, _upper;
+       u32 type = 0, lower = 0, upper = 0;
+
+       if (!sk)
+               return true;
+
+       tsk = tipc_sk(sk);
+
+       _port = sysctl_tipc_sk_filter[0];
+       _sktype = sysctl_tipc_sk_filter[1];
+       _type = sysctl_tipc_sk_filter[2];
+       _lower = sysctl_tipc_sk_filter[3];
+       _upper = sysctl_tipc_sk_filter[4];
+
+       if (!_port && !_sktype && !_type && !_lower && !_upper)
+               return true;
+
+       if (_port)
+               return (_port == tsk->portid);
+
+       if (_sktype && _sktype != sk->sk_type)
+               return false;
+
+       if (tsk->published) {
+               p = list_first_entry_or_null(&tsk->publications,
+                                            struct publication, binding_sock);
+               if (p) {
+                       type = p->type;
+                       lower = p->lower;
+                       upper = p->upper;
+               }
+       }
+
+       if (!tipc_sk_type_connectionless(sk)) {
+               type = tsk->conn_type;
+               lower = tsk->conn_instance;
+               upper = tsk->conn_instance;
+       }
+
+       if ((_type && _type != type) || (_lower && _lower != lower) ||
+           (_upper && _upper != upper))
+               return false;
+
+       return true;
+}
+
+u32 tipc_sock_get_portid(struct sock *sk)
+{
+       return (sk) ? (tipc_sk(sk))->portid : 0;
+}
+
+/**
+ * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
+ *                     both the rcv and backlog queues are considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
+{
+       atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+       unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+       unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
+ *                     only the rcv queue is considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
+{
+       unsigned int lim = rcvbuf_limit(sk, skb);
+       unsigned int qsize = sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_dump - dump TIPC socket
+ * @sk: tipc sk to be dumped
+ * @dqueues: bitmask to decide if any socket queue to be dumped?
+ *           - TIPC_DUMP_NONE: don't dump socket queues
+ *           - TIPC_DUMP_SK_SNDQ: dump socket send queue
+ *           - TIPC_DUMP_SK_RCVQ: dump socket rcv queue
+ *           - TIPC_DUMP_SK_BKLGQ: dump socket backlog queue
+ *           - TIPC_DUMP_ALL: dump all the socket queues above
+ * @buf: returned buffer of dump data in format
+ */
+int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf)
+{
+       int i = 0;
+       size_t sz = (dqueues) ? SK_LMAX : SK_LMIN;
+       struct tipc_sock *tsk;
+       struct publication *p;
+       bool tsk_connected;
+
+       if (!sk) {
+               i += scnprintf(buf, sz, "sk data: (null)\n");
+               return i;
+       }
+
+       tsk = tipc_sk(sk);
+       tsk_connected = !tipc_sk_type_connectionless(sk);
+
+       i += scnprintf(buf, sz, "sk data: %u", sk->sk_type);
+       i += scnprintf(buf + i, sz - i, " %d", sk->sk_state);
+       i += scnprintf(buf + i, sz - i, " %x", tsk_own_node(tsk));
+       i += scnprintf(buf + i, sz - i, " %u", tsk->portid);
+       i += scnprintf(buf + i, sz - i, " | %u", tsk_connected);
+       if (tsk_connected) {
+               i += scnprintf(buf + i, sz - i, " %x", tsk_peer_node(tsk));
+               i += scnprintf(buf + i, sz - i, " %u", tsk_peer_port(tsk));
+               i += scnprintf(buf + i, sz - i, " %u", tsk->conn_type);
+               i += scnprintf(buf + i, sz - i, " %u", tsk->conn_instance);
+       }
+       i += scnprintf(buf + i, sz - i, " | %u", tsk->published);
+       if (tsk->published) {
+               p = list_first_entry_or_null(&tsk->publications,
+                                            struct publication, binding_sock);
+               i += scnprintf(buf + i, sz - i, " %u", (p) ? p->type : 0);
+               i += scnprintf(buf + i, sz - i, " %u", (p) ? p->lower : 0);
+               i += scnprintf(buf + i, sz - i, " %u", (p) ? p->upper : 0);
+       }
+       i += scnprintf(buf + i, sz - i, " | %u", tsk->snd_win);
+       i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_win);
+       i += scnprintf(buf + i, sz - i, " %u", tsk->max_pkt);
+       i += scnprintf(buf + i, sz - i, " %x", tsk->peer_caps);
+       i += scnprintf(buf + i, sz - i, " %u", tsk->cong_link_cnt);
+       i += scnprintf(buf + i, sz - i, " %u", tsk->snt_unacked);
+       i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_unacked);
+       i += scnprintf(buf + i, sz - i, " %u", atomic_read(&tsk->dupl_rcvcnt));
+       i += scnprintf(buf + i, sz - i, " %u", sk->sk_shutdown);
+       i += scnprintf(buf + i, sz - i, " | %d", sk_wmem_alloc_get(sk));
+       i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf);
+       i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk));
+       i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf);
+       i += scnprintf(buf + i, sz - i, " | %d\n", sk->sk_backlog.len);
+
+       if (dqueues & TIPC_DUMP_SK_SNDQ) {
+               i += scnprintf(buf + i, sz - i, "sk_write_queue: ");
+               i += tipc_list_dump(&sk->sk_write_queue, false, buf + i);
+       }
+
+       if (dqueues & TIPC_DUMP_SK_RCVQ) {
+               i += scnprintf(buf + i, sz - i, "sk_receive_queue: ");
+               i += tipc_list_dump(&sk->sk_receive_queue, false, buf + i);
+       }
+
+       if (dqueues & TIPC_DUMP_SK_BKLGQ) {
+               i += scnprintf(buf + i, sz - i, "sk_backlog:\n  head ");
+               i += tipc_skb_dump(sk->sk_backlog.head, false, buf + i);
+               if (sk->sk_backlog.tail != sk->sk_backlog.head) {
+                       i += scnprintf(buf + i, sz - i, "  tail ");
+                       i += tipc_skb_dump(sk->sk_backlog.tail, false,
+                                          buf + i);
+               }
+       }
+
+       return i;
+}