Merge git://git.kernel.org/pub/scm/linux/kernel/git/herbert/crypto-2.6
[sfrench/cifs-2.6.git] / net / tipc / link.c
index da6018beb6ebc149b72efd816f4d7154bc6cdfd7..c5190ab75290d04202b99a3e923a69fe1a9dad38 100644 (file)
@@ -77,19 +77,19 @@ static const char *link_unk_evt = "Unknown link event ";
 
 static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
                                       struct sk_buff *buf);
-static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf);
-static int  tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
+static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf);
+static int  tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
                                 struct sk_buff **buf);
 static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance);
-static int  link_send_sections_long(struct tipc_port *sender,
-                                   struct iovec const *msg_sect,
-                                   unsigned int len, u32 destnode);
+static int  tipc_link_iovec_long_xmit(struct tipc_port *sender,
+                                     struct iovec const *msg_sect,
+                                     unsigned int len, u32 destnode);
 static void link_state_event(struct tipc_link *l_ptr, u32 event);
 static void link_reset_statistics(struct tipc_link *l_ptr);
 static void link_print(struct tipc_link *l_ptr, const char *str);
-static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
-static void tipc_link_send_sync(struct tipc_link *l);
-static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
+static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf);
+static void tipc_link_sync_xmit(struct tipc_link *l);
+static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
 
 /*
  *  Simple link routines
@@ -147,11 +147,6 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
 /**
  * link_timeout - handle expiration of link timer
  * @l_ptr: pointer to link
- *
- * This routine must not grab "tipc_net_lock" to avoid a potential deadlock conflict
- * with tipc_link_delete().  (There is no risk that the node will be deleted by
- * another thread because tipc_link_delete() always cancels the link timer before
- * tipc_node_delete() is called.)
  */
 static void link_timeout(struct tipc_link *l_ptr)
 {
@@ -213,8 +208,8 @@ static void link_set_timer(struct tipc_link *l_ptr, u32 time)
  * Returns pointer to link.
  */
 struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
-                             struct tipc_bearer *b_ptr,
-                             const struct tipc_media_addr *media_addr)
+                                  struct tipc_bearer *b_ptr,
+                                  const struct tipc_media_addr *media_addr)
 {
        struct tipc_link *l_ptr;
        struct tipc_msg *msg;
@@ -279,41 +274,44 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 
        k_init_timer(&l_ptr->timer, (Handler)link_timeout,
                     (unsigned long)l_ptr);
-       list_add_tail(&l_ptr->link_list, &b_ptr->links);
 
        link_state_event(l_ptr, STARTING_EVT);
 
        return l_ptr;
 }
 
-/**
- * tipc_link_delete - delete a link
- * @l_ptr: pointer to link
- *
- * Note: 'tipc_net_lock' is write_locked, bearer is locked.
- * This routine must not grab the node lock until after link timer cancellation
- * to avoid a potential deadlock situation.
- */
-void tipc_link_delete(struct tipc_link *l_ptr)
+void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
 {
-       if (!l_ptr) {
-               pr_err("Attempt to delete non-existent link\n");
-               return;
-       }
-
-       k_cancel_timer(&l_ptr->timer);
+       struct tipc_link *l_ptr;
+       struct tipc_node *n_ptr;
 
-       tipc_node_lock(l_ptr->owner);
-       tipc_link_reset(l_ptr);
-       tipc_node_detach_link(l_ptr->owner, l_ptr);
-       tipc_link_purge_queues(l_ptr);
-       list_del_init(&l_ptr->link_list);
-       tipc_node_unlock(l_ptr->owner);
-       k_term_timer(&l_ptr->timer);
-       kfree(l_ptr);
+       rcu_read_lock();
+       list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
+               spin_lock_bh(&n_ptr->lock);
+               l_ptr = n_ptr->links[bearer_id];
+               if (l_ptr) {
+                       tipc_link_reset(l_ptr);
+                       if (shutting_down || !tipc_node_is_up(n_ptr)) {
+                               tipc_node_detach_link(l_ptr->owner, l_ptr);
+                               tipc_link_reset_fragments(l_ptr);
+                               spin_unlock_bh(&n_ptr->lock);
+
+                               /* Nobody else can access this link now: */
+                               del_timer_sync(&l_ptr->timer);
+                               kfree(l_ptr);
+                       } else {
+                               /* Detach/delete when failover is finished: */
+                               l_ptr->flags |= LINK_STOPPED;
+                               spin_unlock_bh(&n_ptr->lock);
+                               del_timer_sync(&l_ptr->timer);
+                       }
+                       continue;
+               }
+               spin_unlock_bh(&n_ptr->lock);
+       }
+       rcu_read_unlock();
 }
 
-
 /**
  * link_schedule_port - schedule port for deferred sending
  * @l_ptr: pointer to link
@@ -330,8 +328,6 @@ static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz)
        spin_lock_bh(&tipc_port_list_lock);
        p_ptr = tipc_port_lock(origport);
        if (p_ptr) {
-               if (!p_ptr->wakeup)
-                       goto exit;
                if (!list_empty(&p_ptr->wait_list))
                        goto exit;
                p_ptr->congested = 1;
@@ -366,7 +362,7 @@ void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all)
                list_del_init(&p_ptr->wait_list);
                spin_lock_bh(p_ptr->lock);
                p_ptr->congested = 0;
-               p_ptr->wakeup(p_ptr);
+               tipc_port_wakeup(p_ptr);
                win -= p_ptr->waiting_pkts;
                spin_unlock_bh(p_ptr->lock);
        }
@@ -461,6 +457,21 @@ void tipc_link_reset(struct tipc_link *l_ptr)
        link_reset_statistics(l_ptr);
 }
 
+void tipc_link_reset_list(unsigned int bearer_id)
+{
+       struct tipc_link *l_ptr;
+       struct tipc_node *n_ptr;
+
+       rcu_read_lock();
+       list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
+               spin_lock_bh(&n_ptr->lock);
+               l_ptr = n_ptr->links[bearer_id];
+               if (l_ptr)
+                       tipc_link_reset(l_ptr);
+               spin_unlock_bh(&n_ptr->lock);
+       }
+       rcu_read_unlock();
+}
 
 static void link_activate(struct tipc_link *l_ptr)
 {
@@ -479,7 +490,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
        struct tipc_link *other;
        u32 cont_intv = l_ptr->continuity_interval;
 
-       if (!l_ptr->started && (event != STARTING_EVT))
+       if (l_ptr->flags & LINK_STOPPED)
+               return;
+
+       if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
                return;         /* Not yet. */
 
        /* Check whether changeover is going on */
@@ -499,12 +513,12 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
                                l_ptr->checkpoint = l_ptr->next_in_no;
                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
-                                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
-                                                                0, 0, 0, 0, 0);
+                                       tipc_link_proto_xmit(l_ptr, STATE_MSG,
+                                                            0, 0, 0, 0, 0);
                                        l_ptr->fsm_msg_cnt++;
                                } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
-                                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
-                                                                1, 0, 0, 0, 0);
+                                       tipc_link_proto_xmit(l_ptr, STATE_MSG,
+                                                            1, 0, 0, 0, 0);
                                        l_ptr->fsm_msg_cnt++;
                                }
                                link_set_timer(l_ptr, cont_intv);
@@ -512,7 +526,7 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        }
                        l_ptr->state = WORKING_UNKNOWN;
                        l_ptr->fsm_msg_cnt = 0;
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        link_set_timer(l_ptr, cont_intv / 4);
                        break;
@@ -522,7 +536,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        tipc_link_reset(l_ptr);
                        l_ptr->state = RESET_RESET;
                        l_ptr->fsm_msg_cnt = 0;
-                       tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
+                                            0, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        link_set_timer(l_ptr, cont_intv);
                        break;
@@ -544,7 +559,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        tipc_link_reset(l_ptr);
                        l_ptr->state = RESET_RESET;
                        l_ptr->fsm_msg_cnt = 0;
-                       tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
+                                            0, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        link_set_timer(l_ptr, cont_intv);
                        break;
@@ -554,14 +570,14 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                                l_ptr->fsm_msg_cnt = 0;
                                l_ptr->checkpoint = l_ptr->next_in_no;
                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
-                                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
-                                                                0, 0, 0, 0, 0);
+                                       tipc_link_proto_xmit(l_ptr, STATE_MSG,
+                                                            0, 0, 0, 0, 0);
                                        l_ptr->fsm_msg_cnt++;
                                }
                                link_set_timer(l_ptr, cont_intv);
                        } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
-                               tipc_link_send_proto_msg(l_ptr, STATE_MSG,
-                                                        1, 0, 0, 0, 0);
+                               tipc_link_proto_xmit(l_ptr, STATE_MSG,
+                                                    1, 0, 0, 0, 0);
                                l_ptr->fsm_msg_cnt++;
                                link_set_timer(l_ptr, cont_intv / 4);
                        } else {        /* Link has failed */
@@ -570,8 +586,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                                tipc_link_reset(l_ptr);
                                l_ptr->state = RESET_UNKNOWN;
                                l_ptr->fsm_msg_cnt = 0;
-                               tipc_link_send_proto_msg(l_ptr, RESET_MSG,
-                                                        0, 0, 0, 0, 0);
+                               tipc_link_proto_xmit(l_ptr, RESET_MSG,
+                                                    0, 0, 0, 0, 0);
                                l_ptr->fsm_msg_cnt++;
                                link_set_timer(l_ptr, cont_intv);
                        }
@@ -591,24 +607,25 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        l_ptr->state = WORKING_WORKING;
                        l_ptr->fsm_msg_cnt = 0;
                        link_activate(l_ptr);
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        if (l_ptr->owner->working_links == 1)
-                               tipc_link_send_sync(l_ptr);
+                               tipc_link_sync_xmit(l_ptr);
                        link_set_timer(l_ptr, cont_intv);
                        break;
                case RESET_MSG:
                        l_ptr->state = RESET_RESET;
                        l_ptr->fsm_msg_cnt = 0;
-                       tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 1, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
+                                            1, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        link_set_timer(l_ptr, cont_intv);
                        break;
                case STARTING_EVT:
-                       l_ptr->started = 1;
+                       l_ptr->flags |= LINK_STARTED;
                        /* fall through */
                case TIMEOUT_EVT:
-                       tipc_link_send_proto_msg(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        link_set_timer(l_ptr, cont_intv);
                        break;
@@ -626,16 +643,17 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
                        l_ptr->state = WORKING_WORKING;
                        l_ptr->fsm_msg_cnt = 0;
                        link_activate(l_ptr);
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        if (l_ptr->owner->working_links == 1)
-                               tipc_link_send_sync(l_ptr);
+                               tipc_link_sync_xmit(l_ptr);
                        link_set_timer(l_ptr, cont_intv);
                        break;
                case RESET_MSG:
                        break;
                case TIMEOUT_EVT:
-                       tipc_link_send_proto_msg(l_ptr, ACTIVATE_MSG, 0, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
+                                            0, 0, 0, 0, 0);
                        l_ptr->fsm_msg_cnt++;
                        link_set_timer(l_ptr, cont_intv);
                        break;
@@ -721,11 +739,11 @@ static void link_add_chain_to_outqueue(struct tipc_link *l_ptr,
 }
 
 /*
- * tipc_link_send_buf() is the 'full path' for messages, called from
- * inside TIPC when the 'fast path' in tipc_send_buf
+ * tipc_link_xmit() is the 'full path' for messages, called from
+ * inside TIPC when the 'fast path' in tipc_send_xmit
  * has failed, and from link_send()
  */
-int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
+int __tipc_link_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
 {
        struct tipc_msg *msg = buf_msg(buf);
        u32 size = msg_size(msg);
@@ -753,7 +771,7 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
 
        /* Fragmentation needed ? */
        if (size > max_packet)
-               return link_send_long_buf(l_ptr, buf);
+               return tipc_link_frag_xmit(l_ptr, buf);
 
        /* Packet can be queued or sent. */
        if (likely(!link_congested(l_ptr))) {
@@ -797,11 +815,11 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
 }
 
 /*
- * tipc_link_send(): same as tipc_link_send_buf(), but the link to use has
- * not been selected yet, and the the owner node is not locked
+ * tipc_link_xmit(): same as __tipc_link_xmit(), but the link to use
+ * has not been selected yet, and the the owner node is not locked
  * Called by TIPC internal users, e.g. the name distributor
  */
-int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
+int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector)
 {
        struct tipc_link *l_ptr;
        struct tipc_node *n_ptr;
@@ -813,7 +831,7 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
                tipc_node_lock(n_ptr);
                l_ptr = n_ptr->active_links[selector & 1];
                if (l_ptr)
-                       res = tipc_link_send_buf(l_ptr, buf);
+                       res = __tipc_link_xmit(l_ptr, buf);
                else
                        kfree_skb(buf);
                tipc_node_unlock(n_ptr);
@@ -825,14 +843,14 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
 }
 
 /*
- * tipc_link_send_sync - synchronize broadcast link endpoints.
+ * tipc_link_sync_xmit - synchronize broadcast link endpoints.
  *
  * Give a newly added peer node the sequence number where it should
  * start receiving and acking broadcast packets.
  *
  * Called with node locked
  */
-static void tipc_link_send_sync(struct tipc_link *l)
+static void tipc_link_sync_xmit(struct tipc_link *l)
 {
        struct sk_buff *buf;
        struct tipc_msg *msg;
@@ -849,14 +867,14 @@ static void tipc_link_send_sync(struct tipc_link *l)
 }
 
 /*
- * tipc_link_recv_sync - synchronize broadcast link endpoints.
+ * tipc_link_sync_rcv - synchronize broadcast link endpoints.
  * Receive the sequence number where we should start receiving and
  * acking broadcast packets from a newly added peer node, and open
  * up for reception of such packets.
  *
  * Called with node locked
  */
-static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
+static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
 {
        struct tipc_msg *msg = buf_msg(buf);
 
@@ -866,7 +884,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
 }
 
 /*
- * tipc_link_send_names - send name table entries to new neighbor
+ * tipc_link_names_xmit - send name table entries to new neighbor
  *
  * Send routine for bulk delivery of name table messages when contact
  * with a new neighbor occurs. No link congestion checking is performed
@@ -874,7 +892,7 @@ static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
  * small enough not to require fragmentation.
  * Called without any locks held.
  */
-void tipc_link_send_names(struct list_head *message_list, u32 dest)
+void tipc_link_names_xmit(struct list_head *message_list, u32 dest)
 {
        struct tipc_node *n_ptr;
        struct tipc_link *l_ptr;
@@ -909,13 +927,13 @@ void tipc_link_send_names(struct list_head *message_list, u32 dest)
 }
 
 /*
- * link_send_buf_fast: Entry for data messages where the
+ * tipc_link_xmit_fast: Entry for data messages where the
  * destination link is known and the header is complete,
  * inclusive total message length. Very time critical.
  * Link is locked. Returns user data length.
  */
-static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
-                             u32 *used_max_pkt)
+static int tipc_link_xmit_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
+                              u32 *used_max_pkt)
 {
        struct tipc_msg *msg = buf_msg(buf);
        int res = msg_data_sz(msg);
@@ -931,18 +949,18 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
                else
                        *used_max_pkt = l_ptr->max_pkt;
        }
-       return tipc_link_send_buf(l_ptr, buf);  /* All other cases */
+       return __tipc_link_xmit(l_ptr, buf);  /* All other cases */
 }
 
 /*
- * tipc_link_send_sections_fast: Entry for messages where the
+ * tipc_link_iovec_xmit_fast: Entry for messages where the
  * destination processor is known and the header is complete,
  * except for total message length.
  * Returns user data length or errno.
  */
-int tipc_link_send_sections_fast(struct tipc_port *sender,
-                                struct iovec const *msg_sect,
-                                unsigned int len, u32 destaddr)
+int tipc_link_iovec_xmit_fast(struct tipc_port *sender,
+                             struct iovec const *msg_sect,
+                             unsigned int len, u32 destaddr)
 {
        struct tipc_msg *hdr = &sender->phdr;
        struct tipc_link *l_ptr;
@@ -968,8 +986,8 @@ again:
                l_ptr = node->active_links[selector];
                if (likely(l_ptr)) {
                        if (likely(buf)) {
-                               res = link_send_buf_fast(l_ptr, buf,
-                                                        &sender->max_pkt);
+                               res = tipc_link_xmit_fast(l_ptr, buf,
+                                                         &sender->max_pkt);
 exit:
                                tipc_node_unlock(node);
                                read_unlock_bh(&tipc_net_lock);
@@ -995,24 +1013,21 @@ exit:
                        if ((msg_hdr_sz(hdr) + res) <= sender->max_pkt)
                                goto again;
 
-                       return link_send_sections_long(sender, msg_sect, len,
-                                                      destaddr);
+                       return tipc_link_iovec_long_xmit(sender, msg_sect,
+                                                        len, destaddr);
                }
                tipc_node_unlock(node);
        }
        read_unlock_bh(&tipc_net_lock);
 
        /* Couldn't find a link to the destination node */
-       if (buf)
-               return tipc_reject_msg(buf, TIPC_ERR_NO_NODE);
-       if (res >= 0)
-               return tipc_port_reject_sections(sender, hdr, msg_sect,
-                                                len, TIPC_ERR_NO_NODE);
-       return res;
+       kfree_skb(buf);
+       tipc_port_iovec_reject(sender, hdr, msg_sect, len, TIPC_ERR_NO_NODE);
+       return -ENETUNREACH;
 }
 
 /*
- * link_send_sections_long(): Entry for long messages where the
+ * tipc_link_iovec_long_xmit(): Entry for long messages where the
  * destination node is known and the header is complete,
  * inclusive total message length.
  * Link and bearer congestion status have been checked to be ok,
@@ -1025,9 +1040,9 @@ exit:
  *
  * Returns user data length or errno.
  */
-static int link_send_sections_long(struct tipc_port *sender,
-                                  struct iovec const *msg_sect,
-                                  unsigned int len, u32 destaddr)
+static int tipc_link_iovec_long_xmit(struct tipc_port *sender,
+                                    struct iovec const *msg_sect,
+                                    unsigned int len, u32 destaddr)
 {
        struct tipc_link *l_ptr;
        struct tipc_node *node;
@@ -1146,8 +1161,9 @@ error:
        } else {
 reject:
                kfree_skb_list(buf_chain);
-               return tipc_port_reject_sections(sender, hdr, msg_sect,
-                                                len, TIPC_ERR_NO_NODE);
+               tipc_port_iovec_reject(sender, hdr, msg_sect, len,
+                                      TIPC_ERR_NO_NODE);
+               return -ENETUNREACH;
        }
 
        /* Append chain of fragments to send queue & send them */
@@ -1441,15 +1457,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
                u32 seq_no;
                u32 ackd;
                u32 released = 0;
-               int type;
 
                head = head->next;
                buf->next = NULL;
 
-               /* Ensure bearer is still enabled */
-               if (unlikely(!b_ptr->active))
-                       goto discard;
-
                /* Ensure message is well-formed */
                if (unlikely(!link_recv_buf_validate(buf)))
                        goto discard;
@@ -1463,9 +1474,9 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
 
                if (unlikely(msg_non_seq(msg))) {
                        if (msg_user(msg) ==  LINK_CONFIG)
-                               tipc_disc_recv_msg(buf, b_ptr);
+                               tipc_disc_rcv(buf, b_ptr);
                        else
-                               tipc_bclink_recv_pkt(buf);
+                               tipc_bclink_rcv(buf);
                        continue;
                }
 
@@ -1489,7 +1500,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
                if ((n_ptr->block_setup & WAIT_PEER_DOWN) &&
                        msg_user(msg) == LINK_PROTOCOL &&
                        (msg_type(msg) == RESET_MSG ||
-                                       msg_type(msg) == ACTIVATE_MSG) &&
+                        msg_type(msg) == ACTIVATE_MSG) &&
                        !msg_redundant_link(msg))
                        n_ptr->block_setup &= ~WAIT_PEER_DOWN;
 
@@ -1508,7 +1519,6 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
                while ((crs != l_ptr->next_out) &&
                       less_eq(buf_seqno(crs), ackd)) {
                        struct sk_buff *next = crs->next;
-
                        kfree_skb(crs);
                        crs = next;
                        released++;
@@ -1521,18 +1531,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
                /* Try sending any messages link endpoint has pending */
                if (unlikely(l_ptr->next_out))
                        tipc_link_push_queue(l_ptr);
+
                if (unlikely(!list_empty(&l_ptr->waiting_ports)))
                        tipc_link_wakeup_ports(l_ptr, 0);
+
                if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
                        l_ptr->stats.sent_acks++;
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
                }
 
-               /* Now (finally!) process the incoming message */
-protocol_check:
+               /* Process the incoming packet */
                if (unlikely(!link_working_working(l_ptr))) {
                        if (msg_user(msg) == LINK_PROTOCOL) {
-                               link_recv_proto_msg(l_ptr, buf);
+                               tipc_link_proto_rcv(l_ptr, buf);
                                head = link_insert_deferred_queue(l_ptr, head);
                                tipc_node_unlock(n_ptr);
                                continue;
@@ -1561,67 +1572,65 @@ protocol_check:
                l_ptr->next_in_no++;
                if (unlikely(l_ptr->oldest_deferred_in))
                        head = link_insert_deferred_queue(l_ptr, head);
-deliver:
-               if (likely(msg_isdata(msg))) {
-                       tipc_node_unlock(n_ptr);
-                       tipc_port_recv_msg(buf);
-                       continue;
+
+               /* Deliver packet/message to correct user: */
+               if (unlikely(msg_user(msg) ==  CHANGEOVER_PROTOCOL)) {
+                       if (!tipc_link_tunnel_rcv(n_ptr, &buf)) {
+                               tipc_node_unlock(n_ptr);
+                               continue;
+                       }
+                       msg = buf_msg(buf);
+               } else if (msg_user(msg) == MSG_FRAGMENTER) {
+                       int rc;
+
+                       l_ptr->stats.recv_fragments++;
+                       rc = tipc_link_frag_rcv(&l_ptr->reasm_head,
+                                               &l_ptr->reasm_tail,
+                                               &buf);
+                       if (rc == LINK_REASM_COMPLETE) {
+                               l_ptr->stats.recv_fragmented++;
+                               msg = buf_msg(buf);
+                       } else {
+                               if (rc == LINK_REASM_ERROR)
+                                       tipc_link_reset(l_ptr);
+                               tipc_node_unlock(n_ptr);
+                               continue;
+                       }
                }
+
                switch (msg_user(msg)) {
-                       int ret;
+               case TIPC_LOW_IMPORTANCE:
+               case TIPC_MEDIUM_IMPORTANCE:
+               case TIPC_HIGH_IMPORTANCE:
+               case TIPC_CRITICAL_IMPORTANCE:
+                       tipc_node_unlock(n_ptr);
+                       tipc_port_rcv(buf);
+                       continue;
                case MSG_BUNDLER:
                        l_ptr->stats.recv_bundles++;
                        l_ptr->stats.recv_bundled += msg_msgcnt(msg);
                        tipc_node_unlock(n_ptr);
-                       tipc_link_recv_bundle(buf);
+                       tipc_link_bundle_rcv(buf);
                        continue;
                case NAME_DISTRIBUTOR:
                        n_ptr->bclink.recv_permitted = true;
                        tipc_node_unlock(n_ptr);
-                       tipc_named_recv(buf);
-                       continue;
-               case BCAST_PROTOCOL:
-                       tipc_link_recv_sync(n_ptr, buf);
-                       tipc_node_unlock(n_ptr);
+                       tipc_named_rcv(buf);
                        continue;
                case CONN_MANAGER:
                        tipc_node_unlock(n_ptr);
-                       tipc_port_recv_proto_msg(buf);
+                       tipc_port_proto_rcv(buf);
                        continue;
-               case MSG_FRAGMENTER:
-                       l_ptr->stats.recv_fragments++;
-                       ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
-                                                     &l_ptr->reasm_tail,
-                                                     &buf);
-                       if (ret == LINK_REASM_COMPLETE) {
-                               l_ptr->stats.recv_fragmented++;
-                               msg = buf_msg(buf);
-                               goto deliver;
-                       }
-                       if (ret == LINK_REASM_ERROR)
-                               tipc_link_reset(l_ptr);
-                       tipc_node_unlock(n_ptr);
-                       continue;
-               case CHANGEOVER_PROTOCOL:
-                       type = msg_type(msg);
-                       if (tipc_link_tunnel_rcv(&l_ptr, &buf)) {
-                               msg = buf_msg(buf);
-                               seq_no = msg_seqno(msg);
-                               if (type == ORIGINAL_MSG)
-                                       goto deliver;
-                               goto protocol_check;
-                       }
+               case BCAST_PROTOCOL:
+                       tipc_link_sync_rcv(n_ptr, buf);
                        break;
                default:
                        kfree_skb(buf);
-                       buf = NULL;
                        break;
                }
                tipc_node_unlock(n_ptr);
-               tipc_net_route_msg(buf);
                continue;
 unlock_discard:
-
                tipc_node_unlock(n_ptr);
 discard:
                kfree_skb(buf);
@@ -1688,7 +1697,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
        u32 seq_no = buf_seqno(buf);
 
        if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
-               link_recv_proto_msg(l_ptr, buf);
+               tipc_link_proto_rcv(l_ptr, buf);
                return;
        }
 
@@ -1711,7 +1720,7 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
                l_ptr->stats.deferred_recv++;
                TIPC_SKB_CB(buf)->deferred = true;
                if ((l_ptr->deferred_inqueue_sz % 16) == 1)
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
+                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
        } else
                l_ptr->stats.duplicates++;
 }
@@ -1719,9 +1728,8 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
 /*
  * Send protocol message to the other endpoint.
  */
-void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
-                             int probe_msg, u32 gap, u32 tolerance,
-                             u32 priority, u32 ack_mtu)
+void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
+                         u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
 {
        struct sk_buff *buf = NULL;
        struct tipc_msg *msg = l_ptr->pmsg;
@@ -1820,7 +1828,7 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
  * Note that network plane id propagates through the network, and may
  * change at any time. The node with lowest address rules
  */
-static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
+static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
 {
        u32 rec_gap = 0;
        u32 max_pkt_info;
@@ -1939,8 +1947,8 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
                                                      msg_last_bcast(msg));
 
                if (rec_gap || (msg_probe(msg))) {
-                       tipc_link_send_proto_msg(l_ptr, STATE_MSG,
-                                                0, rec_gap, 0, 0, max_pkt_ack);
+                       tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0,
+                                            0, max_pkt_ack);
                }
                if (msg_seq_gap(msg)) {
                        l_ptr->stats.recv_nacks++;
@@ -1979,7 +1987,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
        }
        skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE);
        skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length);
-       tipc_link_send_buf(tunnel, buf);
+       __tipc_link_xmit(tunnel, buf);
 }
 
 
@@ -2012,7 +2020,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
                if (buf) {
                        skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
                        msg_set_size(&tunnel_hdr, INT_H_SIZE);
-                       tipc_link_send_buf(tunnel, buf);
+                       __tipc_link_xmit(tunnel, buf);
                } else {
                        pr_warn("%sunable to send changeover msg\n",
                                link_co_err);
@@ -2046,7 +2054,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
        }
 }
 
-/* tipc_link_dup_send_queue(): A second link has become active. Tunnel a
+/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
  * duplicate of the first link's send queue via the new link. This way, we
  * are guaranteed that currently queued packets from a socket are delivered
  * before future traffic from the same socket, even if this is using the
@@ -2055,7 +2063,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
  * and sequence order is preserved per sender/receiver socket pair.
  * Owner node is locked.
  */
-void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
+void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
                              struct tipc_link *tunnel)
 {
        struct sk_buff *iter;
@@ -2085,7 +2093,7 @@ void tipc_link_dup_send_queue(struct tipc_link *l_ptr,
                skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE);
                skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data,
                                               length);
-               tipc_link_send_buf(tunnel, outbuf);
+               __tipc_link_xmit(tunnel, outbuf);
                if (!tipc_link_is_up(l_ptr))
                        return;
                iter = iter->next;
@@ -2112,89 +2120,114 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
        return eb;
 }
 
-/*  tipc_link_tunnel_rcv(): Receive a tunneled packet, sent
- *  via other link as result of a failover (ORIGINAL_MSG) or
- *  a new active link (DUPLICATE_MSG). Failover packets are
- *  returned to the active link for delivery upwards.
+
+
+/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
+ * Owner node is locked.
+ */
+static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
+                             struct sk_buff *t_buf)
+{
+       struct sk_buff *buf;
+
+       if (!tipc_link_is_up(l_ptr))
+               return;
+
+       buf = buf_extract(t_buf, INT_H_SIZE);
+       if (buf == NULL) {
+               pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
+               return;
+       }
+
+       /* Add buffer to deferred queue, if applicable: */
+       link_handle_out_of_seq_msg(l_ptr, buf);
+}
+
+/*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
  *  Owner node is locked.
  */
-static int tipc_link_tunnel_rcv(struct tipc_link **l_ptr,
-                               struct sk_buff **buf)
+static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
+                                             struct sk_buff *t_buf)
 {
-       struct sk_buff *tunnel_buf = *buf;
-       struct tipc_link *dest_link;
+       struct tipc_msg *t_msg = buf_msg(t_buf);
+       struct sk_buff *buf = NULL;
        struct tipc_msg *msg;
-       struct tipc_msg *tunnel_msg = buf_msg(tunnel_buf);
-       u32 msg_typ = msg_type(tunnel_msg);
-       u32 msg_count = msg_msgcnt(tunnel_msg);
-       u32 bearer_id = msg_bearer_id(tunnel_msg);
 
-       if (bearer_id >= MAX_BEARERS)
-               goto exit;
-       dest_link = (*l_ptr)->owner->links[bearer_id];
-       if (!dest_link)
-               goto exit;
-       if (dest_link == *l_ptr) {
-               pr_err("Unexpected changeover message on link <%s>\n",
-                      (*l_ptr)->name);
-               goto exit;
-       }
-       *l_ptr = dest_link;
-       msg = msg_get_wrapped(tunnel_msg);
+       if (tipc_link_is_up(l_ptr))
+               tipc_link_reset(l_ptr);
 
-       if (msg_typ == DUPLICATE_MSG) {
-               if (less(msg_seqno(msg), mod(dest_link->next_in_no)))
-                       goto exit;
-               *buf = buf_extract(tunnel_buf, INT_H_SIZE);
-               if (*buf == NULL) {
-                       pr_warn("%sduplicate msg dropped\n", link_co_err);
+       /* First failover packet? */
+       if (l_ptr->exp_msg_count == START_CHANGEOVER)
+               l_ptr->exp_msg_count = msg_msgcnt(t_msg);
+
+       /* Should there be an inner packet? */
+       if (l_ptr->exp_msg_count) {
+               l_ptr->exp_msg_count--;
+               buf = buf_extract(t_buf, INT_H_SIZE);
+               if (buf == NULL) {
+                       pr_warn("%sno inner failover pkt\n", link_co_err);
                        goto exit;
                }
-               kfree_skb(tunnel_buf);
-               return 1;
-       }
+               msg = buf_msg(buf);
 
-       /* First original message ?: */
-       if (tipc_link_is_up(dest_link)) {
-               pr_info("%s<%s>, changeover initiated by peer\n", link_rst_msg,
-                       dest_link->name);
-               tipc_link_reset(dest_link);
-               dest_link->exp_msg_count = msg_count;
-               if (!msg_count)
-                       goto exit;
-       } else if (dest_link->exp_msg_count == START_CHANGEOVER) {
-               dest_link->exp_msg_count = msg_count;
-               if (!msg_count)
+               if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
+                       kfree_skb(buf);
+                       buf = NULL;
                        goto exit;
+               }
+               if (msg_user(msg) == MSG_FRAGMENTER) {
+                       l_ptr->stats.recv_fragments++;
+                       tipc_link_frag_rcv(&l_ptr->reasm_head,
+                                          &l_ptr->reasm_tail,
+                                          &buf);
+               }
+       }
+exit:
+       if ((l_ptr->exp_msg_count == 0) && (l_ptr->flags & LINK_STOPPED)) {
+               tipc_node_detach_link(l_ptr->owner, l_ptr);
+               kfree(l_ptr);
        }
+       return buf;
+}
 
-       /* Receive original message */
-       if (dest_link->exp_msg_count == 0) {
-               pr_warn("%sgot too many tunnelled messages\n", link_co_err);
+/*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
+ *  via other link as result of a failover (ORIGINAL_MSG) or
+ *  a new active link (DUPLICATE_MSG). Failover packets are
+ *  returned to the active link for delivery upwards.
+ *  Owner node is locked.
+ */
+static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
+                               struct sk_buff **buf)
+{
+       struct sk_buff *t_buf = *buf;
+       struct tipc_link *l_ptr;
+       struct tipc_msg *t_msg = buf_msg(t_buf);
+       u32 bearer_id = msg_bearer_id(t_msg);
+
+       *buf = NULL;
+
+       if (bearer_id >= MAX_BEARERS)
                goto exit;
-       }
-       dest_link->exp_msg_count--;
-       if (less(msg_seqno(msg), dest_link->reset_checkpoint)) {
+
+       l_ptr = n_ptr->links[bearer_id];
+       if (!l_ptr)
                goto exit;
-       } else {
-               *buf = buf_extract(tunnel_buf, INT_H_SIZE);
-               if (*buf != NULL) {
-                       kfree_skb(tunnel_buf);
-                       return 1;
-               } else {
-                       pr_warn("%soriginal msg dropped\n", link_co_err);
-               }
-       }
+
+       if (msg_type(t_msg) == DUPLICATE_MSG)
+               tipc_link_dup_rcv(l_ptr, t_buf);
+       else if (msg_type(t_msg) == ORIGINAL_MSG)
+               *buf = tipc_link_failover_rcv(l_ptr, t_buf);
+       else
+               pr_warn("%sunknown tunnel pkt received\n", link_co_err);
 exit:
-       *buf = NULL;
-       kfree_skb(tunnel_buf);
-       return 0;
+       kfree_skb(t_buf);
+       return *buf != NULL;
 }
 
 /*
  *  Bundler functionality:
  */
-void tipc_link_recv_bundle(struct sk_buff *buf)
+void tipc_link_bundle_rcv(struct sk_buff *buf)
 {
        u32 msgcount = msg_msgcnt(buf_msg(buf));
        u32 pos = INT_H_SIZE;
@@ -2217,11 +2250,11 @@ void tipc_link_recv_bundle(struct sk_buff *buf)
  */
 
 /*
- * link_send_long_buf: Entry for buffers needing fragmentation.
+ * tipc_link_frag_xmit: Entry for buffers needing fragmentation.
  * The buffer is complete, inclusive total message length.
  * Returns user data length.
  */
-static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
+static int tipc_link_frag_xmit(struct tipc_link *l_ptr, struct sk_buff *buf)
 {
        struct sk_buff *buf_chain = NULL;
        struct sk_buff *buf_chain_tail = (struct sk_buff *)&buf_chain;
@@ -2284,12 +2317,11 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
        return dsz;
 }
 
-/*
- * tipc_link_recv_fragment(): Called with node lock on. Returns
+/* tipc_link_frag_rcv(): Called with node lock on. Returns
  * the reassembled buffer if message is complete.
  */
-int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
-                           struct sk_buff **fbuf)
+int tipc_link_frag_rcv(struct sk_buff **head, struct sk_buff **tail,
+                      struct sk_buff **fbuf)
 {
        struct sk_buff *frag = *fbuf;
        struct tipc_msg *msg = buf_msg(frag);
@@ -2303,6 +2335,7 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
                        goto out_free;
                *head = frag;
                skb_frag_list_init(*head);
+               *fbuf = NULL;
                return 0;
        } else if (*head &&
                   skb_try_coalesce(*head, frag, &headstolen, &delta)) {
@@ -2322,10 +2355,12 @@ int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
                *tail = *head = NULL;
                return LINK_REASM_COMPLETE;
        }
+       *fbuf = NULL;
        return 0;
 out_free:
        pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
        kfree_skb(*fbuf);
+       *fbuf = NULL;
        return LINK_REASM_ERROR;
 }
 
@@ -2359,35 +2394,41 @@ void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
        l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
 }
 
-/**
- * link_find_link - locate link by name
- * @name: ptr to link name string
- * @node: ptr to area to be filled with ptr to associated node
- *
+/* tipc_link_find_owner - locate owner node of link by link's name
+ * @name: pointer to link name string
+ * @bearer_id: pointer to index in 'node->links' array where the link was found.
  * Caller must hold 'tipc_net_lock' to ensure node and bearer are not deleted;
  * this also prevents link deletion.
  *
- * Returns pointer to link (or 0 if invalid link name).
+ * Returns pointer to node owning the link, or 0 if no matching link is found.
  */
-static struct tipc_link *link_find_link(const char *name,
-                                       struct tipc_node **node)
+static struct tipc_node *tipc_link_find_owner(const char *link_name,
+                                             unsigned int *bearer_id)
 {
        struct tipc_link *l_ptr;
        struct tipc_node *n_ptr;
+       struct tipc_node *found_node = 0;
        int i;
 
-       list_for_each_entry(n_ptr, &tipc_node_list, list) {
+       *bearer_id = 0;
+       rcu_read_lock();
+       list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
+               tipc_node_lock(n_ptr);
                for (i = 0; i < MAX_BEARERS; i++) {
                        l_ptr = n_ptr->links[i];
-                       if (l_ptr && !strcmp(l_ptr->name, name))
-                               goto found;
+                       if (l_ptr && !strcmp(l_ptr->name, link_name)) {
+                               *bearer_id = i;
+                               found_node = n_ptr;
+                               break;
+                       }
                }
+               tipc_node_unlock(n_ptr);
+               if (found_node)
+                       break;
        }
-       l_ptr = NULL;
-       n_ptr = NULL;
-found:
-       *node = n_ptr;
-       return l_ptr;
+       rcu_read_unlock();
+
+       return found_node;
 }
 
 /**
@@ -2429,32 +2470,33 @@ static int link_cmd_set_value(const char *name, u32 new_value, u16 cmd)
        struct tipc_link *l_ptr;
        struct tipc_bearer *b_ptr;
        struct tipc_media *m_ptr;
+       int bearer_id;
        int res = 0;
 
-       l_ptr = link_find_link(name, &node);
-       if (l_ptr) {
-               /*
-                * acquire node lock for tipc_link_send_proto_msg().
-                * see "TIPC locking policy" in net.c.
-                */
+       node = tipc_link_find_owner(name, &bearer_id);
+       if (node) {
                tipc_node_lock(node);
-               switch (cmd) {
-               case TIPC_CMD_SET_LINK_TOL:
-                       link_set_supervision_props(l_ptr, new_value);
-                       tipc_link_send_proto_msg(l_ptr,
-                               STATE_MSG, 0, 0, new_value, 0, 0);
-                       break;
-               case TIPC_CMD_SET_LINK_PRI:
-                       l_ptr->priority = new_value;
-                       tipc_link_send_proto_msg(l_ptr,
-                               STATE_MSG, 0, 0, 0, new_value, 0);
-                       break;
-               case TIPC_CMD_SET_LINK_WINDOW:
-                       tipc_link_set_queue_limits(l_ptr, new_value);
-                       break;
-               default:
-                       res = -EINVAL;
-                       break;
+               l_ptr = node->links[bearer_id];
+
+               if (l_ptr) {
+                       switch (cmd) {
+                       case TIPC_CMD_SET_LINK_TOL:
+                               link_set_supervision_props(l_ptr, new_value);
+                               tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
+                                                    new_value, 0, 0);
+                               break;
+                       case TIPC_CMD_SET_LINK_PRI:
+                               l_ptr->priority = new_value;
+                               tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0,
+                                                    0, new_value, 0);
+                               break;
+                       case TIPC_CMD_SET_LINK_WINDOW:
+                               tipc_link_set_queue_limits(l_ptr, new_value);
+                               break;
+                       default:
+                               res = -EINVAL;
+                               break;
+                       }
                }
                tipc_node_unlock(node);
                return res;
@@ -2549,6 +2591,7 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
        char *link_name;
        struct tipc_link *l_ptr;
        struct tipc_node *node;
+       unsigned int bearer_id;
 
        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_LINK_NAME))
                return tipc_cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
@@ -2559,15 +2602,19 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, int req_tlv_
                        return tipc_cfg_reply_error_string("link not found");
                return tipc_cfg_reply_none();
        }
-
        read_lock_bh(&tipc_net_lock);
-       l_ptr = link_find_link(link_name, &node);
-       if (!l_ptr) {
+       node = tipc_link_find_owner(link_name, &bearer_id);
+       if (!node) {
                read_unlock_bh(&tipc_net_lock);
                return tipc_cfg_reply_error_string("link not found");
        }
-
        tipc_node_lock(node);
+       l_ptr = node->links[bearer_id];
+       if (!l_ptr) {
+               tipc_node_unlock(node);
+               read_unlock_bh(&tipc_net_lock);
+               return tipc_cfg_reply_error_string("link not found");
+       }
        link_reset_statistics(l_ptr);
        tipc_node_unlock(node);
        read_unlock_bh(&tipc_net_lock);
@@ -2597,18 +2644,27 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
        struct tipc_node *node;
        char *status;
        u32 profile_total = 0;
+       unsigned int bearer_id;
        int ret;
 
        if (!strcmp(name, tipc_bclink_name))
                return tipc_bclink_stats(buf, buf_size);
 
        read_lock_bh(&tipc_net_lock);
-       l = link_find_link(name, &node);
-       if (!l) {
+       node = tipc_link_find_owner(name, &bearer_id);
+       if (!node) {
                read_unlock_bh(&tipc_net_lock);
                return 0;
        }
        tipc_node_lock(node);
+
+       l = node->links[bearer_id];
+       if (!l) {
+               tipc_node_unlock(node);
+               read_unlock_bh(&tipc_net_lock);
+               return 0;
+       }
+
        s = &l->stats;
 
        if (tipc_link_is_active(l))